diff options
author | Luca Muscariello <muscariello@ieee.org> | 2020-05-13 21:27:38 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@fd.io> | 2020-05-13 21:27:38 +0000 |
commit | dc5a4297aa5b329dc2536de1f6b4a6e7edc36693 (patch) | |
tree | 150e9a386a36e219285283341e202b6373cdcdb8 | |
parent | 76c0e9a9ac88ffc3df287cfb936d23cccad79a6e (diff) | |
parent | eb9119968cfc53f41526981924e5c8d44612f98a (diff) |
Merge "[HICN-595] Bring TLS up to date"
21 files changed, 600 insertions, 625 deletions
diff --git a/libtransport/includes/hicn/transport/core/packet.h b/libtransport/includes/hicn/transport/core/packet.h index e58e7962d..2efd7439d 100644 --- a/libtransport/includes/hicn/transport/core/packet.h +++ b/libtransport/includes/hicn/transport/core/packet.h @@ -148,15 +148,15 @@ class Packet : public std::enable_shared_from_this<Packet> { std::pair<const uint8_t *, std::size_t> getPayloadReference() const { int signature_size = 0; + if (_is_ah(format_)) { signature_size = (uint32_t)getSignatureSize(); } auto header_size = getHeaderSizeFromFormat(format_, signature_size); - auto payload_length = packet_->length() - header_size; + auto payload_length = payloadSize(); - return std::make_pair(packet_->data() + header_size, - payload_length); + return std::make_pair(packet_->data() + header_size, payload_length); } Packet &updateLength(std::size_t length = 0); @@ -229,6 +229,7 @@ class Packet : public std::enable_shared_from_this<Packet> { Packet &setTTL(uint8_t hops); uint8_t getTTL() const; + void separateHeaderPayload(); void resetPayload(); private: @@ -248,7 +249,6 @@ class Packet : public std::enable_shared_from_this<Packet> { } uint8_t *getSignature() const; - void separateHeaderPayload(); protected: Name name_; diff --git a/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h b/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h index 097b0a8c0..224493f00 100644 --- a/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h +++ b/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h @@ -23,10 +23,10 @@ namespace interface { class P2PSecureConsumerSocket : public ConsumerSocket { public: - P2PSecureConsumerSocket(int handshake_protocol, int protocol); + P2PSecureConsumerSocket(int handshake_protocol, int transport_protocol); ~P2PSecureConsumerSocket() = default; + void registerPrefix(const Prefix &producer_namespace); }; } // namespace interface - } // end namespace transport diff --git a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h index 0a6e9a43a..73cbb78b0 100644 --- a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h @@ -106,7 +106,7 @@ class ConsumerSocket { /** * This method will be called by the transport for understanding how many - * bytes it should read (at most) before notifying the application. + * bytes it should read before notifying the application. * * By default it reads 64 KB. */ diff --git a/libtransport/src/core/prefix.cc b/libtransport/src/core/prefix.cc index 30f780461..eea4aeb8b 100644 --- a/libtransport/src/core/prefix.cc +++ b/libtransport/src/core/prefix.cc @@ -211,8 +211,8 @@ Name Prefix::getName(const core::Name &mask, const core::Name &components, } } - if (this->contains(name_ip)) - throw errors::RuntimeException("Mask overrides the prefix"); + // if (this->contains(name_ip)) + // throw errors::RuntimeException("Mask overrides the prefix"); return Name(ip_prefix_.family, (uint8_t *)&name_ip); } diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc index 40ab58161..9b79850d6 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.cc +++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc @@ -33,10 +33,6 @@ void P2PSecureConsumerSocket::setInterestPayload( if (payload_ != NULL) int2.appendPayload(std::move(payload_)); } -// implement void readBufferAvailable(), size_t maxBufferSize() const override, -// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable() -// must be implemented even if empty. - /* Return the number of read bytes in the return param */ int readOld(BIO *b, char *buf, int size) { if (size < 0) return size; @@ -51,11 +47,13 @@ int readOld(BIO *b, char *buf, int size) { socket->network_name_.setSuffix(socket->random_suffix_); socket->ConsumerSocket::asyncConsume(socket->network_name_); } + if (!socket->something_to_read_) socket->cv_.wait(lck); } size_t size_to_read, read; size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) chain_size = socket->head_->computeChainDataLength(); @@ -106,6 +104,7 @@ int writeOld(BIO *b, const char *buf, int num) { socket = (P2PSecureConsumerSocket *)BIO_get_data(b); socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( ConsumerCallbacksOptions::INTEREST_OUTPUT, (ConsumerInterestCallback)std::bind( @@ -173,9 +172,9 @@ int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, P2PSecureConsumerSocket::P2PSecureConsumerSocket( interface::ConsumerSocket *consumer, int handshake_protocol, int transport_protocol) - : ConsumerSocket(consumer, transport_protocol), + : ConsumerSocket(consumer, handshake_protocol), name_(), - tls_consumer_(), + tls_consumer_(nullptr), buf_pool_(), decrypted_content_(), payload_(), @@ -224,12 +223,6 @@ P2PSecureConsumerSocket::P2PSecureConsumerSocket( BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - - ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - std::default_random_engine generator; std::uniform_int_distribution<int> distribution( 1, std::numeric_limits<uint32_t>::max()); @@ -244,76 +237,29 @@ P2PSecureConsumerSocket::~P2PSecureConsumerSocket() { SSL_shutdown(ssl_); } -int P2PSecureConsumerSocket::consume(const Name &name) { - if (transport_protocol_->isRunning()) { - return CONSUMER_BUSY; - } +int P2PSecureConsumerSocket::handshake() { + int result = 1; - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - network_name_ = producer_namespace_.getRandomName(); - network_name_.setSuffix(0); - int result = SSL_connect(this->ssl_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); + if (!(SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + return 1; } - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); - TLSConsumerSocket tls_consumer(nullptr, this->protocol_, this->ssl_); - tls_consumer.setInterface(new interface::TLSConsumerSocket(&tls_consumer)); - ConsumerTimerCallback *stats_summary_callback = nullptr; - this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_summary_callback); + ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - uint32_t lifetime; - this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); - tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - lifetime); - tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, - read_callback_decrypted_); - tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - *stats_summary_callback); - tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL, - this->timer_interval_milliseconds_); - tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - tls_consumer.connect(); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - if (payload_ != NULL) - return tls_consumer.consume((prefix->mapName(name)), std::move(payload_)); - else - return tls_consumer.consume((prefix->mapName(name))); -} + network_name_ = producer_namespace_.getRandomName(); + network_name_.setSuffix(0); -int P2PSecureConsumerSocket::asyncConsume(const Name &name) { - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - network_name_ = producer_namespace_.getRandomName(); - network_name_.setSuffix(0); - TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload); - int result = SSL_connect(this->ssl_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); - TRANSPORT_LOGD("Handshake performed!"); - } + TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); + result = SSL_connect(this->ssl_); - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + return result; +} +void P2PSecureConsumerSocket::initSessionSocket() { tls_consumer_ = std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_); tls_consumer_->setInterface( @@ -325,6 +271,7 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) { uint32_t lifetime; this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, @@ -336,6 +283,59 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) { tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_); tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); tls_consumer_->connect(); +} + +int P2PSecureConsumerSocket::consume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if (handshake() != 1) { + throw errors::RuntimeException("Unable to perform client handshake"); + } else { + TRANSPORT_LOGD("Handshake performed!"); + } + + initSessionSocket(); + + if (tls_consumer_ == nullptr) { + throw errors::RuntimeException("TLS socket does not exist"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + + if (payload_ != nullptr) + return tls_consumer_->consume((prefix->mapName(name)), std::move(payload_)); + else + return tls_consumer_->consume((prefix->mapName(name))); +} + +int P2PSecureConsumerSocket::asyncConsume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if (handshake() != 1) { + throw errors::RuntimeException("Unable to perform client handshake"); + } else { + TRANSPORT_LOGD("Handshake performed!"); + } + + initSessionSocket(); + + if (tls_consumer_ == nullptr) { + throw errors::RuntimeException("TLS socket does not exist"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); if (payload_ != NULL) return tls_consumer_->asyncConsume((prefix->mapName(name)), @@ -399,5 +399,4 @@ void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept { bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h index e2ebaf94e..d4c3b26c2 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.h +++ b/libtransport/src/implementation/p2psecure_socket_consumer.h @@ -69,39 +69,26 @@ class P2PSecureConsumerSocket : public ConsumerSocket, private: Name name_; std::shared_ptr<TLSConsumerSocket> tls_consumer_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; BIO_METHOD *bio_meth_; - /* Chain of MemBuf to be used as a temporary buffer to pass descypted data * from the underlying layer to the application */ utils::ObjectPool<utils::MemBuf> buf_pool_; std::unique_ptr<utils::MemBuf> decrypted_content_; - /* Chain of MemBuf holding the payload to be written into interest or data */ std::unique_ptr<utils::MemBuf> payload_; - /* Chain of MemBuf holding the data retrieved from the underlying layer */ std::unique_ptr<utils::MemBuf> head_; - bool something_to_read_; - bool content_downloaded_; - double old_max_win_; - double old_current_win_; - uint32_t random_suffix_; - ip_prefix_t secure_prefix_; - Prefix producer_namespace_; - interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; - std::mutex mtx_; /* Condition variable for the wait */ @@ -138,9 +125,12 @@ class P2PSecureConsumerSocket : public ConsumerSocket, virtual void readError(const std::error_code ec) noexcept override; virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; - int download_content(const Name &name); + int handshake(); + + void initSessionSocket(); }; } // namespace implementation diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc index d0852539a..15c7d25cd 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.cc +++ b/libtransport/src/implementation/p2psecure_socket_producer.cc @@ -37,9 +37,8 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( : ProducerSocket(producer_socket), mtx_(), cv_(), - map_secure_producers(), - map_secure_rtc_producers(), - list_secure_producers() {} + map_producers(), + list_producers() {} P2PSecureProducerSocket::P2PSecureProducerSocket( interface::ProducerSocket *producer_socket, bool rtc, @@ -48,12 +47,9 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( rtc_(rtc), mtx_(), cv_(), - map_secure_producers(), - map_secure_rtc_producers(), - list_secure_producers() { - /* - * Setup SSL context (identity and parameter to use TLS 1.3) - */ + map_producers(), + list_producers() { + /* Setup SSL context (identity and parameter to use TLS 1.3) */ der_cert_ = parcKeyStore_GetDEREncodedCertificate( (identity->getSigner()->getKeyStore())); der_prk_ = parcKeyStore_GetDEREncodedPrivateKey( @@ -68,10 +64,8 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( cert_509_ = d2i_X509(NULL, &cert, cert_size); pkey_rsa_ = d2i_AutoPrivateKey(NULL, &prk, prk_size); - /* - * Set the callback so that when an interest is received we catch it and we - * decrypt the payload before passing it to the application. - */ + /* Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. */ ProducerSocket::setSocketOption( ProducerCallbacksOptions::INTEREST_INPUT, (ProducerInterestCallback)std::bind( @@ -84,53 +78,61 @@ P2PSecureProducerSocket::~P2PSecureProducerSocket() { if (der_prk_) parcBuffer_Release(&der_prk_); } +void P2PSecureProducerSocket::initSessionSocket( + std::unique_ptr<TLSProducerSocket> &producer) { + producer->on_content_produced_application_ = + this->on_content_produced_application_; + producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, + this->content_object_expiry_time_); + producer->setSocketOption(SIGNER, this->signer_); + producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); + producer->setSocketOption(DATA_PACKET_SIZE, + (uint32_t)(this->data_packet_size_)); + producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); + + if (!rtc_) { + producer->setInterface(new interface::TLSProducerSocket(producer.get())); + } else { + TLSRTCProducerSocket *rtc_producer = + dynamic_cast<TLSRTCProducerSocket *>(producer.get()); + rtc_producer->setInterface( + new interface::TLSRTCProducerSocket(rtc_producer)); + } +} + void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p, Interest &interest) { std::unique_lock<std::mutex> lck(mtx_); + std::unique_ptr<TLSProducerSocket> tls_producer; + auto it = map_producers.find(interest.getName()); + + if (it != map_producers.end()) { + return; + } + + if (!rtc_) { + tls_producer = + std::make_unique<TLSProducerSocket>(nullptr, this, interest.getName()); + } else { + tls_producer = std::make_unique<TLSRTCProducerSocket>(nullptr, this, + interest.getName()); + } + + initSessionSocket(tls_producer); + TLSProducerSocket *tls_producer_ptr = tls_producer.get(); + map_producers.insert({interest.getName(), move(tls_producer)}); TRANSPORT_LOGD("Start handshake at %s", interest.getName().toString().c_str()); + if (!rtc_) { - auto it = map_secure_producers.find(interest.getName()); - if (it != map_secure_producers.end()) return; - TLSProducerSocket *tls_producer = - new TLSProducerSocket(nullptr, this, interest.getName()); - tls_producer->setInterface(new interface::TLSProducerSocket(tls_producer)); - - tls_producer->on_content_produced_application_ = - this->on_content_produced_application_; - tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, - this->content_object_expiry_time_); - tls_producer->setSocketOption(SIGNER, this->signer_); - tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); - tls_producer->setSocketOption(DATA_PACKET_SIZE, - (uint32_t)(this->data_packet_size_)); - tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); - map_secure_producers.insert( - {interest.getName(), std::unique_ptr<TLSProducerSocket>(tls_producer)}); - tls_producer->onInterest(*tls_producer, interest); - tls_producer->async_accept(); + tls_producer_ptr->onInterest(*tls_producer_ptr, interest); + tls_producer_ptr->async_accept(); } else { - auto it = map_secure_rtc_producers.find(interest.getName()); - if (it != map_secure_rtc_producers.end()) return; - TLSRTCProducerSocket *tls_producer = - new TLSRTCProducerSocket(nullptr, this, interest.getName()); - tls_producer->setInterface( - new interface::TLSRTCProducerSocket(tls_producer)); - tls_producer->on_content_produced_application_ = - this->on_content_produced_application_; - tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, - this->content_object_expiry_time_); - tls_producer->setSocketOption(SIGNER, this->signer_); - tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); - tls_producer->setSocketOption(DATA_PACKET_SIZE, - (uint32_t)(this->data_packet_size_)); - tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); - map_secure_rtc_producers.insert( - {interest.getName(), - std::unique_ptr<TLSRTCProducerSocket>(tls_producer)}); - tls_producer->onInterest(*tls_producer, interest); - tls_producer->async_accept(); + TLSRTCProducerSocket *rtc_producer_ptr = + dynamic_cast<TLSRTCProducerSocket *>(tls_producer_ptr); + rtc_producer_ptr->onInterest(*rtc_producer_ptr, interest); + rtc_producer_ptr->async_accept(); } } @@ -143,11 +145,13 @@ void P2PSecureProducerSocket::produce(const uint8_t *buffer, } std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_rtc_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_rtc_producers.cbegin(); - it != list_secure_rtc_producers.cend(); it++) { - (*it)->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); + if (list_producers.empty()) cv_.wait(lck); + + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { + TLSRTCProducerSocket *rtc_producer = + dynamic_cast<TLSRTCProducerSocket *>(it->get()); + rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); } } @@ -162,12 +166,13 @@ uint32_t P2PSecureProducerSocket::produce( std::unique_lock<std::mutex> lck(mtx_); uint32_t segments = 0; - if (list_secure_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (list_producers.empty()) cv_.wait(lck); + + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) segments += (*it)->produce(content_name, buffer->clone(), is_last, start_offset); + return segments; } @@ -183,12 +188,12 @@ uint32_t P2PSecureProducerSocket::produce(Name content_name, std::unique_lock<std::mutex> lck(mtx_); uint32_t segments = 0; - if (list_secure_producers.empty()) cv_.wait(lck); + if (list_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) segments += (*it)->produce(content_name, buffer, buffer_size, is_last, start_offset); + return segments; } @@ -203,10 +208,9 @@ void P2PSecureProducerSocket::asyncProduce(const Name &content_name, } std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_producers.empty()) cv_.wait(lck); + if (list_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { (*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset); } } @@ -221,22 +225,19 @@ void P2PSecureProducerSocket::asyncProduce( } std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_producers.empty()) cv_.wait(lck); + if (list_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset, last_segment); } } -// Socket Option Redefinition to avoid name hiding - +/* Redefinition of socket options to avoid name hiding */ int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerInterestCallback socket_option_value) { - if (!list_secure_producers.empty()) { - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); } @@ -269,9 +270,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, const std::shared_ptr<utils::Signer> &socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); switch (socket_option_key) { @@ -288,9 +288,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption(int socket_option_key, uint32_t socket_option_value) { - if (!list_secure_producers.empty()) { - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); } switch (socket_option_key) { @@ -305,9 +304,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, int P2PSecureProducerSocket::setSocketOption(int socket_option_key, bool socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -316,9 +314,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, int P2PSecureProducerSocket::setSocketOption(int socket_option_key, Name *socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -327,9 +324,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, int P2PSecureProducerSocket::setSocketOption( int socket_option_key, std::list<Prefix> socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -338,9 +334,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerContentObjectCallback socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -349,9 +344,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerContentCallback socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); switch (socket_option_key) { @@ -368,9 +362,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, utils::CryptoHashType socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -379,9 +372,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, utils::CryptoSuite socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -390,9 +382,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, const std::string &socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -400,5 +391,4 @@ int P2PSecureProducerSocket::setSocketOption( } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h index 33339deba..bfc9fc2c1 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.h +++ b/libtransport/src/implementation/p2psecure_socket_producer.h @@ -37,9 +37,11 @@ class P2PSecureProducerSocket : public ProducerSocket { public: explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket); + explicit P2PSecureProducerSocket( interface::ProducerSocket *producer_socket, bool rtc, const std::shared_ptr<utils::Identity> &identity); + ~P2PSecureProducerSocket(); void produce(const uint8_t *buffer, size_t buffer_size) override; @@ -96,7 +98,6 @@ class P2PSecureProducerSocket : public ProducerSocket { using ProducerSocket::onInterest; protected: - bool rtc_; /* Callback invoked once an interest has been received and its payload * decrypted */ ProducerInterestCallback on_interest_input_decrypted_; @@ -104,27 +105,23 @@ class P2PSecureProducerSocket : public ProducerSocket { ProducerContentCallback on_content_produced_application_; private: + bool rtc_; std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - PARCBuffer *der_cert_; PARCBuffer *der_prk_; X509 *cert_509_; EVP_PKEY *pkey_rsa_; std::unordered_map<core::Name, std::unique_ptr<TLSProducerSocket>, core::hash<core::Name>, core::compare2<core::Name>> - map_secure_producers; - std::unordered_map<core::Name, std::unique_ptr<TLSRTCProducerSocket>, - core::hash<core::Name>, core::compare2<core::Name>> - map_secure_rtc_producers; - std::list<std::unique_ptr<TLSProducerSocket>> list_secure_producers; - std::list<std::unique_ptr<TLSRTCProducerSocket>> list_secure_rtc_producers; + map_producers; + std::list<std::unique_ptr<TLSProducerSocket>> list_producers; void onInterestCallback(interface::ProducerSocket &p, Interest &interest); + + void initSessionSocket(std::unique_ptr<TLSProducerSocket> &producer); }; } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index 9ccc59d9e..8c5c453fc 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -131,7 +131,7 @@ class ProducerSocket : public Socket<BasePortal>, // TODO Manifest may still be used for indexing if (making_manifest && !signer) { - TRANSPORT_LOGD("Making manifests without setting producer identity."); + TRANSPORT_LOGE("Making manifests without setting producer identity."); } core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; @@ -192,7 +192,6 @@ class ProducerSocket : public Socket<BasePortal>, } } - TRANSPORT_LOGD("--------- START PRODUCE ----------"); for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments; packaged_segments++) { if (making_manifest) { @@ -207,13 +206,12 @@ class ProducerSocket : public Socket<BasePortal>, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); + TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); // Send content objects stored in the queue while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %u", - content_queue_.front()->getName().getSuffix()); + TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } @@ -270,8 +268,7 @@ class ProducerSocket : public Socket<BasePortal>, signer->sign(*content_object); } passContentObjectToCallbacks(content_object); - TRANSPORT_LOGD("Send content %u", - content_object->getName().getSuffix()); + TRANSPORT_LOGD("Send content %s", content_object->getName().toString().c_str()); } } @@ -286,11 +283,11 @@ class ProducerSocket : public Socket<BasePortal>, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); + TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); + while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %u", - content_queue_.front()->getName().getSuffix()); + TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } } @@ -949,18 +946,19 @@ class ProducerSocket : public Socket<BasePortal>, std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; - std::unique_lock<std::mutex> lck(mtx); + bool done = false; io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); - - if (!done) { - cv.wait(lck); - } + cv.notify_all(); }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } } else { result = func(socket_option_key, socket_option_value); } diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc index 3b3152993..9ef79ca23 100644 --- a/libtransport/src/implementation/tls_rtc_socket_producer.cc +++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc @@ -53,7 +53,7 @@ int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) { (socket->cv_).wait(lck); } - utils::MemBuf *membuf = socket->packet_->next(); + utils::MemBuf *membuf = socket->handshake_packet_->next(); int size_to_read; if ((int)membuf->length() > size) { @@ -91,10 +91,10 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { TLSRTCProducerSocket *socket; socket = (TLSRTCProducerSocket *)BIO_get_data(b); - if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && - socket->first_) { - socket->tls_chunks_--; + if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { bool making_manifest = socket->parent_->making_manifest_; + + socket->tls_chunks_--; socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, false); socket->parent_->ProducerSocket::produce( @@ -107,13 +107,17 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { std::unique_ptr<utils::MemBuf> mbuf = utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { socket->to_call_oncontentproduced_--; auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->RTCProducerSocket::produce(std::move(mbuf)); + ProducerContentCallback on_content_produced_application; socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && on_content_produced_application) { on_content_produced_application( @@ -144,24 +148,28 @@ TLSRTCProducerSocket::TLSRTCProducerSocket( } void TLSRTCProducerSocket::accept() { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { tls_chunks_ = 1; int result = SSL_accept(ssl_); + if (result != 1) throw errors::RuntimeException("Unable to perform client handshake"); } TRANSPORT_LOGD("Handshake performed!"); - parent_->list_secure_rtc_producers.push_front( - std::move(parent_->map_secure_rtc_producers[handshake_name_])); - parent_->map_secure_rtc_producers.erase(handshake_name_); + + parent_->list_producers.push_front( + std::move(parent_->map_producers[handshake_name_])); + parent_->map_producers.erase(handshake_name_); ProducerInterestCallback on_interest_process_decrypted; getSocketOption(ProducerCallbacksOptions::CACHE_MISS, on_interest_process_decrypted); if (on_interest_process_decrypted) { - Interest inter(std::move(packet_)); + Interest inter(std::move(handshake_packet_)); on_interest_process_decrypted( (transport::interface::ProducerSocket &)(*getInterface()), inter); } @@ -181,7 +189,9 @@ int TLSRTCProducerSocket::async_accept() { } void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state != SERVER_FINISHED) { throw errors::RuntimeException( "New handshake on the same P2P secure producer socket not supported"); } @@ -197,5 +207,4 @@ void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc index 95b287aa6..7cf653848 100644 --- a/libtransport/src/implementation/tls_socket_consumer.cc +++ b/libtransport/src/implementation/tls_socket_consumer.cc @@ -46,11 +46,13 @@ int readOldTLS(BIO *b, char *buf, int size) { socket->network_name_.setSuffix(socket->random_suffix_); socket->ConsumerSocket::asyncConsume(socket->network_name_); } + if (!socket->something_to_read_) socket->cv_.wait(lck); } size_t size_to_read, read; size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) chain_size = socket->head_->computeChainDataLength(); @@ -101,6 +103,7 @@ int writeOldTLS(BIO *b, const char *buf, int num) { socket = (TLSConsumerSocket *)BIO_get_data(b); socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( ConsumerCallbacksOptions::INTEREST_OUTPUT, (ConsumerInterestCallback)std::bind( @@ -176,12 +179,6 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - - ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - std::default_random_engine generator; std::uniform_int_distribution<int> distribution( 1, std::numeric_limits<uint32_t>::max()); @@ -191,10 +188,8 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, this); }; -/* - * The producer interface is not owned by the application, so is TLSSocket task - * to deallocate the memory - */ +/* The producer interface is not owned by the application, so is TLSSocket task + * to deallocate the memory */ TLSConsumerSocket::~TLSConsumerSocket() { delete consumer_interface_; } int TLSConsumerSocket::consume(const Name &name, @@ -228,22 +223,15 @@ int TLSConsumerSocket::download_content(const Name &name) { something_to_read_ = false; content_downloaded_ = false; - decrypted_content_ = utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH); - uint8_t *buf = decrypted_content_->writableData(); - size_t size = 0; + std::size_t max_buffer_size = read_callback_decrypted_->maxBufferSize(); + std::size_t buffer_size = read_callback_decrypted_->maxBufferSize() + SSL3_RT_MAX_PLAIN_LENGTH; + decrypted_content_ = utils::MemBuf::createCombined(buffer_size); int result = -1; + std::size_t size = 0; while (!content_downloaded_ || something_to_read_) { - if (decrypted_content_->tailroom() < SSL3_RT_MAX_PLAIN_LENGTH) { - decrypted_content_->appendChain( - utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH)); - // decrypted_content_->computeChainDataLength(); - buf = decrypted_content_->prev()->writableData(); - } else { - buf = decrypted_content_->writableTail(); - } - - result = SSL_read(this->ssl_, buf, SSL3_RT_MAX_PLAIN_LENGTH); + result = SSL_read( + this->ssl_, decrypted_content_->writableTail(), SSL3_RT_MAX_PLAIN_LENGTH); /* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of * the data has been fully downloaded */ @@ -253,20 +241,20 @@ int TLSConsumerSocket::download_content(const Name &name) { if (result >= 0) { size += result; - decrypted_content_->prepend(result); - } else + decrypted_content_->append(result); + } else { throw errors::RuntimeException("Unable to download content"); + } - if (size >= read_callback_decrypted_->maxBufferSize()) { + if (decrypted_content_->length() >= max_buffer_size) { if (read_callback_decrypted_->isBufferMovable()) { - // No need to perform an additional copy. The whole buffer will be - // tranferred to the application. - + /* No need to perform an additional copy. The whole buffer will be + * tranferred to the application. */ read_callback_decrypted_->readBufferAvailable( std::move(decrypted_content_)); - decrypted_content_ = utils::MemBuf::create(SSL3_RT_MAX_PLAIN_LENGTH); + decrypted_content_ = utils::MemBuf::create(buffer_size); } else { - // The buffer will be copied into the application-provided buffer + /* The buffer will be copied into the application-provided buffer */ uint8_t *buffer; std::size_t length; std::size_t total_length = decrypted_content_->length(); @@ -358,6 +346,7 @@ size_t TLSConsumerSocket::maxBufferSize() const { void TLSConsumerSocket::readBufferAvailable( std::unique_ptr<utils::MemBuf> &&buffer) noexcept { std::unique_lock<std::mutex> lck(this->mtx_); + if (head_) { head_->prependChain(std::move(buffer)); } else { @@ -380,5 +369,4 @@ void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept { bool TLSConsumerSocket::isBufferMovable() noexcept { return true; } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h index 2e88dc47e..6931a7a8a 100644 --- a/libtransport/src/implementation/tls_socket_consumer.h +++ b/libtransport/src/implementation/tls_socket_consumer.h @@ -69,42 +69,27 @@ class TLSConsumerSocket : public ConsumerSocket, private: Name name_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; - /* Chain of MemBuf to be used as a temporary buffer to pass descypted data * from the underlying layer to the application */ utils::ObjectPool<utils::MemBuf> buf_pool_; std::unique_ptr<utils::MemBuf> decrypted_content_; - - /* Chain of MemBuf holding the payload to be written into interest or data - */ + /* Chain of MemBuf holding the payload to be written into interest or data */ std::unique_ptr<utils::MemBuf> payload_; - /* Chain of MemBuf holding the data retrieved from the underlying layer */ std::unique_ptr<utils::MemBuf> head_; - bool something_to_read_; - bool content_downloaded_; - double old_max_win_; - double old_current_win_; - uint32_t random_suffix_; - Prefix producer_namespace_; - interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; - std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - utils::EventThread async_downloader_tls_; void setInterestPayload(interface::ConsumerSocket &c, @@ -123,11 +108,11 @@ class TLSConsumerSocket : public ConsumerSocket, virtual void readError(const std::error_code ec) noexcept override; virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; int download_content(const Name &name); }; } // namespace implementation - -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc index 9a5b94a1c..339a1ad58 100644 --- a/libtransport/src/implementation/tls_socket_producer.cc +++ b/libtransport/src/implementation/tls_socket_producer.cc @@ -48,18 +48,17 @@ int TLSProducerSocket::readOld(BIO *b, char *buf, int size) { TLSProducerSocket *socket; socket = (TLSProducerSocket *)BIO_get_data(b); - /* take a lock on the mutex. It will be unlocked by */ std::unique_lock<std::mutex> lck(socket->mtx_); + if (!socket->something_to_read_) { (socket->cv_).wait(lck); } - /* Either there already is something to read, or the thread has been waken up - */ - /* must return the payload in the interest */ - - utils::MemBuf *membuf = socket->packet_->next(); + /* Either there already is something to read, or the thread has been waken up. + * We must return the payload in the interest anyway */ + utils::MemBuf *membuf = socket->handshake_packet_->next(); int size_to_read; + if ((int)membuf->length() > size) { size_to_read = size; } else { @@ -97,11 +96,11 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { TLSProducerSocket *socket; socket = (TLSProducerSocket *)BIO_get_data(b); - if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && - socket->first_) { + if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { + bool making_manifest = socket->parent_->making_manifest_; + //! socket->tls_chunks_ corresponds to is_last socket->tls_chunks_--; - bool making_manifest = socket->parent_->making_manifest_; socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, false); socket->parent_->ProducerSocket::produce( @@ -116,16 +115,21 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { std::unique_ptr<utils::MemBuf> mbuf = utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { + auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->tls_chunks_--; socket->to_call_oncontentproduced_--; - auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->last_segment_ += socket->ProducerSocket::produce( socket->name_, std::move(mbuf), socket->tls_chunks_ == 0, socket->last_segment_); + ProducerContentCallback on_content_produced_application; socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && on_content_produced_application) { on_content_produced_application(*socket->getInterface(), @@ -144,8 +148,10 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, on_content_produced_application_(), mtx_(), cv_(), - something_to_read_(), + something_to_read_(false), + handshake_state_(UNINITIATED), name_(), + handshake_packet_(), last_segment_(0), parent_(parent), first_(true), @@ -157,9 +163,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, const SSL_METHOD *meth = TLS_server_method(); ctx_ = SSL_CTX_new(meth); - /* - * Setup SSL context (identity and parameter to use TLS 1.3) - */ + /* Setup SSL context (identity and parameter to use TLS 1.3) */ SSL_CTX_use_certificate(ctx_, parent->cert_509_); SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_); @@ -167,6 +171,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, SSL_CTX_set_ciphersuites(ctx_, "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" "SHA256:TLS_AES_128_GCM_SHA256"); + if (result != 1) { throw errors::RuntimeException( "Unable to set cipher list on TLS subsystem. Aborting."); @@ -184,10 +189,9 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, this, TLSProducerSocket::parseHicnKeyIdCb, NULL); ssl_ = SSL_new(ctx_); - /* - * Setup this producer socker as the bio that TLS will use to write and read - * data (in stream mode) - */ + + /* Setup this producer socker as the bio that TLS will use to write and read + * data (in stream mode) */ BIO_METHOD *bio_meth = BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket"); BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld); @@ -197,15 +201,15 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, BIO_set_init(bio, 1); BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - /* - * Set the callback so that when an interest is received we catch it and we - * decrypt the payload before passing it to the application. - */ + + /* Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. */ this->ProducerSocket::setSocketOption( ProducerCallbacksOptions::CACHE_MISS, (ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this, std::placeholders::_1, std::placeholders::_2)); + this->ProducerSocket::setSocketOption( ProducerCallbacksOptions::CONTENT_PRODUCED, (ProducerContentCallback)bind( @@ -213,35 +217,39 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, std::placeholders::_2, std::placeholders::_3)); } -/* - * The producer interface is not owned by the application, so is TLSSocket task - * to deallocate the memory - */ +/* The producer interface is not owned by the application, so is TLSSocket task + * to deallocate the memory */ TLSProducerSocket::~TLSProducerSocket() { delete producer_interface_; } void TLSProducerSocket::accept() { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { tls_chunks_ = 1; int result = SSL_accept(ssl_); + if (result != 1) throw errors::RuntimeException("Unable to perform client handshake"); } - TRANSPORT_LOGD("Handshake performed!"); - parent_->list_secure_producers.push_front( - std::move(parent_->map_secure_producers[handshake_name_])); - parent_->map_secure_producers.erase(handshake_name_); + + parent_->list_producers.push_front( + std::move(parent_->map_producers[handshake_name_])); + parent_->map_producers.erase(handshake_name_); ProducerInterestCallback on_interest_process_decrypted; getSocketOption(ProducerCallbacksOptions::CACHE_MISS, on_interest_process_decrypted); if (on_interest_process_decrypted) { - Interest inter(std::move(packet_)); + Interest inter(std::move(handshake_packet_)); on_interest_process_decrypted(*getInterface(), inter); } else { throw errors::RuntimeException( - "On interest process unset. Unable to perform handshake"); + "On interest process unset: unable to perform handshake"); } + + handshake_state_ = SERVER_FINISHED; + TRANSPORT_LOGD("Handshake performed!"); } int TLSProducerSocket::async_accept() { @@ -249,71 +257,88 @@ int TLSProducerSocket::async_accept() { async_thread_.add([this]() { this->accept(); }); } else { throw errors::RuntimeException( - "Async thread not running, impossible to perform handshake"); + "Async thread not running: unable to perform handshake"); } return 1; } void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) { - /* Based on the state machine of (D)TLS, we know what action to do */ - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { std::unique_lock<std::mutex> lck(mtx_); + name_ = interest.getName(); + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - packet_ = interest.acquireMemBufReference(); - if (head_) { - payload_->prependChain(interest.getPayload()); - } else { - payload_ = interest.getPayload(); // std::move(interest.getPayload()); - } + cv_.notify_one(); - } else { - name_ = interest.getName(); - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + return; + } else if (handshake_state == SERVER_FINISHED) { + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - if (interest.getPayload()->length() > 0) + if (interest.getPayload()->length() > 0) { SSL_read( ssl_, const_cast<unsigned char *>(interest.getPayload()->writableData()), interest.getPayload()->length()); - } + } + + ProducerInterestCallback on_interest_input_decrypted; + getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, + on_interest_input_decrypted); - ProducerInterestCallback on_interest_input_decrypted; - getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, - on_interest_input_decrypted); - if (on_interest_input_decrypted) - (on_interest_input_decrypted)(*getInterface(), interest); + if (on_interest_input_decrypted) + (on_interest_input_decrypted)(*getInterface(), interest); + } } void TLSProducerSocket::cacheMiss(interface::ProducerSocket &p, Interest &interest) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == CLIENT_HELLO) { std::unique_lock<std::mutex> lck(mtx_); - name_ = interest.getName(); + + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + handshake_state_ = CLIENT_FINISHED; + cv_.notify_one(); - } else { - name_ = interest.getName(); - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + } else if (handshake_state == SERVER_FINISHED) { + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - if (interest.getPayload()->length() > 0) + if (interest.getPayload()->length() > 0) { SSL_read( ssl_, const_cast<unsigned char *>(interest.getPayload()->writableData()), interest.getPayload()->length()); + } if (on_interest_process_decrypted_ != VOID_HANDLER) on_interest_process_decrypted_(*getInterface(), interest); } } +TLSProducerSocket::HandshakeState TLSProducerSocket::getHandshakeState() { + if (SSL_in_before(ssl_)) { + handshake_state_ = UNINITIATED; + } + + if (SSL_in_init(ssl_) && handshake_state_ == UNINITIATED) { + handshake_state_ = CLIENT_HELLO; + } + + return handshake_state_; +} + void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p, const std::error_code &err, uint64_t bytes_written) {} @@ -321,13 +346,13 @@ void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p, uint32_t TLSProducerSocket::produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, uint32_t start_offset) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + if (getHandshakeState() != SERVER_FINISHED) { throw errors::RuntimeException( "New handshake on the same P2P secure producer socket not supported"); } + size_t buf_size = buffer->length(); name_ = served_namespaces_.front().mapName(content_name); - tls_chunks_ = to_call_oncontentproduced_ = ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); @@ -388,6 +413,7 @@ void TLSProducerSocket::produce(ContentObject &content_object) { long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) { if (cmd == BIO_CTRL_FLUSH) { } + return 1; } @@ -397,6 +423,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, X509 *x, size_t chainidx, int *al, void *add_arg) { TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg); + if (ext_type == 100) { ip_prefix_t ip_prefix = socket->parent_->served_namespaces_.front().toIpPrefixStruct(); @@ -425,6 +452,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, ip_address_t keyId_component = {}; u32 *mask_buf; u32 *keyId_component_buf; + switch (inet_family) { case AF_INET: mask_buf = &(mask.v4.as_u32); @@ -483,6 +511,7 @@ int TLSProducerSocket::setSocketOption( [this](int socket_option_key, ProducerInterestCallback socket_option_value) -> int { int result = SOCKET_OPTION_SET; + switch (socket_option_key) { case ProducerCallbacksOptions::INTEREST_INPUT: on_interest_input_decrypted_ = socket_option_value; @@ -508,6 +537,7 @@ int TLSProducerSocket::setSocketOption( result = SOCKET_OPTION_NOT_SET; break; } + return result; }); } @@ -607,5 +637,4 @@ int TLSProducerSocket::getSocketOption( } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h index e910c8259..2382e8695 100644 --- a/libtransport/src/implementation/tls_socket_producer.h +++ b/libtransport/src/implementation/tls_socket_producer.h @@ -84,47 +84,44 @@ class TLSProducerSocket : virtual public ProducerSocket { using ProducerSocket::setSocketOption; protected: + enum HandshakeState { + UNINITIATED, + CLIENT_HELLO, // when CLIENT_HELLO interest has been received + CLIENT_FINISHED, // when CLIENT_FINISHED interest has been received + SERVER_FINISHED, // when handshake is done + }; /* Callback invoked once an interest has been received and its payload * decrypted */ ProducerInterestCallback on_interest_input_decrypted_; ProducerInterestCallback on_interest_process_decrypted_; ProducerContentCallback on_content_produced_application_; - std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - /* Bool variable, true if there is something to read (an interest arrived) */ bool something_to_read_; - + /* Bool variable, true if CLIENT_FINISHED interest has been received */ + HandshakeState handshake_state_; /* First interest that open a secure connection */ transport::core::Name name_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; - - Packet::MemBufPtr packet_; - + Packet::MemBufPtr handshake_packet_; std::unique_ptr<utils::MemBuf> head_; std::uint32_t last_segment_; - std::shared_ptr<utils::MemBuf> payload_; std::uint32_t key_id_; - std::thread *handshake; P2PSecureProducerSocket *parent_; - bool first_; Name handshake_name_; int tls_chunks_; int to_call_oncontentproduced_; - bool still_writing_; - utils::EventThread encryption_thread_; void onInterest(ProducerSocket &p, Interest &interest); + void cacheMiss(interface::ProducerSocket &p, Interest &interest); /* Return the number of read bytes in readbytes */ @@ -156,8 +153,9 @@ class TLSProducerSocket : virtual public ProducerSocket { void onContentProduced(interface::ProducerSocket &p, const std::error_code &err, uint64_t bytes_written); + + HandshakeState getHandshakeState(); }; } // namespace implementation - } // end namespace transport diff --git a/libtransport/src/interfaces/p2psecure_socket_consumer.cc b/libtransport/src/interfaces/p2psecure_socket_consumer.cc index 2fa8bb6e3..038441dfc 100644 --- a/libtransport/src/interfaces/p2psecure_socket_consumer.cc +++ b/libtransport/src/interfaces/p2psecure_socket_consumer.cc @@ -21,11 +21,17 @@ namespace transport { namespace interface { P2PSecureConsumerSocket::P2PSecureConsumerSocket(int handshake_protocol, - int protocol) + int transport_protocol) : ConsumerSocket() { socket_ = std::unique_ptr<implementation::ConsumerSocket>( new implementation::P2PSecureConsumerSocket(this, handshake_protocol, - protocol)); + transport_protocol)); +} + +void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) { + implementation::P2PSecureConsumerSocket &secure_consumer_socket = + *(static_cast<implementation::P2PSecureConsumerSocket *>(socket_.get())); + secure_consumer_socket.registerPrefix(producer_namespace); } } // namespace interface diff --git a/libtransport/src/interfaces/tls_rtc_socket_producer.h b/libtransport/src/interfaces/tls_rtc_socket_producer.h index 434edb522..3ea84095b 100644 --- a/libtransport/src/interfaces/tls_rtc_socket_producer.h +++ b/libtransport/src/interfaces/tls_rtc_socket_producer.h @@ -28,9 +28,9 @@ namespace interface { class TLSRTCProducerSocket : public ProducerSocket { public: TLSRTCProducerSocket(implementation::TLSRTCProducerSocket *implementation); + ~TLSRTCProducerSocket(); }; } // namespace interface - -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/protocols/consumer.conf b/libtransport/src/protocols/consumer.conf index 1a366f32f..d0eab75ac 100644 --- a/libtransport/src/protocols/consumer.conf +++ b/libtransport/src/protocols/consumer.conf @@ -1,4 +1,4 @@ -; this file contais the parameters for RAAQM +; This file contains the parameters for RAAQM autotune = no lifetime = 500 retransmissions = 128 diff --git a/libtransport/src/protocols/incremental_indexer.cc b/libtransport/src/protocols/incremental_indexer.cc index e590b4fee..0872c4554 100644 --- a/libtransport/src/protocols/incremental_indexer.cc +++ b/libtransport/src/protocols/incremental_indexer.cc @@ -25,6 +25,8 @@ void IncrementalIndexer::onContentObject( core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { using namespace interface; + TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str()); + if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) { final_suffix_ = content_object->getName().getSuffix(); } @@ -50,4 +52,4 @@ void IncrementalIndexer::onContentObject( } } // namespace protocol -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/protocols/manifest_incremental_indexer.cc b/libtransport/src/protocols/manifest_incremental_indexer.cc index 1a2f9dec3..da835b577 100644 --- a/libtransport/src/protocols/manifest_incremental_indexer.cc +++ b/libtransport/src/protocols/manifest_incremental_indexer.cc @@ -37,10 +37,12 @@ ManifestIncrementalIndexer::ManifestIncrementalIndexer( void ManifestIncrementalIndexer::onContentObject( core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { - // Check if mainfiest or not + // Check if manifest or not if (content_object->getPayloadType() == PayloadType::MANIFEST) { + TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str()); onUntrustedManifest(std::move(interest), std::move(content_object)); } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + TRANSPORT_LOGD("Receive manifest %s", content_object->getName().toString().c_str()); onUntrustedContentObject(std::move(interest), std::move(content_object)); } } diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index 0a93dec44..f8da69ceb 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -460,7 +460,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() { // send at least one interest if there are retransmissions to perform and // there is no space left in the window sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Window full, retransmit one content interest"); interest_to_retransmit_.pop(); } @@ -470,7 +469,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() { while (interests_in_flight_ < current_window_size_) { if (interest_to_retransmit_.size() > 0) { sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Retransmit content interest"); interest_to_retransmit_.pop(); } else { index = index_manager_->getNextSuffix(); @@ -479,7 +477,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() { } sendInterest(index); - TRANSPORT_LOGD("Send content interest %u", index); } } } @@ -508,6 +505,7 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); + sendInterest(std::move(interest)); return true; @@ -517,6 +515,7 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { interests_in_flight_++; interest_retransmissions_[interest->getName().getSuffix() & mask]++; + TRANSPORT_LOGD("Send interest %s", interest->getName().toString().c_str()); portal_->sendInterest(std::move(interest)); } diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 151e4df3d..0b1578b6f 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -64,25 +64,15 @@ namespace interface { */ struct ClientConfiguration { ClientConfiguration() - : name("b001::abcd", 0), - verify(false), - beta(-1.f), - drop_factor(-1.f), - window(-1), - producer_certificate(""), - passphrase(""), - receive_buffer(nullptr), - receive_buffer_size_(128 * 1024), - download_size(0), - report_interval_milliseconds_(1000), - transport_protocol_(CBR), - rtc_(false), - test_mode_(false), + : name("b001::abcd", 0), verify(false), beta(-1.f), drop_factor(-1.f), + window(-1), producer_certificate(""), passphrase(""), + receive_buffer(nullptr), receive_buffer_size_(128 * 1024), + download_size(0), report_interval_milliseconds_(1000), + transport_protocol_(CBR), rtc_(false), test_mode_(false), #ifdef SECURE_HICNTRANSPORT secure_(false), #endif - producer_prefix_(), - interest_lifetime_(500) { + producer_prefix_(), interest_lifetime_(500) { } Name name; @@ -110,7 +100,7 @@ struct ClientConfiguration { * Class for handling the production rate for the RTC producer. */ class Rate { - public: +public: Rate() : rate_kbps_(0) {} Rate(const std::string &rate) { @@ -140,7 +130,7 @@ class Rate { (uint32_t)std::round(packet_size * 1000.0 * 8.0 / (double)rate_kbps_)); } - private: +private: float rate_kbps_; }; @@ -149,23 +139,13 @@ class Rate { */ struct ServerConfiguration { ServerConfiguration() - : name("b001::abcd/64"), - virtual_producer(true), - manifest(false), - live_production(false), - sign(false), - content_lifetime(600000000_U32), - content_object_size(1440), - download_size(20 * 1024 * 1024), - hash_algorithm(utils::CryptoHashType::SHA_256), - keystore_name(""), - passphrase(""), - keystore_password("cisco"), - multiphase_produce_(false), - rtc_(false), - interactive_(false), - production_rate_(std::string("2048kbps")), - payload_size_(1440) + : name("b001::abcd/64"), virtual_producer(true), manifest(false), + live_production(false), sign(false), content_lifetime(600000000_U32), + content_object_size(1440), download_size(20 * 1024 * 1024), + hash_algorithm(utils::CryptoHashType::SHA_256), keystore_name(""), + passphrase(""), keystore_password("cisco"), multiphase_produce_(false), + rtc_(false), interactive_(false), + production_rate_(std::string("2048kbps")), payload_size_(1440) #ifdef SECURE_HICNTRANSPORT , secure_(false) @@ -214,13 +194,10 @@ class HIperfClient { friend class KeyCallback; friend class RTCCallback; - public: +public: HIperfClient(const ClientConfiguration &conf) - : configuration_(conf), - total_duration_milliseconds_(0), - old_bytes_value_(0), - signals_(io_service_, SIGINT), - expected_seg_(0), + : configuration_(conf), total_duration_milliseconds_(0), + old_bytes_value_(0), signals_(io_service_, SIGINT), expected_seg_(0), lost_packets_(std::unordered_set<uint32_t>()), rtc_callback_(configuration_.rtc_ ? new RTCCallback(*this) : nullptr), callback_(configuration_.rtc_ ? nullptr : new Callback(*this)), @@ -234,13 +211,14 @@ class HIperfClient { void checkReceivedRtcContent(ConsumerSocket &c, const ContentObject &contentObject) { - if (!configuration_.test_mode_) return; + if (!configuration_.test_mode_) + return; uint32_t receivedSeg = contentObject.getName().getSuffix(); auto payload = contentObject.getPayload(); - if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK - // payload + if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK + // payload uint32_t *payloadPtr = (uint32_t *)payload->data(); uint32_t productionSeg = *(payloadPtr); uint32_t productionRate = *(++payloadPtr); @@ -299,7 +277,8 @@ class HIperfClient { void handleTimerExpiration(ConsumerSocket &c, const TransportStatistics &stats) { - if (configuration_.rtc_) return; + if (configuration_.rtc_) + return; const char separator = ' '; const int width = 20; @@ -361,7 +340,7 @@ class HIperfClient { configuration_.transport_protocol_ = CBR; } -#ifdef SECURE_HICNSOCKET +#ifdef SECURE_HICNTRANSPORT if (configuration_.secure_) { consumer_socket_ = std::make_shared<P2PSecureConsumerSocket>( RAAQM, configuration_.transport_protocol_); @@ -378,7 +357,7 @@ class HIperfClient { #endif consumer_socket_ = std::make_shared<ConsumerSocket>(configuration_.transport_protocol_); -#ifdef SECURE_HICNSOCKET +#ifdef SECURE_HICNTRANSPORT } #endif @@ -431,13 +410,15 @@ class HIperfClient { if (!configuration_.producer_certificate.empty()) { key_id_ = verifier->addKeyFromCertificate( configuration_.producer_certificate); - if (key_id_ == nullptr) return ERROR_SETUP; + if (key_id_ == nullptr) + return ERROR_SETUP; } if (!configuration_.passphrase.empty()) { key_id_ = verifier->addKeyFromPassphrase( configuration_.passphrase, utils::CryptoSuite::HMAC_SHA256); - if (key_id_ == nullptr) return ERROR_SETUP; + if (key_id_ == nullptr) + return ERROR_SETUP; } if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, @@ -528,11 +509,11 @@ class HIperfClient { return ERROR_SUCCESS; } - private: +private: class RTCCallback : public ConsumerSocket::ReadCallback { static constexpr std::size_t mtu = 1500; - public: + public: RTCCallback(HIperfClient &hiperf_client) : client_(hiperf_client) { client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); } @@ -559,12 +540,12 @@ class HIperfClient { std::cout << "Data successfully read" << std::endl; } - private: + private: HIperfClient &client_; }; class Callback : public ConsumerSocket::ReadCallback { - public: + public: Callback(HIperfClient &hiperf_client) : client_(hiperf_client) { client_.configuration_.receive_buffer = utils::MemBuf::create(client_.configuration_.receive_buffer_size_); @@ -610,14 +591,14 @@ class HIperfClient { client_.io_service_.stop(); } - private: + private: HIperfClient &client_; }; class KeyCallback : public ConsumerSocket::ReadCallback { static constexpr std::size_t read_size = 16 * 1024; - public: + public: KeyCallback(HIperfClient &hiperf_client) : client_(hiperf_client), key_(nullptr) {} @@ -643,14 +624,13 @@ class HIperfClient { client_.io_service_.stop(); } - bool verifyKey() { return !key_->empty(); } + bool validateKey() { return !key_->empty(); } void readSuccess(std::size_t total_size) noexcept override { std::cout << "Key size: " << total_size << " bytes" << std::endl; - afterRead(); } - void afterRead() { + void readKey() { std::shared_ptr<utils::Verifier> verifier = std::make_shared<utils::Verifier>(); verifier->addKeyFromPassphrase(*key_, utils::CryptoSuite::HMAC_SHA256); @@ -661,26 +641,30 @@ class HIperfClient { consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, verifier); } else { - std::cout << "Could not set verifier" << std::endl; + std::cout << "Consumer socket not set" << std::endl; return; } - if (consumer_socket_->verifyKeyPackets()) { - std::cout << "Verification of packet signatures successful" - << std::endl; + if (validateKey()) { + std::cout << "Key has been authenticated" << std::endl; } else { - std::cout << "Could not verify packet signatures" << std::endl; + std::cout << "Key could not be authenticated" << std::endl; return; } - std::cout << "Key retrieval done" << std::endl; + if (consumer_socket_->verifyKeyPackets()) { + std::cout << "Signatures of key packets are valid" << std::endl; + } else { + std::cout << "Signatures of key packets are not valid" << std::endl; + return; + } } void setConsumer(std::shared_ptr<ConsumerSocket> consumer_socket) { consumer_socket_ = consumer_socket; } - private: + private: HIperfClient &client_; std::unique_ptr<std::string> key_; std::shared_ptr<ConsumerSocket> consumer_socket_; @@ -699,7 +683,7 @@ class HIperfClient { RTCCallback *rtc_callback_; Callback *callback_; KeyCallback *key_callback_; -}; // namespace interface +}; // namespace interface /** * Hiperf server class: configure and setup an hicn producer following the @@ -708,19 +692,16 @@ class HIperfClient { class HIperfServer { const std::size_t log2_content_object_buffer_size = 8; - public: +public: HIperfServer(ServerConfiguration &conf) - : configuration_(conf), - signals_(io_service_, SIGINT), - rtc_timer_(io_service_), - unsatisfied_interests_(), + : configuration_(conf), signals_(io_service_, SIGINT), + rtc_timer_(io_service_), unsatisfied_interests_(), content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), content_objects_index_(0), mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), last_segment_(0), #ifndef _WIN32 - ptr_last_segment_(&last_segment_), - input_(io_service_), + ptr_last_segment_(&last_segment_), input_(io_service_), rtc_running_(false) #else ptr_last_segment_(&last_segment_) @@ -843,9 +824,10 @@ class HIperfServer { std::placeholders::_1, std::placeholders::_2)); } - std::shared_ptr<utils::Identity> getProducerIdentity( - std::string &keystore_name, std::string &keystore_password, - utils::CryptoHashType &hash_algorithm) { + std::shared_ptr<utils::Identity> + getProducerIdentity(std::string &keystore_name, + std::string &keystore_password, + utils::CryptoHashType &hash_algorithm) { if (access(keystore_name.c_str(), F_OK) != -1) { return std::make_shared<utils::Identity>(keystore_name, keystore_password, hash_algorithm); @@ -859,7 +841,7 @@ class HIperfServer { int setup() { int ret; -#ifdef SECURE_HICNSOCKET +#ifdef SECURE_HICNTRANSPORT if (configuration_.secure_) { auto identity = getProducerIdentity(configuration_.keystore_name, configuration_.keystore_password, @@ -873,7 +855,7 @@ class HIperfServer { } else { producer_socket_ = std::make_unique<ProducerSocket>(); } -#ifdef SECURE_HICNSOCKET +#ifdef SECURE_HICNTRANSPORT } #endif @@ -974,7 +956,8 @@ class HIperfServer { } void sendRTCContentObjectCallback(std::error_code ec) { - if (ec) return; + if (ec) + return; rtc_timer_.expires_from_now( configuration_.production_rate_.getMicrosecondsForPacket( configuration_.payload_size_)); @@ -1007,11 +990,11 @@ class HIperfServer { std::placeholders::_1)); } - input_buffer_.consume(length); // Remove newline from input. - asio::async_read_until( - input_, input_buffer_, '\n', - std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, - std::placeholders::_2)); + input_buffer_.consume(length); // Remove newline from input. + asio::async_read_until(input_, input_buffer_, '\n', + std::bind(&HIperfServer::handleInput, this, + std::placeholders::_1, + std::placeholders::_2)); } #endif @@ -1027,10 +1010,10 @@ class HIperfServer { if (configuration_.rtc_) { #ifndef _WIN32 if (configuration_.interactive_) { - asio::async_read_until( - input_, input_buffer_, '\n', - std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, - std::placeholders::_2)); + asio::async_read_until(input_, input_buffer_, '\n', + std::bind(&HIperfServer::handleInput, this, + std::placeholders::_1, + std::placeholders::_2)); } else { rtc_running_ = true; rtc_timer_.expires_from_now( @@ -1055,7 +1038,7 @@ class HIperfServer { return ERROR_SUCCESS; } - private: +private: ServerConfiguration configuration_; asio::io_service io_service_; asio::signal_set signals_; @@ -1072,7 +1055,7 @@ class HIperfServer { asio::streambuf input_buffer_; bool rtc_running_; #endif -}; // namespace interface +}; // namespace interface void usage() { std::cerr << "HIPERF - A tool for performing network throughput " @@ -1211,174 +1194,174 @@ int main(int argc, char *argv[]) { "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) != -1) { switch (opt) { - // Common - case 'D': { - daemon = true; - break; - } - case 'I': { - server_configuration.interactive_ = true; - break; - } + // Common + case 'D': { + daemon = true; + break; + } + case 'I': { + server_configuration.interactive_ = true; + break; + } #else while ((opt = getopt(argc, argv, "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) { switch (opt) { #endif - case 'f': { - log_file = optarg; - break; - } - case 'R': { - client_configuration.rtc_ = true; - server_configuration.rtc_ = true; - break; - } + case 'f': { + log_file = optarg; + break; + } + case 'R': { + client_configuration.rtc_ = true; + server_configuration.rtc_ = true; + break; + } - // Server or Client - case 'S': { - role -= 1; - break; - } - case 'C': { - role += 1; - break; - } - case 'k': { - server_configuration.passphrase = std::string(optarg); - client_configuration.passphrase = std::string(optarg); - server_configuration.sign = true; - options = -1; - break; - } + // Server or Client + case 'S': { + role -= 1; + break; + } + case 'C': { + role += 1; + break; + } + case 'k': { + server_configuration.passphrase = std::string(optarg); + client_configuration.passphrase = std::string(optarg); + server_configuration.sign = true; + options = -1; + break; + } - // Client specifc - case 'b': { - client_configuration.beta = std::stod(optarg); - options = 1; - break; - } - case 'd': { - client_configuration.drop_factor = std::stod(optarg); - options = 1; - break; - } - case 'W': { - client_configuration.window = std::stod(optarg); - options = 1; - break; - } - case 'M': { - client_configuration.receive_buffer_size_ = std::stoull(optarg); - options = 1; - break; - } + // Client specifc + case 'b': { + client_configuration.beta = std::stod(optarg); + options = 1; + break; + } + case 'd': { + client_configuration.drop_factor = std::stod(optarg); + options = 1; + break; + } + case 'W': { + client_configuration.window = std::stod(optarg); + options = 1; + break; + } + case 'M': { + client_configuration.receive_buffer_size_ = std::stoull(optarg); + options = 1; + break; + } #ifdef SECURE_HICNTRANSPORT - case 'P': { - client_configuration.producer_prefix_ = Prefix(optarg); - client_configuration.secure_ = true; - break; - } + case 'P': { + client_configuration.producer_prefix_ = Prefix(optarg); + client_configuration.secure_ = true; + break; + } #endif - case 'c': { - client_configuration.producer_certificate = std::string(optarg); - options = 1; - break; - } - case 'v': { - client_configuration.verify = true; - options = 1; - break; - } - case 'i': { - client_configuration.report_interval_milliseconds_ = std::stoul(optarg); - options = 1; - break; - } - case 't': { - client_configuration.test_mode_ = true; - options = 1; - break; - } - case 'L': { - client_configuration.interest_lifetime_ = std::stoul(optarg); - options = 1; - break; - } - // Server specific - case 'A': { - server_configuration.download_size = std::stoul(optarg); - options = -1; - break; - } - case 's': { - server_configuration.payload_size_ = std::stoul(optarg); - options = -1; - break; - } - case 'r': { - server_configuration.virtual_producer = false; - options = -1; - break; - } - case 'm': { - server_configuration.manifest = true; - options = -1; - break; - } - case 'l': { - server_configuration.live_production = true; - options = -1; - break; - } - case 'K': { - server_configuration.keystore_name = std::string(optarg); - server_configuration.sign = true; - options = -1; - break; - } - case 'y': { - if (strncasecmp(optarg, "sha256", 6) == 0) { - server_configuration.hash_algorithm = utils::CryptoHashType::SHA_256; - } else if (strncasecmp(optarg, "sha512", 6) == 0) { - server_configuration.hash_algorithm = utils::CryptoHashType::SHA_512; - } else if (strncasecmp(optarg, "crc32", 5) == 0) { - server_configuration.hash_algorithm = utils::CryptoHashType::CRC32C; - } else { - std::cerr << "Ignored unknown hash algorithm. Using SHA 256." - << std::endl; - } - options = -1; - break; - } - case 'p': { - server_configuration.keystore_password = std::string(optarg); - options = -1; - break; - } - case 'x': { - server_configuration.multiphase_produce_ = true; - options = -1; - break; - } - case 'B': { - auto str = std::string(optarg); - std::transform(str.begin(), str.end(), str.begin(), ::tolower); - server_configuration.production_rate_ = str; - options = -1; - break; + case 'c': { + client_configuration.producer_certificate = std::string(optarg); + options = 1; + break; + } + case 'v': { + client_configuration.verify = true; + options = 1; + break; + } + case 'i': { + client_configuration.report_interval_milliseconds_ = std::stoul(optarg); + options = 1; + break; + } + case 't': { + client_configuration.test_mode_ = true; + options = 1; + break; + } + case 'L': { + client_configuration.interest_lifetime_ = std::stoul(optarg); + options = 1; + break; + } + // Server specific + case 'A': { + server_configuration.download_size = std::stoul(optarg); + options = -1; + break; + } + case 's': { + server_configuration.payload_size_ = std::stoul(optarg); + options = -1; + break; + } + case 'r': { + server_configuration.virtual_producer = false; + options = -1; + break; + } + case 'm': { + server_configuration.manifest = true; + options = -1; + break; + } + case 'l': { + server_configuration.live_production = true; + options = -1; + break; + } + case 'K': { + server_configuration.keystore_name = std::string(optarg); + server_configuration.sign = true; + options = -1; + break; + } + case 'y': { + if (strncasecmp(optarg, "sha256", 6) == 0) { + server_configuration.hash_algorithm = utils::CryptoHashType::SHA_256; + } else if (strncasecmp(optarg, "sha512", 6) == 0) { + server_configuration.hash_algorithm = utils::CryptoHashType::SHA_512; + } else if (strncasecmp(optarg, "crc32", 5) == 0) { + server_configuration.hash_algorithm = utils::CryptoHashType::CRC32C; + } else { + std::cerr << "Ignored unknown hash algorithm. Using SHA 256." + << std::endl; } + options = -1; + break; + } + case 'p': { + server_configuration.keystore_password = std::string(optarg); + options = -1; + break; + } + case 'x': { + server_configuration.multiphase_produce_ = true; + options = -1; + break; + } + case 'B': { + auto str = std::string(optarg); + std::transform(str.begin(), str.end(), str.begin(), ::tolower); + server_configuration.production_rate_ = str; + options = -1; + break; + } #ifdef SECURE_HICNTRANSPORT - case 'E': { - server_configuration.keystore_name = std::string(optarg); - server_configuration.secure_ = true; - break; - } + case 'E': { + server_configuration.keystore_name = std::string(optarg); + server_configuration.secure_ = true; + break; + } #endif - case 'h': - default: - usage(); - return EXIT_FAILURE; + case 'h': + default: + usage(); + return EXIT_FAILURE; } } @@ -1457,9 +1440,9 @@ int main(int argc, char *argv[]) { return 0; } -} // end namespace interface +} // end namespace interface -} // end namespace transport +} // end namespace transport int main(int argc, char *argv[]) { return transport::interface::main(argc, argv); |