diff options
Diffstat (limited to 'libtransport/src/implementation')
-rw-r--r-- | libtransport/src/implementation/CMakeLists.txt | 15 | ||||
-rw-r--r-- | libtransport/src/implementation/p2psecure_socket_consumer.cc | 400 | ||||
-rw-r--r-- | libtransport/src/implementation/p2psecure_socket_consumer.h | 136 | ||||
-rw-r--r-- | libtransport/src/implementation/p2psecure_socket_producer.cc | 376 | ||||
-rw-r--r-- | libtransport/src/implementation/p2psecure_socket_producer.h | 119 | ||||
-rw-r--r-- | libtransport/src/implementation/socket.cc | 20 | ||||
-rw-r--r-- | libtransport/src/implementation/socket.h | 29 | ||||
-rw-r--r-- | libtransport/src/implementation/socket_consumer.h | 232 | ||||
-rw-r--r-- | libtransport/src/implementation/socket_producer.h | 246 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_rtc_socket_producer.cc | 208 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_rtc_socket_producer.h | 57 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_socket_consumer.cc | 368 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_socket_consumer.h | 113 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_socket_producer.cc | 551 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_socket_producer.h | 154 |
15 files changed, 391 insertions, 2633 deletions
diff --git a/libtransport/src/implementation/CMakeLists.txt b/libtransport/src/implementation/CMakeLists.txt index daf899d06..c759dd964 100644 --- a/libtransport/src/implementation/CMakeLists.txt +++ b/libtransport/src/implementation/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Copyright (c) 2021 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -19,21 +19,8 @@ list(APPEND HEADER_FILES if (${OPENSSL_VERSION} VERSION_EQUAL "1.1.1a" OR ${OPENSSL_VERSION} VERSION_GREATER "1.1.1a") list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.cc - # ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc - ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.cc - ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.cc - ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.cc ${CMAKE_CURRENT_SOURCE_DIR}/socket.cc ) - - list(APPEND HEADER_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h - # ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h - ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h - ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h - ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h - ) endif() set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc deleted file mode 100644 index 4b14da5d2..000000000 --- a/libtransport/src/implementation/p2psecure_socket_consumer.cc +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <implementation/p2psecure_socket_consumer.h> -#include <interfaces/tls_socket_consumer.h> -#include <openssl/bio.h> -#include <openssl/ssl.h> -#include <openssl/tls1.h> - -#include <random> - -namespace transport { -namespace implementation { - -void P2PSecureConsumerSocket::setInterestPayload( - interface::ConsumerSocket &c, const core::Interest &interest) { - Interest &int2 = const_cast<Interest &>(interest); - random_suffix_ = int2.getName().getSuffix(); - - if (payload_ != NULL) int2.appendPayload(std::move(payload_)); -} - -/* Return the number of read bytes in the return param */ -int readOld(BIO *b, char *buf, int size) { - if (size < 0) return size; - - P2PSecureConsumerSocket *socket; - socket = (P2PSecureConsumerSocket *)BIO_get_data(b); - - std::unique_lock<std::mutex> lck(socket->mtx_); - - if (!socket->something_to_read_) { - if (!socket->transport_protocol_->isRunning()) { - socket->network_name_.setSuffix(socket->random_suffix_); - socket->ConsumerSocket::asyncConsume(socket->network_name_); - } - - if (!socket->something_to_read_) socket->cv_.wait(lck); - } - - size_t size_to_read, read; - size_t chain_size = socket->head_->length(); - - if (socket->head_->isChained()) - chain_size = socket->head_->computeChainDataLength(); - - if (chain_size > (size_t)size) { - read = size_to_read = (size_t)size; - } else { - read = size_to_read = chain_size; - socket->something_to_read_ = false; - } - - while (size_to_read) { - if (socket->head_->length() < size_to_read) { - std::memcpy(buf, socket->head_->data(), socket->head_->length()); - size_to_read -= socket->head_->length(); - buf += socket->head_->length(); - socket->head_ = socket->head_->pop(); - } else { - std::memcpy(buf, socket->head_->data(), size_to_read); - socket->head_->trimStart(size_to_read); - size_to_read = 0; - } - } - - return (int)read; -} - -/* Return the number of read bytes in readbytes */ -int read(BIO *b, char *buf, size_t size, size_t *readbytes) { - int ret; - - if (size > INT_MAX) size = INT_MAX; - - ret = readOld(b, buf, (int)size); - - if (ret <= 0) { - *readbytes = 0; - return ret; - } - - *readbytes = (size_t)ret; - - return 1; -} - -/* Return the number of written bytes in the return param */ -int writeOld(BIO *b, const char *buf, int num) { - P2PSecureConsumerSocket *socket; - socket = (P2PSecureConsumerSocket *)BIO_get_data(b); - - socket->payload_ = utils::MemBuf::copyBuffer(buf, num); - - socket->ConsumerSocket::setSocketOption( - ConsumerCallbacksOptions::INTEREST_OUTPUT, - (ConsumerInterestCallback)std::bind( - &P2PSecureConsumerSocket::setInterestPayload, socket, - std::placeholders::_1, std::placeholders::_2)); - - return num; -} - -/* Return the number of written bytes in written */ -int write(BIO *b, const char *buf, size_t size, size_t *written) { - int ret; - - if (size > INT_MAX) size = INT_MAX; - - ret = writeOld(b, buf, (int)size); - - if (ret <= 0) { - *written = 0; - return ret; - } - - *written = (size_t)ret; - - return 1; -} - -long ctrl(BIO *b, int cmd, long num, void *ptr) { return 1; } - -int P2PSecureConsumerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, - unsigned int context, - const unsigned char **out, - size_t *outlen, X509 *x, - size_t chainidx, int *al, - void *add_arg) { - if (ext_type == 100) { - *out = (unsigned char *)malloc(4); - *(uint32_t *)*out = 10; - *outlen = 4; - } - return 1; -} - -void P2PSecureConsumerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type, - unsigned int context, - const unsigned char *out, - void *add_arg) { - free(const_cast<unsigned char *>(out)); -} - -int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, - unsigned int context, - const unsigned char *in, - size_t inlen, X509 *x, - size_t chainidx, int *al, - void *add_arg) { - P2PSecureConsumerSocket *socket = - reinterpret_cast<P2PSecureConsumerSocket *>(add_arg); - if (ext_type == 100) { - memcpy(&socket->secure_prefix_, in, sizeof(ip_prefix_t)); - } - return 1; -} - -P2PSecureConsumerSocket::P2PSecureConsumerSocket( - interface::ConsumerSocket *consumer, int handshake_protocol, - int transport_protocol) - : ConsumerSocket(consumer, handshake_protocol), - name_(), - tls_consumer_(nullptr), - decrypted_content_(), - payload_(), - head_(), - something_to_read_(false), - content_downloaded_(false), - random_suffix_(), - secure_prefix_(), - producer_namespace_(), - read_callback_decrypted_(), - mtx_(), - cv_(), - protocol_(transport_protocol) { - /* Create the (d)TLS state */ - const SSL_METHOD *meth = TLS_client_method(); - ctx_ = SSL_CTX_new(meth); - - int result = - SSL_CTX_set_ciphersuites(ctx_, - "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" - "SHA256:TLS_AES_128_GCM_SHA256"); - if (result != 1) { - throw errors::RuntimeException( - "Unable to set cipher list on TLS subsystem. Aborting."); - } - - SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION); - SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION); - SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL); - SSL_CTX_set_ssl_version(ctx_, meth); - - result = SSL_CTX_add_custom_ext( - ctx_, 100, SSL_EXT_CLIENT_HELLO | SSL_EXT_TLS1_3_ENCRYPTED_EXTENSIONS, - P2PSecureConsumerSocket::addHicnKeyIdCb, - P2PSecureConsumerSocket::freeHicnKeyIdCb, NULL, - P2PSecureConsumerSocket::parseHicnKeyIdCb, this); - - ssl_ = SSL_new(ctx_); - - bio_meth_ = BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket"); - BIO_meth_set_read(bio_meth_, readOld); - BIO_meth_set_write(bio_meth_, writeOld); - BIO_meth_set_ctrl(bio_meth_, ctrl); - BIO *bio = BIO_new(bio_meth_); - BIO_set_init(bio, 1); - BIO_set_data(bio, this); - SSL_set_bio(ssl_, bio, bio); - - std::default_random_engine generator; - std::uniform_int_distribution<int> distribution( - 1, std::numeric_limits<uint32_t>::max()); - random_suffix_ = 0; - - this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, - this); -}; - -P2PSecureConsumerSocket::~P2PSecureConsumerSocket() { - BIO_meth_free(bio_meth_); - SSL_shutdown(ssl_); -} - -int P2PSecureConsumerSocket::handshake() { - int result = 1; - - if (!(SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - return 1; - } - - ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - - network_name_ = producer_namespace_.getRandomName(); - network_name_.setSuffix(0); - - DLOG_IF(INFO, VLOG_IS_ON(2)) << "Start handshake at " << network_name_; - result = SSL_connect(this->ssl_); - - return result; -} - -void P2PSecureConsumerSocket::initSessionSocket() { - tls_consumer_ = - std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_); - tls_consumer_->setInterface( - new interface::TLSConsumerSocket(tls_consumer_.get())); - - ConsumerTimerCallback *stats_summary_callback = nullptr; - this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_summary_callback); - - uint32_t lifetime; - this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); - - tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - lifetime); - tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, - read_callback_decrypted_); - tls_consumer_->setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - *stats_summary_callback); - tls_consumer_->setSocketOption(GeneralTransportOptions::STATS_INTERVAL, - this->timer_interval_milliseconds_); - tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - tls_consumer_->connect(); -} - -int P2PSecureConsumerSocket::consume(const Name &name) { - if (transport_protocol_->isRunning()) { - return CONSUMER_BUSY; - } - - if (handshake() != 1) { - throw errors::RuntimeException("Unable to perform client handshake"); - } else { - DLOG_IF(INFO, VLOG_IS_ON(2)) << "Handshake performed!"; - } - - initSessionSocket(); - - if (tls_consumer_ == nullptr) { - throw errors::RuntimeException("TLS socket does not exist"); - } - - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); - - if (payload_ != nullptr) - return tls_consumer_->consume((prefix->mapName(name)), std::move(payload_)); - else - return tls_consumer_->consume((prefix->mapName(name))); -} - -int P2PSecureConsumerSocket::asyncConsume(const Name &name) { - if (transport_protocol_->isRunning()) { - return CONSUMER_BUSY; - } - - if (handshake() != 1) { - throw errors::RuntimeException("Unable to perform client handshake"); - } else { - DLOG_IF(INFO, VLOG_IS_ON(2)) << "Handshake performed!"; - } - - initSessionSocket(); - - if (tls_consumer_ == nullptr) { - throw errors::RuntimeException("TLS socket does not exist"); - } - - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); - - if (payload_ != NULL) - return tls_consumer_->asyncConsume((prefix->mapName(name)), - std::move(payload_)); - else - return tls_consumer_->asyncConsume((prefix->mapName(name))); -} - -void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) { - producer_namespace_ = producer_namespace; -} - -int P2PSecureConsumerSocket::setSocketOption( - int socket_option_key, ReadCallback *socket_option_value) { - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, ReadCallback *socket_option_value) -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::READ_CALLBACK: - read_callback_decrypted_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - }); -} - -void P2PSecureConsumerSocket::getReadBuffer(uint8_t **application_buffer, - size_t *max_length){}; - -void P2PSecureConsumerSocket::readDataAvailable(size_t length) noexcept {}; - -size_t P2PSecureConsumerSocket::maxBufferSize() const { - return SSL3_RT_MAX_PLAIN_LENGTH; -} - -void P2PSecureConsumerSocket::readBufferAvailable( - std::unique_ptr<utils::MemBuf> &&buffer) noexcept { - std::unique_lock<std::mutex> lck(this->mtx_); - if (head_) { - head_->prependChain(std::move(buffer)); - } else { - head_ = std::move(buffer); - } - - something_to_read_ = true; - cv_.notify_one(); -} - -void P2PSecureConsumerSocket::readError(const std::error_code ec) noexcept {}; - -void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept { - std::unique_lock<std::mutex> lck(this->mtx_); - content_downloaded_ = true; - something_to_read_ = true; - cv_.notify_one(); -} - -bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; } - -} // namespace implementation -} // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h deleted file mode 100644 index a35a50352..000000000 --- a/libtransport/src/implementation/p2psecure_socket_consumer.h +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/interfaces/socket_consumer.h> -#include <implementation/tls_socket_consumer.h> -#include <openssl/bio.h> -#include <openssl/ssl.h> - -namespace transport { -namespace implementation { - -class P2PSecureConsumerSocket : public ConsumerSocket, - public interface::ConsumerSocket::ReadCallback { - /* Return the number of read bytes in readbytes */ - friend int read(BIO *b, char *buf, size_t size, size_t *readbytes); - - /* Return the number of read bytes in the return param */ - friend int readOld(BIO *h, char *buf, int size); - - /* Return the number of written bytes in written */ - friend int write(BIO *b, const char *buf, size_t size, size_t *written); - - /* Return the number of written bytes in the return param */ - friend int writeOld(BIO *h, const char *buf, int num); - - friend long ctrl(BIO *b, int cmd, long num, void *ptr); - - public: - explicit P2PSecureConsumerSocket(interface::ConsumerSocket *consumer, - int handshake_protocol, - int transport_protocol); - - ~P2PSecureConsumerSocket(); - - int consume(const Name &name) override; - - int asyncConsume(const Name &name) override; - - void registerPrefix(const Prefix &producer_namespace); - - int setSocketOption( - int socket_option_key, - interface::ConsumerSocket::ReadCallback *socket_option_value) override; - - using ConsumerSocket::getSocketOption; - using ConsumerSocket::setSocketOption; - - protected: - /* Callback invoked once an interest has been received and its payload - * decrypted */ - ConsumerInterestCallback on_interest_input_decrypted_; - ConsumerInterestCallback on_interest_process_decrypted_; - - private: - Name name_; - std::shared_ptr<TLSConsumerSocket> tls_consumer_; - /* SSL handle */ - SSL *ssl_; - SSL_CTX *ctx_; - BIO_METHOD *bio_meth_; - /* Chain of MemBuf to be used as a temporary buffer to pass descypted data - * from the underlying layer to the application */ - std::unique_ptr<utils::MemBuf> decrypted_content_; - /* Chain of MemBuf holding the payload to be written into interest or data */ - std::unique_ptr<utils::MemBuf> payload_; - /* Chain of MemBuf holding the data retrieved from the underlying layer */ - std::unique_ptr<utils::MemBuf> head_; - bool something_to_read_; - bool content_downloaded_; - double old_max_win_; - double old_current_win_; - uint32_t random_suffix_; - ip_prefix_t secure_prefix_; - Prefix producer_namespace_; - interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; - std::mutex mtx_; - - /* Condition variable for the wait */ - std::condition_variable cv_; - - int protocol_; - - void setInterestPayload(interface::ConsumerSocket &c, - const core::Interest &interest); - - static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context, - const unsigned char **out, size_t *outlen, X509 *x, - size_t chainidx, int *al, void *add_arg); - - static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type, - unsigned int context, const unsigned char *out, - void *add_arg); - - static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type, - unsigned int context, const unsigned char *in, - size_t inlen, X509 *x, size_t chainidx, int *al, - void *add_arg); - - virtual void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override; - - virtual void readDataAvailable(size_t length) noexcept override; - - virtual size_t maxBufferSize() const override; - - virtual void readBufferAvailable( - std::unique_ptr<utils::MemBuf> &&buffer) noexcept override; - - virtual void readError(const std::error_code ec) noexcept override; - - virtual void readSuccess(std::size_t total_size) noexcept override; - - virtual bool isBufferMovable() noexcept override; - - int handshake(); - - void initSessionSocket(); -}; - -} // namespace implementation - -} // end namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc deleted file mode 100644 index 3748001fc..000000000 --- a/libtransport/src/implementation/p2psecure_socket_producer.cc +++ /dev/null @@ -1,376 +0,0 @@ -/* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/core/interest.h> -#include <implementation/p2psecure_socket_producer.h> -// #include <implementation/tls_rtc_socket_producer.h> -#include <implementation/tls_socket_producer.h> -#include <interfaces/tls_rtc_socket_producer.h> -#include <interfaces/tls_socket_producer.h> -#include <openssl/bio.h> -#include <openssl/rand.h> -#include <openssl/ssl.h> - -namespace transport { -namespace implementation { - -/* Workaround to prevent content with expiry time equal to 0 to be lost when - * pushed in the forwarder */ -#define HICN_HANDSHAKE_CONTENT_EXPIRY_TIME 100; - -P2PSecureProducerSocket::P2PSecureProducerSocket( - interface::ProducerSocket *producer_socket) - : ProducerSocket(producer_socket, - ProductionProtocolAlgorithms::BYTE_STREAM), - mtx_(), - cv_(), - map_producers(), - list_producers() {} - -P2PSecureProducerSocket::P2PSecureProducerSocket( - interface::ProducerSocket *producer_socket, bool rtc, - const std::shared_ptr<auth::Identity> &identity) - : ProducerSocket(producer_socket, - ProductionProtocolAlgorithms::BYTE_STREAM), - rtc_(rtc), - mtx_(), - cv_(), - map_producers(), - list_producers() { - /* Setup SSL context (identity and parameter to use TLS 1.3) */ - cert_509_ = identity->getCertificate().get(); - pkey_rsa_ = identity->getPrivateKey().get(); - - /* Set the callback so that when an interest is received we catch it and we - * decrypt the payload before passing it to the application. */ - ProducerSocket::setSocketOption( - ProducerCallbacksOptions::INTEREST_INPUT, - (ProducerInterestCallback)std::bind( - &P2PSecureProducerSocket::onInterestCallback, this, - std::placeholders::_1, std::placeholders::_2)); -} - -P2PSecureProducerSocket::~P2PSecureProducerSocket() {} - -void P2PSecureProducerSocket::initSessionSocket( - std::unique_ptr<TLSProducerSocket> &producer) { - producer->on_content_produced_application_ = - this->on_content_produced_application_; - producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, - this->content_object_expiry_time_); - producer->setSocketOption(SIGNER, this->signer_); - producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); - producer->setSocketOption(DATA_PACKET_SIZE, - (uint32_t)(this->data_packet_size_)); - uint32_t output_buffer_size = 0; - this->getSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, - output_buffer_size); - producer->setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, - output_buffer_size); - - if (!rtc_) { - producer->setInterface(new interface::TLSProducerSocket(producer.get())); - } else { - // TODO - // TLSRTCProducerSocket *rtc_producer = - // dynamic_cast<TLSRTCProducerSocket *>(producer.get()); - // rtc_producer->setInterface( - // new interface::TLSRTCProducerSocket(rtc_producer)); - } -} - -void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p, - Interest &interest) { - std::unique_lock<std::mutex> lck(mtx_); - std::unique_ptr<TLSProducerSocket> tls_producer; - auto it = map_producers.find(interest.getName()); - - if (it != map_producers.end()) { - return; - } - - if (!rtc_) { - tls_producer = - std::make_unique<TLSProducerSocket>(nullptr, this, interest.getName()); - } else { - // TODO - // tls_producer = std::make_unique<TLSRTCProducerSocket>(nullptr, this, - // interest.getName()); - } - - initSessionSocket(tls_producer); - TLSProducerSocket *tls_producer_ptr = tls_producer.get(); - map_producers.insert({interest.getName(), move(tls_producer)}); - - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Start handshake at " << interest.getName(); - - if (!rtc_) { - tls_producer_ptr->onInterest(*tls_producer_ptr, interest); - tls_producer_ptr->async_accept(); - } else { - // TODO - // TLSRTCProducerSocket *rtc_producer_ptr = - // dynamic_cast<TLSRTCProducerSocket *>(tls_producer_ptr); - // rtc_producer_ptr->onInterest(*rtc_producer_ptr, interest); - // rtc_producer_ptr->async_accept(); - } -} - -uint32_t P2PSecureProducerSocket::produceDatagram( - const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { - // TODO - throw errors::NotImplementedException(); - - if (!rtc_) { - throw errors::RuntimeException( - "RTC must be the transport protocol to start the production of current " - "data. Aborting."); - } - - std::unique_lock<std::mutex> lck(mtx_); - - if (list_producers.empty()) cv_.wait(lck); - - // TODO - // for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - // { - // TLSRTCProducerSocket *rtc_producer = - // dynamic_cast<TLSRTCProducerSocket *>(it->get()); - // rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); - // } - - return 0; -} - -uint32_t P2PSecureProducerSocket::produceStream( - const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t start_offset) { - if (rtc_) { - throw errors::RuntimeException( - "RTC transport protocol is not compatible with the production of " - "current data. Aborting."); - } - - std::unique_lock<std::mutex> lck(mtx_); - uint32_t segments = 0; - - if (list_producers.empty()) cv_.wait(lck); - - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - segments += (*it)->produceStream(content_name, buffer->clone(), is_last, - start_offset); - - return segments; -} - -uint32_t P2PSecureProducerSocket::produceStream(const Name &content_name, - const uint8_t *buffer, - size_t buffer_size, - bool is_last, - uint32_t start_offset) { - if (rtc_) { - throw errors::RuntimeException( - "RTC transport protocol is not compatible with the production of " - "current data. Aborting."); - } - - std::unique_lock<std::mutex> lck(mtx_); - uint32_t segments = 0; - if (list_producers.empty()) cv_.wait(lck); - - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - segments += (*it)->produceStream(content_name, buffer, buffer_size, is_last, - start_offset); - - return segments; -} - -// void P2PSecureProducerSocket::asyncProduce(const Name &content_name, -// const uint8_t *buf, -// size_t buffer_size, bool is_last, -// uint32_t *start_offset) { -// if (rtc_) { -// throw errors::RuntimeException( -// "RTC transport protocol is not compatible with the production of " -// "current data. Aborting."); -// } - -// std::unique_lock<std::mutex> lck(mtx_); -// if (list_producers.empty()) cv_.wait(lck); - -// for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) -// { -// (*it)->asyncProduce(content_name, buf, buffer_size, is_last, -// start_offset); -// } -// } - -void P2PSecureProducerSocket::asyncProduce( - Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, - uint32_t offset, uint32_t **last_segment) { - if (rtc_) { - throw errors::RuntimeException( - "RTC transport protocol is not compatible with the production of " - "current data. Aborting."); - } - - std::unique_lock<std::mutex> lck(mtx_); - if (list_producers.empty()) cv_.wait(lck); - - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { - (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset, - last_segment); - } -} - -/* Redefinition of socket options to avoid name hiding */ -int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, ProducerInterestCallback socket_option_value) { - if (!list_producers.empty()) { - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - } - - switch (socket_option_key) { - case ProducerCallbacksOptions::INTEREST_INPUT: - on_interest_input_decrypted_ = socket_option_value; - return SOCKET_OPTION_SET; - - case ProducerCallbacksOptions::INTEREST_DROP: - on_interest_dropped_input_buffer_ = socket_option_value; - return SOCKET_OPTION_SET; - - case ProducerCallbacksOptions::INTEREST_PASS: - on_interest_inserted_input_buffer_ = socket_option_value; - return SOCKET_OPTION_SET; - - case ProducerCallbacksOptions::CACHE_HIT: - on_interest_satisfied_output_buffer_ = socket_option_value; - return SOCKET_OPTION_SET; - - case ProducerCallbacksOptions::CACHE_MISS: - on_interest_process_decrypted_ = socket_option_value; - return SOCKET_OPTION_SET; - - default: - return SOCKET_OPTION_NOT_SET; - } -} - -int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, - const std::shared_ptr<auth::Signer> &socket_option_value) { - if (!list_producers.empty()) - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - - switch (socket_option_key) { - case GeneralTransportOptions::SIGNER: { - signer_.reset(); - signer_ = socket_option_value; - - return SOCKET_OPTION_SET; - } - default: - return SOCKET_OPTION_NOT_SET; - } -} - -int P2PSecureProducerSocket::setSocketOption(int socket_option_key, - uint32_t socket_option_value) { - if (!list_producers.empty()) { - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - } - switch (socket_option_key) { - case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: - content_object_expiry_time_ = - socket_option_value; // HICN_HANDSHAKE_CONTENT_EXPIRY_TIME; - return SOCKET_OPTION_SET; - } - return ProducerSocket::setSocketOption(socket_option_key, - socket_option_value); -} - -int P2PSecureProducerSocket::setSocketOption(int socket_option_key, - bool socket_option_value) { - if (!list_producers.empty()) - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - - return ProducerSocket::setSocketOption(socket_option_key, - socket_option_value); -} - -int P2PSecureProducerSocket::setSocketOption(int socket_option_key, - Name *socket_option_value) { - if (!list_producers.empty()) - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - - return ProducerSocket::setSocketOption(socket_option_key, - socket_option_value); -} - -int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, ProducerContentObjectCallback socket_option_value) { - if (!list_producers.empty()) - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - - return ProducerSocket::setSocketOption(socket_option_key, - socket_option_value); -} - -int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, ProducerContentCallback socket_option_value) { - if (!list_producers.empty()) - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - - switch (socket_option_key) { - case ProducerCallbacksOptions::CONTENT_PRODUCED: - on_content_produced_application_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; -} - -int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, auth::CryptoHashType socket_option_value) { - if (!list_producers.empty()) - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - - return ProducerSocket::setSocketOption(socket_option_key, - socket_option_value); -} - -int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, const std::string &socket_option_value) { - if (!list_producers.empty()) - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - - return ProducerSocket::setSocketOption(socket_option_key, - socket_option_value); -} - -} // namespace implementation -} // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h deleted file mode 100644 index f94347258..000000000 --- a/libtransport/src/implementation/p2psecure_socket_producer.h +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/auth/identity.h> -#include <hicn/transport/auth/signer.h> -#include <implementation/socket_producer.h> -// #include <implementation/tls_rtc_socket_producer.h> -#include <implementation/tls_socket_producer.h> -#include <openssl/ssl.h> -#include <utils/content_store.h> - -#include <condition_variable> -#include <forward_list> -#include <mutex> - -namespace transport { -namespace implementation { - -class P2PSecureProducerSocket : public ProducerSocket { - friend class TLSProducerSocket; - // TODO - // friend class TLSRTCProducerSocket; - - public: - explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket); - - explicit P2PSecureProducerSocket( - interface::ProducerSocket *producer_socket, bool rtc, - const std::shared_ptr<auth::Identity> &identity); - - ~P2PSecureProducerSocket(); - - uint32_t produceDatagram(const Name &content_name, - std::unique_ptr<utils::MemBuf> &&buffer) override; - - uint32_t produceStream(const Name &content_name, const uint8_t *buffer, - size_t buffer_size, bool is_last = true, - uint32_t start_offset = 0) override; - - uint32_t produceStream(const Name &content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last = true, - uint32_t start_offset = 0) override; - - void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t offset, - uint32_t **last_segment = nullptr) override; - - int setSocketOption(int socket_option_key, - ProducerInterestCallback socket_option_value) override; - - int setSocketOption( - int socket_option_key, - const std::shared_ptr<auth::Signer> &socket_option_value) override; - - int setSocketOption(int socket_option_key, - uint32_t socket_option_value) override; - - int setSocketOption(int socket_option_key, bool socket_option_value) override; - - int setSocketOption(int socket_option_key, - Name *socket_option_value) override; - - int setSocketOption( - int socket_option_key, - ProducerContentObjectCallback socket_option_value) override; - - int setSocketOption(int socket_option_key, - ProducerContentCallback socket_option_value) override; - - int setSocketOption(int socket_option_key, - auth::CryptoHashType socket_option_value) override; - - int setSocketOption(int socket_option_key, - const std::string &socket_option_value) override; - - using ProducerSocket::getSocketOption; - // using ProducerSocket::onInterest; - - protected: - /* Callback invoked once an interest has been received and its payload - * decrypted */ - ProducerInterestCallback on_interest_input_decrypted_; - ProducerInterestCallback on_interest_process_decrypted_; - ProducerContentCallback on_content_produced_application_; - - private: - bool rtc_; - std::mutex mtx_; - /* Condition variable for the wait */ - std::condition_variable cv_; - X509 *cert_509_; - EVP_PKEY *pkey_rsa_; - std::unordered_map<core::Name, std::unique_ptr<TLSProducerSocket>, - core::hash<core::Name>, core::compare2<core::Name>> - map_producers; - std::list<std::unique_ptr<TLSProducerSocket>> list_producers; - - void onInterestCallback(interface::ProducerSocket &p, Interest &interest); - - void initSessionSocket(std::unique_ptr<TLSProducerSocket> &producer); -}; - -} // namespace implementation -} // namespace transport diff --git a/libtransport/src/implementation/socket.cc b/libtransport/src/implementation/socket.cc index 2e21f2bc3..c554d09d3 100644 --- a/libtransport/src/implementation/socket.cc +++ b/libtransport/src/implementation/socket.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -14,13 +14,27 @@ */ #include <core/global_configuration.h> +#include <hicn/transport/interfaces/socket_options_default_values.h> #include <implementation/socket.h> namespace transport { namespace implementation { Socket::Socket(std::shared_ptr<core::Portal> &&portal) - : portal_(std::move(portal)), is_async_(false) {} + : portal_(std::move(portal)), + is_async_(false), + signer_(std::make_shared<auth::VoidSigner>()), + verifier_(std::make_shared<auth::VoidVerifier>()) {} + +int Socket::setSocketOption(int socket_option_key, + hicn_packet_format_t packet_format) { + return SOCKET_OPTION_NOT_SET; +} + +int Socket::getSocketOption(int socket_option_key, + hicn_packet_format_t &packet_format) { + return SOCKET_OPTION_NOT_GET; +} } // namespace implementation -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/implementation/socket.h b/libtransport/src/implementation/socket.h index cf22c03e1..d0d2ba2ac 100644 --- a/libtransport/src/implementation/socket.h +++ b/libtransport/src/implementation/socket.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -16,6 +16,8 @@ #pragma once #include <core/facade.h> +#include <hicn/transport/auth/signer.h> +#include <hicn/transport/auth/verifier.h> #include <hicn/transport/config.h> #include <hicn/transport/interfaces/callbacks.h> #include <hicn/transport/interfaces/socket_options_default_values.h> @@ -38,7 +40,28 @@ class Socket { virtual void connect() = 0; virtual bool isRunning() = 0; - virtual asio::io_service &getIoService() { return portal_->getIoService(); } + virtual asio::io_service &getIoService() { + return portal_->getThread().getIoService(); + } + + int setSocketOption(int socket_option_key, + hicn_packet_format_t packet_format); + int getSocketOption(int socket_option_key, + hicn_packet_format_t &packet_format); + + int getSocketOption(int socket_option_key, + std::shared_ptr<core::Portal> &socket_option_value) { + switch (socket_option_key) { + case interface::GeneralTransportOptions::PORTAL: + socket_option_value = portal_; + break; + default: + return SOCKET_OPTION_NOT_GET; + ; + } + + return SOCKET_OPTION_GET; + } protected: Socket(std::shared_ptr<core::Portal> &&portal); @@ -48,6 +71,8 @@ class Socket { protected: std::shared_ptr<core::Portal> portal_; bool is_async_; + std::shared_ptr<auth::Signer> signer_; + std::shared_ptr<auth::Verifier> verifier_; }; } // namespace implementation diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index e0981af7f..3117e21e7 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -20,6 +20,7 @@ #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/interfaces/statistics.h> #include <hicn/transport/utils/event_thread.h> +#include <implementation/socket.h> #include <protocols/cbr.h> #include <protocols/raaqm.h> #include <protocols/rtc/rtc.h> @@ -38,7 +39,7 @@ class ConsumerSocket : public Socket { std::shared_ptr<core::Portal> &&portal) : Socket(std::move(portal)), consumer_interface_(consumer), - async_downloader_(), + packet_format_(default_values::packet_format), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), max_window_size_(default_values::max_window_size), @@ -56,6 +57,8 @@ class ConsumerSocket : public Socket { rate_estimation_observer_(nullptr), rate_estimation_batching_parameter_(default_values::batch), rate_estimation_choice_(0), + manifest_factor_relevant_(default_values::manifest_factor_relevant), + manifest_factor_alert_(default_values::manifest_factor_alert), verifier_(std::make_shared<auth::VoidVerifier>()), verify_signature_(false), reset_window_(false), @@ -64,41 +67,43 @@ class ConsumerSocket : public Socket { on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), stats_summary_(VOID_HANDLER), + on_fwd_strategy_(VOID_HANDLER), + on_rec_strategy_(VOID_HANDLER), read_callback_(nullptr), timer_interval_milliseconds_(0), + recovery_strategy_(RtcTransportRecoveryStrategies::RTX_ONLY), + aggregated_data_(false), + content_sharing_mode_(false), + aggregated_interests_(false), guard_raaqm_params_() { switch (protocol) { case TransportProtocolAlgorithms::CBR: transport_protocol_ = - std::make_unique<protocol::CbrTransportProtocol>(this); + std::make_shared<protocol::CbrTransportProtocol>(this); break; case TransportProtocolAlgorithms::RTC: transport_protocol_ = - std::make_unique<protocol::rtc::RTCTransportProtocol>(this); + std::make_shared<protocol::rtc::RTCTransportProtocol>(this); break; case TransportProtocolAlgorithms::RAAQM: default: transport_protocol_ = - std::make_unique<protocol::RaaqmTransportProtocol>(this); + std::make_shared<protocol::RaaqmTransportProtocol>(this); break; } } public: ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) - : ConsumerSocket(consumer, protocol, std::make_shared<core::Portal>()) {} + : ConsumerSocket(consumer, protocol, core::Portal::createShared()) {} ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, - asio::io_service &io_service) - : ConsumerSocket(consumer, protocol, - std::make_shared<core::Portal>(io_service)) { + ::utils::EventThread &worker) + : ConsumerSocket(consumer, protocol, core::Portal::createShared(worker)) { is_async_ = true; } - ~ConsumerSocket() { - stop(); - async_downloader_.stop(); - } + ~ConsumerSocket() { stop(); } interface::ConsumerSocket *getInterface() { return consumer_interface_; @@ -122,18 +127,6 @@ class ConsumerSocket : public Socket { transport_protocol_->start(); - return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED; - } - - virtual int asyncConsume(const Name &name) { - if (!async_downloader_.stopped()) { - async_downloader_.add([this, name]() { - network_name_ = std::move(name); - network_name_.setSuffix(0); - transport_protocol_->start(); - }); - } - return CONSUMER_RUNNING; } @@ -149,6 +142,9 @@ class ConsumerSocket : public Socket { } } + using Socket::getSocketOption; + using Socket::setSocketOption; + virtual int setSocketOption(int socket_option_key, ReadCallback *socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -265,6 +261,23 @@ class ConsumerSocket : public Socket { timer_interval_milliseconds_ = socket_option_value; break; + case RtcTransportOptions::RECOVERY_STRATEGY: + recovery_strategy_ = + (RtcTransportRecoveryStrategies)socket_option_value; + break; + + case MANIFEST_FACTOR_RELEVANT: + manifest_factor_relevant_ = socket_option_value; + break; + + case MANIFEST_FACTOR_ALERT: + manifest_factor_alert_ = socket_option_value; + break; + + case GeneralTransportOptions::PACKET_FORMAT: + packet_format_ = socket_option_value; + break; + default: return SOCKET_OPTION_NOT_SET; } @@ -310,7 +323,6 @@ class ConsumerSocket : public Socket { on_content_object_input_ = VOID_HANDLER; break; } - default: return SOCKET_OPTION_NOT_SET; } @@ -328,6 +340,21 @@ class ConsumerSocket : public Socket { result = SOCKET_OPTION_SET; break; + case RtcTransportOptions::AGGREGATED_DATA: + aggregated_data_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + case RtcTransportOptions::CONTENT_SHARING_MODE: + content_sharing_mode_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + case RtcTransportOptions::AGGREGATED_INTERESTS: + aggregated_interests_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + default: return result; } @@ -405,37 +432,49 @@ class ConsumerSocket : public Socket { int setSocketOption( int socket_option_key, + const std::shared_ptr<auth::Signer> &socket_option_value) { + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case GeneralTransportOptions::SIGNER: + signer_.reset(); + signer_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + } + return SOCKET_OPTION_SET; + } + + int setSocketOption( + int socket_option_key, const std::shared_ptr<auth::Verifier> &socket_option_value) { - int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { case GeneralTransportOptions::VERIFIER: verifier_.reset(); verifier_ = socket_option_value; - result = SOCKET_OPTION_SET; break; default: - return result; + return SOCKET_OPTION_NOT_SET; } } - - return result; + return SOCKET_OPTION_SET; } int setSocketOption(int socket_option_key, const std::string &socket_option_value) { int result = SOCKET_OPTION_NOT_SET; - if (!transport_protocol_->isRunning()) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + if (!transport_protocol_->isRunning()) { output_interface_ = socket_option_value; portal_->setOutputInterface(output_interface_); result = SOCKET_OPTION_SET; - break; - - default: - return result; - } + } + break; + default: + return result; } return result; } @@ -461,6 +500,29 @@ class ConsumerSocket : public Socket { }); } + int setSocketOption(int socket_option_key, + StrategyCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + StrategyCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE: + on_fwd_strategy_ = socket_option_value; + break; + case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE: + on_rec_strategy_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + int getSocketOption(int socket_option_key, double &socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { @@ -532,6 +594,22 @@ class ConsumerSocket : public Socket { socket_option_value = timer_interval_milliseconds_; break; + case RtcTransportOptions::RECOVERY_STRATEGY: + socket_option_value = recovery_strategy_; + break; + + case GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT: + socket_option_value = manifest_factor_relevant_; + break; + + case GeneralTransportOptions::MANIFEST_FACTOR_ALERT: + socket_option_value = manifest_factor_alert_; + break; + + case GeneralTransportOptions::PACKET_FORMAT: + socket_option_value = packet_format_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -553,6 +631,18 @@ class ConsumerSocket : public Socket { socket_option_value = reset_window_; break; + case RtcTransportOptions::AGGREGATED_DATA: + socket_option_value = aggregated_data_; + break; + + case RtcTransportOptions::CONTENT_SHARING_MODE: + socket_option_value = content_sharing_mode_; + break; + + case RtcTransportOptions::AGGREGATED_INTERESTS: + socket_option_value = aggregated_interests_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -628,10 +718,11 @@ class ConsumerSocket : public Socket { } int getSocketOption(int socket_option_key, - std::shared_ptr<core::Portal> &socket_option_value) { + IcnObserver **socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; + case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: + *socket_option_value = (rate_estimation_observer_); break; default: @@ -642,18 +733,15 @@ class ConsumerSocket : public Socket { } int getSocketOption(int socket_option_key, - IcnObserver **socket_option_value) { - utils::SpinLock::Acquire locked(guard_raaqm_params_); + std::shared_ptr<auth::Signer> &socket_option_value) { switch (socket_option_key) { - case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: - *socket_option_value = (rate_estimation_observer_); - break; + case GeneralTransportOptions::SIGNER: + socket_option_value = signer_; + return SOCKET_OPTION_GET; default: return SOCKET_OPTION_NOT_GET; } - - return SOCKET_OPTION_GET; } int getSocketOption(int socket_option_key, @@ -665,7 +753,6 @@ class ConsumerSocket : public Socket { default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } @@ -677,7 +764,6 @@ class ConsumerSocket : public Socket { default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } @@ -714,6 +800,29 @@ class ConsumerSocket : public Socket { }); } + int getSocketOption(int socket_option_key, + StrategyCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + StrategyCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE: + *socket_option_value = &on_fwd_strategy_; + break; + case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE: + *socket_option_value = &on_rec_strategy_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + protected: template <typename Lambda, typename arg2> int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, @@ -726,9 +835,9 @@ class ConsumerSocket : public Socket { /* Condition variable for the wait */ std::condition_variable cv; bool done = false; - portal_->getIoService().dispatch([&socket_option_key, - &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getThread().tryRunHandlerNow([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -748,13 +857,12 @@ class ConsumerSocket : public Socket { protected: interface::ConsumerSocket *consumer_interface_; - utils::EventThread async_downloader_; - // No need to protect from multiple accesses in the async consumer // The parameter is accessible only with a getSocketOption and // set from the consume Name network_name_; + hicn_packet_format_t packet_format_; int interest_lifetime_; double min_window_size_; @@ -776,6 +884,8 @@ class ConsumerSocket : public Socket { int rate_estimation_choice_; // Verification parameters + uint32_t manifest_factor_relevant_; + uint32_t manifest_factor_alert_; std::shared_ptr<auth::Verifier> verifier_; transport::auth::KeyId *key_id_; std::atomic_bool verify_signature_; @@ -787,17 +897,25 @@ class ConsumerSocket : public Socket { ConsumerInterestCallback on_interest_satisfied_; ConsumerContentObjectCallback on_content_object_input_; ConsumerTimerCallback stats_summary_; + StrategyCallback on_fwd_strategy_; + StrategyCallback on_rec_strategy_; ReadCallback *read_callback_; uint32_t timer_interval_milliseconds_; // Transport protocol - std::unique_ptr<protocol::TransportProtocol> transport_protocol_; + std::shared_ptr<protocol::TransportProtocol> transport_protocol_; // Statistic TransportStatistics stats_; + // RTC protocol + RtcTransportRecoveryStrategies recovery_strategy_; + bool aggregated_data_; + bool content_sharing_mode_; + bool aggregated_interests_; + utils::SpinLock guard_raaqm_params_; std::string output_interface_; }; diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index 9daf79b9d..c5e0929b2 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -40,6 +40,7 @@ namespace implementation { using namespace core; using namespace interface; +using ProducerCallback = interface::ProducerSocket::Callback; class ProducerSocket : public Socket { private: @@ -47,12 +48,15 @@ class ProducerSocket : public Socket { std::shared_ptr<core::Portal> &&portal) : Socket(std::move(portal)), producer_interface_(producer_socket), + packet_format_(default_values::packet_format), data_packet_size_(default_values::content_object_packet_size), + max_segment_size_(default_values::content_object_packet_size), content_object_expiry_time_(default_values::content_object_expiry_time), - async_thread_(), - making_manifest_(false), + manifest_max_capacity_(default_values::manifest_max_capacity), hash_algorithm_(auth::CryptoHashType::SHA256), - suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL), + suffix_strategy_(std::make_shared<utils::IncrementalSuffixStrategy>(0)), + aggregated_data_(false), + fec_setting_(""), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -63,29 +67,30 @@ class ProducerSocket : public Socket { on_content_object_in_output_buffer_(VOID_HANDLER), on_content_object_output_(VOID_HANDLER), on_content_object_evicted_from_output_buffer_(VOID_HANDLER), - on_content_produced_(VOID_HANDLER) { + on_content_produced_(VOID_HANDLER), + application_callback_(nullptr) { switch (protocol) { case ProductionProtocolAlgorithms::RTC_PROD: production_protocol_ = - std::make_unique<protocol::RTCProductionProtocol>(this); + std::make_shared<protocol::RTCProductionProtocol>(this); break; case ProductionProtocolAlgorithms::BYTE_STREAM: default: production_protocol_ = - std::make_unique<protocol::ByteStreamProductionProtocol>(this); + std::make_shared<protocol::ByteStreamProductionProtocol>(this); break; } } public: ProducerSocket(interface::ProducerSocket *producer, int protocol) - : ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {} + : ProducerSocket(producer, protocol, core::Portal::createShared()) { + is_async_ = true; + } ProducerSocket(interface::ProducerSocket *producer, int protocol, - asio::io_service &io_service) - : ProducerSocket(producer, protocol, - std::make_shared<core::Portal>(io_service)) { - is_async_ = true; + ::utils::EventThread &worker) + : ProducerSocket(producer, protocol, core::Portal::createShared(worker)) { } virtual ~ProducerSocket() {} @@ -98,31 +103,9 @@ class ProducerSocket : public Socket { producer_interface_ = producer_socket; } - void connect() override { - portal_->connect(false); - production_protocol_->start(); - } + void connect() override { portal_->connect(false); } - bool isRunning() override { return !production_protocol_->isRunning(); }; - - virtual void asyncProduce(Name content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t offset, - uint32_t **last_segment = nullptr) { - if (!async_thread_.stopped()) { - auto a = buffer.release(); - async_thread_.add([this, content_name, a, is_last, offset, - last_segment]() { - auto buf = std::unique_ptr<utils::MemBuf>(a); - if (last_segment != NULL) { - **last_segment = offset + produceStream(content_name, std::move(buf), - is_last, offset); - } else { - produceStream(content_name, std::move(buf), is_last, offset); - } - }); - } - } + bool isRunning() override { return production_protocol_->isRunning(); }; virtual uint32_t produceStream(const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, @@ -156,12 +139,38 @@ class ProducerSocket : public Socket { production_protocol_->produce(content_object); } + void sendMapme() { production_protocol_->sendMapme(); } + void registerPrefix(const Prefix &producer_namespace) { - production_protocol_->registerNamespaceWithNetwork(producer_namespace); + portal_->registerRoute(producer_namespace); } + void start() { production_protocol_->start(); } void stop() { production_protocol_->stop(); } + using Socket::getSocketOption; + using Socket::setSocketOption; + + virtual int setSocketOption(int socket_option_key, + ProducerCallback *socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::PRODUCER_CALLBACK: + application_callback_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + virtual int setSocketOption(int socket_option_key, uint32_t socket_option_value) { switch (socket_option_key) { @@ -172,6 +181,17 @@ class ProducerSocket : public Socket { } break; + case GeneralTransportOptions::MANIFEST_MAX_CAPACITY: + manifest_max_capacity_ = socket_option_value; + break; + + case GeneralTransportOptions::MAX_SEGMENT_SIZE: + if (socket_option_value <= default_values::max_content_object_size && + socket_option_value > 0) { + max_segment_size_ = socket_option_value; + } + break; + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: production_protocol_->setOutputBufferSize(socket_option_value); break; @@ -180,6 +200,10 @@ class ProducerSocket : public Socket { content_object_expiry_time_ = socket_option_value; break; + case GeneralTransportOptions::PACKET_FORMAT: + packet_format_ = socket_option_value; + break; + default: return SOCKET_OPTION_NOT_SET; } @@ -260,8 +284,8 @@ class ProducerSocket : public Socket { virtual int setSocketOption(int socket_option_key, bool socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - making_manifest_ = socket_option_value; + case RtcTransportOptions::AGGREGATED_DATA: + aggregated_data_ = socket_option_value; break; default: @@ -385,7 +409,7 @@ class ProducerSocket : public Socket { virtual int setSocketOption( int socket_option_key, - core::NextSegmentCalculationStrategy socket_option_value) { + const std::shared_ptr<utils::SuffixStrategy> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SUFFIX_STRATEGY: suffix_strategy_ = socket_option_value; @@ -413,21 +437,68 @@ class ProducerSocket : public Socket { return SOCKET_OPTION_SET; } + virtual int setSocketOption( + int socket_option_key, + const std::shared_ptr<auth::Verifier> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::VERIFIER: + verifier_.reset(); + verifier_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } + } + + int getSocketOption(int socket_option_key, + ProducerCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::PRODUCER_CALLBACK: + *socket_option_value = application_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + virtual int getSocketOption(int socket_option_key, uint32_t &socket_option_value) { switch (socket_option_key) { + case GeneralTransportOptions::MANIFEST_MAX_CAPACITY: + socket_option_value = (uint32_t)manifest_max_capacity_; + break; + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - socket_option_value = production_protocol_->getOutputBufferSize(); + socket_option_value = + (uint32_t)production_protocol_->getOutputBufferSize(); break; case GeneralTransportOptions::DATA_PACKET_SIZE: socket_option_value = (uint32_t)data_packet_size_; break; + case GeneralTransportOptions::MAX_SEGMENT_SIZE: + socket_option_value = (uint32_t)max_segment_size_; + break; + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: socket_option_value = content_object_expiry_time_; break; + case GeneralTransportOptions::PACKET_FORMAT: + socket_option_value = packet_format_; + break; + default: return SOCKET_OPTION_NOT_SET; } @@ -438,14 +509,14 @@ class ProducerSocket : public Socket { virtual int getSocketOption(int socket_option_key, bool &socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - socket_option_value = making_manifest_; - break; - case GeneralTransportOptions::ASYNC_MODE: socket_option_value = is_async_; break; + case RtcTransportOptions::AGGREGATED_DATA: + socket_option_value = aggregated_data_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -547,21 +618,6 @@ class ProducerSocket : public Socket { }); } - virtual int getSocketOption( - int socket_option_key, - std::shared_ptr<core::Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - default: - return SOCKET_OPTION_NOT_GET; - ; - } - - return SOCKET_OPTION_GET; - } - virtual int getSocketOption(int socket_option_key, auth::CryptoHashType &socket_option_value) { switch (socket_option_key) { @@ -577,7 +633,7 @@ class ProducerSocket : public Socket { virtual int getSocketOption( int socket_option_key, - core::NextSegmentCalculationStrategy &socket_option_value) { + std::shared_ptr<utils::SuffixStrategy> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SUFFIX_STRATEGY: socket_option_value = suffix_strategy_; @@ -603,9 +659,43 @@ class ProducerSocket : public Socket { return SOCKET_OPTION_GET; } + int getSocketOption(int socket_option_key, + std::shared_ptr<auth::Verifier> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::VERIFIER: + socket_option_value = verifier_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } + } + + int getSocketOption(int socket_option_key, std::string &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::FEC_TYPE: + socket_option_value = fec_setting_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + virtual int setSocketOption(int socket_option_key, const std::string &socket_option_value) { - return SOCKET_OPTION_NOT_SET; + int result = SOCKET_OPTION_NOT_SET; + switch (socket_option_key) { + case GeneralTransportOptions::FEC_TYPE: + fec_setting_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + default: + return result; + } + return result; } // If the thread calling lambda_func is not the same of io_service, this @@ -623,9 +713,9 @@ class ProducerSocket : public Socket { std::condition_variable cv; bool done = false; - portal_->getIoService().dispatch([&socket_option_key, - &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getThread().tryRunHandlerNow([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -655,9 +745,9 @@ class ProducerSocket : public Socket { /* Condition variable for the wait */ std::condition_variable cv; bool done = false; - portal_->getIoService().dispatch([&socket_option_key, - &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getThread().tryRunHandlerNow([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -677,20 +767,24 @@ class ProducerSocket : public Socket { // Threads protected: interface::ProducerSocket *producer_interface_; - asio::io_service io_service_; + std::atomic<hicn_packet_format_t> packet_format_; std::atomic<size_t> data_packet_size_; + std::atomic<size_t> max_segment_size_; std::atomic<uint32_t> content_object_expiry_time_; - utils::EventThread async_thread_; - - std::atomic<bool> making_manifest_; + std::atomic<uint32_t> manifest_max_capacity_; std::atomic<auth::CryptoHashType> hash_algorithm_; std::atomic<auth::CryptoSuite> crypto_suite_; utils::SpinLock signer_lock_; - std::shared_ptr<auth::Signer> signer_; - core::NextSegmentCalculationStrategy suffix_strategy_; + std::shared_ptr<utils::SuffixStrategy> suffix_strategy_; - std::unique_ptr<protocol::ProductionProtocol> production_protocol_; + std::shared_ptr<protocol::ProductionProtocol> production_protocol_; + + // RTC transport + bool aggregated_data_; + + // FEC setting + std::string fec_setting_; // callbacks ProducerInterestCallback on_interest_input_; @@ -706,6 +800,8 @@ class ProducerSocket : public Socket { ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_; ProducerContentCallback on_content_produced_; + + ProducerCallback *application_callback_; }; } // namespace implementation diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc deleted file mode 100644 index db62b10c1..000000000 --- a/libtransport/src/implementation/tls_rtc_socket_producer.cc +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/core/interest.h> -#include <hicn/transport/interfaces/p2psecure_socket_producer.h> -#include <implementation/p2psecure_socket_producer.h> -#include <implementation/tls_rtc_socket_producer.h> -#include <openssl/bio.h> -#include <openssl/rand.h> -#include <openssl/ssl.h> - -namespace transport { -namespace implementation { - -int TLSRTCProducerSocket::read(BIO *b, char *buf, size_t size, - size_t *readbytes) { - int ret; - - if (size > INT_MAX) size = INT_MAX; - - ret = TLSRTCProducerSocket::readOld(b, buf, (int)size); - - if (ret <= 0) { - *readbytes = 0; - return ret; - } - - *readbytes = (size_t)ret; - - return 1; -} - -int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) { - TLSRTCProducerSocket *socket; - socket = (TLSRTCProducerSocket *)BIO_get_data(b); - - std::unique_lock<std::mutex> lck(socket->mtx_); - if (!socket->something_to_read_) { - (socket->cv_).wait(lck); - } - - utils::MemBuf *membuf = socket->handshake_packet_->next(); - int size_to_read; - - if ((int)membuf->length() > size) { - size_to_read = size; - } else { - size_to_read = membuf->length(); - socket->something_to_read_ = false; - } - - std::memcpy(buf, membuf->data(), size_to_read); - membuf->trimStart(size_to_read); - - return size_to_read; -} - -int TLSRTCProducerSocket::write(BIO *b, const char *buf, size_t size, - size_t *written) { - int ret; - - if (size > INT_MAX) size = INT_MAX; - - ret = TLSRTCProducerSocket::writeOld(b, buf, (int)size); - - if (ret <= 0) { - *written = 0; - return ret; - } - - *written = (size_t)ret; - - return 1; -} - -int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { - TLSRTCProducerSocket *socket; - socket = (TLSRTCProducerSocket *)BIO_get_data(b); - - if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { - bool making_manifest = socket->parent_->making_manifest_; - - socket->tls_chunks_--; - socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, - false); - socket->parent_->ProducerSocket::produce( - socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, 0); - socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, - making_manifest); - socket->first_ = false; - - } else { - std::unique_ptr<utils::MemBuf> mbuf = - utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); - auto a = mbuf.release(); - - socket->async_thread_.add([socket = socket, a]() { - socket->to_call_oncontentproduced_--; - auto mbuf = std::unique_ptr<utils::MemBuf>(a); - - socket->RTCProducerSocket::produce(std::move(mbuf)); - - ProducerContentCallback on_content_produced_application; - socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, - on_content_produced_application); - - if (socket->to_call_oncontentproduced_ == 0 && - on_content_produced_application) { - on_content_produced_application( - (transport::interface::ProducerSocket &)(*socket->getInterface()), - std::error_code(), 0); - } - }); - } - - return num; -} - -TLSRTCProducerSocket::TLSRTCProducerSocket( - interface::ProducerSocket *producer_socket, P2PSecureProducerSocket *parent, - const Name &handshake_name) - : ProducerSocket(producer_socket), - RTCProducerSocket(producer_socket), - TLSProducerSocket(producer_socket, parent, handshake_name) { - BIO_METHOD *bio_meth = - BIO_meth_new(BIO_TYPE_ACCEPT, "secure rtc producer socket"); - BIO_meth_set_read(bio_meth, TLSRTCProducerSocket::readOld); - BIO_meth_set_write(bio_meth, TLSRTCProducerSocket::writeOld); - BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl); - BIO *bio = BIO_new(bio_meth); - BIO_set_init(bio, 1); - BIO_set_data(bio, this); - SSL_set_bio(ssl_, bio, bio); -} - -void TLSRTCProducerSocket::accept() { - HandshakeState handshake_state = getHandshakeState(); - - if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { - tls_chunks_ = 1; - int result = SSL_accept(ssl_); - - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); - } - - DLOG_IF(INFO, VLOG_IS_ON(2)) << "Handshake performed!"; - - parent_->list_producers.push_front( - std::move(parent_->map_producers[handshake_name_])); - parent_->map_producers.erase(handshake_name_); - - ProducerInterestCallback on_interest_process_decrypted; - getSocketOption(ProducerCallbacksOptions::CACHE_MISS, - on_interest_process_decrypted); - - if (on_interest_process_decrypted) { - Interest inter(std::move(handshake_packet_)); - on_interest_process_decrypted( - (transport::interface::ProducerSocket &)(*getInterface()), inter); - } - - parent_->cv_.notify_one(); -} - -int TLSRTCProducerSocket::async_accept() { - if (!async_thread_.stopped()) { - async_thread_.add([this]() { this->TLSRTCProducerSocket::accept(); }); - } else { - throw errors::RuntimeException( - "Async thread not running, impossible to perform handshake"); - } - - return 1; -} - -void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { - HandshakeState handshake_state = getHandshakeState(); - - if (handshake_state != SERVER_FINISHED) { - throw errors::RuntimeException( - "New handshake on the same P2P secure producer socket not supported"); - } - - size_t buf_size = buffer->length(); - tls_chunks_ = ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); - to_call_oncontentproduced_ = tls_chunks_; - - SSL_write(ssl_, buffer->data(), buf_size); - BIO *wbio = SSL_get_wbio(ssl_); - int i = BIO_flush(wbio); - (void)i; // To shut up gcc 5 -} - -} // namespace implementation -} // namespace transport diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.h b/libtransport/src/implementation/tls_rtc_socket_producer.h deleted file mode 100644 index 92c657afc..000000000 --- a/libtransport/src/implementation/tls_rtc_socket_producer.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <implementation/tls_socket_producer.h> - -namespace transport { -namespace implementation { - -class P2PSecureProducerSocket; - -class TLSRTCProducerSocket : public TLSProducerSocket { - friend class P2PSecureProducerSocket; - - public: - explicit TLSRTCProducerSocket(interface::ProducerSocket *producer_socket, - P2PSecureProducerSocket *parent, - const Name &handshake_name); - - ~TLSRTCProducerSocket() = default; - - uint32_t produceDatagram(const Name &content_name, - std::unique_ptr<utils::MemBuf> &&buffer) override; - - void accept() override; - - int async_accept() override; - - using TLSProducerSocket::onInterest; - using TLSProducerSocket::produce; - - protected: - static int read(BIO *b, char *buf, size_t size, size_t *readbytes); - - static int readOld(BIO *h, char *buf, int size); - - static int write(BIO *b, const char *buf, size_t size, size_t *written); - - static int writeOld(BIO *h, const char *buf, int num); -}; - -} // namespace implementation - -} // end namespace transport diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc deleted file mode 100644 index 65472b41d..000000000 --- a/libtransport/src/implementation/tls_socket_consumer.cc +++ /dev/null @@ -1,368 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <implementation/tls_socket_consumer.h> -#include <openssl/bio.h> -#include <openssl/ssl.h> -#include <openssl/tls1.h> - -#include <random> - -namespace transport { -namespace implementation { - -void TLSConsumerSocket::setInterestPayload(interface::ConsumerSocket &c, - const core::Interest &interest) { - Interest &int2 = const_cast<Interest &>(interest); - random_suffix_ = int2.getName().getSuffix(); - - if (payload_ != NULL) int2.appendPayload(std::move(payload_)); -} - -/* Return the number of read bytes in the return param */ -int readOldTLS(BIO *b, char *buf, int size) { - if (size < 0) return size; - - TLSConsumerSocket *socket; - socket = (TLSConsumerSocket *)BIO_get_data(b); - - std::unique_lock<std::mutex> lck(socket->mtx_); - - if (!socket->something_to_read_) { - if (!socket->transport_protocol_->isRunning()) { - socket->network_name_.setSuffix(socket->random_suffix_); - socket->ConsumerSocket::asyncConsume(socket->network_name_); - } - - if (!socket->something_to_read_) socket->cv_.wait(lck); - } - - size_t size_to_read, read; - size_t chain_size = socket->head_->length(); - - if (socket->head_->isChained()) - chain_size = socket->head_->computeChainDataLength(); - - if (chain_size > (size_t)size) { - read = size_to_read = (size_t)size; - } else { - read = size_to_read = chain_size; - socket->something_to_read_ = false; - } - - while (size_to_read) { - if (socket->head_->length() < size_to_read) { - std::memcpy(buf, socket->head_->data(), socket->head_->length()); - size_to_read -= socket->head_->length(); - buf += socket->head_->length(); - socket->head_ = socket->head_->pop(); - } else { - std::memcpy(buf, socket->head_->data(), size_to_read); - socket->head_->trimStart(size_to_read); - size_to_read = 0; - } - } - - return (int)read; -} - -/* Return the number of read bytes in readbytes */ -int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes) { - int ret; - - if (size > INT_MAX) size = INT_MAX; - - ret = readOldTLS(b, buf, (int)size); - - if (ret <= 0) { - *readbytes = 0; - return ret; - } - - *readbytes = (size_t)ret; - - return 1; -} - -/* Return the number of written bytes in the return param */ -int writeOldTLS(BIO *b, const char *buf, int num) { - TLSConsumerSocket *socket; - socket = (TLSConsumerSocket *)BIO_get_data(b); - - socket->payload_ = utils::MemBuf::copyBuffer(buf, num); - - socket->ConsumerSocket::setSocketOption( - ConsumerCallbacksOptions::INTEREST_OUTPUT, - (ConsumerInterestCallback)std::bind( - &TLSConsumerSocket::setInterestPayload, socket, std::placeholders::_1, - std::placeholders::_2)); - - return num; -} - -/* Return the number of written bytes in written */ -int writeTLS(BIO *b, const char *buf, size_t size, size_t *written) { - int ret; - - if (size > INT_MAX) size = INT_MAX; - - ret = writeOldTLS(b, buf, (int)size); - - if (ret <= 0) { - *written = 0; - return ret; - } - - *written = (size_t)ret; - - return 1; -} - -long ctrlTLS(BIO *b, int cmd, long num, void *ptr) { return 1; } - -TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, - int protocol, SSL *ssl) - : ConsumerSocket(consumer_socket, protocol), - name_(), - decrypted_content_(), - payload_(), - head_(), - something_to_read_(false), - content_downloaded_(false), - random_suffix_(), - producer_namespace_(), - read_callback_decrypted_(), - mtx_(), - cv_(), - async_downloader_tls_() { - /* Create the (d)TLS state */ - const SSL_METHOD *meth = TLS_client_method(); - ctx_ = SSL_CTX_new(meth); - - int result = - SSL_CTX_set_ciphersuites(ctx_, - "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" - "SHA256:TLS_AES_128_GCM_SHA256"); - if (result != 1) { - throw errors::RuntimeException( - "Unable to set cipher list on TLS subsystem. Aborting."); - } - - SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION); - SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION); - SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL); - SSL_CTX_set_ssl_version(ctx_, meth); - - ssl_ = ssl; - - BIO_METHOD *bio_meth = - BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket"); - BIO_meth_set_read(bio_meth, readOldTLS); - BIO_meth_set_write(bio_meth, writeOldTLS); - BIO_meth_set_ctrl(bio_meth, ctrlTLS); - BIO *bio = BIO_new(bio_meth); - BIO_set_init(bio, 1); - BIO_set_data(bio, this); - SSL_set_bio(ssl_, bio, bio); - - std::default_random_engine generator; - std::uniform_int_distribution<int> distribution( - 1, std::numeric_limits<uint32_t>::max()); - random_suffix_ = 0; - - this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, - this); -}; - -/* The producer interface is not owned by the application, so is TLSSocket task - * to deallocate the memory */ -TLSConsumerSocket::~TLSConsumerSocket() { delete consumer_interface_; } - -int TLSConsumerSocket::consume(const Name &name, - std::unique_ptr<utils::MemBuf> &&buffer) { - this->payload_ = std::move(buffer); - - this->ConsumerSocket::setSocketOption( - ConsumerCallbacksOptions::INTEREST_OUTPUT, - (ConsumerInterestCallback)std::bind( - &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1, - std::placeholders::_2)); - - return consume(name); -} - -int TLSConsumerSocket::consume(const Name &name) { - if (transport_protocol_->isRunning()) { - return CONSUMER_BUSY; - } - - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - throw errors::RuntimeException("Handshake not performed"); - } - - return download_content(name); -} - -int TLSConsumerSocket::download_content(const Name &name) { - network_name_ = name; - network_name_.setSuffix(0); - something_to_read_ = false; - content_downloaded_ = false; - - std::size_t max_buffer_size = read_callback_decrypted_->maxBufferSize(); - std::size_t buffer_size = - read_callback_decrypted_->maxBufferSize() + SSL3_RT_MAX_PLAIN_LENGTH; - decrypted_content_ = utils::MemBuf::createCombined(buffer_size); - int result = -1; - std::size_t size = 0; - - while (!content_downloaded_ || something_to_read_) { - result = SSL_read(this->ssl_, decrypted_content_->writableTail(), - SSL3_RT_MAX_PLAIN_LENGTH); - - /* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of - * the data has been fully downloaded */ - - /* ASSERT((result < SSL3_RT_MAX_PLAIN_LENGTH && content_downloaded_) || */ - /* result == SSL3_RT_MAX_PLAIN_LENGTH); */ - - if (result >= 0) { - size += result; - decrypted_content_->append(result); - } else { - throw errors::RuntimeException("Unable to download content"); - } - - if (decrypted_content_->length() >= max_buffer_size) { - if (read_callback_decrypted_->isBufferMovable()) { - /* No need to perform an additional copy. The whole buffer will be - * tranferred to the application. */ - read_callback_decrypted_->readBufferAvailable( - std::move(decrypted_content_)); - decrypted_content_ = utils::MemBuf::create(buffer_size); - } else { - /* The buffer will be copied into the application-provided buffer */ - uint8_t *buffer; - std::size_t length; - std::size_t total_length = decrypted_content_->length(); - - while (decrypted_content_->length()) { - buffer = nullptr; - length = 0; - read_callback_decrypted_->getReadBuffer(&buffer, &length); - - if (!buffer || !length) { - throw errors::RuntimeException( - "Invalid buffer provided by the application."); - } - - auto to_copy = std::min(decrypted_content_->length(), length); - std::memcpy(buffer, decrypted_content_->data(), to_copy); - decrypted_content_->trimStart(to_copy); - } - - read_callback_decrypted_->readDataAvailable(total_length); - decrypted_content_->clear(); - } - } - } - - read_callback_decrypted_->readSuccess(size); - - return CONSUMER_FINISHED; -} - -int TLSConsumerSocket::asyncConsume(const Name &name, - std::unique_ptr<utils::MemBuf> &&buffer) { - this->payload_ = std::move(buffer); - - this->ConsumerSocket::setSocketOption( - ConsumerCallbacksOptions::INTEREST_OUTPUT, - (ConsumerInterestCallback)std::bind( - &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1, - std::placeholders::_2)); - - return asyncConsume(name); -} - -int TLSConsumerSocket::asyncConsume(const Name &name) { - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - throw errors::RuntimeException("Handshake not performed"); - } - - if (!async_downloader_tls_.stopped()) { - async_downloader_tls_.add([this, name]() { download_content(name); }); - } - - return CONSUMER_RUNNING; -} - -void TLSConsumerSocket::registerPrefix(const Prefix &producer_namespace) { - producer_namespace_ = producer_namespace; -} - -int TLSConsumerSocket::setSocketOption(int socket_option_key, - ReadCallback *socket_option_value) { - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, ReadCallback *socket_option_value) -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::READ_CALLBACK: - read_callback_decrypted_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - }); -} - -void TLSConsumerSocket::getReadBuffer(uint8_t **application_buffer, - size_t *max_length) {} - -void TLSConsumerSocket::readDataAvailable(size_t length) noexcept {} - -size_t TLSConsumerSocket::maxBufferSize() const { - return SSL3_RT_MAX_PLAIN_LENGTH; -} - -void TLSConsumerSocket::readBufferAvailable( - std::unique_ptr<utils::MemBuf> &&buffer) noexcept { - std::unique_lock<std::mutex> lck(this->mtx_); - - if (head_) { - head_->prependChain(std::move(buffer)); - } else { - head_ = std::move(buffer); - } - - something_to_read_ = true; - cv_.notify_one(); -} - -void TLSConsumerSocket::readError(const std::error_code ec) noexcept {} - -void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept { - std::unique_lock<std::mutex> lck(this->mtx_); - content_downloaded_ = true; - something_to_read_ = true; - cv_.notify_one(); -} - -bool TLSConsumerSocket::isBufferMovable() noexcept { return true; } - -} // namespace implementation -} // namespace transport diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h deleted file mode 100644 index be08ec47d..000000000 --- a/libtransport/src/implementation/tls_socket_consumer.h +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/interfaces/socket_consumer.h> -#include <implementation/socket_consumer.h> -#include <openssl/ssl.h> - -namespace transport { -namespace implementation { - -class TLSConsumerSocket : public ConsumerSocket, - public interface::ConsumerSocket::ReadCallback { - /* Return the number of read bytes in readbytes */ - friend int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes); - - /* Return the number of read bytes in the return param */ - friend int readOldTLS(BIO *h, char *buf, int size); - - /* Return the number of written bytes in written */ - friend int writeTLS(BIO *b, const char *buf, size_t size, size_t *written); - - /* Return the number of written bytes in the return param */ - friend int writeOldTLS(BIO *h, const char *buf, int num); - - friend long ctrlTLS(BIO *b, int cmd, long num, void *ptr); - - public: - explicit TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, - int protocol, SSL *ssl_); - - ~TLSConsumerSocket(); - - int consume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer); - int consume(const Name &name) override; - - int asyncConsume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer); - int asyncConsume(const Name &name) override; - - void registerPrefix(const Prefix &producer_namespace); - - int setSocketOption( - int socket_option_key, - interface::ConsumerSocket::ReadCallback *socket_option_value) override; - - using ConsumerSocket::getSocketOption; - using ConsumerSocket::setSocketOption; - - protected: - /* Callback invoked once an interest has been received and its payload - * decrypted */ - ConsumerInterestCallback on_interest_input_decrypted_; - ConsumerInterestCallback on_interest_process_decrypted_; - - private: - Name name_; - /* SSL handle */ - SSL *ssl_; - SSL_CTX *ctx_; - /* Chain of MemBuf to be used as a temporary buffer to pass descypted data - * from the underlying layer to the application */ - std::unique_ptr<utils::MemBuf> decrypted_content_; - /* Chain of MemBuf holding the payload to be written into interest or data */ - std::unique_ptr<utils::MemBuf> payload_; - /* Chain of MemBuf holding the data retrieved from the underlying layer */ - std::unique_ptr<utils::MemBuf> head_; - bool something_to_read_; - bool content_downloaded_; - uint32_t random_suffix_; - Prefix producer_namespace_; - interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; - std::mutex mtx_; - /* Condition variable for the wait */ - std::condition_variable cv_; - utils::EventThread async_downloader_tls_; - - void setInterestPayload(interface::ConsumerSocket &c, - const core::Interest &interest); - - virtual void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override; - - virtual void readDataAvailable(size_t length) noexcept override; - - virtual size_t maxBufferSize() const override; - - virtual void readBufferAvailable( - std::unique_ptr<utils::MemBuf> &&buffer) noexcept override; - - virtual void readError(const std::error_code ec) noexcept override; - - virtual void readSuccess(std::size_t total_size) noexcept override; - - virtual bool isBufferMovable() noexcept override; - - int download_content(const Name &name); -}; - -} // namespace implementation -} // end namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc deleted file mode 100644 index 3992ca45c..000000000 --- a/libtransport/src/implementation/tls_socket_producer.cc +++ /dev/null @@ -1,551 +0,0 @@ -/* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/interfaces/socket_producer.h> -#include <implementation/p2psecure_socket_producer.h> -#include <implementation/tls_socket_producer.h> -#include <openssl/bio.h> -#include <openssl/rand.h> -#include <openssl/ssl.h> - -namespace transport { -namespace implementation { - -/* Return the number of read bytes in readbytes */ -int TLSProducerSocket::read(BIO *b, char *buf, size_t size, size_t *readbytes) { - int ret; - - if (size > INT_MAX) size = INT_MAX; - - ret = TLSProducerSocket::readOld(b, buf, (int)size); - - if (ret <= 0) { - *readbytes = 0; - return ret; - } - - *readbytes = (size_t)ret; - - return 1; -} - -/* Return the number of read bytes in the return param */ -int TLSProducerSocket::readOld(BIO *b, char *buf, int size) { - TLSProducerSocket *socket; - socket = (TLSProducerSocket *)BIO_get_data(b); - - std::unique_lock<std::mutex> lck(socket->mtx_); - - DLOG_IF(INFO, VLOG_IS_ON(4)) << "Start wait on the CV."; - - if (!socket->something_to_read_) { - (socket->cv_).wait(lck); - } - - DLOG_IF(INFO, VLOG_IS_ON(4)) << "CV unlocked."; - - /* Either there already is something to read, or the thread has been waken up. - * We must return the payload in the interest anyway */ - utils::MemBuf *membuf = socket->handshake_packet_->next(); - int size_to_read; - - if ((int)membuf->length() > size) { - size_to_read = size; - } else { - size_to_read = (int)membuf->length(); - socket->something_to_read_ = false; - } - - std::memcpy(buf, membuf->data(), size_to_read); - membuf->trimStart(size_to_read); - - return size_to_read; -} - -/* Return the number of written bytes in written */ -int TLSProducerSocket::write(BIO *b, const char *buf, size_t size, - size_t *written) { - int ret; - - if (size > INT_MAX) size = INT_MAX; - - ret = TLSProducerSocket::writeOld(b, buf, (int)size); - - if (ret <= 0) { - *written = 0; - return ret; - } - - *written = (size_t)ret; - - return 1; -} - -/* Return the number of written bytes in the return param */ -int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { - TLSProducerSocket *socket; - socket = (TLSProducerSocket *)BIO_get_data(b); - - if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { - bool making_manifest = socket->parent_->making_manifest_; - - //! socket->tls_chunks_ corresponds to is_last - socket->tls_chunks_--; - socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, - false); - socket->parent_->ProducerSocket::produceStream( - socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, - socket->last_segment_); - socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, - making_manifest); - socket->first_ = false; - } else { - socket->still_writing_ = true; - - std::unique_ptr<utils::MemBuf> mbuf = - utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); - auto a = mbuf.release(); - - socket->async_thread_.add([socket = socket, a]() { - auto mbuf = std::unique_ptr<utils::MemBuf>(a); - - socket->tls_chunks_--; - socket->to_call_oncontentproduced_--; - - socket->last_segment_ += socket->ProducerSocket::produceStream( - socket->name_, std::move(mbuf), socket->tls_chunks_ == 0, - socket->last_segment_); - - ProducerContentCallback *on_content_produced_application; - socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, - &on_content_produced_application); - - if (socket->to_call_oncontentproduced_ == 0 && - on_content_produced_application) { - on_content_produced_application->operator()(*socket->getInterface(), - std::error_code(), 0); - } - }); - } - - return num; -} - -TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, - P2PSecureProducerSocket *parent, - const Name &handshake_name) - : ProducerSocket(producer_socket, - ProductionProtocolAlgorithms::BYTE_STREAM), - on_content_produced_application_(), - mtx_(), - cv_(), - something_to_read_(false), - handshake_state_(UNINITIATED), - name_(), - handshake_packet_(), - last_segment_(0), - parent_(parent), - first_(true), - handshake_name_(handshake_name), - tls_chunks_(0), - to_call_oncontentproduced_(0), - still_writing_(false), - encryption_thread_() { - const SSL_METHOD *meth = TLS_server_method(); - ctx_ = SSL_CTX_new(meth); - - /* Setup SSL context (identity and parameter to use TLS 1.3) */ - SSL_CTX_use_certificate(ctx_, parent->cert_509_); - SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_); - - int result = - SSL_CTX_set_ciphersuites(ctx_, - "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" - "SHA256:TLS_AES_128_GCM_SHA256"); - - if (result != 1) { - throw errors::RuntimeException( - "Unable to set cipher list on TLS subsystem. Aborting."); - } - - // We force it to be TLS 1.3 - SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION); - SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION); - SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL); - SSL_CTX_set_num_tickets(ctx_, 0); - - result = SSL_CTX_add_custom_ext( - ctx_, 100, SSL_EXT_CLIENT_HELLO | SSL_EXT_TLS1_3_ENCRYPTED_EXTENSIONS, - TLSProducerSocket::addHicnKeyIdCb, TLSProducerSocket::freeHicnKeyIdCb, - this, TLSProducerSocket::parseHicnKeyIdCb, NULL); - - ssl_ = SSL_new(ctx_); - - /* Setup this producer socker as the bio that TLS will use to write and read - * data (in stream mode) */ - BIO_METHOD *bio_meth = - BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket"); - BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld); - BIO_meth_set_write(bio_meth, TLSProducerSocket::writeOld); - BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl); - BIO *bio = BIO_new(bio_meth); - BIO_set_init(bio, 1); - BIO_set_data(bio, this); - SSL_set_bio(ssl_, bio, bio); - - /* Set the callback so that when an interest is received we catch it and we - * decrypt the payload before passing it to the application. */ - this->ProducerSocket::setSocketOption( - ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this, - std::placeholders::_1, - std::placeholders::_2)); - - this->ProducerSocket::setSocketOption( - ProducerCallbacksOptions::CONTENT_PRODUCED, - (ProducerContentCallback)bind( - &TLSProducerSocket::onContentProduced, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); -} - -/* The producer interface is not owned by the application, so is TLSSocket task - * to deallocate the memory */ -TLSProducerSocket::~TLSProducerSocket() { delete producer_interface_; } - -void TLSProducerSocket::accept() { - HandshakeState handshake_state = getHandshakeState(); - - if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { - tls_chunks_ = 1; - int result = SSL_accept(ssl_); - - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); - } - - parent_->list_producers.push_front( - std::move(parent_->map_producers[handshake_name_])); - parent_->map_producers.erase(handshake_name_); - - ProducerInterestCallback *on_interest_process_decrypted; - getSocketOption(ProducerCallbacksOptions::CACHE_MISS, - &on_interest_process_decrypted); - - if (*on_interest_process_decrypted) { - Interest inter(std::move(*handshake_packet_)); - handshake_packet_.reset(); - on_interest_process_decrypted->operator()(*getInterface(), inter); - } else { - throw errors::RuntimeException( - "On interest process unset: unable to perform handshake"); - } - - handshake_state_ = SERVER_FINISHED; - DLOG_IF(INFO, VLOG_IS_ON(2)) << "Handshake performed!"; -} - -int TLSProducerSocket::async_accept() { - if (!async_thread_.stopped()) { - async_thread_.add([this]() { this->accept(); }); - } else { - throw errors::RuntimeException( - "Async thread not running: unable to perform handshake"); - } - - return 1; -} - -void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) { - HandshakeState handshake_state = getHandshakeState(); - - if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) { - std::unique_lock<std::mutex> lck(mtx_); - - name_ = interest.getName(); - // interest.separateHeaderPayload(); - handshake_packet_ = interest.acquireMemBufReference(); - something_to_read_ = true; - - cv_.notify_one(); - return; - } else if (handshake_state == SERVER_FINISHED) { - // interest.separateHeaderPayload(); - handshake_packet_ = interest.acquireMemBufReference(); - something_to_read_ = true; - - if (interest.getPayload()->length() > 0) { - SSL_read( - ssl_, - const_cast<unsigned char *>(interest.getPayload()->writableData()), - (int)interest.getPayload()->length()); - } - - ProducerInterestCallback *on_interest_input_decrypted; - getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, - &on_interest_input_decrypted); - - if (*on_interest_input_decrypted) - (*on_interest_input_decrypted)(*getInterface(), interest); - } -} - -void TLSProducerSocket::cacheMiss(interface::ProducerSocket &p, - Interest &interest) { - HandshakeState handshake_state = getHandshakeState(); - - DLOG_IF(INFO, VLOG_IS_ON(3)) << "On cache miss in TLS socket producer."; - - if (handshake_state == CLIENT_HELLO) { - std::unique_lock<std::mutex> lck(mtx_); - - // interest.separateHeaderPayload(); - handshake_packet_ = interest.acquireMemBufReference(); - something_to_read_ = true; - handshake_state_ = CLIENT_FINISHED; - - cv_.notify_one(); - } else if (handshake_state == SERVER_FINISHED) { - // interest.separateHeaderPayload(); - handshake_packet_ = interest.acquireMemBufReference(); - something_to_read_ = true; - - if (interest.getPayload()->length() > 0) { - SSL_read( - ssl_, - const_cast<unsigned char *>(interest.getPayload()->writableData()), - (int)interest.getPayload()->length()); - } - - if (on_interest_process_decrypted_ != VOID_HANDLER) - on_interest_process_decrypted_(*getInterface(), interest); - } -} - -TLSProducerSocket::HandshakeState TLSProducerSocket::getHandshakeState() { - if (SSL_in_before(ssl_)) { - handshake_state_ = UNINITIATED; - } - - if (SSL_in_init(ssl_) && handshake_state_ == UNINITIATED) { - handshake_state_ = CLIENT_HELLO; - } - - return handshake_state_; -} - -void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p, - const std::error_code &err, - uint64_t bytes_written) {} - -uint32_t TLSProducerSocket::produceStream( - const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t start_offset) { - if (getHandshakeState() != SERVER_FINISHED) { - throw errors::RuntimeException( - "New handshake on the same P2P secure producer socket not supported"); - } - - size_t buf_size = buffer->length(); - name_ = production_protocol_->getNamespaces().front().mapName(content_name); - tls_chunks_ = to_call_oncontentproduced_ = - (int)ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); - - if (!is_last) { - tls_chunks_++; - } - - last_segment_ = start_offset; - - SSL_write(ssl_, buffer->data(), (int)buf_size); - BIO *wbio = SSL_get_wbio(ssl_); - int i = BIO_flush(wbio); - (void)i; // To shut up gcc 5 - - return 0; -} - -long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) { - if (cmd == BIO_CTRL_FLUSH) { - } - - return 1; -} - -int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, - unsigned int context, - const unsigned char **out, size_t *outlen, - X509 *x, size_t chainidx, int *al, - void *add_arg) { - TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg); - - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "On addHicnKeyIdCb, for the prefix registration."; - - if (ext_type == 100) { - auto &prefix = - socket->parent_->production_protocol_->getNamespaces().front(); - const ip_prefix_t &ip_prefix = prefix.toIpPrefixStruct(); - int inet_family = prefix.getAddressFamily(); - uint16_t prefix_len_bits = prefix.getPrefixLength(); - uint8_t prefix_len_bytes = prefix_len_bits / 8; - uint8_t prefix_len_u32 = prefix_len_bits / 32; - - ip_prefix_t *out_ip = (ip_prefix_t *)malloc(sizeof(ip_prefix_t)); - out_ip->family = inet_family; - out_ip->len = prefix_len_bits + 32; - u8 *out_ip_buf = const_cast<u8 *>( - ip_address_get_buffer(&(out_ip->address), inet_family)); - *out = reinterpret_cast<unsigned char *>(out_ip); - - RAND_bytes((unsigned char *)&socket->key_id_, 4); - - memcpy(out_ip_buf, ip_address_get_buffer(&(ip_prefix.address), inet_family), - prefix_len_bytes); - memcpy((out_ip_buf + prefix_len_bytes), &socket->key_id_, 4); - *outlen = sizeof(ip_prefix_t); - - ip_address_t mask = {}; - ip_address_t keyId_component = {}; - u32 *mask_buf; - u32 *keyId_component_buf; - - switch (inet_family) { - case AF_INET: - mask_buf = &(mask.v4.as_u32); - keyId_component_buf = &(keyId_component.v4.as_u32); - break; - case AF_INET6: - mask_buf = mask.v6.as_u32; - keyId_component_buf = keyId_component.v6.as_u32; - break; - default: - throw errors::RuntimeException("Unknown protocol"); - } - - if (prefix_len_bits > (inet_family == AF_INET6 ? IPV6_ADDR_LEN_BITS - 32 - : IPV4_ADDR_LEN_BITS - 32)) - throw errors::RuntimeException( - "Not enough space in the content name to add key_id"); - - mask_buf[prefix_len_u32] = 0xffffffff; - keyId_component_buf[prefix_len_u32] = socket->key_id_; - socket->last_segment_ = 0; - - socket->on_interest_process_decrypted_ = - socket->parent_->on_interest_process_decrypted_; - - socket->registerPrefix( - Prefix(prefix.getName(Name(inet_family, (uint8_t *)&mask), - Name(inet_family, (uint8_t *)&keyId_component), - prefix.getName()), - out_ip->len)); - socket->connect(); - } - return 1; -} - -void TLSProducerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type, - unsigned int context, - const unsigned char *out, - void *add_arg) { - free(const_cast<unsigned char *>(out)); -} - -int TLSProducerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, - unsigned int context, - const unsigned char *in, size_t inlen, - X509 *x, size_t chainidx, int *al, - void *add_arg) { - return 1; -} - -int TLSProducerSocket::setSocketOption( - int socket_option_key, ProducerInterestCallback socket_option_value) { - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ProducerInterestCallback socket_option_value) -> int { - int result = SOCKET_OPTION_SET; - - switch (socket_option_key) { - case ProducerCallbacksOptions::INTEREST_INPUT: - on_interest_input_decrypted_ = socket_option_value; - break; - - case ProducerCallbacksOptions::INTEREST_DROP: - on_interest_dropped_input_buffer_ = socket_option_value; - break; - - case ProducerCallbacksOptions::INTEREST_PASS: - on_interest_inserted_input_buffer_ = socket_option_value; - break; - - case ProducerCallbacksOptions::CACHE_HIT: - on_interest_satisfied_output_buffer_ = socket_option_value; - break; - - case ProducerCallbacksOptions::CACHE_MISS: - on_interest_process_decrypted_ = socket_option_value; - break; - - default: - result = SOCKET_OPTION_NOT_SET; - break; - } - - return result; - }); -} - -int TLSProducerSocket::setSocketOption( - int socket_option_key, ProducerContentCallback socket_option_value) { - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ProducerContentCallback socket_option_value) -> int { - switch (socket_option_key) { - case ProducerCallbacksOptions::CONTENT_PRODUCED: - on_content_produced_application_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - }); -} - -int TLSProducerSocket::getSocketOption( - int socket_option_key, ProducerContentCallback **socket_option_value) { - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ProducerContentCallback **socket_option_value) -> int { - switch (socket_option_key) { - case ProducerCallbacksOptions::CONTENT_PRODUCED: - *socket_option_value = &on_content_produced_application_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - }); -} - -} // namespace implementation -} // namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h deleted file mode 100644 index a542a4d9f..000000000 --- a/libtransport/src/implementation/tls_socket_producer.h +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <implementation/socket_producer.h> -#include <openssl/ssl.h> - -#include <condition_variable> -#include <mutex> - -namespace transport { -namespace implementation { - -class P2PSecureProducerSocket; - -class TLSProducerSocket : virtual public ProducerSocket { - friend class P2PSecureProducerSocket; - - public: - explicit TLSProducerSocket(interface::ProducerSocket *producer_socket, - P2PSecureProducerSocket *parent, - const Name &handshake_name); - - ~TLSProducerSocket(); - - uint32_t produceStream(const Name &content_name, const uint8_t *buffer, - size_t buffer_size, bool is_last = true, - uint32_t start_offset = 0) override { - return produceStream(content_name, - utils::MemBuf::copyBuffer(buffer, buffer_size), - is_last, start_offset); - } - - uint32_t produceStream(const Name &content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last = true, - uint32_t start_offset = 0) override; - - virtual void accept(); - - virtual int async_accept(); - - virtual int setSocketOption( - int socket_option_key, - ProducerInterestCallback socket_option_value) override; - - virtual int setSocketOption( - int socket_option_key, - ProducerContentCallback socket_option_value) override; - - virtual int getSocketOption( - int socket_option_key, - ProducerContentCallback **socket_option_value) override; - - int getSocketOption(int socket_option_key, - ProducerContentCallback &socket_option_value); - - int getSocketOption(int socket_option_key, - ProducerInterestCallback &socket_option_value); - - using ProducerSocket::getSocketOption; - // using ProducerSocket::onInterest; - using ProducerSocket::setSocketOption; - - protected: - enum HandshakeState { - UNINITIATED, - CLIENT_HELLO, // when CLIENT_HELLO interest has been received - CLIENT_FINISHED, // when CLIENT_FINISHED interest has been received - SERVER_FINISHED, // when handshake is done - }; - /* Callback invoked once an interest has been received and its payload - * decrypted */ - ProducerInterestCallback on_interest_input_decrypted_; - ProducerInterestCallback on_interest_process_decrypted_; - ProducerContentCallback on_content_produced_application_; - std::mutex mtx_; - /* Condition variable for the wait */ - std::condition_variable cv_; - /* Bool variable, true if there is something to read (an interest arrived) */ - bool something_to_read_; - /* Bool variable, true if CLIENT_FINISHED interest has been received */ - HandshakeState handshake_state_; - /* First interest that open a secure connection */ - transport::core::Name name_; - /* SSL handle */ - SSL *ssl_; - SSL_CTX *ctx_; - Packet::MemBufPtr handshake_packet_; - std::unique_ptr<utils::MemBuf> head_; - std::uint32_t last_segment_; - std::uint32_t key_id_; - std::thread *handshake; - P2PSecureProducerSocket *parent_; - bool first_; - Name handshake_name_; - int tls_chunks_; - int to_call_oncontentproduced_; - bool still_writing_; - utils::EventThread encryption_thread_; - utils::EventThread async_thread_; - - void onInterest(ProducerSocket &p, Interest &interest); - - void cacheMiss(interface::ProducerSocket &p, Interest &interest); - - /* Return the number of read bytes in readbytes */ - static int read(BIO *b, char *buf, size_t size, size_t *readbytes); - - /* Return the number of read bytes in the return param */ - static int readOld(BIO *h, char *buf, int size); - - /* Return the number of written bytes in written */ - static int write(BIO *b, const char *buf, size_t size, size_t *written); - - /* Return the number of written bytes in the return param */ - static int writeOld(BIO *h, const char *buf, int num); - - static long ctrl(BIO *b, int cmd, long num, void *ptr); - - static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context, - const unsigned char **out, size_t *outlen, X509 *x, - size_t chainidx, int *al, void *add_arg); - - static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type, - unsigned int context, const unsigned char *out, - void *add_arg); - - static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type, - unsigned int context, const unsigned char *in, - size_t inlen, X509 *x, size_t chainidx, int *al, - void *add_arg); - - void onContentProduced(interface::ProducerSocket &p, - const std::error_code &err, uint64_t bytes_written); - - HandshakeState getHandshakeState(); -}; - -} // namespace implementation -} // end namespace transport |