diff options
author | Olivier Roques <oroques+fdio@cisco.com> | 2020-04-08 15:29:55 +0200 |
---|---|---|
committer | Olivier Roques <oroques+fdio@cisco.com> | 2020-04-11 17:25:30 +0200 |
commit | eb9119968cfc53f41526981924e5c8d44612f98a (patch) | |
tree | 065b282b91e48fc62a01f5de5a5fe1bd29092c5c | |
parent | 0ea5735b98f38beacf92dfdca74b7a6d5b3f7182 (diff) |
[HICN-595] Bring TLS up to date
HICN-2 would enable TLS only if OpenSSL 1.1.1 was present.
However the mechanism to do so was broken and hiperf always
ended up using normal consumer and producer sockets.
This patch fixes that by updating the build files. It also fixes
various bugs in the TLS implementation that went unnoticed and
cleans up the code.
Change-Id: Ifda75a9929e14460af43fe79d737d0c926bb671e
Signed-off-by: Olivier Roques <oroques+fdio@cisco.com>
Signed-off-by: Mauro Sardara <msardara@cisco.com>
21 files changed, 600 insertions, 625 deletions
diff --git a/libtransport/includes/hicn/transport/core/packet.h b/libtransport/includes/hicn/transport/core/packet.h index e58e7962d..2efd7439d 100644 --- a/libtransport/includes/hicn/transport/core/packet.h +++ b/libtransport/includes/hicn/transport/core/packet.h @@ -148,15 +148,15 @@ class Packet : public std::enable_shared_from_this<Packet> { std::pair<const uint8_t *, std::size_t> getPayloadReference() const { int signature_size = 0; + if (_is_ah(format_)) { signature_size = (uint32_t)getSignatureSize(); } auto header_size = getHeaderSizeFromFormat(format_, signature_size); - auto payload_length = packet_->length() - header_size; + auto payload_length = payloadSize(); - return std::make_pair(packet_->data() + header_size, - payload_length); + return std::make_pair(packet_->data() + header_size, payload_length); } Packet &updateLength(std::size_t length = 0); @@ -229,6 +229,7 @@ class Packet : public std::enable_shared_from_this<Packet> { Packet &setTTL(uint8_t hops); uint8_t getTTL() const; + void separateHeaderPayload(); void resetPayload(); private: @@ -248,7 +249,6 @@ class Packet : public std::enable_shared_from_this<Packet> { } uint8_t *getSignature() const; - void separateHeaderPayload(); protected: Name name_; diff --git a/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h b/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h index 097b0a8c0..224493f00 100644 --- a/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h +++ b/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h @@ -23,10 +23,10 @@ namespace interface { class P2PSecureConsumerSocket : public ConsumerSocket { public: - P2PSecureConsumerSocket(int handshake_protocol, int protocol); + P2PSecureConsumerSocket(int handshake_protocol, int transport_protocol); ~P2PSecureConsumerSocket() = default; + void registerPrefix(const Prefix &producer_namespace); }; } // namespace interface - } // end namespace transport diff --git a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h index 0a6e9a43a..73cbb78b0 100644 --- a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h @@ -106,7 +106,7 @@ class ConsumerSocket { /** * This method will be called by the transport for understanding how many - * bytes it should read (at most) before notifying the application. + * bytes it should read before notifying the application. * * By default it reads 64 KB. */ diff --git a/libtransport/src/core/prefix.cc b/libtransport/src/core/prefix.cc index 30f780461..eea4aeb8b 100644 --- a/libtransport/src/core/prefix.cc +++ b/libtransport/src/core/prefix.cc @@ -211,8 +211,8 @@ Name Prefix::getName(const core::Name &mask, const core::Name &components, } } - if (this->contains(name_ip)) - throw errors::RuntimeException("Mask overrides the prefix"); + // if (this->contains(name_ip)) + // throw errors::RuntimeException("Mask overrides the prefix"); return Name(ip_prefix_.family, (uint8_t *)&name_ip); } diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc index 40ab58161..9b79850d6 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.cc +++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc @@ -33,10 +33,6 @@ void P2PSecureConsumerSocket::setInterestPayload( if (payload_ != NULL) int2.appendPayload(std::move(payload_)); } -// implement void readBufferAvailable(), size_t maxBufferSize() const override, -// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable() -// must be implemented even if empty. - /* Return the number of read bytes in the return param */ int readOld(BIO *b, char *buf, int size) { if (size < 0) return size; @@ -51,11 +47,13 @@ int readOld(BIO *b, char *buf, int size) { socket->network_name_.setSuffix(socket->random_suffix_); socket->ConsumerSocket::asyncConsume(socket->network_name_); } + if (!socket->something_to_read_) socket->cv_.wait(lck); } size_t size_to_read, read; size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) chain_size = socket->head_->computeChainDataLength(); @@ -106,6 +104,7 @@ int writeOld(BIO *b, const char *buf, int num) { socket = (P2PSecureConsumerSocket *)BIO_get_data(b); socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( ConsumerCallbacksOptions::INTEREST_OUTPUT, (ConsumerInterestCallback)std::bind( @@ -173,9 +172,9 @@ int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, P2PSecureConsumerSocket::P2PSecureConsumerSocket( interface::ConsumerSocket *consumer, int handshake_protocol, int transport_protocol) - : ConsumerSocket(consumer, transport_protocol), + : ConsumerSocket(consumer, handshake_protocol), name_(), - tls_consumer_(), + tls_consumer_(nullptr), buf_pool_(), decrypted_content_(), payload_(), @@ -224,12 +223,6 @@ P2PSecureConsumerSocket::P2PSecureConsumerSocket( BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - - ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - std::default_random_engine generator; std::uniform_int_distribution<int> distribution( 1, std::numeric_limits<uint32_t>::max()); @@ -244,76 +237,29 @@ P2PSecureConsumerSocket::~P2PSecureConsumerSocket() { SSL_shutdown(ssl_); } -int P2PSecureConsumerSocket::consume(const Name &name) { - if (transport_protocol_->isRunning()) { - return CONSUMER_BUSY; - } +int P2PSecureConsumerSocket::handshake() { + int result = 1; - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - network_name_ = producer_namespace_.getRandomName(); - network_name_.setSuffix(0); - int result = SSL_connect(this->ssl_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); + if (!(SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + return 1; } - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); - TLSConsumerSocket tls_consumer(nullptr, this->protocol_, this->ssl_); - tls_consumer.setInterface(new interface::TLSConsumerSocket(&tls_consumer)); - ConsumerTimerCallback *stats_summary_callback = nullptr; - this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_summary_callback); + ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - uint32_t lifetime; - this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); - tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - lifetime); - tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, - read_callback_decrypted_); - tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - *stats_summary_callback); - tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL, - this->timer_interval_milliseconds_); - tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - tls_consumer.connect(); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - if (payload_ != NULL) - return tls_consumer.consume((prefix->mapName(name)), std::move(payload_)); - else - return tls_consumer.consume((prefix->mapName(name))); -} + network_name_ = producer_namespace_.getRandomName(); + network_name_.setSuffix(0); -int P2PSecureConsumerSocket::asyncConsume(const Name &name) { - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - network_name_ = producer_namespace_.getRandomName(); - network_name_.setSuffix(0); - TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload); - int result = SSL_connect(this->ssl_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); - TRANSPORT_LOGD("Handshake performed!"); - } + TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); + result = SSL_connect(this->ssl_); - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + return result; +} +void P2PSecureConsumerSocket::initSessionSocket() { tls_consumer_ = std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_); tls_consumer_->setInterface( @@ -325,6 +271,7 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) { uint32_t lifetime; this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, @@ -336,6 +283,59 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) { tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_); tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); tls_consumer_->connect(); +} + +int P2PSecureConsumerSocket::consume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if (handshake() != 1) { + throw errors::RuntimeException("Unable to perform client handshake"); + } else { + TRANSPORT_LOGD("Handshake performed!"); + } + + initSessionSocket(); + + if (tls_consumer_ == nullptr) { + throw errors::RuntimeException("TLS socket does not exist"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + + if (payload_ != nullptr) + return tls_consumer_->consume((prefix->mapName(name)), std::move(payload_)); + else + return tls_consumer_->consume((prefix->mapName(name))); +} + +int P2PSecureConsumerSocket::asyncConsume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if (handshake() != 1) { + throw errors::RuntimeException("Unable to perform client handshake"); + } else { + TRANSPORT_LOGD("Handshake performed!"); + } + + initSessionSocket(); + + if (tls_consumer_ == nullptr) { + throw errors::RuntimeException("TLS socket does not exist"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); if (payload_ != NULL) return tls_consumer_->asyncConsume((prefix->mapName(name)), @@ -399,5 +399,4 @@ void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept { bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h index e2ebaf94e..d4c3b26c2 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.h +++ b/libtransport/src/implementation/p2psecure_socket_consumer.h @@ -69,39 +69,26 @@ class P2PSecureConsumerSocket : public ConsumerSocket, private: Name name_; std::shared_ptr<TLSConsumerSocket> tls_consumer_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; BIO_METHOD *bio_meth_; - /* Chain of MemBuf to be used as a temporary buffer to pass descypted data * from the underlying layer to the application */ utils::ObjectPool<utils::MemBuf> buf_pool_; std::unique_ptr<utils::MemBuf> decrypted_content_; - /* Chain of MemBuf holding the payload to be written into interest or data */ std::unique_ptr<utils::MemBuf> payload_; - /* Chain of MemBuf holding the data retrieved from the underlying layer */ std::unique_ptr<utils::MemBuf> head_; - bool something_to_read_; - bool content_downloaded_; - double old_max_win_; - double old_current_win_; - uint32_t random_suffix_; - ip_prefix_t secure_prefix_; - Prefix producer_namespace_; - interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; - std::mutex mtx_; /* Condition variable for the wait */ @@ -138,9 +125,12 @@ class P2PSecureConsumerSocket : public ConsumerSocket, virtual void readError(const std::error_code ec) noexcept override; virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; - int download_content(const Name &name); + int handshake(); + + void initSessionSocket(); }; } // namespace implementation diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc index d0852539a..15c7d25cd 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.cc +++ b/libtransport/src/implementation/p2psecure_socket_producer.cc @@ -37,9 +37,8 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( : ProducerSocket(producer_socket), mtx_(), cv_(), - map_secure_producers(), - map_secure_rtc_producers(), - list_secure_producers() {} + map_producers(), + list_producers() {} P2PSecureProducerSocket::P2PSecureProducerSocket( interface::ProducerSocket *producer_socket, bool rtc, @@ -48,12 +47,9 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( rtc_(rtc), mtx_(), cv_(), - map_secure_producers(), - map_secure_rtc_producers(), - list_secure_producers() { - /* - * Setup SSL context (identity and parameter to use TLS 1.3) - */ + map_producers(), + list_producers() { + /* Setup SSL context (identity and parameter to use TLS 1.3) */ der_cert_ = parcKeyStore_GetDEREncodedCertificate( (identity->getSigner()->getKeyStore())); der_prk_ = parcKeyStore_GetDEREncodedPrivateKey( @@ -68,10 +64,8 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( cert_509_ = d2i_X509(NULL, &cert, cert_size); pkey_rsa_ = d2i_AutoPrivateKey(NULL, &prk, prk_size); - /* - * Set the callback so that when an interest is received we catch it and we - * decrypt the payload before passing it to the application. - */ + /* Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. */ ProducerSocket::setSocketOption( ProducerCallbacksOptions::INTEREST_INPUT, (ProducerInterestCallback)std::bind( @@ -84,53 +78,61 @@ P2PSecureProducerSocket::~P2PSecureProducerSocket() { if (der_prk_) parcBuffer_Release(&der_prk_); } +void P2PSecureProducerSocket::initSessionSocket( + std::unique_ptr<TLSProducerSocket> &producer) { + producer->on_content_produced_application_ = + this->on_content_produced_application_; + producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, + this->content_object_expiry_time_); + producer->setSocketOption(SIGNER, this->signer_); + producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); + producer->setSocketOption(DATA_PACKET_SIZE, + (uint32_t)(this->data_packet_size_)); + producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); + + if (!rtc_) { + producer->setInterface(new interface::TLSProducerSocket(producer.get())); + } else { + TLSRTCProducerSocket *rtc_producer = + dynamic_cast<TLSRTCProducerSocket *>(producer.get()); + rtc_producer->setInterface( + new interface::TLSRTCProducerSocket(rtc_producer)); + } +} + void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p, Interest &interest) { std::unique_lock<std::mutex> lck(mtx_); + std::unique_ptr<TLSProducerSocket> tls_producer; + auto it = map_producers.find(interest.getName()); + + if (it != map_producers.end()) { + return; + } + + if (!rtc_) { + tls_producer = + std::make_unique<TLSProducerSocket>(nullptr, this, interest.getName()); + } else { + tls_producer = std::make_unique<TLSRTCProducerSocket>(nullptr, this, + interest.getName()); + } + + initSessionSocket(tls_producer); + TLSProducerSocket *tls_producer_ptr = tls_producer.get(); + map_producers.insert({interest.getName(), move(tls_producer)}); TRANSPORT_LOGD("Start handshake at %s", interest.getName().toString().c_str()); + if (!rtc_) { - auto it = map_secure_producers.find(interest.getName()); - if (it != map_secure_producers.end()) return; - TLSProducerSocket *tls_producer = - new TLSProducerSocket(nullptr, this, interest.getName()); - tls_producer->setInterface(new interface::TLSProducerSocket(tls_producer)); - - tls_producer->on_content_produced_application_ = - this->on_content_produced_application_; - tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, - this->content_object_expiry_time_); - tls_producer->setSocketOption(SIGNER, this->signer_); - tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); - tls_producer->setSocketOption(DATA_PACKET_SIZE, - (uint32_t)(this->data_packet_size_)); - tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); - map_secure_producers.insert( - {interest.getName(), std::unique_ptr<TLSProducerSocket>(tls_producer)}); - tls_producer->onInterest(*tls_producer, interest); - tls_producer->async_accept(); + tls_producer_ptr->onInterest(*tls_producer_ptr, interest); + tls_producer_ptr->async_accept(); } else { - auto it = map_secure_rtc_producers.find(interest.getName()); - if (it != map_secure_rtc_producers.end()) return; - TLSRTCProducerSocket *tls_producer = - new TLSRTCProducerSocket(nullptr, this, interest.getName()); - tls_producer->setInterface( - new interface::TLSRTCProducerSocket(tls_producer)); - tls_producer->on_content_produced_application_ = - this->on_content_produced_application_; - tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, - this->content_object_expiry_time_); - tls_producer->setSocketOption(SIGNER, this->signer_); - tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); - tls_producer->setSocketOption(DATA_PACKET_SIZE, - (uint32_t)(this->data_packet_size_)); - tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); - map_secure_rtc_producers.insert( - {interest.getName(), - std::unique_ptr<TLSRTCProducerSocket>(tls_producer)}); - tls_producer->onInterest(*tls_producer, interest); - tls_producer->async_accept(); + TLSRTCProducerSocket *rtc_producer_ptr = + dynamic_cast<TLSRTCProducerSocket *>(tls_producer_ptr); + rtc_producer_ptr->onInterest(*rtc_producer_ptr, interest); + rtc_producer_ptr->async_accept(); } } @@ -143,11 +145,13 @@ void P2PSecureProducerSocket::produce(const uint8_t *buffer, } std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_rtc_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_rtc_producers.cbegin(); - it != list_secure_rtc_producers.cend(); it++) { - (*it)->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); + if (list_producers.empty()) cv_.wait(lck); + + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { + TLSRTCProducerSocket *rtc_producer = + dynamic_cast<TLSRTCProducerSocket *>(it->get()); + rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); } } @@ -162,12 +166,13 @@ uint32_t P2PSecureProducerSocket::produce( std::unique_lock<std::mutex> lck(mtx_); uint32_t segments = 0; - if (list_secure_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (list_producers.empty()) cv_.wait(lck); + + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) segments += (*it)->produce(content_name, buffer->clone(), is_last, start_offset); + return segments; } @@ -183,12 +188,12 @@ uint32_t P2PSecureProducerSocket::produce(Name content_name, std::unique_lock<std::mutex> lck(mtx_); uint32_t segments = 0; - if (list_secure_producers.empty()) cv_.wait(lck); + if (list_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) segments += (*it)->produce(content_name, buffer, buffer_size, is_last, start_offset); + return segments; } @@ -203,10 +208,9 @@ void P2PSecureProducerSocket::asyncProduce(const Name &content_name, } std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_producers.empty()) cv_.wait(lck); + if (list_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { (*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset); } } @@ -221,22 +225,19 @@ void P2PSecureProducerSocket::asyncProduce( } std::unique_lock<std::mutex> lck(mtx_); - if (list_secure_producers.empty()) cv_.wait(lck); + if (list_producers.empty()) cv_.wait(lck); - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset, last_segment); } } -// Socket Option Redefinition to avoid name hiding - +/* Redefinition of socket options to avoid name hiding */ int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerInterestCallback socket_option_value) { - if (!list_secure_producers.empty()) { - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); } @@ -269,9 +270,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, const std::shared_ptr<utils::Signer> &socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); switch (socket_option_key) { @@ -288,9 +288,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption(int socket_option_key, uint32_t socket_option_value) { - if (!list_secure_producers.empty()) { - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) { + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); } switch (socket_option_key) { @@ -305,9 +304,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, int P2PSecureProducerSocket::setSocketOption(int socket_option_key, bool socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -316,9 +314,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, int P2PSecureProducerSocket::setSocketOption(int socket_option_key, Name *socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -327,9 +324,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, int P2PSecureProducerSocket::setSocketOption( int socket_option_key, std::list<Prefix> socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -338,9 +334,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerContentObjectCallback socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -349,9 +344,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerContentCallback socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); switch (socket_option_key) { @@ -368,9 +362,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, utils::CryptoHashType socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -379,9 +372,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, utils::CryptoSuite socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -390,9 +382,8 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, const std::string &socket_option_value) { - if (!list_secure_producers.empty()) - for (auto it = list_secure_producers.cbegin(); - it != list_secure_producers.cend(); it++) + if (!list_producers.empty()) + for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); return ProducerSocket::setSocketOption(socket_option_key, @@ -400,5 +391,4 @@ int P2PSecureProducerSocket::setSocketOption( } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h index 33339deba..bfc9fc2c1 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.h +++ b/libtransport/src/implementation/p2psecure_socket_producer.h @@ -37,9 +37,11 @@ class P2PSecureProducerSocket : public ProducerSocket { public: explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket); + explicit P2PSecureProducerSocket( interface::ProducerSocket *producer_socket, bool rtc, const std::shared_ptr<utils::Identity> &identity); + ~P2PSecureProducerSocket(); void produce(const uint8_t *buffer, size_t buffer_size) override; @@ -96,7 +98,6 @@ class P2PSecureProducerSocket : public ProducerSocket { using ProducerSocket::onInterest; protected: - bool rtc_; /* Callback invoked once an interest has been received and its payload * decrypted */ ProducerInterestCallback on_interest_input_decrypted_; @@ -104,27 +105,23 @@ class P2PSecureProducerSocket : public ProducerSocket { ProducerContentCallback on_content_produced_application_; private: + bool rtc_; std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - PARCBuffer *der_cert_; PARCBuffer *der_prk_; X509 *cert_509_; EVP_PKEY *pkey_rsa_; std::unordered_map<core::Name, std::unique_ptr<TLSProducerSocket>, core::hash<core::Name>, core::compare2<core::Name>> - map_secure_producers; - std::unordered_map<core::Name, std::unique_ptr<TLSRTCProducerSocket>, - core::hash<core::Name>, core::compare2<core::Name>> - map_secure_rtc_producers; - std::list<std::unique_ptr<TLSProducerSocket>> list_secure_producers; - std::list<std::unique_ptr<TLSRTCProducerSocket>> list_secure_rtc_producers; + map_producers; + std::list<std::unique_ptr<TLSProducerSocket>> list_producers; void onInterestCallback(interface::ProducerSocket &p, Interest &interest); + + void initSessionSocket(std::unique_ptr<TLSProducerSocket> &producer); }; } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index 9ccc59d9e..8c5c453fc 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -131,7 +131,7 @@ class ProducerSocket : public Socket<BasePortal>, // TODO Manifest may still be used for indexing if (making_manifest && !signer) { - TRANSPORT_LOGD("Making manifests without setting producer identity."); + TRANSPORT_LOGE("Making manifests without setting producer identity."); } core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; @@ -192,7 +192,6 @@ class ProducerSocket : public Socket<BasePortal>, } } - TRANSPORT_LOGD("--------- START PRODUCE ----------"); for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments; packaged_segments++) { if (making_manifest) { @@ -207,13 +206,12 @@ class ProducerSocket : public Socket<BasePortal>, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); + TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); // Send content objects stored in the queue while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %u", - content_queue_.front()->getName().getSuffix()); + TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } @@ -270,8 +268,7 @@ class ProducerSocket : public Socket<BasePortal>, signer->sign(*content_object); } passContentObjectToCallbacks(content_object); - TRANSPORT_LOGD("Send content %u", - content_object->getName().getSuffix()); + TRANSPORT_LOGD("Send content %s", content_object->getName().toString().c_str()); } } @@ -286,11 +283,11 @@ class ProducerSocket : public Socket<BasePortal>, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); + TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); + while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %u", - content_queue_.front()->getName().getSuffix()); + TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } } @@ -949,18 +946,19 @@ class ProducerSocket : public Socket<BasePortal>, std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; - std::unique_lock<std::mutex> lck(mtx); + bool done = false; io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); - - if (!done) { - cv.wait(lck); - } + cv.notify_all(); }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } } else { result = func(socket_option_key, socket_option_value); } diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc index 3b3152993..9ef79ca23 100644 --- a/libtransport/src/implementation/tls_rtc_socket_producer.cc +++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc @@ -53,7 +53,7 @@ int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) { (socket->cv_).wait(lck); } - utils::MemBuf *membuf = socket->packet_->next(); + utils::MemBuf *membuf = socket->handshake_packet_->next(); int size_to_read; if ((int)membuf->length() > size) { @@ -91,10 +91,10 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { TLSRTCProducerSocket *socket; socket = (TLSRTCProducerSocket *)BIO_get_data(b); - if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && - socket->first_) { - socket->tls_chunks_--; + if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { bool making_manifest = socket->parent_->making_manifest_; + + socket->tls_chunks_--; socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, false); socket->parent_->ProducerSocket::produce( @@ -107,13 +107,17 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { std::unique_ptr<utils::MemBuf> mbuf = utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { socket->to_call_oncontentproduced_--; auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->RTCProducerSocket::produce(std::move(mbuf)); + ProducerContentCallback on_content_produced_application; socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && on_content_produced_application) { on_content_produced_application( @@ -144,24 +148,28 @@ TLSRTCProducerSocket::TLSRTCProducerSocket( } void TLSRTCProducerSocket::accept() { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { tls_chunks_ = 1; int result = SSL_accept(ssl_); + if (result != 1) throw errors::RuntimeException("Unable to perform client handshake"); } TRANSPORT_LOGD("Handshake performed!"); - parent_->list_secure_rtc_producers.push_front( - std::move(parent_->map_secure_rtc_producers[handshake_name_])); - parent_->map_secure_rtc_producers.erase(handshake_name_); + + parent_->list_producers.push_front( + std::move(parent_->map_producers[handshake_name_])); + parent_->map_producers.erase(handshake_name_); ProducerInterestCallback on_interest_process_decrypted; getSocketOption(ProducerCallbacksOptions::CACHE_MISS, on_interest_process_decrypted); if (on_interest_process_decrypted) { - Interest inter(std::move(packet_)); + Interest inter(std::move(handshake_packet_)); on_interest_process_decrypted( (transport::interface::ProducerSocket &)(*getInterface()), inter); } @@ -181,7 +189,9 @@ int TLSRTCProducerSocket::async_accept() { } void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state != SERVER_FINISHED) { throw errors::RuntimeException( "New handshake on the same P2P secure producer socket not supported"); } @@ -197,5 +207,4 @@ void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc index 95b287aa6..7cf653848 100644 --- a/libtransport/src/implementation/tls_socket_consumer.cc +++ b/libtransport/src/implementation/tls_socket_consumer.cc @@ -46,11 +46,13 @@ int readOldTLS(BIO *b, char *buf, int size) { socket->network_name_.setSuffix(socket->random_suffix_); socket->ConsumerSocket::asyncConsume(socket->network_name_); } + if (!socket->something_to_read_) socket->cv_.wait(lck); } size_t size_to_read, read; size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) chain_size = socket->head_->computeChainDataLength(); @@ -101,6 +103,7 @@ int writeOldTLS(BIO *b, const char *buf, int num) { socket = (TLSConsumerSocket *)BIO_get_data(b); socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( ConsumerCallbacksOptions::INTEREST_OUTPUT, (ConsumerInterestCallback)std::bind( @@ -176,12 +179,6 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - - ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - std::default_random_engine generator; std::uniform_int_distribution<int> distribution( 1, std::numeric_limits<uint32_t>::max()); @@ -191,10 +188,8 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, this); }; -/* - * The producer interface is not owned by the application, so is TLSSocket task - * to deallocate the memory - */ +/* The producer interface is not owned by the application, so is TLSSocket task + * to deallocate the memory */ TLSConsumerSocket::~TLSConsumerSocket() { delete consumer_interface_; } int TLSConsumerSocket::consume(const Name &name, @@ -228,22 +223,15 @@ int TLSConsumerSocket::download_content(const Name &name) { something_to_read_ = false; content_downloaded_ = false; - decrypted_content_ = utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH); - uint8_t *buf = decrypted_content_->writableData(); - size_t size = 0; + std::size_t max_buffer_size = read_callback_decrypted_->maxBufferSize(); + std::size_t buffer_size = read_callback_decrypted_->maxBufferSize() + SSL3_RT_MAX_PLAIN_LENGTH; + decrypted_content_ = utils::MemBuf::createCombined(buffer_size); int result = -1; + std::size_t size = 0; while (!content_downloaded_ || something_to_read_) { - if (decrypted_content_->tailroom() < SSL3_RT_MAX_PLAIN_LENGTH) { - decrypted_content_->appendChain( - utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH)); - // decrypted_content_->computeChainDataLength(); - buf = decrypted_content_->prev()->writableData(); - } else { - buf = decrypted_content_->writableTail(); - } - - result = SSL_read(this->ssl_, buf, SSL3_RT_MAX_PLAIN_LENGTH); + result = SSL_read( + this->ssl_, decrypted_content_->writableTail(), SSL3_RT_MAX_PLAIN_LENGTH); /* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of * the data has been fully downloaded */ @@ -253,20 +241,20 @@ int TLSConsumerSocket::download_content(const Name &name) { if (result >= 0) { size += result; - decrypted_content_->prepend(result); - } else + decrypted_content_->append(result); + } else { throw errors::RuntimeException("Unable to download content"); + } - if (size >= read_callback_decrypted_->maxBufferSize()) { + if (decrypted_content_->length() >= max_buffer_size) { if (read_callback_decrypted_->isBufferMovable()) { - // No need to perform an additional copy. The whole buffer will be - // tranferred to the application. - + /* No need to perform an additional copy. The whole buffer will be + * tranferred to the application. */ read_callback_decrypted_->readBufferAvailable( std::move(decrypted_content_)); - decrypted_content_ = utils::MemBuf::create(SSL3_RT_MAX_PLAIN_LENGTH); + decrypted_content_ = utils::MemBuf::create(buffer_size); } else { - // The buffer will be copied into the application-provided buffer + /* The buffer will be copied into the application-provided buffer */ uint8_t *buffer; std::size_t length; std::size_t total_length = decrypted_content_->length(); @@ -358,6 +346,7 @@ size_t TLSConsumerSocket::maxBufferSize() const { void TLSConsumerSocket::readBufferAvailable( std::unique_ptr<utils::MemBuf> &&buffer) noexcept { std::unique_lock<std::mutex> lck(this->mtx_); + if (head_) { head_->prependChain(std::move(buffer)); } else { @@ -380,5 +369,4 @@ void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept { bool TLSConsumerSocket::isBufferMovable() noexcept { return true; } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h index 2e88dc47e..6931a7a8a 100644 --- a/libtransport/src/implementation/tls_socket_consumer.h +++ b/libtransport/src/implementation/tls_socket_consumer.h @@ -69,42 +69,27 @@ class TLSConsumerSocket : public ConsumerSocket, private: Name name_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; - /* Chain of MemBuf to be used as a temporary buffer to pass descypted data * from the underlying layer to the application */ utils::ObjectPool<utils::MemBuf> buf_pool_; std::unique_ptr<utils::MemBuf> decrypted_content_; - - /* Chain of MemBuf holding the payload to be written into interest or data - */ + /* Chain of MemBuf holding the payload to be written into interest or data */ std::unique_ptr<utils::MemBuf> payload_; - /* Chain of MemBuf holding the data retrieved from the underlying layer */ std::unique_ptr<utils::MemBuf> head_; - bool something_to_read_; - bool content_downloaded_; - double old_max_win_; - double old_current_win_; - uint32_t random_suffix_; - Prefix producer_namespace_; - interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; - std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - utils::EventThread async_downloader_tls_; void setInterestPayload(interface::ConsumerSocket &c, @@ -123,11 +108,11 @@ class TLSConsumerSocket : public ConsumerSocket, virtual void readError(const std::error_code ec) noexcept override; virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; int download_content(const Name &name); }; } // namespace implementation - -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc index 9a5b94a1c..339a1ad58 100644 --- a/libtransport/src/implementation/tls_socket_producer.cc +++ b/libtransport/src/implementation/tls_socket_producer.cc @@ -48,18 +48,17 @@ int TLSProducerSocket::readOld(BIO *b, char *buf, int size) { TLSProducerSocket *socket; socket = (TLSProducerSocket *)BIO_get_data(b); - /* take a lock on the mutex. It will be unlocked by */ std::unique_lock<std::mutex> lck(socket->mtx_); + if (!socket->something_to_read_) { (socket->cv_).wait(lck); } - /* Either there already is something to read, or the thread has been waken up - */ - /* must return the payload in the interest */ - - utils::MemBuf *membuf = socket->packet_->next(); + /* Either there already is something to read, or the thread has been waken up. + * We must return the payload in the interest anyway */ + utils::MemBuf *membuf = socket->handshake_packet_->next(); int size_to_read; + if ((int)membuf->length() > size) { size_to_read = size; } else { @@ -97,11 +96,11 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { TLSProducerSocket *socket; socket = (TLSProducerSocket *)BIO_get_data(b); - if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && - socket->first_) { + if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { + bool making_manifest = socket->parent_->making_manifest_; + //! socket->tls_chunks_ corresponds to is_last socket->tls_chunks_--; - bool making_manifest = socket->parent_->making_manifest_; socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, false); socket->parent_->ProducerSocket::produce( @@ -116,16 +115,21 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { std::unique_ptr<utils::MemBuf> mbuf = utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { + auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->tls_chunks_--; socket->to_call_oncontentproduced_--; - auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->last_segment_ += socket->ProducerSocket::produce( socket->name_, std::move(mbuf), socket->tls_chunks_ == 0, socket->last_segment_); + ProducerContentCallback on_content_produced_application; socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && on_content_produced_application) { on_content_produced_application(*socket->getInterface(), @@ -144,8 +148,10 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, on_content_produced_application_(), mtx_(), cv_(), - something_to_read_(), + something_to_read_(false), + handshake_state_(UNINITIATED), name_(), + handshake_packet_(), last_segment_(0), parent_(parent), first_(true), @@ -157,9 +163,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, const SSL_METHOD *meth = TLS_server_method(); ctx_ = SSL_CTX_new(meth); - /* - * Setup SSL context (identity and parameter to use TLS 1.3) - */ + /* Setup SSL context (identity and parameter to use TLS 1.3) */ SSL_CTX_use_certificate(ctx_, parent->cert_509_); SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_); @@ -167,6 +171,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, SSL_CTX_set_ciphersuites(ctx_, "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" "SHA256:TLS_AES_128_GCM_SHA256"); + if (result != 1) { throw errors::RuntimeException( "Unable to set cipher list on TLS subsystem. Aborting."); @@ -184,10 +189,9 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, this, TLSProducerSocket::parseHicnKeyIdCb, NULL); ssl_ = SSL_new(ctx_); - /* - * Setup this producer socker as the bio that TLS will use to write and read - * data (in stream mode) - */ + + /* Setup this producer socker as the bio that TLS will use to write and read + * data (in stream mode) */ BIO_METHOD *bio_meth = BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket"); BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld); @@ -197,15 +201,15 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, BIO_set_init(bio, 1); BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - /* - * Set the callback so that when an interest is received we catch it and we - * decrypt the payload before passing it to the application. - */ + + /* Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. */ this->ProducerSocket::setSocketOption( ProducerCallbacksOptions::CACHE_MISS, (ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this, std::placeholders::_1, std::placeholders::_2)); + this->ProducerSocket::setSocketOption( ProducerCallbacksOptions::CONTENT_PRODUCED, (ProducerContentCallback)bind( @@ -213,35 +217,39 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, std::placeholders::_2, std::placeholders::_3)); } -/* - * The producer interface is not owned by the application, so is TLSSocket task - * to deallocate the memory - */ +/* The producer interface is not owned by the application, so is TLSSocket task + * to deallocate the memory */ TLSProducerSocket::~TLSProducerSocket() { delete producer_interface_; } void TLSProducerSocket::accept() { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { tls_chunks_ = 1; int result = SSL_accept(ssl_); + if (result != 1) throw errors::RuntimeException("Unable to perform client handshake"); } - TRANSPORT_LOGD("Handshake performed!"); - parent_->list_secure_producers.push_front( - std::move(parent_->map_secure_producers[handshake_name_])); - parent_->map_secure_producers.erase(handshake_name_); + + parent_->list_producers.push_front( + std::move(parent_->map_producers[handshake_name_])); + parent_->map_producers.erase(handshake_name_); ProducerInterestCallback on_interest_process_decrypted; getSocketOption(ProducerCallbacksOptions::CACHE_MISS, on_interest_process_decrypted); if (on_interest_process_decrypted) { - Interest inter(std::move(packet_)); + Interest inter(std::move(handshake_packet_)); on_interest_process_decrypted(*getInterface(), inter); } else { throw errors::RuntimeException( - "On interest process unset. Unable to perform handshake"); + "On interest process unset: unable to perform handshake"); } + + handshake_state_ = SERVER_FINISHED; + TRANSPORT_LOGD("Handshake performed!"); } int TLSProducerSocket::async_accept() { @@ -249,71 +257,88 @@ int TLSProducerSocket::async_accept() { async_thread_.add([this]() { this->accept(); }); } else { throw errors::RuntimeException( - "Async thread not running, impossible to perform handshake"); + "Async thread not running: unable to perform handshake"); } return 1; } void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) { - /* Based on the state machine of (D)TLS, we know what action to do */ - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { std::unique_lock<std::mutex> lck(mtx_); + name_ = interest.getName(); + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - packet_ = interest.acquireMemBufReference(); - if (head_) { - payload_->prependChain(interest.getPayload()); - } else { - payload_ = interest.getPayload(); // std::move(interest.getPayload()); - } + cv_.notify_one(); - } else { - name_ = interest.getName(); - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + return; + } else if (handshake_state == SERVER_FINISHED) { + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - if (interest.getPayload()->length() > 0) + if (interest.getPayload()->length() > 0) { SSL_read( ssl_, const_cast<unsigned char *>(interest.getPayload()->writableData()), interest.getPayload()->length()); - } + } + + ProducerInterestCallback on_interest_input_decrypted; + getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, + on_interest_input_decrypted); - ProducerInterestCallback on_interest_input_decrypted; - getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, - on_interest_input_decrypted); - if (on_interest_input_decrypted) - (on_interest_input_decrypted)(*getInterface(), interest); + if (on_interest_input_decrypted) + (on_interest_input_decrypted)(*getInterface(), interest); + } } void TLSProducerSocket::cacheMiss(interface::ProducerSocket &p, Interest &interest) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + HandshakeState handshake_state = getHandshakeState(); + + if (handshake_state == CLIENT_HELLO) { std::unique_lock<std::mutex> lck(mtx_); - name_ = interest.getName(); + + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + handshake_state_ = CLIENT_FINISHED; + cv_.notify_one(); - } else { - name_ = interest.getName(); - packet_ = interest.acquireMemBufReference(); - payload_ = interest.getPayload(); + } else if (handshake_state == SERVER_FINISHED) { + interest.separateHeaderPayload(); + handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; - if (interest.getPayload()->length() > 0) + if (interest.getPayload()->length() > 0) { SSL_read( ssl_, const_cast<unsigned char *>(interest.getPayload()->writableData()), interest.getPayload()->length()); + } if (on_interest_process_decrypted_ != VOID_HANDLER) on_interest_process_decrypted_(*getInterface(), interest); } } +TLSProducerSocket::HandshakeState TLSProducerSocket::getHandshakeState() { + if (SSL_in_before(ssl_)) { + handshake_state_ = UNINITIATED; + } + + if (SSL_in_init(ssl_) && handshake_state_ == UNINITIATED) { + handshake_state_ = CLIENT_HELLO; + } + + return handshake_state_; +} + void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p, const std::error_code &err, uint64_t bytes_written) {} @@ -321,13 +346,13 @@ void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p, uint32_t TLSProducerSocket::produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, uint32_t start_offset) { - if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + if (getHandshakeState() != SERVER_FINISHED) { throw errors::RuntimeException( "New handshake on the same P2P secure producer socket not supported"); } + size_t buf_size = buffer->length(); name_ = served_namespaces_.front().mapName(content_name); - tls_chunks_ = to_call_oncontentproduced_ = ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); @@ -388,6 +413,7 @@ void TLSProducerSocket::produce(ContentObject &content_object) { long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) { if (cmd == BIO_CTRL_FLUSH) { } + return 1; } @@ -397,6 +423,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, X509 *x, size_t chainidx, int *al, void *add_arg) { TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg); + if (ext_type == 100) { ip_prefix_t ip_prefix = socket->parent_->served_namespaces_.front().toIpPrefixStruct(); @@ -425,6 +452,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, ip_address_t keyId_component = {}; u32 *mask_buf; u32 *keyId_component_buf; + switch (inet_family) { case AF_INET: mask_buf = &(mask.v4.as_u32); @@ -483,6 +511,7 @@ int TLSProducerSocket::setSocketOption( [this](int socket_option_key, ProducerInterestCallback socket_option_value) -> int { int result = SOCKET_OPTION_SET; + switch (socket_option_key) { case ProducerCallbacksOptions::INTEREST_INPUT: on_interest_input_decrypted_ = socket_option_value; @@ -508,6 +537,7 @@ int TLSProducerSocket::setSocketOption( result = SOCKET_OPTION_NOT_SET; break; } + return result; }); } @@ -607,5 +637,4 @@ int TLSProducerSocket::getSocketOption( } } // namespace implementation - } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h index e910c8259..2382e8695 100644 --- a/libtransport/src/implementation/tls_socket_producer.h +++ b/libtransport/src/implementation/tls_socket_producer.h @@ -84,47 +84,44 @@ class TLSProducerSocket : virtual public ProducerSocket { using ProducerSocket::setSocketOption; protected: + enum HandshakeState { + UNINITIATED, + CLIENT_HELLO, // when CLIENT_HELLO interest has been received + CLIENT_FINISHED, // when CLIENT_FINISHED interest has been received + SERVER_FINISHED, // when handshake is done + }; /* Callback invoked once an interest has been received and its payload * decrypted */ ProducerInterestCallback on_interest_input_decrypted_; ProducerInterestCallback on_interest_process_decrypted_; ProducerContentCallback on_content_produced_application_; - std::mutex mtx_; - /* Condition variable for the wait */ std::condition_variable cv_; - /* Bool variable, true if there is something to read (an interest arrived) */ bool something_to_read_; - + /* Bool variable, true if CLIENT_FINISHED interest has been received */ + HandshakeState handshake_state_; /* First interest that open a secure connection */ transport::core::Name name_; - /* SSL handle */ SSL *ssl_; SSL_CTX *ctx_; - - Packet::MemBufPtr packet_; - + Packet::MemBufPtr handshake_packet_; std::unique_ptr<utils::MemBuf> head_; std::uint32_t last_segment_; - std::shared_ptr<utils::MemBuf> payload_; std::uint32_t key_id_; - std::thread *handshake; P2PSecureProducerSocket *parent_; - bool first_; Name handshake_name_; int tls_chunks_; int to_call_oncontentproduced_; - bool still_writing_; - utils::EventThread encryption_thread_; void onInterest(ProducerSocket &p, Interest &interest); + void cacheMiss(interface::ProducerSocket &p, Interest &interest); /* Return the number of read bytes in readbytes */ @@ -156,8 +153,9 @@ class TLSProducerSocket : virtual public ProducerSocket { void onContentProduced(interface::ProducerSocket &p, const std::error_code &err, uint64_t bytes_written); + + HandshakeState getHandshakeState(); }; } // namespace implementation - } // end namespace transport diff --git a/libtransport/src/interfaces/p2psecure_socket_consumer.cc b/libtransport/src/interfaces/p2psecure_socket_consumer.cc index 2fa8bb6e3..038441dfc 100644 --- a/libtransport/src/interfaces/p2psecure_socket_consumer.cc +++ b/libtransport/src/interfaces/p2psecure_socket_consumer.cc @@ -21,11 +21,17 @@ namespace transport { namespace interface { P2PSecureConsumerSocket::P2PSecureConsumerSocket(int handshake_protocol, - int protocol) + int transport_protocol) : ConsumerSocket() { socket_ = std::unique_ptr<implementation::ConsumerSocket>( new implementation::P2PSecureConsumerSocket(this, handshake_protocol, - protocol)); + transport_protocol)); +} + +void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) { + implementation::P2PSecureConsumerSocket &secure_consumer_socket = + *(static_cast<implementation::P2PSecureConsumerSocket *>(socket_.get())); + secure_consumer_socket.registerPrefix(producer_namespace); } } // namespace interface diff --git a/libtransport/src/interfaces/tls_rtc_socket_producer.h b/libtransport/src/interfaces/tls_rtc_socket_producer.h index 434edb522..3ea84095b 100644 --- a/libtransport/src/interfaces/tls_rtc_socket_producer.h +++ b/libtransport/src/interfaces/tls_rtc_socket_producer.h @@ -28,9 +28,9 @@ namespace interface { class TLSRTCProducerSocket : public ProducerSocket { public: TLSRTCProducerSocket(implementation::TLSRTCProducerSocket *implementation); + ~TLSRTCProducerSocket(); }; } // namespace interface - -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/protocols/consumer.conf b/libtransport/src/protocols/consumer.conf index 1a366f32f..d0eab75ac 100644 --- a/libtransport/src/protocols/consumer.conf +++ b/libtransport/src/protocols/consumer.conf @@ -1,4 +1,4 @@ -; this file contais the parameters for RAAQM +; This file contains the parameters for RAAQM autotune = no lifetime = 500 retransmissions = 128 diff --git a/libtransport/src/protocols/incremental_indexer.cc b/libtransport/src/protocols/incremental_indexer.cc index e590b4fee..0872c4554 100644 --- a/libtransport/src/protocols/incremental_indexer.cc +++ b/libtransport/src/protocols/incremental_indexer.cc @@ -25,6 +25,8 @@ void IncrementalIndexer::onContentObject( core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { using namespace interface; + TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str()); + if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) { final_suffix_ = content_object->getName().getSuffix(); } @@ -50,4 +52,4 @@ void IncrementalIndexer::onContentObject( } } // namespace protocol -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/protocols/manifest_incremental_indexer.cc b/libtransport/src/protocols/manifest_incremental_indexer.cc index 1a2f9dec3..da835b577 100644 --- a/libtransport/src/protocols/manifest_incremental_indexer.cc +++ b/libtransport/src/protocols/manifest_incremental_indexer.cc @@ -37,10 +37,12 @@ ManifestIncrementalIndexer::ManifestIncrementalIndexer( void ManifestIncrementalIndexer::onContentObject( core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { - // Check if mainfiest or not + // Check if manifest or not if (content_object->getPayloadType() == PayloadType::MANIFEST) { + TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str()); onUntrustedManifest(std::move(interest), std::move(content_object)); } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + TRANSPORT_LOGD("Receive manifest %s", content_object->getName().toString().c_str()); onUntrustedContentObject(std::move(interest), std::move(content_object)); } } diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index 0a93dec44..f8da69ceb 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -460,7 +460,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() { // send at least one interest if there are retransmissions to perform and // there is no space left in the window sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Window full, retransmit one content interest"); interest_to_retransmit_.pop(); } @@ -470,7 +469,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() { while (interests_in_flight_ < current_window_size_) { if (interest_to_retransmit_.size() > 0) { sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Retransmit content interest"); interest_to_retransmit_.pop(); } else { index = index_manager_->getNextSuffix(); @@ -479,7 +477,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() { } sendInterest(index); - TRANSPORT_LOGD("Send content interest %u", index); } } } @@ -508,6 +505,7 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); + sendInterest(std::move(interest)); return true; @@ -517,6 +515,7 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { interests_in_flight_++; interest_retransmissions_[interest->getName().getSuffix() & mask]++; + TRANSPORT_LOGD("Send interest %s", interest->getName().toString().c_str()); portal_->sendInterest(std::move(interest)); } diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 151e4df3d..0b1578b6f 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -64,25 +64,15 @@ namespace interface { */ struct ClientConfiguration { ClientConfiguration() - : name("b001::abcd", 0), - verify(false), - beta(-1.f), - drop_factor(-1.f), - window(-1), - producer_certificate(""), - passphrase(""), - receive_buffer(nullptr), - receive_buffer_size_(128 * 1024), - download_size(0), - report_interval_milliseconds_(1000), - transport_protocol_(CBR), - rtc_(false), - test_mode_(false), + : name("b001::abcd", 0), verify(false), beta(-1.f), drop_factor(-1.f), + window(-1), producer_certificate(""), passphrase(""), + receive_buffer(nullptr), receive_buffer_size_(128 * 1024), + download_size(0), report_interval_milliseconds_(1000), + transport_protocol_(CBR), rtc_(false), test_mode_(false), #ifdef SECURE_HICNTRANSPORT secure_(false), #endif - producer_prefix_(), - interest_lifetime_(500) { + producer_prefix_(), interest_lifetime_(500) { } Name name; @@ -110,7 +100,7 @@ struct ClientConfiguration { * Class for handling the production rate for the RTC producer. */ class Rate { - public: +public: Rate() : rate_kbps_(0) {} Rate(const std::string &rate) { @@ -140,7 +130,7 @@ class Rate { (uint32_t)std::round(packet_size * 1000.0 * 8.0 / (double)rate_kbps_)); } - private: +private: float rate_kbps_; }; @@ -149,23 +139,13 @@ class Rate { */ struct ServerConfiguration { ServerConfiguration() - : name("b001::abcd/64"), - virtual_producer(true), - manifest(false), - live_production(false), - sign(false), - content_lifetime(600000000_U32), - content_object_size(1440), - download_size(20 * 1024 * 1024), - hash_algorithm(utils::CryptoHashType::SHA_256), - keystore_name(""), - passphrase(""), - keystore_password("cisco"), - multiphase_produce_(false), - rtc_(false), - interactive_(false), - production_rate_(std::string("2048kbps")), - payload_size_(1440) + : name("b001::abcd/64"), virtual_producer(true), manifest(false), + live_production(false), sign(false), content_lifetime(600000000_U32), + content_object_size(1440), download_size(20 * 1024 * 1024), + hash_algorithm(utils::CryptoHashType::SHA_256), keystore_name(""), + passphrase(""), keystore_password("cisco"), multiphase_produce_(false), + rtc_(false), interactive_(false), + production_rate_(std::string("2048kbps")), payload_size_(1440) #ifdef SECURE_HICNTRANSPORT , secure_(false) @@ -214,13 +194,10 @@ class HIperfClient { friend class KeyCallback; friend class RTCCallback; - public: +public: HIperfClient(const ClientConfiguration &conf) - : configuration_(conf), - total_duration_milliseconds_(0), - old_bytes_value_(0), - signals_(io_service_, SIGINT), - expected_seg_(0), + : configuration_(conf), total_duration_milliseconds_(0), + old_bytes_value_(0), signals_(io_service_, SIGINT), expected_seg_(0), lost_packets_(std::unordered_set<uint32_t>()), rtc_callback_(configuration_.rtc_ ? new RTCCallback(*this) : nullptr), callback_(configuration_.rtc_ ? nullptr : new Callback(*this)), @@ -234,13 +211,14 @@ class HIperfClient { void checkReceivedRtcContent(ConsumerSocket &c, const ContentObject &contentObject) { - if (!configuration_.test_mode_) return; + if (!configuration_.test_mode_) + return; uint32_t receivedSeg = contentObject.getName().getSuffix(); auto payload = contentObject.getPayload(); - if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK - // payload + if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK + // payload uint32_t *payloadPtr = (uint32_t *)payload->data(); uint32_t productionSeg = *(payloadPtr); uint32_t productionRate = *(++payloadPtr); @@ -299,7 +277,8 @@ class HIperfClient { void handleTimerExpiration(ConsumerSocket &c, const TransportStatistics &stats) { - if (configuration_.rtc_) return; + if (configuration_.rtc_) + return; const char separator = ' '; const int width = 20; @@ -361,7 +340,7 @@ class HIperfClient { configuration_.transport_protocol_ = CBR; } -#ifdef SECURE_HICNSOCKET +#ifdef SECURE_HICNTRANSPORT if (configuration_.secure_) { consumer_socket_ = std::make_shared<P2PSecureConsumerSocket>( RAAQM, configuration_.transport_protocol_); @@ -378,7 +357,7 @@ class HIperfClient { #endif consumer_socket_ = std::make_shared<ConsumerSocket>(configuration_.transport_protocol_); -#ifdef SECURE_HICNSOCKET +#ifdef SECURE_HICNTRANSPORT } #endif @@ -431,13 +410,15 @@ class HIperfClient { if (!configuration_.producer_certificate.empty()) { key_id_ = verifier->addKeyFromCertificate( configuration_.producer_certificate); - if (key_id_ == nullptr) return ERROR_SETUP; + if (key_id_ == nullptr) + return ERROR_SETUP; } if (!configuration_.passphrase.empty()) { key_id_ = verifier->addKeyFromPassphrase( configuration_.passphrase, utils::CryptoSuite::HMAC_SHA256); - if (key_id_ == nullptr) return ERROR_SETUP; + if (key_id_ == nullptr) + return ERROR_SETUP; } if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, @@ -528,11 +509,11 @@ class HIperfClient { return ERROR_SUCCESS; } - private: +private: class RTCCallback : public ConsumerSocket::ReadCallback { static constexpr std::size_t mtu = 1500; - public: + public: RTCCallback(HIperfClient &hiperf_client) : client_(hiperf_client) { client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); } @@ -559,12 +540,12 @@ class HIperfClient { std::cout << "Data successfully read" << std::endl; } - private: + private: HIperfClient &client_; }; class Callback : public ConsumerSocket::ReadCallback { - public: + public: Callback(HIperfClient &hiperf_client) : client_(hiperf_client) { client_.configuration_.receive_buffer = utils::MemBuf::create(client_.configuration_.receive_buffer_size_); @@ -610,14 +591,14 @@ class HIperfClient { client_.io_service_.stop(); } - private: + private: HIperfClient &client_; }; class KeyCallback : public ConsumerSocket::ReadCallback { static constexpr std::size_t read_size = 16 * 1024; - public: + public: KeyCallback(HIperfClient &hiperf_client) : client_(hiperf_client), key_(nullptr) {} @@ -643,14 +624,13 @@ class HIperfClient { client_.io_service_.stop(); } - bool verifyKey() { return !key_->empty(); } + bool validateKey() { return !key_->empty(); } void readSuccess(std::size_t total_size) noexcept override { std::cout << "Key size: " << total_size << " bytes" << std::endl; - afterRead(); } - void afterRead() { + void readKey() { std::shared_ptr<utils::Verifier> verifier = std::make_shared<utils::Verifier>(); verifier->addKeyFromPassphrase(*key_, utils::CryptoSuite::HMAC_SHA256); @@ -661,26 +641,30 @@ class HIperfClient { consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, verifier); } else { - std::cout << "Could not set verifier" << std::endl; + std::cout << "Consumer socket not set" << std::endl; return; } - if (consumer_socket_->verifyKeyPackets()) { - std::cout << "Verification of packet signatures successful" - << std::endl; + if (validateKey()) { + std::cout << "Key has been authenticated" << std::endl; } else { - std::cout << "Could not verify packet signatures" << std::endl; + std::cout << "Key could not be authenticated" << std::endl; return; } - std::cout << "Key retrieval done" << std::endl; + if (consumer_socket_->verifyKeyPackets()) { + std::cout << "Signatures of key packets are valid" << std::endl; + } else { + std::cout << "Signatures of key packets are not valid" << std::endl; + return; + } } void setConsumer(std::shared_ptr<ConsumerSocket> consumer_socket) { consumer_socket_ = consumer_socket; } - private: + private: HIperfClient &client_; std::unique_ptr<std::string> key_; std::shared_ptr<ConsumerSocket> consumer_socket_; @@ -699,7 +683,7 @@ class HIperfClient { RTCCallback *rtc_callback_; Callback *callback_; KeyCallback *key_callback_; -}; // namespace interface +}; // namespace interface /** * Hiperf server class: configure and setup an hicn producer following the @@ -708,19 +692,16 @@ class HIperfClient { class HIperfServer { const std::size_t log2_content_object_buffer_size = 8; - public: +public: HIperfServer(ServerConfiguration &conf) - : configuration_(conf), - signals_(io_service_, SIGINT), - rtc_timer_(io_service_), - unsatisfied_interests_(), + : configuration_(conf), signals_(io_service_, SIGINT), + rtc_timer_(io_service_), unsatisfied_interests_(), content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), content_objects_index_(0), mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), last_segment_(0), #ifndef _WIN32 - ptr_last_segment_(&last_segment_), - input_(io_service_), + ptr_last_segment_(&last_segment_), input_(io_service_), rtc_running_(false) #else ptr_last_segment_(&last_segment_) @@ -843,9 +824,10 @@ class HIperfServer { std::placeholders::_1, std::placeholders::_2)); } - std::shared_ptr<utils::Identity> getProducerIdentity( - std::string &keystore_name, std::string &keystore_password, - utils::CryptoHashType &hash_algorithm) { + std::shared_ptr<utils::Identity> + getProducerIdentity(std::string &keystore_name, + std::string &keystore_password, + utils::CryptoHashType &hash_algorithm) { if (access(keystore_name.c_str(), F_OK) != -1) { return std::make_shared<utils::Identity>(keystore_name, keystore_password, hash_algorithm); @@ -859,7 +841,7 @@ class HIperfServer { int setup() { int ret; -#ifdef SECURE_HICNSOCKET +#ifdef SECURE_HICNTRANSPORT if (configuration_.secure_) { auto identity = getProducerIdentity(configuration_.keystore_name, configuration_.keystore_password, @@ -873,7 +855,7 @@ class HIperfServer { } else { producer_socket_ = std::make_unique<ProducerSocket>(); } -#ifdef SECURE_HICNSOCKET +#ifdef SECURE_HICNTRANSPORT } #endif @@ -974,7 +956,8 @@ class HIperfServer { } void sendRTCContentObjectCallback(std::error_code ec) { - if (ec) return; + if (ec) + return; rtc_timer_.expires_from_now( configuration_.production_rate_.getMicrosecondsForPacket( configuration_.payload_size_)); @@ -1007,11 +990,11 @@ class HIperfServer { std::placeholders::_1)); } - input_buffer_.consume(length); // Remove newline from input. - asio::async_read_until( - input_, input_buffer_, '\n', - std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, - std::placeholders::_2)); + input_buffer_.consume(length); // Remove newline from input. + asio::async_read_until(input_, input_buffer_, '\n', + std::bind(&HIperfServer::handleInput, this, + std::placeholders::_1, + std::placeholders::_2)); } #endif @@ -1027,10 +1010,10 @@ class HIperfServer { if (configuration_.rtc_) { #ifndef _WIN32 if (configuration_.interactive_) { - asio::async_read_until( - input_, input_buffer_, '\n', - std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, - std::placeholders::_2)); + asio::async_read_until(input_, input_buffer_, '\n', + std::bind(&HIperfServer::handleInput, this, + std::placeholders::_1, + std::placeholders::_2)); } else { rtc_running_ = true; rtc_timer_.expires_from_now( @@ -1055,7 +1038,7 @@ class HIperfServer { return ERROR_SUCCESS; } - private: +private: ServerConfiguration configuration_; asio::io_service io_service_; asio::signal_set signals_; @@ -1072,7 +1055,7 @@ class HIperfServer { asio::streambuf input_buffer_; bool rtc_running_; #endif -}; // namespace interface +}; // namespace interface void usage() { std::cerr << "HIPERF - A tool for performing network throughput " @@ -1211,174 +1194,174 @@ int main(int argc, char *argv[]) { "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) != -1) { switch (opt) { - // Common - case 'D': { - daemon = true; - break; - } - case 'I': { - server_configuration.interactive_ = true; - break; - } + // Common + case 'D': { + daemon = true; + break; + } + case 'I': { + server_configuration.interactive_ = true; + break; + } #else while ((opt = getopt(argc, argv, "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) { switch (opt) { #endif - case 'f': { - log_file = optarg; - break; - } - case 'R': { - client_configuration.rtc_ = true; - server_configuration.rtc_ = true; - break; - } + case 'f': { + log_file = optarg; + break; + } + case 'R': { + client_configuration.rtc_ = true; + server_configuration.rtc_ = true; + break; + } - // Server or Client - case 'S': { - role -= 1; - break; - } - case 'C': { - role += 1; - break; - } - case 'k': { - server_configuration.passphrase = std::string(optarg); - client_configuration.passphrase = std::string(optarg); - server_configuration.sign = true; - options = -1; - break; - } + // Server or Client + case 'S': { + role -= 1; + break; + } + case 'C': { + role += 1; + break; + } + case 'k': { + server_configuration.passphrase = std::string(optarg); + client_configuration.passphrase = std::string(optarg); + server_configuration.sign = true; + options = -1; + break; + } - // Client specifc - case 'b': { - client_configuration.beta = std::stod(optarg); - options = 1; - break; - } - case 'd': { - client_configuration.drop_factor = std::stod(optarg); - options = 1; - break; - } - case 'W': { - client_configuration.window = std::stod(optarg); - options = 1; - break; - } - case 'M': { - client_configuration.receive_buffer_size_ = std::stoull(optarg); - options = 1; - break; - } + // Client specifc + case 'b': { + client_configuration.beta = std::stod(optarg); + options = 1; + break; + } + case 'd': { + client_configuration.drop_factor = std::stod(optarg); + options = 1; + break; + } + case 'W': { + client_configuration.window = std::stod(optarg); + options = 1; + break; + } + case 'M': { + client_configuration.receive_buffer_size_ = std::stoull(optarg); + options = 1; + break; + } #ifdef SECURE_HICNTRANSPORT - case 'P': { - client_configuration.producer_prefix_ = Prefix(optarg); - client_configuration.secure_ = true; - break; - } + case 'P': { + client_configuration.producer_prefix_ = Prefix(optarg); + client_configuration.secure_ = true; + break; + } #endif - case 'c': { - client_configuration.producer_certificate = std::string(optarg); - options = 1; - break; - } - case 'v': { - client_configuration.verify = true; - options = 1; - break; - } - case 'i': { - client_configuration.report_interval_milliseconds_ = std::stoul(optarg); - options = 1; - break; - } - case 't': { - client_configuration.test_mode_ = true; - options = 1; - break; - } - case 'L': { - client_configuration.interest_lifetime_ = std::stoul(optarg); - options = 1; - break; - } - // Server specific - case 'A': { - server_configuration.download_size = std::stoul(optarg); - options = -1; - break; - } - case 's': { - server_configuration.payload_size_ = std::stoul(optarg); - options = -1; - break; - } - case 'r': { - server_configuration.virtual_producer = false; - options = -1; - break; - } - case 'm': { - server_configuration.manifest = true; - options = -1; - break; - } - case 'l': { - server_configuration.live_production = true; - options = -1; - break; - } - case 'K': { - server_configuration.keystore_name = std::string(optarg); - server_configuration.sign = true; - options = -1; - break; - } - case 'y': { - if (strncasecmp(optarg, "sha256", 6) == 0) { - server_configuration.hash_algorithm = utils::CryptoHashType::SHA_256; - } else if (strncasecmp(optarg, "sha512", 6) == 0) { - server_configuration.hash_algorithm = utils::CryptoHashType::SHA_512; - } else if (strncasecmp(optarg, "crc32", 5) == 0) { - server_configuration.hash_algorithm = utils::CryptoHashType::CRC32C; - } else { - std::cerr << "Ignored unknown hash algorithm. Using SHA 256." - << std::endl; - } - options = -1; - break; - } - case 'p': { - server_configuration.keystore_password = std::string(optarg); - options = -1; - break; - } - case 'x': { - server_configuration.multiphase_produce_ = true; - options = -1; - break; - } - case 'B': { - auto str = std::string(optarg); - std::transform(str.begin(), str.end(), str.begin(), ::tolower); - server_configuration.production_rate_ = str; - options = -1; - break; + case 'c': { + client_configuration.producer_certificate = std::string(optarg); + options = 1; + break; + } + case 'v': { + client_configuration.verify = true; + options = 1; + break; + } + case 'i': { + client_configuration.report_interval_milliseconds_ = std::stoul(optarg); + options = 1; + break; + } + case 't': { + client_configuration.test_mode_ = true; + options = 1; + break; + } + case 'L': { + client_configuration.interest_lifetime_ = std::stoul(optarg); + options = 1; + break; + } + // Server specific + case 'A': { + server_configuration.download_size = std::stoul(optarg); + options = -1; + break; + } + case 's': { + server_configuration.payload_size_ = std::stoul(optarg); + options = -1; + break; + } + case 'r': { + server_configuration.virtual_producer = false; + options = -1; + break; + } + case 'm': { + server_configuration.manifest = true; + options = -1; + break; + } + case 'l': { + server_configuration.live_production = true; + options = -1; + break; + } + case 'K': { + server_configuration.keystore_name = std::string(optarg); + server_configuration.sign = true; + options = -1; + break; + } + case 'y': { + if (strncasecmp(optarg, "sha256", 6) == 0) { + server_configuration.hash_algorithm = utils::CryptoHashType::SHA_256; + } else if (strncasecmp(optarg, "sha512", 6) == 0) { + server_configuration.hash_algorithm = utils::CryptoHashType::SHA_512; + } else if (strncasecmp(optarg, "crc32", 5) == 0) { + server_configuration.hash_algorithm = utils::CryptoHashType::CRC32C; + } else { + std::cerr << "Ignored unknown hash algorithm. Using SHA 256." + << std::endl; } + options = -1; + break; + } + case 'p': { + server_configuration.keystore_password = std::string(optarg); + options = -1; + break; + } + case 'x': { + server_configuration.multiphase_produce_ = true; + options = -1; + break; + } + case 'B': { + auto str = std::string(optarg); + std::transform(str.begin(), str.end(), str.begin(), ::tolower); + server_configuration.production_rate_ = str; + options = -1; + break; + } #ifdef SECURE_HICNTRANSPORT - case 'E': { - server_configuration.keystore_name = std::string(optarg); - server_configuration.secure_ = true; - break; - } + case 'E': { + server_configuration.keystore_name = std::string(optarg); + server_configuration.secure_ = true; + break; + } #endif - case 'h': - default: - usage(); - return EXIT_FAILURE; + case 'h': + default: + usage(); + return EXIT_FAILURE; } } @@ -1457,9 +1440,9 @@ int main(int argc, char *argv[]) { return 0; } -} // end namespace interface +} // end namespace interface -} // end namespace transport +} // end namespace transport int main(int argc, char *argv[]) { return transport::interface::main(argc, argv); |