From 35058cdfe0134c88f1aa8d23342d1d7b9d39e296 Mon Sep 17 00:00:00 2001 From: Alberto Compagno Date: Tue, 7 Jan 2020 11:46:02 +0100 Subject: [HICN-2] Added P2P confidential communication on hICN P2P confidential communications exploit the TLS 1.3 protocol to let a consumer to establish a secure communication on an hICN name. Currently we don't support the consumer authentication (mutual authentication in TLS) and the 0-rtt session establishment. Change-Id: I2be073847c08a17f28c837d444081920c5e57a07 Signed-off-by: Alberto Compagno Signed-off-by: Olivier Roques Signed-off-by: Mauro Sardara --- .../hicn/transport/interfaces/socket_producer.cc | 148 +++++++++++++-------- 1 file changed, 90 insertions(+), 58 deletions(-) (limited to 'libtransport/src/hicn/transport/interfaces/socket_producer.cc') diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 4fef5d1e2..26a7208b6 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -14,7 +14,7 @@ */ #include -#include +#include #include @@ -35,6 +35,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) data_packet_size_(default_values::content_object_packet_size), content_object_expiry_time_(default_values::content_object_expiry_time), output_buffer_(default_values::producer_socket_output_buffer_size), + async_thread_(), registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), hash_algorithm_(HashAlgorithm::SHA_256), @@ -96,51 +97,47 @@ void ProducerSocket::listen() { void ProducerSocket::passContentObjectToCallbacks( const std::shared_ptr &content_object) { if (content_object) { - if (on_new_segment_) { - io_service_.dispatch([this, content_object]() { + io_service_.dispatch([this, content_object]() { + if (on_new_segment_) { on_new_segment_(*this, *content_object); - }); - } + } - if (on_content_object_to_sign_) { - io_service_.dispatch([this, content_object]() { + if (on_content_object_to_sign_) { on_content_object_to_sign_(*this, *content_object); - }); - } + } - if (on_content_object_in_output_buffer_) { - io_service_.dispatch([this, content_object]() { + if (on_content_object_in_output_buffer_) { on_content_object_in_output_buffer_(*this, *content_object); - }); - } + } + }); output_buffer_.insert(content_object); - if (on_content_object_output_) { - io_service_.dispatch([this, content_object]() { + io_service_.dispatch([this, content_object]() { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); - }); - } + } + }); portal_->sendContentObject(*content_object); } } void ProducerSocket::produce(ContentObject &content_object) { - if (on_content_object_in_output_buffer_) { - io_service_.dispatch([this, &content_object]() { + io_service_.dispatch([this, &content_object]() { + if (on_content_object_in_output_buffer_) { on_content_object_in_output_buffer_(*this, content_object); - }); - } + } + }); output_buffer_.insert(std::static_pointer_cast( content_object.shared_from_this())); - if (on_content_object_output_) { - io_service_.dispatch([this, &content_object]() { + io_service_.dispatch([this, &content_object]() { + if (on_content_object_output_) { on_content_object_output_(*this, content_object); - }); - } + } + }); portal_->sendContentObject(content_object); } @@ -160,8 +157,8 @@ uint32_t ProducerSocket::produce(Name content_name, bool making_manifest = making_manifest_; auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( suffix_strategy_, start_offset); - std::shared_ptr identity; - getSocketOption(GeneralTransportOptions::IDENTITY, identity); + std::shared_ptr signer; + getSocketOption(GeneralTransportOptions::SIGNER, signer); auto buffer_size = buffer->length(); int bytes_segmented = 0; @@ -176,7 +173,7 @@ uint32_t ProducerSocket::produce(Name content_name, bool is_last_manifest = false; // TODO Manifest may still be used for indexing - if (making_manifest && !identity) { + if (making_manifest && !signer) { TRANSPORT_LOGD("Making manifests without setting producer identity."); } @@ -197,11 +194,11 @@ uint32_t ProducerSocket::produce(Name content_name, format = hf_format; if (making_manifest) { manifest_header_size = core::Packet::getHeaderSizeFromFormat( - identity ? hf_format_ah : hf_format, - identity ? identity->getSignatureLength() : 0); - } else if (identity) { + signer ? hf_format_ah : hf_format, + signer ? signer->getSignatureLength() : 0); + } else if (signer) { format = hf_format_ah; - signature_length = identity->getSignatureLength(); + signature_length = signer->getSignatureLength(); } header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length); @@ -227,7 +224,7 @@ uint32_t ProducerSocket::produce(Name content_name, content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, content_name, suffix_strategy_, - identity ? identity->getSignatureLength() : 0)); + signer ? signer->getSignatureLength() : 0)); manifest->setLifetime(content_object_expiry_time); if (is_last) { @@ -237,6 +234,7 @@ uint32_t ProducerSocket::produce(Name content_name, } } + TRANSPORT_LOGD("--------- START PRODUCE ----------"); for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments; packaged_segments++) { if (making_manifest) { @@ -246,15 +244,18 @@ uint32_t ProducerSocket::produce(Name content_name, manifest->encode(); // If identity set, sign manifest - if (identity) { - identity->getSigner().sign(*manifest); + if (signer) { + signer->sign(*manifest); } passContentObjectToCallbacks(manifest); + TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); // Send content objects stored in the queue while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); + TRANSPORT_LOGD("Send content %u", + content_queue_.front()->getName().getSuffix()); content_queue_.pop(); } @@ -266,7 +267,7 @@ uint32_t ProducerSocket::produce(Name content_name, core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, content_name, suffix_strategy_, - identity ? identity->getSignatureLength() : 0)); + signer ? signer->getSignatureLength() : 0)); manifest->setLifetime(content_object_expiry_time); manifest->setFinalBlockNumber( @@ -307,10 +308,11 @@ uint32_t ProducerSocket::produce(Name content_name, manifest->addSuffixHash(content_suffix, hash); content_queue_.push(content_object); } else { - if (identity) { - identity->getSigner().sign(*content_object); + if (signer) { + signer->sign(*content_object); } passContentObjectToCallbacks(content_object); + TRANSPORT_LOGD("Send content %u", content_object->getName().getSuffix()); } } @@ -320,24 +322,28 @@ uint32_t ProducerSocket::produce(Name content_name, } manifest->encode(); - if (identity) { - identity->getSigner().sign(*manifest); + if (signer) { + signer->sign(*manifest); } passContentObjectToCallbacks(manifest); + TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); + TRANSPORT_LOGD("Send content %u", + content_queue_.front()->getName().getSuffix()); content_queue_.pop(); } } - if (on_content_produced_) { - io_service_.dispatch([this, buffer_size]() { + io_service_.dispatch([this, buffer_size]() { + if (on_content_produced_) { on_content_produced_(*this, std::make_error_code(std::errc(0)), buffer_size); - }); - } + } + }); + TRANSPORT_LOGD("--------- END PRODUCE ------------"); return suffix_strategy->getTotalCount(); } @@ -346,16 +352,42 @@ void ProducerSocket::asyncProduce(ContentObject &content_object) { auto co_ptr = std::static_pointer_cast( content_object.shared_from_this()); async_thread_.add([this, content_object = std::move(co_ptr)]() { - produce(*content_object); + ProducerSocket::produce(*content_object); }); } } void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, - size_t buffer_size) { + size_t buffer_size, bool is_last, + uint32_t *start_offset) { if (!async_thread_.stopped()) { - async_thread_.add([this, suffix, buffer = buf, size = buffer_size]() { - produce(suffix, buffer, size, 0, false); + async_thread_.add([this, suffix, buffer = buf, size = buffer_size, is_last, + start_offset]() { + if (start_offset != NULL) { + *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last, + *start_offset); + } else { + ProducerSocket::produce(suffix, buffer, size, is_last, 0); + } + }); + } +} + +void ProducerSocket::asyncProduce(Name content_name, + std::unique_ptr &&buffer, + bool is_last, uint32_t offset, + uint32_t **last_segment) { + if (!async_thread_.stopped()) { + auto a = buffer.release(); + async_thread_.add([this, content_name, a, is_last, offset, last_segment]() { + auto buf = std::unique_ptr(a); + if (last_segment != NULL) { + **last_segment = + offset + ProducerSocket::produce(content_name, std::move(buf), + is_last, offset); + } else { + ProducerSocket::produce(content_name, std::move(buf), is_last, offset); + } }); } } @@ -392,8 +424,8 @@ int ProducerSocket::setSocketOption(int socket_option_key, if (socket_option_value < default_values::max_content_object_size && socket_option_value > 0) { data_packet_size_ = socket_option_value; - break; } + break; case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: output_buffer_.setLimit(socket_option_value); @@ -632,12 +664,12 @@ int ProducerSocket::setSocketOption(int socket_option_key, int ProducerSocket::setSocketOption( int socket_option_key, - const std::shared_ptr &socket_option_value) { + const std::shared_ptr &socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::IDENTITY: { - utils::SpinLock::Acquire locked(identity_lock_); - identity_.reset(); - identity_ = socket_option_value; + case GeneralTransportOptions::SIGNER: { + utils::SpinLock::Acquire locked(signer_lock_); + signer_.reset(); + signer_ = socket_option_value; } break; default: return SOCKET_OPTION_NOT_SET; @@ -844,11 +876,11 @@ int ProducerSocket::getSocketOption(int socket_option_key, int ProducerSocket::getSocketOption( int socket_option_key, - std::shared_ptr &socket_option_value) { + std::shared_ptr &socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::IDENTITY: { - utils::SpinLock::Acquire locked(identity_lock_); - socket_option_value = identity_; + case GeneralTransportOptions::SIGNER: { + utils::SpinLock::Acquire locked(signer_lock_); + socket_option_value = signer_; } break; default: return SOCKET_OPTION_NOT_GET; -- cgit 1.2.3-korg