diff options
Diffstat (limited to 'libtransport')
20 files changed, 369 insertions, 377 deletions
diff --git a/libtransport/includes/hicn/transport/core/packet.h b/libtransport/includes/hicn/transport/core/packet.h index e58e7962d..2efd7439d 100644 --- a/libtransport/includes/hicn/transport/core/packet.h +++ b/libtransport/includes/hicn/transport/core/packet.h @@ -148,15 +148,15 @@ class Packet : public std::enable_shared_from_this<Packet> { std::pair<const uint8_t *, std::size_t> getPayloadReference() const { int signature_size = 0; + if (_is_ah(format_)) { signature_size = (uint32_t)getSignatureSize(); } auto header_size = getHeaderSizeFromFormat(format_, signature_size); - auto payload_length = packet_->length() - header_size; + auto payload_length = payloadSize(); - return std::make_pair(packet_->data() + header_size, - payload_length); + return std::make_pair(packet_->data() + header_size, payload_length); } Packet &updateLength(std::size_t length = 0); @@ -229,6 +229,7 @@ class Packet : public std::enable_shared_from_this<Packet> { Packet &setTTL(uint8_t hops); uint8_t getTTL() const; + void separateHeaderPayload(); void resetPayload(); private: @@ -248,7 +249,6 @@ class Packet : public std::enable_shared_from_this<Packet> { } uint8_t *getSignature() const; - void separateHeaderPayload(); protected: Name name_; diff --git a/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h b/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h index 097b0a8c0..224493f00 100644 --- a/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h +++ b/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h @@ -23,10 +23,10 @@ namespace interface { class P2PSecureConsumerSocket : public ConsumerSocket { public: - P2PSecureConsumerSocket(int handshake_protocol, int protocol); + P2PSecureConsumerSocket(int handshake_protocol, int transport_protocol); ~P2PSecureConsumerSocket() = default; + void registerPrefix(const Prefix &producer_namespace); }; } // namespace interface - } // end namespace transport diff --git a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h index 0a6e9a43a..73cbb78b0 100644 --- a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h @@ -106,7 +106,7 @@ class ConsumerSocket { /** * This method will be called by the transport for understanding how many - * bytes it should read (at most) before notifying the application. + * bytes it should read before notifying the application. * * By default it reads 64 KB. */ diff --git a/libtransport/src/core/prefix.cc b/libtransport/src/core/prefix.cc index 30f780461..eea4aeb8b 100644 --- a/libtransport/src/core/prefix.cc +++ b/libtransport/src/core/prefix.cc @@ -211,8 +211,8 @@ Name Prefix::getName(const core::Name &mask, const core::Name &components, } } - if (this->contains(name_ip)) - throw errors::RuntimeException("Mask overrides the prefix"); + // if (this->contains(name_ip)) + // throw errors::RuntimeException("Mask overrides the prefix"); return Name(ip_prefix_.family, (uint8_t *)&name_ip); } diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc index 40ab58161..9b79850d6 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.cc +++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc @@ -33,10 +33,6 @@ void P2PSecureConsumerSocket::setInterestPayload( if (payload_ != NULL) int2.appendPayload(std::move(payload_)); } -// implement void readBufferAvailable(), size_t maxBufferSize() const override, -// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable() -// must be implemented even if empty. - /* Return the number of read bytes in the return param */ int readOld(BIO *b, char *buf, int size) { if (size < 0) return size; @@ -51,11 +47,13 @@ int readOld(BIO *b, char *buf, int size) { socket->network_name_.setSuffix(socket->random_suffix_); socket->ConsumerSocket::asyncConsume(socket->network_name_); } + if (!socket->something_to_read_) socket->cv_.wait(lck); } size_t size_to_read, read; size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) chain_size = socket->head_->computeChainDataLength(); @@ -106,6 +104,7 @@ int writeOld(BIO *b, const char *buf, int num) { socket = (P2PSecureConsumerSocket *)BIO_get_data(b); socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( ConsumerCallbacksOptions::INTEREST_OUTPUT, (ConsumerInterestCallback)std::bind( @@ -173,9 +172,9 @@ int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, P2PSecureConsumerSocket::P2PSecureConsumerSocket( interface::ConsumerSocket *consumer, int handshake_protocol, int transport_protocol) - : ConsumerSocket(consumer, transport_protocol), + : ConsumerSocket(consumer, handshake_protocol), name_(), - tls_consumer_(), + tls_consumer_(nullptr), buf_pool_(), decrypted_content_(), payload_(), @@ -224,12 +223,6 @@ P2PSecureConsumerSocket::P2PSecureConsumerSocket( BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - - ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - std::default_random_engine generator; std::uniform_int_distribution<int> distribution( 1, std::numeric_limits<uint32_t>::max()); @@ -244,76 +237,29 @@ P2PSecureConsumerSocket::~P2PSecureConsumerSocket() { SSL_shutdown(ssl_); } -int P2PSecureConsumerSocket::consume(const Name &name) { - if (transport_protocol_->isRunning()) { - return CONSUMER_BUSY; - } +int P2PSecureConsumerSocket::handshake() { + int result = 1; - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - network_name_ = producer_namespace_.getRandomName(); - network_name_.setSuffix(0); - int result = SSL_connect(this->ssl_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); + if (!(SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + return 1; } - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); - TLSConsumerSocket tls_consumer(nullptr, this->protocol_, this->ssl_); - tls_consumer.setInterface(new interface::TLSConsumerSocket(&tls_consumer)); - ConsumerTimerCallback *stats_summary_callback = nullptr; - this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_summary_callback); + ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - uint32_t lifetime; - this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); - tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - lifetime); - tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, - read_callback_decrypted_); - tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - *stats_summary_callback); - tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL, - this->timer_interval_milliseconds_); - tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - tls_consumer.connect(); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - if (payload_ != NULL) - return tls_consumer.consume((prefix->mapName(name)), std::move(payload_)); - else - return tls_consumer.consume((prefix->mapName(name))); -} + network_name_ = producer_namespace_.getRandomName(); + network_name_.setSuffix(0); -int P2PSecureConsumerSocket::asyncConsume(const Name &name) { - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - network_name_ = producer_namespace_.getRandomName(); - network_name_.setSuffix(0); - TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload); - int result = SSL_connect(this->ssl_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); - TRANSPORT_LOGD("Handshake performed!"); - } + TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); + result = SSL_connect(this->ssl_); - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + return result; +} +void P2PSecureConsumerSocket::initSessionSocket() { tls_consumer_ = std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_); tls_consumer_->setInterface( @@ -325,6 +271,7 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) { uint32_t lifetime; this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, @@ -336,6 +283,59 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) { tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_); tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); tls_consumer_->connect(); +} + +int P2PSecureConsumerSocket::consume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if (handshake() != 1) { + throw errors::RuntimeException("Unable to perform client handshake"); + } else { + TRANSPORT_LOGD("Handshake performed!"); + } + + initSessionSocket(); + + if (tls_consumer_ == nullptr) { + throw errors::RuntimeException("TLS socket does not exist"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + + if (payload_ != nullptr) + return tls_consumer_->consume((prefix->mapName(name)), std::move(payload_)); + else + return tls_consumer_->consume((prefix->mapName(name))); +} + +int P2PSecureConsumerSocket::asyncConsume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if (handshake() != 1) { + throw errors::RuntimeException("Unable to perform client handshake"); + } else { + TRANSPORT_LOGD("Handshake performed!"); + } + + initSessionSocket(); + + if (tls_consumer_ == nullptr) { + throw errors::RuntimeException("TLS socket does not exist"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); if (payload_ != NULL) return tls_consumer_->asyncConsume((prefix->mapName(name)), @@ -399,5 +399,4 @@ void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept { bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h index e2ebaf94e..d4c3b26c2 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.h +++ b/libtransport/src/implementation/p2psecure_socket_consumer.h @@ -69,39 +69,26 @@ class P2PSecureConsumerSocket : public ConsumerSocket, private: Name name_; std::shared_ptr<TLSConsumerSocket> tls_consumer_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; BIO_METHOD *bio_meth_; - /* Chain of MemBuf to be used as a temporary buffer to pass descypted data * from the underlying layer to the application */ utils::ObjectPool<utils::MemBuf> buf_pool_; std::unique_ptr<utils::MemBuf> decrypted_content_; - /* Chain of MemBuf holding the payload to be written into interest or data */ std::unique_ptr<utils::MemBuf> payload_; - /* Chain of MemBuf holding the data retrieved from the underlying layer */ std::unique_ptr<utils::MemBuf> head_; - bool something_to_read_; - bool content_downloaded_; - double old_max_win_; - double old_current_win_; - uint32_t random_suffix_; - ip_prefix_t secure_prefix_; - Prefix producer_namespace_; - interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; - std::mutex mtx_; /* Condition variable for the wait */ @@ -138,9 +125,12 @@ class P2PSecureConsumerSocket : public ConsumerSocket, virtual void readError(const std::error_code ec) noexcept override; virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; - int download_content(const Name &name); + int handshake(); + + void initSessionSocket(); }; } // namespace implementation diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc index d0852539a..15c7d25cd 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.cc +++ b/libtransport/src/implementation/p2psecure_socket_producer.cc @@ -37,9 +37,8 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( : ProducerSocket(producer_socket), mtx_(), cv_(), - map_secure_producers(), - map_secure_rtc_producers(), - list_secure_producers() {} + map_producers(), + list_producers() {} P2PSecureProducerSocket::P2PSecureProducerSocket( interface::ProducerSocket *producer_socket, bool rtc, @@ -48,12 +47,9 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( rtc_(rtc), mtx_(), cv_(), - map_secure_producers(), - map_secure_rtc_producers(), - list_secure_producers() { - /* - * Setup SSL context (identity and parameter to use TLS 1.3) - */ + map_producers(), + list_producers() { + /* Setup SSL context (identity and parameter to use TLS 1.3) */ der_cert_ = parcKeyStore_GetDEREncodedCertificate( (identity->getSigner()->getKeyStore())); der_prk_ = parcKeyStore_GetDEREncodedPrivateKey( @@ -68,10 +64,8 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( cert_509_ = d2i_X509(NULL, &cert, cert_size); pkey_rsa_ = d2i_AutoPrivateKey(NULL, &prk, prk_size); - /* - * Set the callback so that when an interest is received we catch it and we - * decrypt the payload before passing it to the application. - */ + /* Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. */ ProducerSocket::setSocketOption( ProducerCallbacksOptions::INTEREST_INPUT, (ProducerInterestCallback)std::bind( @@ -84,53 +78,61 @@ P2PSecureProducerSocket::~P2PSecureProducerSocket() { if (der_prk_) parcBuffer_Release(&der_prk_); } +void P2PSecureProducerSocket::initSessionSocket( + std::unique_ptr<TLSProducerSocket> &producer) { + producer->on_content_produced_application_ = + this->on_content_produced_application_; + producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, + this->content_object_expiry_time_); + producer->setSocketOption(SIGNER, this->signer_); + producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); + producer->setSocketOption(DATA_PACKET_SIZE, + (uint32_t)(this->data_packet_size_)); + producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); + + if (!rtc_) { + producer->setInterface(new interface::TLSProducerSocket(producer.get())); + } else { + TLSRTCProducerSocket *rtc_producer = + dynamic_cast<TLSRTCProducerSocket *>(producer.get()); + rtc_producer->setInterface( + new interface::TLSRTCProducerSocket(rtc_producer)); + } +} + void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p, Interest &interest) { std::unique_lock<std::mutex> lck(mtx_); + std::unique_ptr<TLSProducerSocket> tls_producer; + auto it = map_producers.find(interest.getName()); + + if (it != map_producers.end()) { + return; + } + + if (!rtc_) { + tls_producer = + std::make_unique<TLSProducerSocket>(nullptr, this, interest.getName()); + } else { + tls_producer = std::make_unique<TLSRTCProducerSocket>(nullptr, this, + interest.getName()); + } + + initSessionSocket(tls_producer); + TLSProducerSocket *tls_producer_ptr = tls_producer.get(); + map_producers.insert({interest.getName(), move(tls_producer)}); TRANSPORT_LOGD("Start handshake at %s", interest.getName().toString().c_str()); + if (!rtc_) { - auto it = map_secure_producers.find(interest.getName()); - if (it != map_secure_producers.end()) return; - TLSProducerSocket *tls_producer = - new TLSProducerSocket(nullptr, this, interest.getName()); - tls_producer->setInterface(new interface::TLSProducerSocket(tls_producer)); - - tls_producer->on_content_produced_application_ = - this->on_content_produced_application_; - tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, - this->content_object_expiry_time_); - tls_producer->setSocketOption(SIGNER, this->signer_); - tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); - tls_producer->setSocketOption(DATA_PACKET_SIZE, - (uint32_t)(this->data_packet_size_)); - tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); - map_secure_producers.insert( - {interest.getName(), std::unique_ptr<TLSProducerSocket>(tls_producer)}); - tls_producer->onInterest(*tls_producer, interest); - tls_producer->async_accept(); + tls_producer_ptr->onInterest(*tls_producer_ptr, interest); + tls_producer_ptr->async_accept(); } else { - auto it = map_secure_rtc_producers.find(interest.getName()); - if (it != map_secure_rtc_producers.end()) return; - TLSRTCProducerSocket *tls_producer = - new TLSRTCProducerSocket(nullptr, this, interest.getName()); - tls_producer->setInterface( - new interface::TLSRTCProducerSocket(tls_producer)); - tls_producer->on_content_produced_application_ = - this->on_content_produced_application_; - tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, - this->content_object_expiry_time_); - tls_producer->setSocketOption(SIGNER, this->signer_); - tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); - tls_producer->setSocketOption(DATA_PACKET_SIZE, - (uint32_t)(this->data_packet_size_)); - tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); - map_secure_rtc_producers.insert( - {interest.getName(), - std::unique_ptr<TLSRTCProducerSocket>(tls_producer)}); - tls_producer->onInterest(*tls_producer, interest); - tls_producer->async_accept(); + TLSRTCProducerSocket *rtc_producer_ptr = + dynamic_cast<TLSRTCProducerSocket *>(tls_producer_ptr); + rtc_producer_ptr->onInterest(*rtc_producer_ptr, interest); + rtc_producer_ptr->async_accept(); } } @@ -143,11 +145,13 @@ void P2PSecureProducerSocket::produce(const uint8_t *buffer, } std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_rtc_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_rtc_producers.cbegin(); - it != list_secure_rtc_producers.cend(); it++) { - (*it)->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); + if (list_producers.empty()) cv_.wait(lck); + + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { + TLSRTCProducerSocket *rtc_producer = + dynamic_cast<TLSRTCProducerSocket *>(it->get()); + rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); } } @@ -162,12 +166,13 @@ uint32_t P2PSecureProducerSocket::produce( std::unique_lock<std::mutex> lck(mtx_); uint32_t segments = 0; - if (list_secure_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (list_producers.empty()) cv_.wait(lck); + + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) segments += (*it)->produce(content_name, buffer->clone(), is_last, start_offset); + return segments; } @@ -183,12 +188,12 @@ uint32_t P2PSecureProducerSocket::produce(Name content_name, std::unique_lock<std::mutex> lck(mtx_); uint32_t segments = 0; - if (list_secure_producers.empty()) cv_.wait(lck); + if (list_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) segments += (*it)->produce(content_name, buffer, buffer_size, is_last, start_offset); + return segments; } @@ -203,10 +208,9 @@ void P2PSecureProducerSocket::asyncProduce(const Name &content_name, } std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_producers.empty()) cv_.wait(lck); + if (list_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { (*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset); } } @@ -221,22 +225,19 @@ void P2PSecureProducerSocket::asyncProduce( } std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_producers.empty()) cv_.wait(lck); + if (list_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset, last_segment); } } -// Socket Option Redefinition to avoid name hiding - +/* Redefinition of socket options to avoid name hiding */ int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerInterestCallback socket_option_value) { - if (!list_secure_producers.empty()) { - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); } @@ -269,9 +270,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, const std::shared_ptr<utils::Signer> &socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); switch (socket_option_key) { @@ -288,9 +288,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption(int socket_option_key, uint32_t socket_option_value) { - if (!list_secure_producers.empty()) { - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); } switch (socket_option_key) { @@ -305,9 +304,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, int P2PSecureProducerSocket::setSocketOption(int socket_option_key, bool socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -316,9 +314,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, int P2PSecureProducerSocket::setSocketOption(int socket_option_key, Name *socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -327,9 +324,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, int P2PSecureProducerSocket::setSocketOption( int socket_option_key, std::list<Prefix> socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -338,9 +334,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerContentObjectCallback socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -349,9 +344,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerContentCallback socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); switch (socket_option_key) { @@ -368,9 +362,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, utils::CryptoHashType socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -379,9 +372,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, utils::CryptoSuite socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -390,9 +382,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, const std::string &socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -400,5 +391,4 @@ int P2PSecureProducerSocket::setSocketOption( } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h index 33339deba..bfc9fc2c1 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.h +++ b/libtransport/src/implementation/p2psecure_socket_producer.h @@ -37,9 +37,11 @@ class P2PSecureProducerSocket : public ProducerSocket { public: explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket); + explicit P2PSecureProducerSocket( interface::ProducerSocket *producer_socket, bool rtc, const std::shared_ptr<utils::Identity> &identity); + ~P2PSecureProducerSocket(); void produce(const uint8_t *buffer, size_t buffer_size) override; @@ -96,7 +98,6 @@ class P2PSecureProducerSocket : public ProducerSocket { using ProducerSocket::onInterest; protected: - bool rtc_; /* Callback invoked once an interest has been received and its payload * decrypted */ ProducerInterestCallback on_interest_input_decrypted_; @@ -104,27 +105,23 @@ class P2PSecureProducerSocket : public ProducerSocket { ProducerContentCallback on_content_produced_application_; private: + bool rtc_; std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - PARCBuffer *der_cert_; PARCBuffer *der_prk_; X509 *cert_509_; EVP_PKEY *pkey_rsa_; std::unordered_map<core::Name, std::unique_ptr<TLSProducerSocket>, core::hash<core::Name>, core::compare2<core::Name>> - map_secure_producers; - std::unordered_map<core::Name, std::unique_ptr<TLSRTCProducerSocket>, - core::hash<core::Name>, core::compare2<core::Name>> - map_secure_rtc_producers; - std::list<std::unique_ptr<TLSProducerSocket>> list_secure_producers; - std::list<std::unique_ptr<TLSRTCProducerSocket>> list_secure_rtc_producers; + map_producers; + std::list<std::unique_ptr<TLSProducerSocket>> list_producers; void onInterestCallback(interface::ProducerSocket &p, Interest &interest); + + void initSessionSocket(std::unique_ptr<TLSProducerSocket> &producer); }; } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index 9ccc59d9e..8c5c453fc 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -131,7 +131,7 @@ class ProducerSocket : public Socket<BasePortal>, // TODO Manifest may still be used for indexing if (making_manifest && !signer) { - TRANSPORT_LOGD("Making manifests without setting producer identity."); + TRANSPORT_LOGE("Making manifests without setting producer identity."); } core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; @@ -192,7 +192,6 @@ class ProducerSocket : public Socket<BasePortal>, } } - TRANSPORT_LOGD("--------- START PRODUCE ----------"); for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments; packaged_segments++) { if (making_manifest) { @@ -207,13 +206,12 @@ class ProducerSocket : public Socket<BasePortal>, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); + TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); // Send content objects stored in the queue while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %u", - content_queue_.front()->getName().getSuffix()); + TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } @@ -270,8 +268,7 @@ class ProducerSocket : public Socket<BasePortal>, signer->sign(*content_object); } passContentObjectToCallbacks(content_object); - TRANSPORT_LOGD("Send content %u", - content_object->getName().getSuffix()); + TRANSPORT_LOGD("Send content %s", content_object->getName().toString().c_str()); } } @@ -286,11 +283,11 @@ class ProducerSocket : public Socket<BasePortal>, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); + TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); + while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %u", - content_queue_.front()->getName().getSuffix()); + TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } } @@ -949,18 +946,19 @@ class ProducerSocket : public Socket<BasePortal>, std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; - std::unique_lock<std::mutex> lck(mtx); + bool done = false; io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); - - if (!done) { - cv.wait(lck); - } + cv.notify_all(); }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } } else { result = func(socket_option_key, socket_option_value); } diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc index 3b3152993..9ef79ca23 100644 --- a/libtransport/src/implementation/tls_rtc_socket_producer.cc +++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc @@ -53,7 +53,7 @@ int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) { (socket->cv_).wait(lck); } - utils::MemBuf *membuf = socket->packet_->next(); + utils::MemBuf *membuf = socket->handshake_packet_->next(); int size_to_read; if ((int)membuf->length() > size) { @@ -91,10 +91,10 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { TLSRTCProducerSocket *socket; socket = (TLSRTCProducerSocket *)BIO_get_data(b); - if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && - socket->first_) { - socket->tls_chunks_--; + if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { bool making_manifest = socket->parent_->making_manifest_; + + socket->tls_chunks_--; socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, false); socket->parent_->ProducerSocket::produce( @@ -107,13 +107,17 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { std::unique_ptr<utils::MemBuf> mbuf = utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { socket->to_call_oncontentproduced_--; auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->RTCProducerSocket::produce(std::move(mbuf)); + ProducerContentCallback on_content_produced_application; socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && on_content_produced_application) { on_content_produced_application( @@ -144,24 +148,28 @@ TLSRTCProducerSocket::TLSRTCProducerSocket( } void TLSRTCProducerSocket::accept() { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { tls_chunks_ = 1; int result = SSL_accept(ssl_); + if (result != 1) throw errors::RuntimeException("Unable to perform client handshake"); } TRANSPORT_LOGD("Handshake performed!"); - parent_->list_secure_rtc_producers.push_front( - std::move(parent_->map_secure_rtc_producers[handshake_name_])); - parent_->map_secure_rtc_producers.erase(handshake_name_); + + parent_->list_producers.push_front( + std::move(parent_->map_producers[handshake_name_])); + parent_->map_producers.erase(handshake_name_); ProducerInterestCallback on_interest_process_decrypted; getSocketOption(ProducerCallbacksOptions::CACHE_MISS, on_interest_process_decrypted); if (on_interest_process_decrypted) { - Interest inter(std::move(packet_)); + Interest inter(std::move(handshake_packet_)); on_interest_process_decrypted( (transport::interface::ProducerSocket &)(*getInterface()), inter); } @@ -181,7 +189,9 @@ int TLSRTCProducerSocket::async_accept() { } void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state != SERVER_FINISHED) { throw errors::RuntimeException( "New handshake on the same P2P secure producer socket not supported"); } @@ -197,5 +207,4 @@ void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc index 95b287aa6..7cf653848 100644 --- a/libtransport/src/implementation/tls_socket_consumer.cc +++ b/libtransport/src/implementation/tls_socket_consumer.cc @@ -46,11 +46,13 @@ int readOldTLS(BIO *b, char *buf, int size) { socket->network_name_.setSuffix(socket->random_suffix_); socket->ConsumerSocket::asyncConsume(socket->network_name_); } + if (!socket->something_to_read_) socket->cv_.wait(lck); } size_t size_to_read, read; size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) chain_size = socket->head_->computeChainDataLength(); @@ -101,6 +103,7 @@ int writeOldTLS(BIO *b, const char *buf, int num) { socket = (TLSConsumerSocket *)BIO_get_data(b); socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( ConsumerCallbacksOptions::INTEREST_OUTPUT, (ConsumerInterestCallback)std::bind( @@ -176,12 +179,6 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - - ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - std::default_random_engine generator; std::uniform_int_distribution<int> distribution( 1, std::numeric_limits<uint32_t>::max()); @@ -191,10 +188,8 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, this); }; -/* - * The producer interface is not owned by the application, so is TLSSocket task - * to deallocate the memory - */ +/* The producer interface is not owned by the application, so is TLSSocket task + * to deallocate the memory */ TLSConsumerSocket::~TLSConsumerSocket() { delete consumer_interface_; } int TLSConsumerSocket::consume(const Name &name, @@ -228,22 +223,15 @@ int TLSConsumerSocket::download_content(const Name &name) { something_to_read_ = false; content_downloaded_ = false; - decrypted_content_ = utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH); - uint8_t *buf = decrypted_content_->writableData(); - size_t size = 0; + std::size_t max_buffer_size = read_callback_decrypted_->maxBufferSize(); + std::size_t buffer_size = read_callback_decrypted_->maxBufferSize() + SSL3_RT_MAX_PLAIN_LENGTH; + decrypted_content_ = utils::MemBuf::createCombined(buffer_size); int result = -1; + std::size_t size = 0; while (!content_downloaded_ || something_to_read_) { - if (decrypted_content_->tailroom() < SSL3_RT_MAX_PLAIN_LENGTH) { - decrypted_content_->appendChain( - utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH)); - // decrypted_content_->computeChainDataLength(); - buf = decrypted_content_->prev()->writableData(); - } else { - buf = decrypted_content_->writableTail(); - } - - result = SSL_read(this->ssl_, buf, SSL3_RT_MAX_PLAIN_LENGTH); + result = SSL_read( + this->ssl_, decrypted_content_->writableTail(), SSL3_RT_MAX_PLAIN_LENGTH); /* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of * the data has been fully downloaded */ @@ -253,20 +241,20 @@ int TLSConsumerSocket::download_content(const Name &name) { if (result >= 0) { size += result; - decrypted_content_->prepend(result); - } else + decrypted_content_->append(result); + } else { throw errors::RuntimeException("Unable to download content"); + } - if (size >= read_callback_decrypted_->maxBufferSize()) { + if (decrypted_content_->length() >= max_buffer_size) { if (read_callback_decrypted_->isBufferMovable()) { - // No need to perform an additional copy. The whole buffer will be - // tranferred to the application. - + /* No need to perform an additional copy. The whole buffer will be + * tranferred to the application. */ read_callback_decrypted_->readBufferAvailable( std::move(decrypted_content_)); - decrypted_content_ = utils::MemBuf::create(SSL3_RT_MAX_PLAIN_LENGTH); + decrypted_content_ = utils::MemBuf::create(buffer_size); } else { - // The buffer will be copied into the application-provided buffer + /* The buffer will be copied into the application-provided buffer */ uint8_t *buffer; std::size_t length; std::size_t total_length = decrypted_content_->length(); @@ -358,6 +346,7 @@ size_t TLSConsumerSocket::maxBufferSize() const { void TLSConsumerSocket::readBufferAvailable( std::unique_ptr<utils::MemBuf> &&buffer) noexcept { std::unique_lock<std::mutex> lck(this->mtx_); + if (head_) { head_->prependChain(std::move(buffer)); } else { @@ -380,5 +369,4 @@ void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept { bool TLSConsumerSocket::isBufferMovable() noexcept { return true; } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h index 2e88dc47e..6931a7a8a 100644 --- a/libtransport/src/implementation/tls_socket_consumer.h +++ b/libtransport/src/implementation/tls_socket_consumer.h @@ -69,42 +69,27 @@ class TLSConsumerSocket : public ConsumerSocket, private: Name name_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; - /* Chain of MemBuf to be used as a temporary buffer to pass descypted data * from the underlying layer to the application */ utils::ObjectPool<utils::MemBuf> buf_pool_; std::unique_ptr<utils::MemBuf> decrypted_content_; - - /* Chain of MemBuf holding the payload to be written into interest or data - */ + /* Chain of MemBuf holding the payload to be written into interest or data */ std::unique_ptr<utils::MemBuf> payload_; - /* Chain of MemBuf holding the data retrieved from the underlying layer */ std::unique_ptr<utils::MemBuf> head_; - bool something_to_read_; - bool content_downloaded_; - double old_max_win_; - double old_current_win_; - uint32_t random_suffix_; - Prefix producer_namespace_; - interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; - std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - utils::EventThread async_downloader_tls_; void setInterestPayload(interface::ConsumerSocket &c, @@ -123,11 +108,11 @@ class TLSConsumerSocket : public ConsumerSocket, virtual void readError(const std::error_code ec) noexcept override; virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; int download_content(const Name &name); }; } // namespace implementation - -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc index 9a5b94a1c..339a1ad58 100644 --- a/libtransport/src/implementation/tls_socket_producer.cc +++ b/libtransport/src/implementation/tls_socket_producer.cc @@ -48,18 +48,17 @@ int TLSProducerSocket::readOld(BIO *b, char *buf, int size) { TLSProducerSocket *socket; socket = (TLSProducerSocket *)BIO_get_data(b); - /* take a lock on the mutex. It will be unlocked by */ std::unique_lock<std::mutex> lck(socket->mtx_); + if (!socket->something_to_read_) { (socket->cv_).wait(lck); } - /* Either there already is something to read, or the thread has been waken up - */ - /* must return the payload in the interest */ - - utils::MemBuf *membuf = socket->packet_->next(); + /* Either there already is something to read, or the thread has been waken up. + * We must return the payload in the interest anyway */ + utils::MemBuf *membuf = socket->handshake_packet_->next(); int size_to_read; + if ((int)membuf->length() > size) { size_to_read = size; } else { @@ -97,11 +96,11 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { TLSProducerSocket *socket; socket = (TLSProducerSocket *)BIO_get_data(b); - if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && - socket->first_) { + if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { + bool making_manifest = socket->parent_->making_manifest_; + //! socket->tls_chunks_ corresponds to is_last socket->tls_chunks_--; - bool making_manifest = socket->parent_->making_manifest_; socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, false); socket->parent_->ProducerSocket::produce( @@ -116,16 +115,21 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { std::unique_ptr<utils::MemBuf> mbuf = utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { + auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->tls_chunks_--; socket->to_call_oncontentproduced_--; - auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->last_segment_ += socket->ProducerSocket::produce( socket->name_, std::move(mbuf), socket->tls_chunks_ == 0, socket->last_segment_); + ProducerContentCallback on_content_produced_application; socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && on_content_produced_application) { on_content_produced_application(*socket->getInterface(), @@ -144,8 +148,10 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, on_content_produced_application_(), mtx_(), cv_(), - something_to_read_(), + something_to_read_(false), + handshake_state_(UNINITIATED), name_(), + handshake_packet_(), last_segment_(0), parent_(parent), first_(true), @@ -157,9 +163,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, const SSL_METHOD *meth = TLS_server_method(); ctx_ = SSL_CTX_new(meth); - /* - * Setup SSL context (identity and parameter to use TLS 1.3) - */ + /* Setup SSL context (identity and parameter to use TLS 1.3) */ SSL_CTX_use_certificate(ctx_, parent->cert_509_); SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_); @@ -167,6 +171,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, SSL_CTX_set_ciphersuites(ctx_, "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" "SHA256:TLS_AES_128_GCM_SHA256"); + if (result != 1) { throw errors::RuntimeException( "Unable to set cipher list on TLS subsystem. Aborting."); @@ -184,10 +189,9 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, this, TLSProducerSocket::parseHicnKeyIdCb, NULL); ssl_ = SSL_new(ctx_); - /* - * Setup this producer socker as the bio that TLS will use to write and read - * data (in stream mode) - */ + + /* Setup this producer socker as the bio that TLS will use to write and read + * data (in stream mode) */ BIO_METHOD *bio_meth = BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket"); BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld); @@ -197,15 +201,15 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, BIO_set_init(bio, 1); BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - /* - * Set the callback so that when an interest is received we catch it and we - * decrypt the payload before passing it to the application. - */ + + /* Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. */ this->ProducerSocket::setSocketOption( ProducerCallbacksOptions::CACHE_MISS, (ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this, std::placeholders::_1, std::placeholders::_2)); + this->ProducerSocket::setSocketOption( ProducerCallbacksOptions::CONTENT_PRODUCED, (ProducerContentCallback)bind( @@ -213,35 +217,39 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, std::placeholders::_2, std::placeholders::_3)); } -/* - * The producer interface is not owned by the application, so is TLSSocket task - * to deallocate the memory - */ +/* The producer interface is not owned by the application, so is TLSSocket task + * to deallocate the memory */ TLSProducerSocket::~TLSProducerSocket() { delete producer_interface_; } void TLSProducerSocket::accept() { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { tls_chunks_ = 1; int result = SSL_accept(ssl_); + if (result != 1) throw errors::RuntimeException("Unable to perform client handshake"); } - TRANSPORT_LOGD("Handshake performed!"); - parent_->list_secure_producers.push_front( - std::move(parent_->map_secure_producers[handshake_name_])); - parent_->map_secure_producers.erase(handshake_name_); + + parent_->list_producers.push_front( + std::move(parent_->map_producers[handshake_name_])); + parent_->map_producers.erase(handshake_name_); ProducerInterestCallback on_interest_process_decrypted; getSocketOption(ProducerCallbacksOptions::CACHE_MISS, on_interest_process_decrypted); if (on_interest_process_decrypted) { - Interest inter(std::move(packet_)); + Interest inter(std::move(handshake_packet_)); on_interest_process_decrypted(*getInterface(), inter); } else { throw errors::RuntimeException( - "On interest process unset. Unable to perform handshake"); + "On interest process unset: unable to perform handshake"); } + + handshake_state_ = SERVER_FINISHED; + TRANSPORT_LOGD("Handshake performed!"); } int TLSProducerSocket::async_accept() { @@ -249,71 +257,88 @@ int TLSProducerSocket::async_accept() { async_thread_.add([this]() { this->accept(); }); } else { throw errors::RuntimeException( - "Async thread not running, impossible to perform handshake"); + "Async thread not running: unable to perform handshake"); } return 1; } void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) { - /* Based on the state machine of (D)TLS, we know what action to do */ - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { std::unique_lock<std::mutex> lck(mtx_); + name_ = interest.getName(); + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - packet_ = interest.acquireMemBufReference(); - if (head_) { - payload_->prependChain(interest.getPayload()); - } else { - payload_ = interest.getPayload(); // std::move(interest.getPayload()); - } + cv_.notify_one(); - } else { - name_ = interest.getName(); - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + return; + } else if (handshake_state == SERVER_FINISHED) { + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - if (interest.getPayload()->length() > 0) + if (interest.getPayload()->length() > 0) { SSL_read( ssl_, const_cast<unsigned char *>(interest.getPayload()->writableData()), interest.getPayload()->length()); - } + } + + ProducerInterestCallback on_interest_input_decrypted; + getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, + on_interest_input_decrypted); - ProducerInterestCallback on_interest_input_decrypted; - getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, - on_interest_input_decrypted); - if (on_interest_input_decrypted) - (on_interest_input_decrypted)(*getInterface(), interest); + if (on_interest_input_decrypted) + (on_interest_input_decrypted)(*getInterface(), interest); + } } void TLSProducerSocket::cacheMiss(interface::ProducerSocket &p, Interest &interest) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == CLIENT_HELLO) { std::unique_lock<std::mutex> lck(mtx_); - name_ = interest.getName(); + + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + handshake_state_ = CLIENT_FINISHED; + cv_.notify_one(); - } else { - name_ = interest.getName(); - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + } else if (handshake_state == SERVER_FINISHED) { + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - if (interest.getPayload()->length() > 0) + if (interest.getPayload()->length() > 0) { SSL_read( ssl_, const_cast<unsigned char *>(interest.getPayload()->writableData()), interest.getPayload()->length()); + } if (on_interest_process_decrypted_ != VOID_HANDLER) on_interest_process_decrypted_(*getInterface(), interest); } } +TLSProducerSocket::HandshakeState TLSProducerSocket::getHandshakeState() { + if (SSL_in_before(ssl_)) { + handshake_state_ = UNINITIATED; + } + + if (SSL_in_init(ssl_) && handshake_state_ == UNINITIATED) { + handshake_state_ = CLIENT_HELLO; + } + + return handshake_state_; +} + void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p, const std::error_code &err, uint64_t bytes_written) {} @@ -321,13 +346,13 @@ void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p, uint32_t TLSProducerSocket::produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, uint32_t start_offset) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + if (getHandshakeState() != SERVER_FINISHED) { throw errors::RuntimeException( "New handshake on the same P2P secure producer socket not supported"); } + size_t buf_size = buffer->length(); name_ = served_namespaces_.front().mapName(content_name); - tls_chunks_ = to_call_oncontentproduced_ = ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); @@ -388,6 +413,7 @@ void TLSProducerSocket::produce(ContentObject &content_object) { long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) { if (cmd == BIO_CTRL_FLUSH) { } + return 1; } @@ -397,6 +423,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, X509 *x, size_t chainidx, int *al, void *add_arg) { TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg); + if (ext_type == 100) { ip_prefix_t ip_prefix = socket->parent_->served_namespaces_.front().toIpPrefixStruct(); @@ -425,6 +452,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, ip_address_t keyId_component = {}; u32 *mask_buf; u32 *keyId_component_buf; + switch (inet_family) { case AF_INET: mask_buf = &(mask.v4.as_u32); @@ -483,6 +511,7 @@ int TLSProducerSocket::setSocketOption( [this](int socket_option_key, ProducerInterestCallback socket_option_value) -> int { int result = SOCKET_OPTION_SET; + switch (socket_option_key) { case ProducerCallbacksOptions::INTEREST_INPUT: on_interest_input_decrypted_ = socket_option_value; @@ -508,6 +537,7 @@ int TLSProducerSocket::setSocketOption( result = SOCKET_OPTION_NOT_SET; break; } + return result; }); } @@ -607,5 +637,4 @@ int TLSProducerSocket::getSocketOption( } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h index e910c8259..2382e8695 100644 --- a/libtransport/src/implementation/tls_socket_producer.h +++ b/libtransport/src/implementation/tls_socket_producer.h @@ -84,47 +84,44 @@ class TLSProducerSocket : virtual public ProducerSocket { using ProducerSocket::setSocketOption; protected: + enum HandshakeState { + UNINITIATED, + CLIENT_HELLO, // when CLIENT_HELLO interest has been received + CLIENT_FINISHED, // when CLIENT_FINISHED interest has been received + SERVER_FINISHED, // when handshake is done + }; /* Callback invoked once an interest has been received and its payload * decrypted */ ProducerInterestCallback on_interest_input_decrypted_; ProducerInterestCallback on_interest_process_decrypted_; ProducerContentCallback on_content_produced_application_; - std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - /* Bool variable, true if there is something to read (an interest arrived) */ bool something_to_read_; - + /* Bool variable, true if CLIENT_FINISHED interest has been received */ + HandshakeState handshake_state_; /* First interest that open a secure connection */ transport::core::Name name_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; - - Packet::MemBufPtr packet_; - + Packet::MemBufPtr handshake_packet_; std::unique_ptr<utils::MemBuf> head_; std::uint32_t last_segment_; - std::shared_ptr<utils::MemBuf> payload_; std::uint32_t key_id_; - std::thread *handshake; P2PSecureProducerSocket *parent_; - bool first_; Name handshake_name_; int tls_chunks_; int to_call_oncontentproduced_; - bool still_writing_; - utils::EventThread encryption_thread_; void onInterest(ProducerSocket &p, Interest &interest); + void cacheMiss(interface::ProducerSocket &p, Interest &interest); /* Return the number of read bytes in readbytes */ @@ -156,8 +153,9 @@ class TLSProducerSocket : virtual public ProducerSocket { void onContentProduced(interface::ProducerSocket &p, const std::error_code &err, uint64_t bytes_written); + + HandshakeState getHandshakeState(); }; } // namespace implementation - } // end namespace transport diff --git a/libtransport/src/interfaces/p2psecure_socket_consumer.cc b/libtransport/src/interfaces/p2psecure_socket_consumer.cc index 2fa8bb6e3..038441dfc 100644 --- a/libtransport/src/interfaces/p2psecure_socket_consumer.cc +++ b/libtransport/src/interfaces/p2psecure_socket_consumer.cc @@ -21,11 +21,17 @@ namespace transport { namespace interface { P2PSecureConsumerSocket::P2PSecureConsumerSocket(int handshake_protocol, - int protocol) + int transport_protocol) : ConsumerSocket() { socket_ = std::unique_ptr<implementation::ConsumerSocket>( new implementation::P2PSecureConsumerSocket(this, handshake_protocol, - protocol)); + transport_protocol)); +} + +void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) { + implementation::P2PSecureConsumerSocket &secure_consumer_socket = + *(static_cast<implementation::P2PSecureConsumerSocket *>(socket_.get())); + secure_consumer_socket.registerPrefix(producer_namespace); } } // namespace interface diff --git a/libtransport/src/interfaces/tls_rtc_socket_producer.h b/libtransport/src/interfaces/tls_rtc_socket_producer.h index 434edb522..3ea84095b 100644 --- a/libtransport/src/interfaces/tls_rtc_socket_producer.h +++ b/libtransport/src/interfaces/tls_rtc_socket_producer.h @@ -28,9 +28,9 @@ namespace interface { class TLSRTCProducerSocket : public ProducerSocket { public: TLSRTCProducerSocket(implementation::TLSRTCProducerSocket *implementation); + ~TLSRTCProducerSocket(); }; } // namespace interface - -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/protocols/consumer.conf b/libtransport/src/protocols/consumer.conf index 1a366f32f..d0eab75ac 100644 --- a/libtransport/src/protocols/consumer.conf +++ b/libtransport/src/protocols/consumer.conf @@ -1,4 +1,4 @@ -; this file contais the parameters for RAAQM +; This file contains the parameters for RAAQM autotune = no lifetime = 500 retransmissions = 128 diff --git a/libtransport/src/protocols/incremental_indexer.cc b/libtransport/src/protocols/incremental_indexer.cc index e590b4fee..0872c4554 100644 --- a/libtransport/src/protocols/incremental_indexer.cc +++ b/libtransport/src/protocols/incremental_indexer.cc @@ -25,6 +25,8 @@ void IncrementalIndexer::onContentObject( core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { using namespace interface; + TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str()); + if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) { final_suffix_ = content_object->getName().getSuffix(); } @@ -50,4 +52,4 @@ void IncrementalIndexer::onContentObject( } } // namespace protocol -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/protocols/manifest_incremental_indexer.cc b/libtransport/src/protocols/manifest_incremental_indexer.cc index 1a2f9dec3..da835b577 100644 --- a/libtransport/src/protocols/manifest_incremental_indexer.cc +++ b/libtransport/src/protocols/manifest_incremental_indexer.cc @@ -37,10 +37,12 @@ ManifestIncrementalIndexer::ManifestIncrementalIndexer( void ManifestIncrementalIndexer::onContentObject( core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { - // Check if mainfiest or not + // Check if manifest or not if (content_object->getPayloadType() == PayloadType::MANIFEST) { + TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str()); onUntrustedManifest(std::move(interest), std::move(content_object)); } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + TRANSPORT_LOGD("Receive manifest %s", content_object->getName().toString().c_str()); onUntrustedContentObject(std::move(interest), std::move(content_object)); } } diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index 0a93dec44..f8da69ceb 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -460,7 +460,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() { // send at least one interest if there are retransmissions to perform and // there is no space left in the window sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Window full, retransmit one content interest"); interest_to_retransmit_.pop(); } @@ -470,7 +469,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() { while (interests_in_flight_ < current_window_size_) { if (interest_to_retransmit_.size() > 0) { sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Retransmit content interest"); interest_to_retransmit_.pop(); } else { index = index_manager_->getNextSuffix(); @@ -479,7 +477,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() { } sendInterest(index); - TRANSPORT_LOGD("Send content interest %u", index); } } } @@ -508,6 +505,7 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); + sendInterest(std::move(interest)); return true; @@ -517,6 +515,7 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { interests_in_flight_++; interest_retransmissions_[interest->getName().getSuffix() & mask]++; + TRANSPORT_LOGD("Send interest %s", interest->getName().toString().c_str()); portal_->sendInterest(std::move(interest)); } |