diff options
Diffstat (limited to 'libtransport/src/implementation')
-rw-r--r-- | libtransport/src/implementation/CMakeLists.txt | 10 | ||||
-rw-r--r-- | libtransport/src/implementation/p2psecure_socket_consumer.cc | 153 | ||||
-rw-r--r-- | libtransport/src/implementation/p2psecure_socket_consumer.h | 20 | ||||
-rw-r--r-- | libtransport/src/implementation/p2psecure_socket_producer.cc | 309 | ||||
-rw-r--r-- | libtransport/src/implementation/p2psecure_socket_producer.h | 61 | ||||
-rw-r--r-- | libtransport/src/implementation/socket.cc | 26 | ||||
-rw-r--r-- | libtransport/src/implementation/socket.h | 52 | ||||
-rw-r--r-- | libtransport/src/implementation/socket_consumer.h | 209 | ||||
-rw-r--r-- | libtransport/src/implementation/socket_producer.h | 642 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_rtc_socket_producer.cc | 33 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_rtc_socket_producer.h | 7 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_socket_consumer.cc | 62 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_socket_consumer.h | 26 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_socket_producer.cc | 323 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_socket_producer.h | 59 |
15 files changed, 672 insertions, 1320 deletions
diff --git a/libtransport/src/implementation/CMakeLists.txt b/libtransport/src/implementation/CMakeLists.txt index 5423a7697..392c99e15 100644 --- a/libtransport/src/implementation/CMakeLists.txt +++ b/libtransport/src/implementation/CMakeLists.txt @@ -13,13 +13,8 @@ cmake_minimum_required(VERSION 3.5 FATAL_ERROR) -list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc -) - list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/socket.h - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.h ) @@ -27,15 +22,16 @@ list(APPEND HEADER_FILES if (${OPENSSL_VERSION} VERSION_EQUAL "1.1.1a" OR ${OPENSSL_VERSION} VERSION_GREATER "1.1.1a") list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.cc - ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc + # ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.cc ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/socket.cc ) list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h - ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h + # ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc index 40ab58161..0b5966e71 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.cc +++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc @@ -15,7 +15,6 @@ #include <implementation/p2psecure_socket_consumer.h> #include <interfaces/tls_socket_consumer.h> - #include <openssl/bio.h> #include <openssl/ssl.h> #include <openssl/tls1.h> @@ -33,10 +32,6 @@ void P2PSecureConsumerSocket::setInterestPayload( if (payload_ != NULL) int2.appendPayload(std::move(payload_)); } -// implement void readBufferAvailable(), size_t maxBufferSize() const override, -// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable() -// must be implemented even if empty. - /* Return the number of read bytes in the return param */ int readOld(BIO *b, char *buf, int size) { if (size < 0) return size; @@ -51,11 +46,13 @@ int readOld(BIO *b, char *buf, int size) { socket->network_name_.setSuffix(socket->random_suffix_); socket->ConsumerSocket::asyncConsume(socket->network_name_); } + if (!socket->something_to_read_) socket->cv_.wait(lck); } size_t size_to_read, read; size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) chain_size = socket->head_->computeChainDataLength(); @@ -79,7 +76,7 @@ int readOld(BIO *b, char *buf, int size) { } } - return read; + return (int)read; } /* Return the number of read bytes in readbytes */ @@ -106,6 +103,7 @@ int writeOld(BIO *b, const char *buf, int num) { socket = (P2PSecureConsumerSocket *)BIO_get_data(b); socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( ConsumerCallbacksOptions::INTEREST_OUTPUT, (ConsumerInterestCallback)std::bind( @@ -173,10 +171,9 @@ int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, P2PSecureConsumerSocket::P2PSecureConsumerSocket( interface::ConsumerSocket *consumer, int handshake_protocol, int transport_protocol) - : ConsumerSocket(consumer, transport_protocol), + : ConsumerSocket(consumer, handshake_protocol), name_(), - tls_consumer_(), - buf_pool_(), + tls_consumer_(nullptr), decrypted_content_(), payload_(), head_(), @@ -224,12 +221,6 @@ P2PSecureConsumerSocket::P2PSecureConsumerSocket( BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - - ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - std::default_random_engine generator; std::uniform_int_distribution<int> distribution( 1, std::numeric_limits<uint32_t>::max()); @@ -244,76 +235,29 @@ P2PSecureConsumerSocket::~P2PSecureConsumerSocket() { SSL_shutdown(ssl_); } -int P2PSecureConsumerSocket::consume(const Name &name) { - if (transport_protocol_->isRunning()) { - return CONSUMER_BUSY; - } +int P2PSecureConsumerSocket::handshake() { + int result = 1; - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - network_name_ = producer_namespace_.getRandomName(); - network_name_.setSuffix(0); - int result = SSL_connect(this->ssl_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); + if (!(SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + return 1; } - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); - TLSConsumerSocket tls_consumer(nullptr, this->protocol_, this->ssl_); - tls_consumer.setInterface(new interface::TLSConsumerSocket(&tls_consumer)); - ConsumerTimerCallback *stats_summary_callback = nullptr; - this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_summary_callback); + ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - uint32_t lifetime; - this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); - tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - lifetime); - tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, - read_callback_decrypted_); - tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - *stats_summary_callback); - tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL, - this->timer_interval_milliseconds_); - tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - tls_consumer.connect(); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - if (payload_ != NULL) - return tls_consumer.consume((prefix->mapName(name)), std::move(payload_)); - else - return tls_consumer.consume((prefix->mapName(name))); -} + network_name_ = producer_namespace_.getRandomName(); + network_name_.setSuffix(0); -int P2PSecureConsumerSocket::asyncConsume(const Name &name) { - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - network_name_ = producer_namespace_.getRandomName(); - network_name_.setSuffix(0); - TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload); - int result = SSL_connect(this->ssl_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); - TRANSPORT_LOGD("Handshake performed!"); - } + TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); + result = SSL_connect(this->ssl_); - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + return result; +} +void P2PSecureConsumerSocket::initSessionSocket() { tls_consumer_ = std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_); tls_consumer_->setInterface( @@ -325,6 +269,7 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) { uint32_t lifetime; this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, @@ -336,6 +281,59 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) { tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_); tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); tls_consumer_->connect(); +} + +int P2PSecureConsumerSocket::consume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if (handshake() != 1) { + throw errors::RuntimeException("Unable to perform client handshake"); + } else { + TRANSPORT_LOGD("Handshake performed!"); + } + + initSessionSocket(); + + if (tls_consumer_ == nullptr) { + throw errors::RuntimeException("TLS socket does not exist"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + + if (payload_ != nullptr) + return tls_consumer_->consume((prefix->mapName(name)), std::move(payload_)); + else + return tls_consumer_->consume((prefix->mapName(name))); +} + +int P2PSecureConsumerSocket::asyncConsume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if (handshake() != 1) { + throw errors::RuntimeException("Unable to perform client handshake"); + } else { + TRANSPORT_LOGD("Handshake performed!"); + } + + initSessionSocket(); + + if (tls_consumer_ == nullptr) { + throw errors::RuntimeException("TLS socket does not exist"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); if (payload_ != NULL) return tls_consumer_->asyncConsume((prefix->mapName(name)), @@ -399,5 +397,4 @@ void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept { bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h index e2ebaf94e..a35a50352 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.h +++ b/libtransport/src/implementation/p2psecure_socket_consumer.h @@ -16,7 +16,6 @@ #pragma once #include <hicn/transport/interfaces/socket_consumer.h> - #include <implementation/tls_socket_consumer.h> #include <openssl/bio.h> #include <openssl/ssl.h> @@ -69,39 +68,25 @@ class P2PSecureConsumerSocket : public ConsumerSocket, private: Name name_; std::shared_ptr<TLSConsumerSocket> tls_consumer_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; BIO_METHOD *bio_meth_; - /* Chain of MemBuf to be used as a temporary buffer to pass descypted data * from the underlying layer to the application */ - utils::ObjectPool<utils::MemBuf> buf_pool_; std::unique_ptr<utils::MemBuf> decrypted_content_; - /* Chain of MemBuf holding the payload to be written into interest or data */ std::unique_ptr<utils::MemBuf> payload_; - /* Chain of MemBuf holding the data retrieved from the underlying layer */ std::unique_ptr<utils::MemBuf> head_; - bool something_to_read_; - bool content_downloaded_; - double old_max_win_; - double old_current_win_; - uint32_t random_suffix_; - ip_prefix_t secure_prefix_; - Prefix producer_namespace_; - interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; - std::mutex mtx_; /* Condition variable for the wait */ @@ -138,9 +123,12 @@ class P2PSecureConsumerSocket : public ConsumerSocket, virtual void readError(const std::error_code ec) noexcept override; virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; - int download_content(const Name &name); + int handshake(); + + void initSessionSocket(); }; } // namespace implementation diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc index d0852539a..aa14f9e37 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.cc +++ b/libtransport/src/implementation/p2psecure_socket_producer.cc @@ -14,13 +14,11 @@ */ #include <hicn/transport/core/interest.h> - #include <implementation/p2psecure_socket_producer.h> -#include <implementation/tls_rtc_socket_producer.h> +// #include <implementation/tls_rtc_socket_producer.h> #include <implementation/tls_socket_producer.h> #include <interfaces/tls_rtc_socket_producer.h> #include <interfaces/tls_socket_producer.h> - #include <openssl/bio.h> #include <openssl/rand.h> #include <openssl/ssl.h> @@ -34,33 +32,31 @@ namespace implementation { P2PSecureProducerSocket::P2PSecureProducerSocket( interface::ProducerSocket *producer_socket) - : ProducerSocket(producer_socket), + : ProducerSocket(producer_socket, + ProductionProtocolAlgorithms::BYTE_STREAM), mtx_(), cv_(), - map_secure_producers(), - map_secure_rtc_producers(), - list_secure_producers() {} + map_producers(), + list_producers() {} P2PSecureProducerSocket::P2PSecureProducerSocket( interface::ProducerSocket *producer_socket, bool rtc, - const std::shared_ptr<utils::Identity> &identity) - : ProducerSocket(producer_socket), + const std::shared_ptr<auth::Identity> &identity) + : ProducerSocket(producer_socket, + ProductionProtocolAlgorithms::BYTE_STREAM), rtc_(rtc), mtx_(), cv_(), - map_secure_producers(), - map_secure_rtc_producers(), - list_secure_producers() { - /* - * Setup SSL context (identity and parameter to use TLS 1.3) - */ + map_producers(), + list_producers() { + /* Setup SSL context (identity and parameter to use TLS 1.3) */ der_cert_ = parcKeyStore_GetDEREncodedCertificate( - (identity->getSigner()->getKeyStore())); + (identity->getSigner()->getParcKeyStore())); der_prk_ = parcKeyStore_GetDEREncodedPrivateKey( - (identity->getSigner()->getKeyStore())); + (identity->getSigner()->getParcKeyStore())); - int cert_size = parcBuffer_Limit(der_cert_); - int prk_size = parcBuffer_Limit(der_prk_); + int cert_size = (int)parcBuffer_Limit(der_cert_); + int prk_size = (int)parcBuffer_Limit(der_prk_); const uint8_t *cert = reinterpret_cast<uint8_t *>(parcBuffer_Overlay(der_cert_, cert_size)); const uint8_t *prk = @@ -68,10 +64,8 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( cert_509_ = d2i_X509(NULL, &cert, cert_size); pkey_rsa_ = d2i_AutoPrivateKey(NULL, &prk, prk_size); - /* - * Set the callback so that when an interest is received we catch it and we - * decrypt the payload before passing it to the application. - */ + /* Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. */ ProducerSocket::setSocketOption( ProducerCallbacksOptions::INTEREST_INPUT, (ProducerInterestCallback)std::bind( @@ -84,58 +78,76 @@ P2PSecureProducerSocket::~P2PSecureProducerSocket() { if (der_prk_) parcBuffer_Release(&der_prk_); } +void P2PSecureProducerSocket::initSessionSocket( + std::unique_ptr<TLSProducerSocket> &producer) { + producer->on_content_produced_application_ = + this->on_content_produced_application_; + producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, + this->content_object_expiry_time_); + producer->setSocketOption(SIGNER, this->signer_); + producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); + producer->setSocketOption(DATA_PACKET_SIZE, + (uint32_t)(this->data_packet_size_)); + uint32_t output_buffer_size = 0; + this->getSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, + output_buffer_size); + producer->setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, + output_buffer_size); + + if (!rtc_) { + producer->setInterface(new interface::TLSProducerSocket(producer.get())); + } else { + // TODO + // TLSRTCProducerSocket *rtc_producer = + // dynamic_cast<TLSRTCProducerSocket *>(producer.get()); + // rtc_producer->setInterface( + // new interface::TLSRTCProducerSocket(rtc_producer)); + } +} + void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p, Interest &interest) { std::unique_lock<std::mutex> lck(mtx_); + std::unique_ptr<TLSProducerSocket> tls_producer; + auto it = map_producers.find(interest.getName()); + + if (it != map_producers.end()) { + return; + } + + if (!rtc_) { + tls_producer = + std::make_unique<TLSProducerSocket>(nullptr, this, interest.getName()); + } else { + // TODO + // tls_producer = std::make_unique<TLSRTCProducerSocket>(nullptr, this, + // interest.getName()); + } + + initSessionSocket(tls_producer); + TLSProducerSocket *tls_producer_ptr = tls_producer.get(); + map_producers.insert({interest.getName(), move(tls_producer)}); TRANSPORT_LOGD("Start handshake at %s", interest.getName().toString().c_str()); + if (!rtc_) { - auto it = map_secure_producers.find(interest.getName()); - if (it != map_secure_producers.end()) return; - TLSProducerSocket *tls_producer = - new TLSProducerSocket(nullptr, this, interest.getName()); - tls_producer->setInterface(new interface::TLSProducerSocket(tls_producer)); - - tls_producer->on_content_produced_application_ = - this->on_content_produced_application_; - tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, - this->content_object_expiry_time_); - tls_producer->setSocketOption(SIGNER, this->signer_); - tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); - tls_producer->setSocketOption(DATA_PACKET_SIZE, - (uint32_t)(this->data_packet_size_)); - tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); - map_secure_producers.insert( - {interest.getName(), std::unique_ptr<TLSProducerSocket>(tls_producer)}); - tls_producer->onInterest(*tls_producer, interest); - tls_producer->async_accept(); + tls_producer_ptr->onInterest(*tls_producer_ptr, interest); + tls_producer_ptr->async_accept(); } else { - auto it = map_secure_rtc_producers.find(interest.getName()); - if (it != map_secure_rtc_producers.end()) return; - TLSRTCProducerSocket *tls_producer = - new TLSRTCProducerSocket(nullptr, this, interest.getName()); - tls_producer->setInterface( - new interface::TLSRTCProducerSocket(tls_producer)); - tls_producer->on_content_produced_application_ = - this->on_content_produced_application_; - tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, - this->content_object_expiry_time_); - tls_producer->setSocketOption(SIGNER, this->signer_); - tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); - tls_producer->setSocketOption(DATA_PACKET_SIZE, - (uint32_t)(this->data_packet_size_)); - tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); - map_secure_rtc_producers.insert( - {interest.getName(), - std::unique_ptr<TLSRTCProducerSocket>(tls_producer)}); - tls_producer->onInterest(*tls_producer, interest); - tls_producer->async_accept(); + // TODO + // TLSRTCProducerSocket *rtc_producer_ptr = + // dynamic_cast<TLSRTCProducerSocket *>(tls_producer_ptr); + // rtc_producer_ptr->onInterest(*rtc_producer_ptr, interest); + // rtc_producer_ptr->async_accept(); } } -void P2PSecureProducerSocket::produce(const uint8_t *buffer, - size_t buffer_size) { +uint32_t P2PSecureProducerSocket::produceDatagram( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { + // TODO + throw errors::NotImplementedException(); + if (!rtc_) { throw errors::RuntimeException( "RTC must be the transport protocol to start the production of current " @@ -143,17 +155,23 @@ void P2PSecureProducerSocket::produce(const uint8_t *buffer, } std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_rtc_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_rtc_producers.cbegin(); - it != list_secure_rtc_producers.cend(); it++) { - (*it)->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); - } + if (list_producers.empty()) cv_.wait(lck); + + // TODO + // for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) + // { + // TLSRTCProducerSocket *rtc_producer = + // dynamic_cast<TLSRTCProducerSocket *>(it->get()); + // rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); + // } + + return 0; } -uint32_t P2PSecureProducerSocket::produce( - Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, - uint32_t start_offset) { +uint32_t P2PSecureProducerSocket::produceStream( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t start_offset) { if (rtc_) { throw errors::RuntimeException( "RTC transport protocol is not compatible with the production of " @@ -162,19 +180,21 @@ uint32_t P2PSecureProducerSocket::produce( std::unique_lock<std::mutex> lck(mtx_); uint32_t segments = 0; - if (list_secure_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) - segments += - (*it)->produce(content_name, buffer->clone(), is_last, start_offset); + if (list_producers.empty()) cv_.wait(lck); + + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) + segments += (*it)->produceStream(content_name, buffer->clone(), is_last, + start_offset); + return segments; } -uint32_t P2PSecureProducerSocket::produce(Name content_name, - const uint8_t *buffer, - size_t buffer_size, bool is_last, - uint32_t start_offset) { +uint32_t P2PSecureProducerSocket::produceStream(const Name &content_name, + const uint8_t *buffer, + size_t buffer_size, + bool is_last, + uint32_t start_offset) { if (rtc_) { throw errors::RuntimeException( "RTC transport protocol is not compatible with the production of " @@ -183,33 +203,34 @@ uint32_t P2PSecureProducerSocket::produce(Name content_name, std::unique_lock<std::mutex> lck(mtx_); uint32_t segments = 0; - if (list_secure_producers.empty()) cv_.wait(lck); + if (list_producers.empty()) cv_.wait(lck); + + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) + segments += (*it)->produceStream(content_name, buffer, buffer_size, is_last, + start_offset); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) - segments += (*it)->produce(content_name, buffer, buffer_size, is_last, - start_offset); return segments; } -void P2PSecureProducerSocket::asyncProduce(const Name &content_name, - const uint8_t *buf, - size_t buffer_size, bool is_last, - uint32_t *start_offset) { - if (rtc_) { - throw errors::RuntimeException( - "RTC transport protocol is not compatible with the production of " - "current data. Aborting."); - } - - std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_producers.empty()) cv_.wait(lck); - - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) { - (*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset); - } -} +// void P2PSecureProducerSocket::asyncProduce(const Name &content_name, +// const uint8_t *buf, +// size_t buffer_size, bool is_last, +// uint32_t *start_offset) { +// if (rtc_) { +// throw errors::RuntimeException( +// "RTC transport protocol is not compatible with the production of " +// "current data. Aborting."); +// } + +// std::unique_lock<std::mutex> lck(mtx_); +// if (list_producers.empty()) cv_.wait(lck); + +// for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) +// { +// (*it)->asyncProduce(content_name, buf, buffer_size, is_last, +// start_offset); +// } +// } void P2PSecureProducerSocket::asyncProduce( Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, @@ -221,22 +242,19 @@ void P2PSecureProducerSocket::asyncProduce( } std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_producers.empty()) cv_.wait(lck); + if (list_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset, last_segment); } } -// Socket Option Redefinition to avoid name hiding - +/* Redefinition of socket options to avoid name hiding */ int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerInterestCallback socket_option_value) { - if (!list_secure_producers.empty()) { - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); } @@ -268,10 +286,9 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Signer> &socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + const std::shared_ptr<auth::Signer> &socket_option_value) { + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); switch (socket_option_key) { @@ -288,9 +305,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption(int socket_option_key, uint32_t socket_option_value) { - if (!list_secure_producers.empty()) { - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); } switch (socket_option_key) { @@ -305,9 +321,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, int P2PSecureProducerSocket::setSocketOption(int socket_option_key, bool socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -316,20 +331,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, int P2PSecureProducerSocket::setSocketOption(int socket_option_key, Name *socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - - return ProducerSocket::setSocketOption(socket_option_key, - socket_option_value); -} - -int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, std::list<Prefix> socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -338,9 +341,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerContentObjectCallback socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -349,9 +351,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerContentCallback socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); switch (socket_option_key) { @@ -367,21 +368,9 @@ int P2PSecureProducerSocket::setSocketOption( } int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, utils::CryptoHashType socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - - return ProducerSocket::setSocketOption(socket_option_key, - socket_option_value); -} - -int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, utils::CryptoSuite socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + int socket_option_key, auth::CryptoHashType socket_option_value) { + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -390,9 +379,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, const std::string &socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -400,5 +388,4 @@ int P2PSecureProducerSocket::setSocketOption( } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h index 33339deba..b7c3d1958 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.h +++ b/libtransport/src/implementation/p2psecure_socket_producer.h @@ -15,15 +15,14 @@ #pragma once -#include <hicn/transport/security/identity.h> -#include <hicn/transport/security/signer.h> - +#include <hicn/transport/auth/identity.h> +#include <hicn/transport/auth/signer.h> #include <implementation/socket_producer.h> -#include <implementation/tls_rtc_socket_producer.h> +// #include <implementation/tls_rtc_socket_producer.h> #include <implementation/tls_socket_producer.h> +#include <openssl/ssl.h> #include <utils/content_store.h> -#include <openssl/ssl.h> #include <condition_variable> #include <forward_list> #include <mutex> @@ -33,37 +32,40 @@ namespace implementation { class P2PSecureProducerSocket : public ProducerSocket { friend class TLSProducerSocket; - friend class TLSRTCProducerSocket; + // TODO + // friend class TLSRTCProducerSocket; public: explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket); + explicit P2PSecureProducerSocket( interface::ProducerSocket *producer_socket, bool rtc, - const std::shared_ptr<utils::Identity> &identity); + const std::shared_ptr<auth::Identity> &identity); + ~P2PSecureProducerSocket(); - void produce(const uint8_t *buffer, size_t buffer_size) override; + uint32_t produceDatagram(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer) override; - uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size, - bool is_last = true, uint32_t start_offset = 0) override; + uint32_t produceStream(const Name &content_name, const uint8_t *buffer, + size_t buffer_size, bool is_last = true, + uint32_t start_offset = 0) override; - uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last = true, uint32_t start_offset = 0) override; + uint32_t produceStream(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, + uint32_t start_offset = 0) override; void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, uint32_t offset, uint32_t **last_segment = nullptr) override; - void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size, - bool is_last = true, - uint32_t *start_offset = nullptr) override; - int setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value) override; int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Signer> &socket_option_value) override; + const std::shared_ptr<auth::Signer> &socket_option_value) override; int setSocketOption(int socket_option_key, uint32_t socket_option_value) override; @@ -73,9 +75,6 @@ class P2PSecureProducerSocket : public ProducerSocket { int setSocketOption(int socket_option_key, Name *socket_option_value) override; - int setSocketOption(int socket_option_key, - std::list<Prefix> socket_option_value) override; - int setSocketOption( int socket_option_key, ProducerContentObjectCallback socket_option_value) override; @@ -84,19 +83,15 @@ class P2PSecureProducerSocket : public ProducerSocket { ProducerContentCallback socket_option_value) override; int setSocketOption(int socket_option_key, - utils::CryptoHashType socket_option_value) override; - - int setSocketOption(int socket_option_key, - utils::CryptoSuite socket_option_value) override; + auth::CryptoHashType socket_option_value) override; int setSocketOption(int socket_option_key, const std::string &socket_option_value) override; using ProducerSocket::getSocketOption; - using ProducerSocket::onInterest; + // using ProducerSocket::onInterest; protected: - bool rtc_; /* Callback invoked once an interest has been received and its payload * decrypted */ ProducerInterestCallback on_interest_input_decrypted_; @@ -104,27 +99,23 @@ class P2PSecureProducerSocket : public ProducerSocket { ProducerContentCallback on_content_produced_application_; private: + bool rtc_; std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - PARCBuffer *der_cert_; PARCBuffer *der_prk_; X509 *cert_509_; EVP_PKEY *pkey_rsa_; std::unordered_map<core::Name, std::unique_ptr<TLSProducerSocket>, core::hash<core::Name>, core::compare2<core::Name>> - map_secure_producers; - std::unordered_map<core::Name, std::unique_ptr<TLSRTCProducerSocket>, - core::hash<core::Name>, core::compare2<core::Name>> - map_secure_rtc_producers; - std::list<std::unique_ptr<TLSProducerSocket>> list_secure_producers; - std::list<std::unique_ptr<TLSRTCProducerSocket>> list_secure_rtc_producers; + map_producers; + std::list<std::unique_ptr<TLSProducerSocket>> list_producers; void onInterestCallback(interface::ProducerSocket &p, Interest &interest); + + void initSessionSocket(std::unique_ptr<TLSProducerSocket> &producer); }; } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/socket.cc b/libtransport/src/implementation/socket.cc new file mode 100644 index 000000000..2e21f2bc3 --- /dev/null +++ b/libtransport/src/implementation/socket.cc @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/global_configuration.h> +#include <implementation/socket.h> + +namespace transport { +namespace implementation { + +Socket::Socket(std::shared_ptr<core::Portal> &&portal) + : portal_(std::move(portal)), is_async_(false) {} + +} // namespace implementation +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/implementation/socket.h b/libtransport/src/implementation/socket.h index 2e51f3027..cf22c03e1 100644 --- a/libtransport/src/implementation/socket.h +++ b/libtransport/src/implementation/socket.h @@ -15,13 +15,12 @@ #pragma once +#include <core/facade.h> #include <hicn/transport/config.h> #include <hicn/transport/interfaces/callbacks.h> #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/interfaces/socket_options_keys.h> -#include <core/facade.h> - #define SOCKET_OPTION_GET 0 #define SOCKET_OPTION_NOT_GET 1 #define SOCKET_OPTION_SET 2 @@ -32,56 +31,23 @@ namespace transport { namespace implementation { // Forward Declarations -template <typename PortalType> class Socket; -// Define the portal and its connector, depending on the compilation options -// passed by the build tool. -using HicnForwarderPortal = core::HicnForwarderPortal; - -#ifdef __linux__ -#ifndef __ANDROID__ -using RawSocketPortal = core::RawSocketPortal; -#endif -#endif - -#ifdef __vpp__ -using VPPForwarderPortal = core::VPPForwarderPortal; -using BaseSocket = Socket<VPPForwarderPortal>; -using BasePortal = VPPForwarderPortal; -#else -using BaseSocket = Socket<HicnForwarderPortal>; -using BasePortal = HicnForwarderPortal; -#endif - -template <typename PortalType> class Socket { - static_assert(std::is_same<PortalType, HicnForwarderPortal>::value -#ifdef __linux__ -#ifndef __ANDROID__ - || std::is_same<PortalType, RawSocketPortal>::value -#ifdef __vpp__ - || std::is_same<PortalType, VPPForwarderPortal>::value -#endif -#endif - , -#else - , - -#endif - "This class is not allowed as Portal"); - public: - using Portal = PortalType; - - virtual asio::io_service &getIoService() = 0; - virtual void connect() = 0; - virtual bool isRunning() = 0; + virtual asio::io_service &getIoService() { return portal_->getIoService(); } + protected: + Socket(std::shared_ptr<core::Portal> &&portal); + virtual ~Socket(){}; + + protected: + std::shared_ptr<core::Portal> portal_; + bool is_async_; }; } // namespace implementation diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index 488f238ba..a7b6ac4e7 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -13,16 +13,17 @@ * limitations under the License. */ +#pragma once + #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/interfaces/statistics.h> -#include <hicn/transport/security/verifier.h> - +#include <hicn/transport/auth/verifier.h> +#include <hicn/transport/utils/event_thread.h> #include <protocols/cbr.h> -#include <protocols/protocol.h> #include <protocols/raaqm.h> -#include <protocols/rtc.h> -#include <utils/event_thread.h> +#include <protocols/rtc/rtc.h> +#include <protocols/transport_protocol.h> namespace transport { namespace implementation { @@ -31,11 +32,12 @@ using namespace core; using namespace interface; using ReadCallback = interface::ConsumerSocket::ReadCallback; -class ConsumerSocket : public Socket<BasePortal> { - public: - ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) - : consumer_interface_(consumer), - portal_(std::make_shared<Portal>()), +class ConsumerSocket : public Socket { + private: + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, + std::shared_ptr<core::Portal> &&portal) + : Socket(std::move(portal)), + consumer_interface_(consumer), async_downloader_(), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), @@ -54,14 +56,13 @@ class ConsumerSocket : public Socket<BasePortal> { rate_estimation_observer_(nullptr), rate_estimation_batching_parameter_(default_values::batch), rate_estimation_choice_(0), - verifier_(std::make_shared<utils::Verifier>()), + verifier_(std::make_shared<auth::VoidVerifier>()), verify_signature_(false), - key_content_(false), + reset_window_(false), on_interest_output_(VOID_HANDLER), on_interest_timeout_(VOID_HANDLER), on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), - on_content_object_verification_(VOID_HANDLER), stats_summary_(VOID_HANDLER), read_callback_(nullptr), timer_interval_milliseconds_(0), @@ -73,7 +74,7 @@ class ConsumerSocket : public Socket<BasePortal> { break; case TransportProtocolAlgorithms::RTC: transport_protocol_ = - std::make_unique<protocol::RTCTransportProtocol>(this); + std::make_unique<protocol::rtc::RTCTransportProtocol>(this); break; case TransportProtocolAlgorithms::RAAQM: default: @@ -83,6 +84,17 @@ class ConsumerSocket : public Socket<BasePortal> { } } + public: + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) + : ConsumerSocket(consumer, protocol, std::make_shared<core::Portal>()) {} + + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, + asio::io_service &io_service) + : ConsumerSocket(consumer, protocol, + std::make_shared<core::Portal>(io_service)) { + is_async_ = true; + } + ~ConsumerSocket() { stop(); async_downloader_.stop(); @@ -110,7 +122,7 @@ class ConsumerSocket : public Socket<BasePortal> { transport_protocol_->start(); - return CONSUMER_FINISHED; + return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED; } virtual int asyncConsume(const Name &name) { @@ -125,8 +137,6 @@ class ConsumerSocket : public Socket<BasePortal> { return CONSUMER_RUNNING; } - bool verifyKeyPackets() { return transport_protocol_->verifyKeyPackets(); } - void stop() { if (transport_protocol_->isRunning()) { transport_protocol_->stop(); @@ -139,8 +149,6 @@ class ConsumerSocket : public Socket<BasePortal> { } } - asio::io_service &getIoService() { return portal_->getIoService(); } - virtual int setSocketOption(int socket_option_key, ReadCallback *socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -303,12 +311,6 @@ class ConsumerSocket : public Socket<BasePortal> { break; } - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - if (socket_option_value == VOID_HANDLER) { - on_content_object_verification_ = VOID_HANDLER; - break; - } - default: return SOCKET_OPTION_NOT_SET; } @@ -321,13 +323,8 @@ class ConsumerSocket : public Socket<BasePortal> { int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { - case GeneralTransportOptions::VERIFY_SIGNATURE: - verify_signature_ = socket_option_value; - result = SOCKET_OPTION_SET; - break; - - case GeneralTransportOptions::KEY_CONTENT: - key_content_ = socket_option_value; + case RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET: + reset_window_ = socket_option_value; result = SOCKET_OPTION_SET; break; @@ -359,29 +356,6 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int setSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationCallback socket_option_value) { - // Reschedule the function on the io_service to avoid race condition in - // case setSocketOption is called while the io_service is running. - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ConsumerContentObjectVerificationCallback socket_option_value) - -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - on_content_object_verification_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - }); - } - int setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -415,51 +389,6 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int setSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationFailedCallback socket_option_value) { - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this]( - int socket_option_key, - ConsumerContentObjectVerificationFailedCallback socket_option_value) - -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::VERIFICATION_FAILED: - verification_failed_callback_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - }); - } - - // int setSocketOption( - // int socket_option_key, - // ConsumerContentObjectVerificationFailedCallback socket_option_value) { - // return rescheduleOnIOService( - // socket_option_key, socket_option_value, - // [this]( - // int socket_option_key, - // ConsumerContentObjectVerificationFailedCallback - // socket_option_value) - // -> int { - // switch (socket_option_key) { - // case ConsumerCallbacksOptions::VERIFICATION_FAILED: - // verification_failed_callback_ = socket_option_value; - // break; - - // default: - // return SOCKET_OPTION_NOT_SET; - // } - - // return SOCKET_OPTION_SET; - // }); - // } - int setSocketOption(int socket_option_key, IcnObserver *socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { @@ -476,7 +405,7 @@ class ConsumerSocket : public Socket<BasePortal> { int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Verifier> &socket_option_value) { + const std::shared_ptr<auth::Verifier> &socket_option_value) { int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { @@ -498,14 +427,6 @@ class ConsumerSocket : public Socket<BasePortal> { int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { - case GeneralTransportOptions::CERTIFICATE: - key_id_ = verifier_->addKeyFromCertificate(socket_option_value); - - if (key_id_ != nullptr) { - result = SOCKET_OPTION_SET; - } - break; - case DataLinkOptions::OUTPUT_INTERFACE: output_interface_ = socket_option_value; portal_->setOutputInterface(output_interface_); @@ -624,12 +545,12 @@ class ConsumerSocket : public Socket<BasePortal> { socket_option_value = transport_protocol_->isRunning(); break; - case GeneralTransportOptions::VERIFY_SIGNATURE: - socket_option_value = verify_signature_; + case GeneralTransportOptions::ASYNC_MODE: + socket_option_value = is_async_; break; - case GeneralTransportOptions::KEY_CONTENT: - socket_option_value = key_content_; + case RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET: + socket_option_value = reset_window_; break; default: @@ -673,29 +594,6 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int getSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationCallback **socket_option_value) { - // Reschedule the function on the io_service to avoid race condition in - // case setSocketOption is called while the io_service is running. - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ConsumerContentObjectVerificationCallback **socket_option_value) - -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - *socket_option_value = &on_content_object_verification_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - }); - } - int getSocketOption(int socket_option_key, ConsumerInterestCallback **socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -729,30 +627,8 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int getSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationFailedCallback **socket_option_value) { - // Reschedule the function on the io_service to avoid race condition in - // case setSocketOption is called while the io_service is running. - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ConsumerContentObjectVerificationFailedCallback * - *socket_option_value) -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::VERIFICATION_FAILED: - *socket_option_value = &verification_failed_callback_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - }); - } - int getSocketOption(int socket_option_key, - std::shared_ptr<Portal> &socket_option_value) { + std::shared_ptr<core::Portal> &socket_option_value) { switch (socket_option_key) { case PORTAL: socket_option_value = portal_; @@ -781,7 +657,7 @@ class ConsumerSocket : public Socket<BasePortal> { } int getSocketOption(int socket_option_key, - std::shared_ptr<utils::Verifier> &socket_option_value) { + std::shared_ptr<auth::Verifier> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::VERIFIER: socket_option_value = verifier_; @@ -845,7 +721,7 @@ class ConsumerSocket : public Socket<BasePortal> { // To enforce type check std::function<int(int, arg2)> func = lambda_func; int result = SOCKET_OPTION_SET; - if (transport_protocol_->isRunning()) { + if (transport_protocol_ && transport_protocol_->isRunning()) { std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; @@ -872,7 +748,6 @@ class ConsumerSocket : public Socket<BasePortal> { protected: interface::ConsumerSocket *consumer_interface_; - std::shared_ptr<Portal> portal_; utils::EventThread async_downloader_; // No need to protect from multiple accesses in the async consumer @@ -900,22 +775,18 @@ class ConsumerSocket : public Socket<BasePortal> { int rate_estimation_batching_parameter_; int rate_estimation_choice_; - bool is_async_; - // Verification parameters - std::shared_ptr<utils::Verifier> verifier_; + std::shared_ptr<auth::Verifier> verifier_; PARCKeyId *key_id_; std::atomic_bool verify_signature_; - bool key_content_; + bool reset_window_; ConsumerInterestCallback on_interest_retransmission_; ConsumerInterestCallback on_interest_output_; ConsumerInterestCallback on_interest_timeout_; ConsumerInterestCallback on_interest_satisfied_; ConsumerContentObjectCallback on_content_object_input_; - ConsumerContentObjectVerificationCallback on_content_object_verification_; ConsumerTimerCallback stats_summary_; - ConsumerContentObjectVerificationFailedCallback verification_failed_callback_; ReadCallback *read_callback_; @@ -932,4 +803,4 @@ class ConsumerSocket : public Socket<BasePortal> { }; } // namespace implementation -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index 9ccc59d9e..f511f7743 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -15,12 +15,12 @@ #pragma once -#include <hicn/transport/security/signer.h> - +#include <hicn/transport/auth/signer.h> +#include <hicn/transport/utils/event_thread.h> #include <implementation/socket.h> - +#include <protocols/prod_protocol_bytestream.h> +#include <protocols/prod_protocol_rtc.h> #include <utils/content_store.h> -#include <utils/event_thread.h> #include <utils/suffix_strategy.h> #include <atomic> @@ -41,21 +41,17 @@ namespace implementation { using namespace core; using namespace interface; -class ProducerSocket : public Socket<BasePortal>, - public BasePortal::ProducerCallback { - static constexpr uint32_t burst_size = 256; - - public: - explicit ProducerSocket(interface::ProducerSocket *producer_socket) - : producer_interface_(producer_socket), - portal_(std::make_shared<Portal>(io_service_)), +class ProducerSocket : public Socket { + private: + ProducerSocket(interface::ProducerSocket *producer_socket, int protocol, + std::shared_ptr<core::Portal> &&portal) + : Socket(std::move(portal)), + producer_interface_(producer_socket), data_packet_size_(default_values::content_object_packet_size), content_object_expiry_time_(default_values::content_object_expiry_time), - output_buffer_(default_values::producer_socket_output_buffer_size), async_thread_(), - registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), - hash_algorithm_(utils::CryptoHashType::SHA_256), + hash_algorithm_(auth::CryptoHashType::SHA_256), suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), @@ -67,15 +63,33 @@ class ProducerSocket : public Socket<BasePortal>, on_content_object_in_output_buffer_(VOID_HANDLER), on_content_object_output_(VOID_HANDLER), on_content_object_evicted_from_output_buffer_(VOID_HANDLER), - on_content_produced_(VOID_HANDLER) {} - - virtual ~ProducerSocket() { - stop(); - if (listening_thread_.joinable()) { - listening_thread_.join(); + on_content_produced_(VOID_HANDLER) { + switch (protocol) { + case ProductionProtocolAlgorithms::RTC_PROD: + production_protocol_ = + std::make_unique<protocol::RTCProductionProtocol>(this); + break; + case ProductionProtocolAlgorithms::BYTE_STREAM: + default: + production_protocol_ = + std::make_unique<protocol::ByteStreamProductionProtocol>(this); + break; } } + public: + ProducerSocket(interface::ProducerSocket *producer, int protocol) + : ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {} + + ProducerSocket(interface::ProducerSocket *producer, int protocol, + asio::io_service &io_service) + : ProducerSocket(producer, protocol, + std::make_shared<core::Portal>(io_service)) { + is_async_ = true; + } + + virtual ~ProducerSocket() {} + interface::ProducerSocket *getInterface() { return producer_interface_; } @@ -86,293 +100,10 @@ class ProducerSocket : public Socket<BasePortal>, void connect() override { portal_->connect(false); - listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this)); - } - - bool isRunning() override { return !io_service_.stopped(); }; - - virtual uint32_t produce(Name content_name, const uint8_t *buffer, - size_t buffer_size, bool is_last = true, - uint32_t start_offset = 0) { - return ProducerSocket::produce( - content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), is_last, - start_offset); - } - - virtual uint32_t produce(Name content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last = true, uint32_t start_offset = 0) { - if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) { - return 0; - } - - // Copy the atomic variables to ensure they keep the same value - // during the production - std::size_t data_packet_size = data_packet_size_; - uint32_t content_object_expiry_time = content_object_expiry_time_; - utils::CryptoHashType hash_algo = hash_algorithm_; - bool making_manifest = making_manifest_; - auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( - suffix_strategy_, start_offset); - std::shared_ptr<utils::Signer> signer; - getSocketOption(GeneralTransportOptions::SIGNER, signer); - - auto buffer_size = buffer->length(); - int bytes_segmented = 0; - std::size_t header_size; - std::size_t manifest_header_size = 0; - std::size_t signature_length = 0; - std::uint32_t final_block_number = start_offset; - uint64_t free_space_for_content = 0; - - core::Packet::Format format; - std::shared_ptr<ContentObjectManifest> manifest; - bool is_last_manifest = false; - - // TODO Manifest may still be used for indexing - if (making_manifest && !signer) { - TRANSPORT_LOGD("Making manifests without setting producer identity."); - } - - core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; - core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC; - if (content_name.getType() == HNT_CONTIGUOUS_V4 || - content_name.getType() == HNT_IOV_V4) { - hf_format = core::Packet::Format::HF_INET_TCP; - hf_format_ah = core::Packet::Format::HF_INET_TCP_AH; - } else if (content_name.getType() == HNT_CONTIGUOUS_V6 || - content_name.getType() == HNT_IOV_V6) { - hf_format = core::Packet::Format::HF_INET6_TCP; - hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH; - } else { - throw errors::RuntimeException("Unknown name format."); - } - - format = hf_format; - if (making_manifest) { - manifest_header_size = core::Packet::getHeaderSizeFromFormat( - signer ? hf_format_ah : hf_format, - signer ? signer->getSignatureLength() : 0); - } else if (signer) { - format = hf_format_ah; - signature_length = signer->getSignatureLength(); - } - - header_size = - core::Packet::getHeaderSizeFromFormat(format, signature_length); - free_space_for_content = data_packet_size - header_size; - uint32_t number_of_segments = uint32_t( - std::ceil(double(buffer_size) / double(free_space_for_content))); - if (free_space_for_content * number_of_segments < buffer_size) { - number_of_segments++; - } - - // TODO allocate space for all the headers - if (making_manifest) { - uint32_t segment_in_manifest = static_cast<uint32_t>( - std::floor(double(data_packet_size - manifest_header_size - - ContentObjectManifest::getManifestHeaderSize()) / - ContentObjectManifest::getManifestEntrySize()) - - 1.0); - uint32_t number_of_manifests = static_cast<uint32_t>( - std::ceil(float(number_of_segments) / segment_in_manifest)); - final_block_number += number_of_segments + number_of_manifests - 1; - - manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), - core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, - hash_algo, is_last_manifest, content_name, suffix_strategy_, - signer ? signer->getSignatureLength() : 0)); - manifest->setLifetime(content_object_expiry_time); - - if (is_last) { - manifest->setFinalBlockNumber(final_block_number); - } else { - manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX); - } - } - - TRANSPORT_LOGD("--------- START PRODUCE ----------"); - for (unsigned int packaged_segments = 0; - packaged_segments < number_of_segments; packaged_segments++) { - if (making_manifest) { - if (manifest->estimateManifestSize(2) > - data_packet_size - manifest_header_size) { - // Send the current manifest - manifest->encode(); - - // If identity set, sign manifest - if (signer) { - signer->sign(*manifest); - } - - passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); - - // Send content objects stored in the queue - while (!content_queue_.empty()) { - passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %u", - content_queue_.front()->getName().getSuffix()); - content_queue_.pop(); - } - - // Create new manifest. The reference to the last manifest has been - // acquired in the passContentObjectToCallbacks function, so we can - // safely release this reference - manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), - core::ManifestVersion::VERSION_1, - core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, - content_name, suffix_strategy_, - signer ? signer->getSignatureLength() : 0)); - - manifest->setLifetime(content_object_expiry_time); - manifest->setFinalBlockNumber( - is_last ? final_block_number - : utils::SuffixStrategy::INVALID_SUFFIX); - } - } - - auto content_suffix = suffix_strategy->getNextContentSuffix(); - auto content_object = std::make_shared<ContentObject>( - content_name.setSuffix(content_suffix), format); - content_object->setLifetime(content_object_expiry_time); - - auto b = buffer->cloneOne(); - b->trimStart(free_space_for_content * packaged_segments); - b->trimEnd(b->length()); - - if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) { - b->append(buffer_size - bytes_segmented); - bytes_segmented += (int)(buffer_size - bytes_segmented); - - if (is_last && making_manifest) { - is_last_manifest = true; - } else if (is_last) { - content_object->setRst(); - } - - } else { - b->append(free_space_for_content); - bytes_segmented += (int)(free_space_for_content); - } - - content_object->appendPayload(std::move(b)); - - if (making_manifest) { - using namespace std::chrono_literals; - utils::CryptoHash hash = content_object->computeDigest(hash_algo); - manifest->addSuffixHash(content_suffix, hash); - content_queue_.push(content_object); - } else { - if (signer) { - signer->sign(*content_object); - } - passContentObjectToCallbacks(content_object); - TRANSPORT_LOGD("Send content %u", - content_object->getName().getSuffix()); - } - } - - if (making_manifest) { - if (is_last_manifest) { - manifest->setFinalManifest(is_last_manifest); - } - - manifest->encode(); - if (signer) { - signer->sign(*manifest); - } - - passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); - while (!content_queue_.empty()) { - passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %u", - content_queue_.front()->getName().getSuffix()); - content_queue_.pop(); - } - } - - io_service_.post([this]() { - std::shared_ptr<ContentObject> co; - while (object_queue_for_callbacks_.pop(co)) { - if (on_new_segment_) { - on_new_segment_(*producer_interface_, *co); - } - - if (on_content_object_to_sign_) { - on_content_object_to_sign_(*producer_interface_, *co); - } - - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, *co); - } - - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *co); - } - } - }); - - io_service_.dispatch([this, buffer_size]() { - if (on_content_produced_) { - on_content_produced_(*producer_interface_, - std::make_error_code(std::errc(0)), buffer_size); - } - }); - - return suffix_strategy->getTotalCount(); - } - - virtual void produce(ContentObject &content_object) { - io_service_.dispatch([this, &content_object]() { - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, - content_object); - } - }); - - output_buffer_.insert(std::static_pointer_cast<ContentObject>( - content_object.shared_from_this())); - - io_service_.dispatch([this, &content_object]() { - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, content_object); - } - }); - - portal_->sendContentObject(content_object); - } - - virtual void produce(const uint8_t *buffer, size_t buffer_size) { - produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); - } - - virtual void produce(std::unique_ptr<utils::MemBuf> &&buffer) { - // This API is meant to be used just with the RTC producer. - // Here it cannot be used since no name for the content is specified. - throw errors::NotImplementedException(); + production_protocol_->start(); } - virtual void asyncProduce(const Name &suffix, const uint8_t *buf, - size_t buffer_size, bool is_last = true, - uint32_t *start_offset = nullptr) { - if (!async_thread_.stopped()) { - async_thread_.add([this, suffix, buffer = buf, size = buffer_size, - is_last, start_offset]() { - if (start_offset != nullptr) { - *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last, - *start_offset); - } else { - ProducerSocket::produce(suffix, buffer, size, is_last, 0); - } - }); - } - } - - void asyncProduce(const Name &suffix); + bool isRunning() override { return !production_protocol_->isRunning(); }; virtual void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, @@ -380,75 +111,56 @@ class ProducerSocket : public Socket<BasePortal>, uint32_t **last_segment = nullptr) { if (!async_thread_.stopped()) { auto a = buffer.release(); - async_thread_.add( - [this, content_name, a, is_last, offset, last_segment]() { - auto buf = std::unique_ptr<utils::MemBuf>(a); - if (last_segment != NULL) { - **last_segment = - offset + ProducerSocket::produce(content_name, std::move(buf), - is_last, offset); - } else { - ProducerSocket::produce(content_name, std::move(buf), is_last, - offset); - } - }); - } - } - - virtual void asyncProduce(ContentObject &content_object) { - if (!async_thread_.stopped()) { - auto co_ptr = std::static_pointer_cast<ContentObject>( - content_object.shared_from_this()); - async_thread_.add([this, content_object = std::move(co_ptr)]() { - ProducerSocket::produce(*content_object); + async_thread_.add([this, content_name, a, is_last, offset, + last_segment]() { + auto buf = std::unique_ptr<utils::MemBuf>(a); + if (last_segment != NULL) { + **last_segment = offset + produceStream(content_name, std::move(buf), + is_last, offset); + } else { + produceStream(content_name, std::move(buf), is_last, offset); + } }); } } - virtual void registerPrefix(const Prefix &producer_namespace) { - served_namespaces_.push_back(producer_namespace); + virtual uint32_t produceStream(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, + uint32_t start_offset = 0) { + return production_protocol_->produceStream(content_name, std::move(buffer), + is_last, start_offset); } - void serveForever() { - if (listening_thread_.joinable()) { - listening_thread_.join(); - } + virtual uint32_t produceStream(const Name &content_name, + const uint8_t *buffer, size_t buffer_size, + bool is_last = true, + uint32_t start_offset = 0) { + return production_protocol_->produceStream( + content_name, buffer, buffer_size, is_last, start_offset); } - void stop() { portal_->stopEventsLoop(); } - - asio::io_service &getIoService() override { return portal_->getIoService(); }; - - virtual void onInterest(Interest &interest) { - if (on_interest_input_) { - on_interest_input_(*producer_interface_, interest); - } - - const std::shared_ptr<ContentObject> content_object = - output_buffer_.find(interest); - - if (content_object) { - if (on_interest_satisfied_output_buffer_) { - on_interest_satisfied_output_buffer_(*producer_interface_, interest); - } + virtual uint32_t produceDatagram(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer) { + return production_protocol_->produceDatagram(content_name, + std::move(buffer)); + } - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *content_object); - } + virtual uint32_t produceDatagram(const Name &content_name, + const uint8_t *buffer, size_t buffer_size) { + return production_protocol_->produceDatagram(content_name, buffer, + buffer_size); + } - portal_->sendContentObject(*content_object); - } else { - if (on_interest_process_) { - on_interest_process_(*producer_interface_, interest); - } - } + void produce(ContentObject &content_object) { + production_protocol_->produce(content_object); } - virtual void onInterest(Interest::Ptr &&interest) override { - onInterest(*interest); - }; + void registerPrefix(const Prefix &producer_namespace) { + production_protocol_->registerNamespaceWithNetwork(producer_namespace); + } - virtual void onError(std::error_code ec) override {} + void stop() { production_protocol_->stop(); } virtual int setSocketOption(int socket_option_key, uint32_t socket_option_value) { @@ -461,7 +173,7 @@ class ProducerSocket : public Socket<BasePortal>, break; case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - output_buffer_.setLimit(socket_option_value); + production_protocol_->setOutputBufferSize(socket_option_value); break; case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: @@ -532,6 +244,12 @@ class ProducerSocket : public Socket<BasePortal>, break; } + case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN: + if (socket_option_value == VOID_HANDLER) { + on_content_object_to_sign_ = VOID_HANDLER; + break; + } + default: return SOCKET_OPTION_NOT_SET; } @@ -558,19 +276,6 @@ class ProducerSocket : public Socket<BasePortal>, return SOCKET_OPTION_NOT_SET; } - virtual int setSocketOption(int socket_option_key, - std::list<Prefix> socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - served_namespaces_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - virtual int setSocketOption( int socket_option_key, interface::ProducerContentObjectCallback socket_option_value) { @@ -593,6 +298,10 @@ class ProducerSocket : public Socket<BasePortal>, on_content_object_output_ = socket_option_value; break; + case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN: + on_content_object_to_sign_ = socket_option_value; + break; + default: return SOCKET_OPTION_NOT_SET; } @@ -662,7 +371,7 @@ class ProducerSocket : public Socket<BasePortal>, } virtual int setSocketOption(int socket_option_key, - utils::CryptoHashType socket_option_value) { + auth::CryptoHashType socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::HASH_ALGORITHM: hash_algorithm_ = socket_option_value; @@ -674,11 +383,12 @@ class ProducerSocket : public Socket<BasePortal>, return SOCKET_OPTION_SET; } - virtual int setSocketOption(int socket_option_key, - utils::CryptoSuite socket_option_value) { + virtual int setSocketOption( + int socket_option_key, + core::NextSegmentCalculationStrategy socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::CRYPTO_SUITE: - crypto_suite_ = socket_option_value; + case GeneralTransportOptions::SUFFIX_STRATEGY: + suffix_strategy_ = socket_option_value; break; default: return SOCKET_OPTION_NOT_SET; @@ -689,7 +399,7 @@ class ProducerSocket : public Socket<BasePortal>, virtual int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Signer> &socket_option_value) { + const std::shared_ptr<auth::Signer> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SIGNER: { utils::SpinLock::Acquire locked(signer_lock_); @@ -707,7 +417,7 @@ class ProducerSocket : public Socket<BasePortal>, uint32_t &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)output_buffer_.getLimit(); + socket_option_value = (uint32_t)production_protocol_->getOutputBufferSize(); break; case GeneralTransportOptions::DATA_PACKET_SIZE: @@ -732,18 +442,8 @@ class ProducerSocket : public Socket<BasePortal>, socket_option_value = making_manifest_; break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - virtual int getSocketOption(int socket_option_key, - std::list<Prefix> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - socket_option_value = served_namespaces_; + case GeneralTransportOptions::ASYNC_MODE: + socket_option_value = is_async_; break; default: @@ -775,6 +475,10 @@ class ProducerSocket : public Socket<BasePortal>, *socket_option_value = &on_content_object_output_; break; + case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN: + *socket_option_value = &on_content_object_to_sign_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -827,11 +531,11 @@ class ProducerSocket : public Socket<BasePortal>, *socket_option_value = &on_interest_inserted_input_buffer_; break; - case CACHE_HIT: + case ProducerCallbacksOptions::CACHE_HIT: *socket_option_value = &on_interest_satisfied_output_buffer_; break; - case CACHE_MISS: + case ProducerCallbacksOptions::CACHE_MISS: *socket_option_value = &on_interest_process_; break; @@ -843,8 +547,9 @@ class ProducerSocket : public Socket<BasePortal>, }); } - virtual int getSocketOption(int socket_option_key, - std::shared_ptr<Portal> &socket_option_value) { + virtual int getSocketOption( + int socket_option_key, + std::shared_ptr<core::Portal> &socket_option_value) { switch (socket_option_key) { case PORTAL: socket_option_value = portal_; @@ -858,7 +563,7 @@ class ProducerSocket : public Socket<BasePortal>, } virtual int getSocketOption(int socket_option_key, - utils::CryptoHashType &socket_option_value) { + auth::CryptoHashType &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::HASH_ALGORITHM: socket_option_value = hash_algorithm_; @@ -870,22 +575,22 @@ class ProducerSocket : public Socket<BasePortal>, return SOCKET_OPTION_GET; } - virtual int getSocketOption(int socket_option_key, - utils::CryptoSuite &socket_option_value) { + virtual int getSocketOption( + int socket_option_key, + core::NextSegmentCalculationStrategy &socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - socket_option_value = crypto_suite_; + case GeneralTransportOptions::SUFFIX_STRATEGY: + socket_option_value = suffix_strategy_; break; default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } virtual int getSocketOption( int socket_option_key, - std::shared_ptr<utils::Signer> &socket_option_value) { + std::shared_ptr<auth::Signer> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SIGNER: { utils::SpinLock::Acquire locked(signer_lock_); @@ -906,19 +611,21 @@ class ProducerSocket : public Socket<BasePortal>, // If the thread calling lambda_func is not the same of io_service, this // function reschedule the function on it template <typename Lambda, typename arg2> - int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, - Lambda lambda_func) { + int rescheduleOnIOServiceWithReference(int socket_option_key, + arg2 &socket_option_value, + Lambda lambda_func) { // To enforce type check - std::function<int(int, arg2)> func = lambda_func; + std::function<int(int, arg2 &)> func = lambda_func; int result = SOCKET_OPTION_SET; - if (listening_thread_.joinable() && - std::this_thread::get_id() != listening_thread_.get_id()) { + if (production_protocol_ && production_protocol_->isRunning()) { std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; + bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getIoService().dispatch([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -938,29 +645,28 @@ class ProducerSocket : public Socket<BasePortal>, // If the thread calling lambda_func is not the same of io_service, this // function reschedule the function on it template <typename Lambda, typename arg2> - int rescheduleOnIOServiceWithReference(int socket_option_key, - arg2 &socket_option_value, - Lambda lambda_func) { + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func) { // To enforce type check - std::function<int(int, arg2 &)> func = lambda_func; + std::function<int(int, arg2)> func = lambda_func; int result = SOCKET_OPTION_SET; - if (listening_thread_.joinable() && - std::this_thread::get_id() != this->listening_thread_.get_id()) { + if (production_protocol_ && production_protocol_->isRunning()) { std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; - std::unique_lock<std::mutex> lck(mtx); bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getIoService().dispatch([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); - - if (!done) { - cv.wait(lck); - } + cv.notify_all(); }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } } else { result = func(socket_option_key, socket_option_value); } @@ -971,39 +677,20 @@ class ProducerSocket : public Socket<BasePortal>, // Threads protected: interface::ProducerSocket *producer_interface_; - std::thread listening_thread_; asio::io_service io_service_; - std::shared_ptr<Portal> portal_; std::atomic<size_t> data_packet_size_; - std::list<Prefix> - served_namespaces_; // No need to be threadsafe, this is always modified - // by the application thread std::atomic<uint32_t> content_object_expiry_time_; - utils::CircularFifo<std::shared_ptr<ContentObject>, 2048> - object_queue_for_callbacks_; - - // buffers - // ContentStore is thread-safe - utils::ContentStore output_buffer_; - utils::EventThread async_thread_; - int registration_status_; std::atomic<bool> making_manifest_; - - // map for storing sequence numbers for several calls of the publish - // function - std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_; - - std::atomic<utils::CryptoHashType> hash_algorithm_; - std::atomic<utils::CryptoSuite> crypto_suite_; + std::atomic<auth::CryptoHashType> hash_algorithm_; + std::atomic<auth::CryptoSuite> crypto_suite_; utils::SpinLock signer_lock_; - std::shared_ptr<utils::Signer> signer_; + std::shared_ptr<auth::Signer> signer_; core::NextSegmentCalculationStrategy suffix_strategy_; - // While manifests are being built, contents are stored in a queue - std::queue<std::shared_ptr<ContentObject>> content_queue_; + std::unique_ptr<protocol::ProductionProtocol> production_protocol_; // callbacks ProducerInterestCallback on_interest_input_; @@ -1019,63 +706,6 @@ class ProducerSocket : public Socket<BasePortal>, ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_; ProducerContentCallback on_content_produced_; - - private: - void listen() { - bool first = true; - - for (core::Prefix &producer_namespace : served_namespaces_) { - if (first) { - core::BindConfig bind_config(producer_namespace, 1000); - portal_->bind(bind_config); - portal_->setProducerCallback(this); - first = !first; - } else { - portal_->registerRoute(producer_namespace); - } - } - - portal_->runEventsLoop(); - } - - void scheduleSendBurst() { - io_service_.post([this]() { - std::shared_ptr<ContentObject> co; - - for (uint32_t i = 0; i < burst_size; i++) { - if (object_queue_for_callbacks_.pop(co)) { - if (on_new_segment_) { - on_new_segment_(*producer_interface_, *co); - } - - if (on_content_object_to_sign_) { - on_content_object_to_sign_(*producer_interface_, *co); - } - - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, *co); - } - - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *co); - } - } else { - break; - } - } - }); - } - - void passContentObjectToCallbacks( - const std::shared_ptr<ContentObject> &content_object) { - output_buffer_.insert(content_object); - portal_->sendContentObject(*content_object); - object_queue_for_callbacks_.push(std::move(content_object)); - - if (object_queue_for_callbacks_.size() >= burst_size) { - scheduleSendBurst(); - } - } }; } // namespace implementation diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc index 3b3152993..9a62c8683 100644 --- a/libtransport/src/implementation/tls_rtc_socket_producer.cc +++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc @@ -15,10 +15,8 @@ #include <hicn/transport/core/interest.h> #include <hicn/transport/interfaces/p2psecure_socket_producer.h> - #include <implementation/p2psecure_socket_producer.h> #include <implementation/tls_rtc_socket_producer.h> - #include <openssl/bio.h> #include <openssl/rand.h> #include <openssl/ssl.h> @@ -53,7 +51,7 @@ int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) { (socket->cv_).wait(lck); } - utils::MemBuf *membuf = socket->packet_->next(); + utils::MemBuf *membuf = socket->handshake_packet_->next(); int size_to_read; if ((int)membuf->length() > size) { @@ -91,10 +89,10 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { TLSRTCProducerSocket *socket; socket = (TLSRTCProducerSocket *)BIO_get_data(b); - if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && - socket->first_) { - socket->tls_chunks_--; + if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { bool making_manifest = socket->parent_->making_manifest_; + + socket->tls_chunks_--; socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, false); socket->parent_->ProducerSocket::produce( @@ -107,13 +105,17 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { std::unique_ptr<utils::MemBuf> mbuf = utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { socket->to_call_oncontentproduced_--; auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->RTCProducerSocket::produce(std::move(mbuf)); + ProducerContentCallback on_content_produced_application; socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && on_content_produced_application) { on_content_produced_application( @@ -144,24 +146,28 @@ TLSRTCProducerSocket::TLSRTCProducerSocket( } void TLSRTCProducerSocket::accept() { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { tls_chunks_ = 1; int result = SSL_accept(ssl_); + if (result != 1) throw errors::RuntimeException("Unable to perform client handshake"); } TRANSPORT_LOGD("Handshake performed!"); - parent_->list_secure_rtc_producers.push_front( - std::move(parent_->map_secure_rtc_producers[handshake_name_])); - parent_->map_secure_rtc_producers.erase(handshake_name_); + + parent_->list_producers.push_front( + std::move(parent_->map_producers[handshake_name_])); + parent_->map_producers.erase(handshake_name_); ProducerInterestCallback on_interest_process_decrypted; getSocketOption(ProducerCallbacksOptions::CACHE_MISS, on_interest_process_decrypted); if (on_interest_process_decrypted) { - Interest inter(std::move(packet_)); + Interest inter(std::move(handshake_packet_)); on_interest_process_decrypted( (transport::interface::ProducerSocket &)(*getInterface()), inter); } @@ -181,7 +187,9 @@ int TLSRTCProducerSocket::async_accept() { } void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state != SERVER_FINISHED) { throw errors::RuntimeException( "New handshake on the same P2P secure producer socket not supported"); } @@ -197,5 +205,4 @@ void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.h b/libtransport/src/implementation/tls_rtc_socket_producer.h index 685c91244..92c657afc 100644 --- a/libtransport/src/implementation/tls_rtc_socket_producer.h +++ b/libtransport/src/implementation/tls_rtc_socket_producer.h @@ -15,7 +15,6 @@ #pragma once -#include <implementation/rtc_socket_producer.h> #include <implementation/tls_socket_producer.h> namespace transport { @@ -23,8 +22,7 @@ namespace implementation { class P2PSecureProducerSocket; -class TLSRTCProducerSocket : public RTCProducerSocket, - public TLSProducerSocket { +class TLSRTCProducerSocket : public TLSProducerSocket { friend class P2PSecureProducerSocket; public: @@ -34,7 +32,8 @@ class TLSRTCProducerSocket : public RTCProducerSocket, ~TLSRTCProducerSocket() = default; - void produce(std::unique_ptr<utils::MemBuf> &&buffer) override; + uint32_t produceDatagram(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer) override; void accept() override; diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc index 95b287aa6..65472b41d 100644 --- a/libtransport/src/implementation/tls_socket_consumer.cc +++ b/libtransport/src/implementation/tls_socket_consumer.cc @@ -14,7 +14,6 @@ */ #include <implementation/tls_socket_consumer.h> - #include <openssl/bio.h> #include <openssl/ssl.h> #include <openssl/tls1.h> @@ -46,11 +45,13 @@ int readOldTLS(BIO *b, char *buf, int size) { socket->network_name_.setSuffix(socket->random_suffix_); socket->ConsumerSocket::asyncConsume(socket->network_name_); } + if (!socket->something_to_read_) socket->cv_.wait(lck); } size_t size_to_read, read; size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) chain_size = socket->head_->computeChainDataLength(); @@ -74,7 +75,7 @@ int readOldTLS(BIO *b, char *buf, int size) { } } - return read; + return (int)read; } /* Return the number of read bytes in readbytes */ @@ -101,6 +102,7 @@ int writeOldTLS(BIO *b, const char *buf, int num) { socket = (TLSConsumerSocket *)BIO_get_data(b); socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( ConsumerCallbacksOptions::INTEREST_OUTPUT, (ConsumerInterestCallback)std::bind( @@ -134,7 +136,6 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, int protocol, SSL *ssl) : ConsumerSocket(consumer_socket, protocol), name_(), - buf_pool_(), decrypted_content_(), payload_(), head_(), @@ -176,12 +177,6 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - - ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - std::default_random_engine generator; std::uniform_int_distribution<int> distribution( 1, std::numeric_limits<uint32_t>::max()); @@ -191,10 +186,8 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, this); }; -/* - * The producer interface is not owned by the application, so is TLSSocket task - * to deallocate the memory - */ +/* The producer interface is not owned by the application, so is TLSSocket task + * to deallocate the memory */ TLSConsumerSocket::~TLSConsumerSocket() { delete consumer_interface_; } int TLSConsumerSocket::consume(const Name &name, @@ -228,22 +221,16 @@ int TLSConsumerSocket::download_content(const Name &name) { something_to_read_ = false; content_downloaded_ = false; - decrypted_content_ = utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH); - uint8_t *buf = decrypted_content_->writableData(); - size_t size = 0; + std::size_t max_buffer_size = read_callback_decrypted_->maxBufferSize(); + std::size_t buffer_size = + read_callback_decrypted_->maxBufferSize() + SSL3_RT_MAX_PLAIN_LENGTH; + decrypted_content_ = utils::MemBuf::createCombined(buffer_size); int result = -1; + std::size_t size = 0; while (!content_downloaded_ || something_to_read_) { - if (decrypted_content_->tailroom() < SSL3_RT_MAX_PLAIN_LENGTH) { - decrypted_content_->appendChain( - utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH)); - // decrypted_content_->computeChainDataLength(); - buf = decrypted_content_->prev()->writableData(); - } else { - buf = decrypted_content_->writableTail(); - } - - result = SSL_read(this->ssl_, buf, SSL3_RT_MAX_PLAIN_LENGTH); + result = SSL_read(this->ssl_, decrypted_content_->writableTail(), + SSL3_RT_MAX_PLAIN_LENGTH); /* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of * the data has been fully downloaded */ @@ -253,20 +240,20 @@ int TLSConsumerSocket::download_content(const Name &name) { if (result >= 0) { size += result; - decrypted_content_->prepend(result); - } else + decrypted_content_->append(result); + } else { throw errors::RuntimeException("Unable to download content"); + } - if (size >= read_callback_decrypted_->maxBufferSize()) { + if (decrypted_content_->length() >= max_buffer_size) { if (read_callback_decrypted_->isBufferMovable()) { - // No need to perform an additional copy. The whole buffer will be - // tranferred to the application. - + /* No need to perform an additional copy. The whole buffer will be + * tranferred to the application. */ read_callback_decrypted_->readBufferAvailable( std::move(decrypted_content_)); - decrypted_content_ = utils::MemBuf::create(SSL3_RT_MAX_PLAIN_LENGTH); + decrypted_content_ = utils::MemBuf::create(buffer_size); } else { - // The buffer will be copied into the application-provided buffer + /* The buffer will be copied into the application-provided buffer */ uint8_t *buffer; std::size_t length; std::size_t total_length = decrypted_content_->length(); @@ -316,10 +303,7 @@ int TLSConsumerSocket::asyncConsume(const Name &name) { } if (!async_downloader_tls_.stopped()) { - async_downloader_tls_.add([this, name]() { - is_async_ = true; - download_content(name); - }); + async_downloader_tls_.add([this, name]() { download_content(name); }); } return CONSUMER_RUNNING; @@ -358,6 +342,7 @@ size_t TLSConsumerSocket::maxBufferSize() const { void TLSConsumerSocket::readBufferAvailable( std::unique_ptr<utils::MemBuf> &&buffer) noexcept { std::unique_lock<std::mutex> lck(this->mtx_); + if (head_) { head_->prependChain(std::move(buffer)); } else { @@ -380,5 +365,4 @@ void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept { bool TLSConsumerSocket::isBufferMovable() noexcept { return true; } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h index 2e88dc47e..be08ec47d 100644 --- a/libtransport/src/implementation/tls_socket_consumer.h +++ b/libtransport/src/implementation/tls_socket_consumer.h @@ -16,9 +16,7 @@ #pragma once #include <hicn/transport/interfaces/socket_consumer.h> - #include <implementation/socket_consumer.h> - #include <openssl/ssl.h> namespace transport { @@ -69,42 +67,24 @@ class TLSConsumerSocket : public ConsumerSocket, private: Name name_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; - /* Chain of MemBuf to be used as a temporary buffer to pass descypted data * from the underlying layer to the application */ - utils::ObjectPool<utils::MemBuf> buf_pool_; std::unique_ptr<utils::MemBuf> decrypted_content_; - - /* Chain of MemBuf holding the payload to be written into interest or data - */ + /* Chain of MemBuf holding the payload to be written into interest or data */ std::unique_ptr<utils::MemBuf> payload_; - /* Chain of MemBuf holding the data retrieved from the underlying layer */ std::unique_ptr<utils::MemBuf> head_; - bool something_to_read_; - bool content_downloaded_; - - double old_max_win_; - - double old_current_win_; - uint32_t random_suffix_; - Prefix producer_namespace_; - interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; - std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - utils::EventThread async_downloader_tls_; void setInterestPayload(interface::ConsumerSocket &c, @@ -123,11 +103,11 @@ class TLSConsumerSocket : public ConsumerSocket, virtual void readError(const std::error_code ec) noexcept override; virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; int download_content(const Name &name); }; } // namespace implementation - -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc index 9a5b94a1c..dd92e58cf 100644 --- a/libtransport/src/implementation/tls_socket_producer.cc +++ b/libtransport/src/implementation/tls_socket_producer.cc @@ -14,10 +14,8 @@ */ #include <hicn/transport/interfaces/socket_producer.h> - #include <implementation/p2psecure_socket_producer.h> #include <implementation/tls_socket_producer.h> - #include <openssl/bio.h> #include <openssl/rand.h> #include <openssl/ssl.h> @@ -48,22 +46,25 @@ int TLSProducerSocket::readOld(BIO *b, char *buf, int size) { TLSProducerSocket *socket; socket = (TLSProducerSocket *)BIO_get_data(b); - /* take a lock on the mutex. It will be unlocked by */ std::unique_lock<std::mutex> lck(socket->mtx_); + + TRANSPORT_LOGD("Start wait on the CV."); + if (!socket->something_to_read_) { (socket->cv_).wait(lck); } - /* Either there already is something to read, or the thread has been waken up - */ - /* must return the payload in the interest */ + TRANSPORT_LOGD("CV unlocked."); - utils::MemBuf *membuf = socket->packet_->next(); + /* Either there already is something to read, or the thread has been waken up. + * We must return the payload in the interest anyway */ + utils::MemBuf *membuf = socket->handshake_packet_->next(); int size_to_read; + if ((int)membuf->length() > size) { size_to_read = size; } else { - size_to_read = membuf->length(); + size_to_read = (int)membuf->length(); socket->something_to_read_ = false; } @@ -97,14 +98,14 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { TLSProducerSocket *socket; socket = (TLSProducerSocket *)BIO_get_data(b); - if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && - socket->first_) { + if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { + bool making_manifest = socket->parent_->making_manifest_; + //! socket->tls_chunks_ corresponds to is_last socket->tls_chunks_--; - bool making_manifest = socket->parent_->making_manifest_; socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, false); - socket->parent_->ProducerSocket::produce( + socket->parent_->ProducerSocket::produceStream( socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, socket->last_segment_); socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, @@ -116,20 +117,25 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { std::unique_ptr<utils::MemBuf> mbuf = utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { + auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->tls_chunks_--; socket->to_call_oncontentproduced_--; - auto mbuf = std::unique_ptr<utils::MemBuf>(a); - socket->last_segment_ += socket->ProducerSocket::produce( + + socket->last_segment_ += socket->ProducerSocket::produceStream( socket->name_, std::move(mbuf), socket->tls_chunks_ == 0, socket->last_segment_); - ProducerContentCallback on_content_produced_application; + + ProducerContentCallback *on_content_produced_application; socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, - on_content_produced_application); + &on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && on_content_produced_application) { - on_content_produced_application(*socket->getInterface(), - std::error_code(), 0); + on_content_produced_application->operator()(*socket->getInterface(), + std::error_code(), 0); } }); } @@ -140,12 +146,15 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, P2PSecureProducerSocket *parent, const Name &handshake_name) - : ProducerSocket(producer_socket), + : ProducerSocket(producer_socket, + ProductionProtocolAlgorithms::BYTE_STREAM), on_content_produced_application_(), mtx_(), cv_(), - something_to_read_(), + something_to_read_(false), + handshake_state_(UNINITIATED), name_(), + handshake_packet_(), last_segment_(0), parent_(parent), first_(true), @@ -157,9 +166,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, const SSL_METHOD *meth = TLS_server_method(); ctx_ = SSL_CTX_new(meth); - /* - * Setup SSL context (identity and parameter to use TLS 1.3) - */ + /* Setup SSL context (identity and parameter to use TLS 1.3) */ SSL_CTX_use_certificate(ctx_, parent->cert_509_); SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_); @@ -167,6 +174,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, SSL_CTX_set_ciphersuites(ctx_, "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" "SHA256:TLS_AES_128_GCM_SHA256"); + if (result != 1) { throw errors::RuntimeException( "Unable to set cipher list on TLS subsystem. Aborting."); @@ -184,10 +192,9 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, this, TLSProducerSocket::parseHicnKeyIdCb, NULL); ssl_ = SSL_new(ctx_); - /* - * Setup this producer socker as the bio that TLS will use to write and read - * data (in stream mode) - */ + + /* Setup this producer socker as the bio that TLS will use to write and read + * data (in stream mode) */ BIO_METHOD *bio_meth = BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket"); BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld); @@ -197,15 +204,15 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, BIO_set_init(bio, 1); BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - /* - * Set the callback so that when an interest is received we catch it and we - * decrypt the payload before passing it to the application. - */ + + /* Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. */ this->ProducerSocket::setSocketOption( ProducerCallbacksOptions::CACHE_MISS, (ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this, std::placeholders::_1, std::placeholders::_2)); + this->ProducerSocket::setSocketOption( ProducerCallbacksOptions::CONTENT_PRODUCED, (ProducerContentCallback)bind( @@ -213,35 +220,40 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, std::placeholders::_2, std::placeholders::_3)); } -/* - * The producer interface is not owned by the application, so is TLSSocket task - * to deallocate the memory - */ +/* The producer interface is not owned by the application, so is TLSSocket task + * to deallocate the memory */ TLSProducerSocket::~TLSProducerSocket() { delete producer_interface_; } void TLSProducerSocket::accept() { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { tls_chunks_ = 1; int result = SSL_accept(ssl_); + if (result != 1) throw errors::RuntimeException("Unable to perform client handshake"); } - TRANSPORT_LOGD("Handshake performed!"); - parent_->list_secure_producers.push_front( - std::move(parent_->map_secure_producers[handshake_name_])); - parent_->map_secure_producers.erase(handshake_name_); - ProducerInterestCallback on_interest_process_decrypted; + parent_->list_producers.push_front( + std::move(parent_->map_producers[handshake_name_])); + parent_->map_producers.erase(handshake_name_); + + ProducerInterestCallback *on_interest_process_decrypted; getSocketOption(ProducerCallbacksOptions::CACHE_MISS, - on_interest_process_decrypted); + &on_interest_process_decrypted); - if (on_interest_process_decrypted) { - Interest inter(std::move(packet_)); - on_interest_process_decrypted(*getInterface(), inter); + if (*on_interest_process_decrypted) { + Interest inter(std::move(*handshake_packet_)); + handshake_packet_.reset(); + on_interest_process_decrypted->operator()(*getInterface(), inter); } else { throw errors::RuntimeException( - "On interest process unset. Unable to perform handshake"); + "On interest process unset: unable to perform handshake"); } + + handshake_state_ = SERVER_FINISHED; + TRANSPORT_LOGD("Handshake performed!"); } int TLSProducerSocket::async_accept() { @@ -249,87 +261,106 @@ int TLSProducerSocket::async_accept() { async_thread_.add([this]() { this->accept(); }); } else { throw errors::RuntimeException( - "Async thread not running, impossible to perform handshake"); + "Async thread not running: unable to perform handshake"); } return 1; } void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) { - /* Based on the state machine of (D)TLS, we know what action to do */ - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { std::unique_lock<std::mutex> lck(mtx_); + name_ = interest.getName(); + // interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - packet_ = interest.acquireMemBufReference(); - if (head_) { - payload_->prependChain(interest.getPayload()); - } else { - payload_ = interest.getPayload(); // std::move(interest.getPayload()); - } + cv_.notify_one(); - } else { - name_ = interest.getName(); - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + return; + } else if (handshake_state == SERVER_FINISHED) { + // interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - if (interest.getPayload()->length() > 0) + if (interest.getPayload()->length() > 0) { SSL_read( ssl_, const_cast<unsigned char *>(interest.getPayload()->writableData()), - interest.getPayload()->length()); - } + (int)interest.getPayload()->length()); + } + + ProducerInterestCallback *on_interest_input_decrypted; + getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, + &on_interest_input_decrypted); - ProducerInterestCallback on_interest_input_decrypted; - getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, - on_interest_input_decrypted); - if (on_interest_input_decrypted) - (on_interest_input_decrypted)(*getInterface(), interest); + if (*on_interest_input_decrypted) + (*on_interest_input_decrypted)(*getInterface(), interest); + } } void TLSProducerSocket::cacheMiss(interface::ProducerSocket &p, Interest &interest) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + TRANSPORT_LOGD("On cache miss in TLS socket producer."); + + if (handshake_state == CLIENT_HELLO) { std::unique_lock<std::mutex> lck(mtx_); - name_ = interest.getName(); + + // interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + handshake_state_ = CLIENT_FINISHED; + cv_.notify_one(); - } else { - name_ = interest.getName(); - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + } else if (handshake_state == SERVER_FINISHED) { + // interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - if (interest.getPayload()->length() > 0) + if (interest.getPayload()->length() > 0) { SSL_read( ssl_, const_cast<unsigned char *>(interest.getPayload()->writableData()), - interest.getPayload()->length()); + (int)interest.getPayload()->length()); + } if (on_interest_process_decrypted_ != VOID_HANDLER) on_interest_process_decrypted_(*getInterface(), interest); } } +TLSProducerSocket::HandshakeState TLSProducerSocket::getHandshakeState() { + if (SSL_in_before(ssl_)) { + handshake_state_ = UNINITIATED; + } + + if (SSL_in_init(ssl_) && handshake_state_ == UNINITIATED) { + handshake_state_ = CLIENT_HELLO; + } + + return handshake_state_; +} + void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p, const std::error_code &err, uint64_t bytes_written) {} -uint32_t TLSProducerSocket::produce(Name content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t start_offset) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { +uint32_t TLSProducerSocket::produceStream( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t start_offset) { + if (getHandshakeState() != SERVER_FINISHED) { throw errors::RuntimeException( "New handshake on the same P2P secure producer socket not supported"); } - size_t buf_size = buffer->length(); - name_ = served_namespaces_.front().mapName(content_name); + size_t buf_size = buffer->length(); + name_ = production_protocol_->getNamespaces().front().mapName(content_name); tls_chunks_ = to_call_oncontentproduced_ = - ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); + (int)ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); if (!is_last) { tls_chunks_++; @@ -337,7 +368,7 @@ uint32_t TLSProducerSocket::produce(Name content_name, last_segment_ = start_offset; - SSL_write(ssl_, buffer->data(), buf_size); + SSL_write(ssl_, buffer->data(), (int)buf_size); BIO *wbio = SSL_get_wbio(ssl_); int i = BIO_flush(wbio); (void)i; // To shut up gcc 5 @@ -345,49 +376,10 @@ uint32_t TLSProducerSocket::produce(Name content_name, return 0; } -void TLSProducerSocket::asyncProduce(const Name &content_name, - const uint8_t *buf, size_t buffer_size, - bool is_last, uint32_t *start_offset) { - if (!encryption_thread_.stopped()) { - encryption_thread_.add([this, content_name, buffer = buf, - size = buffer_size, is_last, start_offset]() { - if (start_offset != NULL) { - produce(content_name, buffer, size, is_last, *start_offset); - } else { - produce(content_name, buffer, size, is_last, 0); - } - }); - } -} - -void TLSProducerSocket::asyncProduce(Name content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t offset, - uint32_t **last_segment) { - if (!encryption_thread_.stopped()) { - auto a = buffer.release(); - encryption_thread_.add( - [this, content_name, a, is_last, offset, last_segment]() { - auto buf = std::unique_ptr<utils::MemBuf>(a); - if (last_segment != NULL) { - *last_segment = &last_segment_; - } - produce(content_name, std::move(buf), is_last, offset); - }); - } -} - -void TLSProducerSocket::asyncProduce(ContentObject &content_object) { - throw errors::RuntimeException("API not supported"); -} - -void TLSProducerSocket::produce(ContentObject &content_object) { - throw errors::RuntimeException("API not supported"); -} - long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) { if (cmd == BIO_CTRL_FLUSH) { } + return 1; } @@ -397,13 +389,15 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, X509 *x, size_t chainidx, int *al, void *add_arg) { TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg); + + TRANSPORT_LOGD("On addHicnKeyIdCb, for the prefix registration."); + if (ext_type == 100) { - ip_prefix_t ip_prefix = - socket->parent_->served_namespaces_.front().toIpPrefixStruct(); - int inet_family = - socket->parent_->served_namespaces_.front().getAddressFamily(); - uint16_t prefix_len_bits = - socket->parent_->served_namespaces_.front().getPrefixLength(); + auto &prefix = + socket->parent_->production_protocol_->getNamespaces().front(); + const ip_prefix_t &ip_prefix = prefix.toIpPrefixStruct(); + int inet_family = prefix.getAddressFamily(); + uint16_t prefix_len_bits = prefix.getPrefixLength(); uint8_t prefix_len_bytes = prefix_len_bits / 8; uint8_t prefix_len_u32 = prefix_len_bits / 32; @@ -425,6 +419,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, ip_address_t keyId_component = {}; u32 *mask_buf; u32 *keyId_component_buf; + switch (inet_family) { case AF_INET: mask_buf = &(mask.v4.as_u32); @@ -451,10 +446,9 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, socket->parent_->on_interest_process_decrypted_; socket->registerPrefix( - Prefix(socket->parent_->served_namespaces_.front().getName( - Name(inet_family, (uint8_t *)&mask), - Name(inet_family, (uint8_t *)&keyId_component), - socket->parent_->served_namespaces_.front().getName()), + Prefix(prefix.getName(Name(inet_family, (uint8_t *)&mask), + Name(inet_family, (uint8_t *)&keyId_component), + prefix.getName()), out_ip->len)); socket->connect(); } @@ -483,6 +477,7 @@ int TLSProducerSocket::setSocketOption( [this](int socket_option_key, ProducerInterestCallback socket_option_value) -> int { int result = SOCKET_OPTION_SET; + switch (socket_option_key) { case ProducerCallbacksOptions::INTEREST_INPUT: on_interest_input_decrypted_ = socket_option_value; @@ -508,6 +503,7 @@ int TLSProducerSocket::setSocketOption( result = SOCKET_OPTION_NOT_SET; break; } + return result; }); } @@ -550,62 +546,5 @@ int TLSProducerSocket::getSocketOption( }); } -int TLSProducerSocket::getSocketOption( - int socket_option_key, ProducerContentCallback &socket_option_value) { - return rescheduleOnIOServiceWithReference( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ProducerContentCallback &socket_option_value) -> int { - switch (socket_option_key) { - case ProducerCallbacksOptions::CONTENT_PRODUCED: - socket_option_value = on_content_produced_application_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - }); -} - -int TLSProducerSocket::getSocketOption( - int socket_option_key, ProducerInterestCallback &socket_option_value) { - // Reschedule the function on the io_service to avoid race condition in case - // setSocketOption is called while the io_service is running. - return rescheduleOnIOServiceWithReference( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ProducerInterestCallback &socket_option_value) -> int { - switch (socket_option_key) { - case ProducerCallbacksOptions::INTEREST_INPUT: - socket_option_value = on_interest_input_decrypted_; - break; - - case ProducerCallbacksOptions::INTEREST_DROP: - socket_option_value = on_interest_dropped_input_buffer_; - break; - - case ProducerCallbacksOptions::INTEREST_PASS: - socket_option_value = on_interest_inserted_input_buffer_; - break; - - case ProducerCallbacksOptions::CACHE_HIT: - socket_option_value = on_interest_satisfied_output_buffer_; - break; - - case ProducerCallbacksOptions::CACHE_MISS: - socket_option_value = on_interest_process_decrypted_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - }); -} - } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h index e910c8259..a542a4d9f 100644 --- a/libtransport/src/implementation/tls_socket_producer.h +++ b/libtransport/src/implementation/tls_socket_producer.h @@ -16,8 +16,8 @@ #pragma once #include <implementation/socket_producer.h> - #include <openssl/ssl.h> + #include <condition_variable> #include <mutex> @@ -36,26 +36,18 @@ class TLSProducerSocket : virtual public ProducerSocket { ~TLSProducerSocket(); - uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size, - bool is_last = true, uint32_t start_offset = 0) override { - return produce(content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), - is_last, start_offset); + uint32_t produceStream(const Name &content_name, const uint8_t *buffer, + size_t buffer_size, bool is_last = true, + uint32_t start_offset = 0) override { + return produceStream(content_name, + utils::MemBuf::copyBuffer(buffer, buffer_size), + is_last, start_offset); } - uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last = true, uint32_t start_offset = 0) override; - - void produce(ContentObject &content_object) override; - - void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size, - bool is_last = true, - uint32_t *start_offset = nullptr) override; - - void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t offset, - uint32_t **last_segment = nullptr) override; - - void asyncProduce(ContentObject &content_object) override; + uint32_t produceStream(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, + uint32_t start_offset = 0) override; virtual void accept(); @@ -80,51 +72,49 @@ class TLSProducerSocket : virtual public ProducerSocket { ProducerInterestCallback &socket_option_value); using ProducerSocket::getSocketOption; - using ProducerSocket::onInterest; + // using ProducerSocket::onInterest; using ProducerSocket::setSocketOption; protected: + enum HandshakeState { + UNINITIATED, + CLIENT_HELLO, // when CLIENT_HELLO interest has been received + CLIENT_FINISHED, // when CLIENT_FINISHED interest has been received + SERVER_FINISHED, // when handshake is done + }; /* Callback invoked once an interest has been received and its payload * decrypted */ ProducerInterestCallback on_interest_input_decrypted_; ProducerInterestCallback on_interest_process_decrypted_; ProducerContentCallback on_content_produced_application_; - std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - /* Bool variable, true if there is something to read (an interest arrived) */ bool something_to_read_; - + /* Bool variable, true if CLIENT_FINISHED interest has been received */ + HandshakeState handshake_state_; /* First interest that open a secure connection */ transport::core::Name name_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; - - Packet::MemBufPtr packet_; - + Packet::MemBufPtr handshake_packet_; std::unique_ptr<utils::MemBuf> head_; std::uint32_t last_segment_; - std::shared_ptr<utils::MemBuf> payload_; std::uint32_t key_id_; - std::thread *handshake; P2PSecureProducerSocket *parent_; - bool first_; Name handshake_name_; int tls_chunks_; int to_call_oncontentproduced_; - bool still_writing_; - utils::EventThread encryption_thread_; + utils::EventThread async_thread_; void onInterest(ProducerSocket &p, Interest &interest); + void cacheMiss(interface::ProducerSocket &p, Interest &interest); /* Return the number of read bytes in readbytes */ @@ -156,8 +146,9 @@ class TLSProducerSocket : virtual public ProducerSocket { void onContentProduced(interface::ProducerSocket &p, const std::error_code &err, uint64_t bytes_written); + + HandshakeState getHandshakeState(); }; } // namespace implementation - } // end namespace transport |