diff options
Diffstat (limited to 'libtransport/src/implementation')
16 files changed, 5207 insertions, 0 deletions
diff --git a/libtransport/src/implementation/CMakeLists.txt b/libtransport/src/implementation/CMakeLists.txt new file mode 100644 index 000000000..5423a7697 --- /dev/null +++ b/libtransport/src/implementation/CMakeLists.txt @@ -0,0 +1,46 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc +) + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/socket.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.h +) + +if (${OPENSSL_VERSION} VERSION_EQUAL "1.1.1a" OR ${OPENSSL_VERSION} VERSION_GREATER "1.1.1a") + list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.cc + ) + + list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h + ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h + ) +endif() + +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) +set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc new file mode 100644 index 000000000..40ab58161 --- /dev/null +++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc @@ -0,0 +1,403 @@ +/* + * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <implementation/p2psecure_socket_consumer.h> +#include <interfaces/tls_socket_consumer.h> + +#include <openssl/bio.h> +#include <openssl/ssl.h> +#include <openssl/tls1.h> + +#include <random> + +namespace transport { +namespace implementation { + +void P2PSecureConsumerSocket::setInterestPayload( + interface::ConsumerSocket &c, const core::Interest &interest) { + Interest &int2 = const_cast<Interest &>(interest); + random_suffix_ = int2.getName().getSuffix(); + + if (payload_ != NULL) int2.appendPayload(std::move(payload_)); +} + +// implement void readBufferAvailable(), size_t maxBufferSize() const override, +// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable() +// must be implemented even if empty. + +/* Return the number of read bytes in the return param */ +int readOld(BIO *b, char *buf, int size) { + if (size < 0) return size; + + P2PSecureConsumerSocket *socket; + socket = (P2PSecureConsumerSocket *)BIO_get_data(b); + + std::unique_lock<std::mutex> lck(socket->mtx_); + + if (!socket->something_to_read_) { + if (!socket->transport_protocol_->isRunning()) { + socket->network_name_.setSuffix(socket->random_suffix_); + socket->ConsumerSocket::asyncConsume(socket->network_name_); + } + if (!socket->something_to_read_) socket->cv_.wait(lck); + } + + size_t size_to_read, read; + size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) + chain_size = socket->head_->computeChainDataLength(); + + if (chain_size > (size_t)size) { + read = size_to_read = (size_t)size; + } else { + read = size_to_read = chain_size; + socket->something_to_read_ = false; + } + + while (size_to_read) { + if (socket->head_->length() < size_to_read) { + std::memcpy(buf, socket->head_->data(), socket->head_->length()); + size_to_read -= socket->head_->length(); + buf += socket->head_->length(); + socket->head_ = socket->head_->pop(); + } else { + std::memcpy(buf, socket->head_->data(), size_to_read); + socket->head_->trimStart(size_to_read); + size_to_read = 0; + } + } + + return read; +} + +/* Return the number of read bytes in readbytes */ +int read(BIO *b, char *buf, size_t size, size_t *readbytes) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = readOld(b, buf, (int)size); + + if (ret <= 0) { + *readbytes = 0; + return ret; + } + + *readbytes = (size_t)ret; + + return 1; +} + +/* Return the number of written bytes in the return param */ +int writeOld(BIO *b, const char *buf, int num) { + P2PSecureConsumerSocket *socket; + socket = (P2PSecureConsumerSocket *)BIO_get_data(b); + + socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &P2PSecureConsumerSocket::setInterestPayload, socket, + std::placeholders::_1, std::placeholders::_2)); + + return num; +} + +/* Return the number of written bytes in written */ +int write(BIO *b, const char *buf, size_t size, size_t *written) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = writeOld(b, buf, (int)size); + + if (ret <= 0) { + *written = 0; + return ret; + } + + *written = (size_t)ret; + + return 1; +} + +long ctrl(BIO *b, int cmd, long num, void *ptr) { return 1; } + +int P2PSecureConsumerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char **out, + size_t *outlen, X509 *x, + size_t chainidx, int *al, + void *add_arg) { + if (ext_type == 100) { + *out = (unsigned char *)malloc(4); + *(uint32_t *)*out = 10; + *outlen = 4; + } + return 1; +} + +void P2PSecureConsumerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char *out, + void *add_arg) { + free(const_cast<unsigned char *>(out)); +} + +int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char *in, + size_t inlen, X509 *x, + size_t chainidx, int *al, + void *add_arg) { + P2PSecureConsumerSocket *socket = + reinterpret_cast<P2PSecureConsumerSocket *>(add_arg); + if (ext_type == 100) { + memcpy(&socket->secure_prefix_, in, sizeof(ip_prefix_t)); + } + return 1; +} + +P2PSecureConsumerSocket::P2PSecureConsumerSocket( + interface::ConsumerSocket *consumer, int handshake_protocol, + int transport_protocol) + : ConsumerSocket(consumer, transport_protocol), + name_(), + tls_consumer_(), + buf_pool_(), + decrypted_content_(), + payload_(), + head_(), + something_to_read_(false), + content_downloaded_(false), + random_suffix_(), + secure_prefix_(), + producer_namespace_(), + read_callback_decrypted_(), + mtx_(), + cv_(), + protocol_(transport_protocol) { + /* Create the (d)TLS state */ + const SSL_METHOD *meth = TLS_client_method(); + ctx_ = SSL_CTX_new(meth); + + int result = + SSL_CTX_set_ciphersuites(ctx_, + "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" + "SHA256:TLS_AES_128_GCM_SHA256"); + if (result != 1) { + throw errors::RuntimeException( + "Unable to set cipher list on TLS subsystem. Aborting."); + } + + SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL); + SSL_CTX_set_ssl_version(ctx_, meth); + + result = SSL_CTX_add_custom_ext( + ctx_, 100, SSL_EXT_CLIENT_HELLO | SSL_EXT_TLS1_3_ENCRYPTED_EXTENSIONS, + P2PSecureConsumerSocket::addHicnKeyIdCb, + P2PSecureConsumerSocket::freeHicnKeyIdCb, NULL, + P2PSecureConsumerSocket::parseHicnKeyIdCb, this); + + ssl_ = SSL_new(ctx_); + + bio_meth_ = BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket"); + BIO_meth_set_read(bio_meth_, readOld); + BIO_meth_set_write(bio_meth_, writeOld); + BIO_meth_set_ctrl(bio_meth_, ctrl); + BIO *bio = BIO_new(bio_meth_); + BIO_set_init(bio, 1); + BIO_set_data(bio, this); + SSL_set_bio(ssl_, bio, bio); + + ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + + ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); + + std::default_random_engine generator; + std::uniform_int_distribution<int> distribution( + 1, std::numeric_limits<uint32_t>::max()); + random_suffix_ = 0; + + this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + this); +}; + +P2PSecureConsumerSocket::~P2PSecureConsumerSocket() { + BIO_meth_free(bio_meth_); + SSL_shutdown(ssl_); +} + +int P2PSecureConsumerSocket::consume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + network_name_ = producer_namespace_.getRandomName(); + network_name_.setSuffix(0); + int result = SSL_connect(this->ssl_); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + if (result != 1) + throw errors::RuntimeException("Unable to perform client handshake"); + } + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + TLSConsumerSocket tls_consumer(nullptr, this->protocol_, this->ssl_); + tls_consumer.setInterface(new interface::TLSConsumerSocket(&tls_consumer)); + + ConsumerTimerCallback *stats_summary_callback = nullptr; + this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_summary_callback); + + uint32_t lifetime; + this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + read_callback_decrypted_); + tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + *stats_summary_callback); + tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL, + this->timer_interval_milliseconds_); + tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + tls_consumer.connect(); + + if (payload_ != NULL) + return tls_consumer.consume((prefix->mapName(name)), std::move(payload_)); + else + return tls_consumer.consume((prefix->mapName(name))); +} + +int P2PSecureConsumerSocket::asyncConsume(const Name &name) { + if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + network_name_ = producer_namespace_.getRandomName(); + network_name_.setSuffix(0); + TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); + interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; + this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload); + int result = SSL_connect(this->ssl_); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + if (result != 1) + throw errors::RuntimeException("Unable to perform client handshake"); + TRANSPORT_LOGD("Handshake performed!"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + + tls_consumer_ = + std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_); + tls_consumer_->setInterface( + new interface::TLSConsumerSocket(tls_consumer_.get())); + + ConsumerTimerCallback *stats_summary_callback = nullptr; + this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_summary_callback); + + uint32_t lifetime; + this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + read_callback_decrypted_); + tls_consumer_->setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + *stats_summary_callback); + tls_consumer_->setSocketOption(GeneralTransportOptions::STATS_INTERVAL, + this->timer_interval_milliseconds_); + tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + tls_consumer_->connect(); + + if (payload_ != NULL) + return tls_consumer_->asyncConsume((prefix->mapName(name)), + std::move(payload_)); + else + return tls_consumer_->asyncConsume((prefix->mapName(name))); +} + +void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) { + producer_namespace_ = producer_namespace; +} + +int P2PSecureConsumerSocket::setSocketOption( + int socket_option_key, ReadCallback *socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, ReadCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_decrypted_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +void P2PSecureConsumerSocket::getReadBuffer(uint8_t **application_buffer, + size_t *max_length){}; + +void P2PSecureConsumerSocket::readDataAvailable(size_t length) noexcept {}; + +size_t P2PSecureConsumerSocket::maxBufferSize() const { + return SSL3_RT_MAX_PLAIN_LENGTH; +} + +void P2PSecureConsumerSocket::readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept { + std::unique_lock<std::mutex> lck(this->mtx_); + if (head_) { + head_->prependChain(std::move(buffer)); + } else { + head_ = std::move(buffer); + } + + something_to_read_ = true; + cv_.notify_one(); +} + +void P2PSecureConsumerSocket::readError(const std::error_code ec) noexcept {}; + +void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept { + std::unique_lock<std::mutex> lck(this->mtx_); + content_downloaded_ = true; + something_to_read_ = true; + cv_.notify_one(); +} + +bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; } + +} // namespace implementation + +} // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h new file mode 100644 index 000000000..e2ebaf94e --- /dev/null +++ b/libtransport/src/implementation/p2psecure_socket_consumer.h @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/socket_consumer.h> + +#include <implementation/tls_socket_consumer.h> +#include <openssl/bio.h> +#include <openssl/ssl.h> + +namespace transport { +namespace implementation { + +class P2PSecureConsumerSocket : public ConsumerSocket, + public interface::ConsumerSocket::ReadCallback { + /* Return the number of read bytes in readbytes */ + friend int read(BIO *b, char *buf, size_t size, size_t *readbytes); + + /* Return the number of read bytes in the return param */ + friend int readOld(BIO *h, char *buf, int size); + + /* Return the number of written bytes in written */ + friend int write(BIO *b, const char *buf, size_t size, size_t *written); + + /* Return the number of written bytes in the return param */ + friend int writeOld(BIO *h, const char *buf, int num); + + friend long ctrl(BIO *b, int cmd, long num, void *ptr); + + public: + explicit P2PSecureConsumerSocket(interface::ConsumerSocket *consumer, + int handshake_protocol, + int transport_protocol); + + ~P2PSecureConsumerSocket(); + + int consume(const Name &name) override; + + int asyncConsume(const Name &name) override; + + void registerPrefix(const Prefix &producer_namespace); + + int setSocketOption( + int socket_option_key, + interface::ConsumerSocket::ReadCallback *socket_option_value) override; + + using ConsumerSocket::getSocketOption; + using ConsumerSocket::setSocketOption; + + protected: + /* Callback invoked once an interest has been received and its payload + * decrypted */ + ConsumerInterestCallback on_interest_input_decrypted_; + ConsumerInterestCallback on_interest_process_decrypted_; + + private: + Name name_; + std::shared_ptr<TLSConsumerSocket> tls_consumer_; + + /* SSL handle */ + SSL *ssl_; + SSL_CTX *ctx_; + BIO_METHOD *bio_meth_; + + /* Chain of MemBuf to be used as a temporary buffer to pass descypted data + * from the underlying layer to the application */ + utils::ObjectPool<utils::MemBuf> buf_pool_; + std::unique_ptr<utils::MemBuf> decrypted_content_; + + /* Chain of MemBuf holding the payload to be written into interest or data */ + std::unique_ptr<utils::MemBuf> payload_; + + /* Chain of MemBuf holding the data retrieved from the underlying layer */ + std::unique_ptr<utils::MemBuf> head_; + + bool something_to_read_; + + bool content_downloaded_; + + double old_max_win_; + + double old_current_win_; + + uint32_t random_suffix_; + + ip_prefix_t secure_prefix_; + + Prefix producer_namespace_; + + interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; + + std::mutex mtx_; + + /* Condition variable for the wait */ + std::condition_variable cv_; + + int protocol_; + + void setInterestPayload(interface::ConsumerSocket &c, + const core::Interest &interest); + + static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context, + const unsigned char **out, size_t *outlen, X509 *x, + size_t chainidx, int *al, void *add_arg); + + static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, const unsigned char *out, + void *add_arg); + + static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, const unsigned char *in, + size_t inlen, X509 *x, size_t chainidx, int *al, + void *add_arg); + + virtual void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override; + + virtual void readDataAvailable(size_t length) noexcept override; + + virtual size_t maxBufferSize() const override; + + virtual void readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept override; + + virtual void readError(const std::error_code ec) noexcept override; + + virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; + + int download_content(const Name &name); +}; + +} // namespace implementation + +} // end namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc new file mode 100644 index 000000000..d7161986b --- /dev/null +++ b/libtransport/src/implementation/p2psecure_socket_producer.cc @@ -0,0 +1,404 @@ +/* + * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/interest.h> + +#include <implementation/p2psecure_socket_producer.h> +#include <implementation/tls_rtc_socket_producer.h> +#include <implementation/tls_socket_producer.h> +#include <interfaces/tls_rtc_socket_producer.h> +#include <interfaces/tls_socket_producer.h> + +#include <openssl/bio.h> +#include <openssl/rand.h> +#include <openssl/ssl.h> + +namespace transport { +namespace implementation { + +/* Workaround to prevent content with expiry time equal to 0 to be lost when + * pushed in the forwarder */ +#define HICN_HANDSHAKE_CONTENT_EXPIRY_TIME 100; + +P2PSecureProducerSocket::P2PSecureProducerSocket( + interface::ProducerSocket *producer_socket) + : ProducerSocket(producer_socket), + mtx_(), + cv_(), + map_secure_producers(), + map_secure_rtc_producers(), + list_secure_producers() {} + +P2PSecureProducerSocket::P2PSecureProducerSocket( + interface::ProducerSocket *producer_socket, bool rtc, + const std::shared_ptr<utils::Identity> &identity) + : ProducerSocket(producer_socket), + rtc_(rtc), + mtx_(), + cv_(), + map_secure_producers(), + map_secure_rtc_producers(), + list_secure_producers() { + /* + * Setup SSL context (identity and parameter to use TLS 1.3) + */ + der_cert_ = parcKeyStore_GetDEREncodedCertificate( + (identity->getSigner()->getKeyStore())); + der_prk_ = parcKeyStore_GetDEREncodedPrivateKey( + (identity->getSigner()->getKeyStore())); + + int cert_size = parcBuffer_Limit(der_cert_); + int prk_size = parcBuffer_Limit(der_prk_); + const uint8_t *cert = + reinterpret_cast<uint8_t *>(parcBuffer_Overlay(der_cert_, cert_size)); + const uint8_t *prk = + reinterpret_cast<uint8_t *>(parcBuffer_Overlay(der_prk_, prk_size)); + cert_509_ = d2i_X509(NULL, &cert, cert_size); + pkey_rsa_ = d2i_AutoPrivateKey(NULL, &prk, prk_size); + + /* + * Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. + */ + ProducerSocket::setSocketOption( + ProducerCallbacksOptions::INTEREST_INPUT, + (ProducerInterestCallback)std::bind( + &P2PSecureProducerSocket::onInterestCallback, this, + std::placeholders::_1, std::placeholders::_2)); +} + +P2PSecureProducerSocket::~P2PSecureProducerSocket() { + if (der_cert_) parcBuffer_Release(&der_cert_); + if (der_prk_) parcBuffer_Release(&der_prk_); +} + +void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p, + Interest &interest) { + std::unique_lock<std::mutex> lck(mtx_); + + TRANSPORT_LOGD("Start handshake at %s", + interest.getName().toString().c_str()); + if (!rtc_) { + auto it = map_secure_producers.find(interest.getName()); + if (it != map_secure_producers.end()) return; + TLSProducerSocket *tls_producer = + new TLSProducerSocket(nullptr, this, interest.getName()); + tls_producer->setInterface(new interface::TLSProducerSocket(tls_producer)); + + tls_producer->on_content_produced_application_ = + this->on_content_produced_application_; + tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, + this->content_object_expiry_time_); + tls_producer->setSocketOption(SIGNER, this->signer_); + tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); + tls_producer->setSocketOption(DATA_PACKET_SIZE, + (uint32_t)(this->data_packet_size_)); + tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); + map_secure_producers.insert( + {interest.getName(), std::unique_ptr<TLSProducerSocket>(tls_producer)}); + tls_producer->onInterest(*tls_producer, interest); + tls_producer->async_accept(); + } else { + auto it = map_secure_rtc_producers.find(interest.getName()); + if (it != map_secure_rtc_producers.end()) return; + TLSRTCProducerSocket *tls_producer = + new TLSRTCProducerSocket(nullptr, this, interest.getName()); + tls_producer->setInterface( + new interface::TLSRTCProducerSocket(tls_producer)); + tls_producer->on_content_produced_application_ = + this->on_content_produced_application_; + tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, + this->content_object_expiry_time_); + tls_producer->setSocketOption(SIGNER, this->signer_); + tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); + tls_producer->setSocketOption(DATA_PACKET_SIZE, + (uint32_t)(this->data_packet_size_)); + tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); + map_secure_rtc_producers.insert( + {interest.getName(), + std::unique_ptr<TLSRTCProducerSocket>(tls_producer)}); + tls_producer->onInterest(*tls_producer, interest); + tls_producer->async_accept(); + } +} + +void P2PSecureProducerSocket::produce(const uint8_t *buffer, + size_t buffer_size) { + if (!rtc_) { + throw errors::RuntimeException( + "RTC must be the transport protocol to start the production of current " + "data. Aborting."); + } + + std::unique_lock<std::mutex> lck(mtx_); + if (list_secure_rtc_producers.empty()) cv_.wait(lck); + + for (auto it = list_secure_rtc_producers.cbegin(); + it != list_secure_rtc_producers.cend(); it++) { + (*it)->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); + } +} + +uint32_t P2PSecureProducerSocket::produce( + Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, + uint32_t start_offset) { + if (rtc_) { + throw errors::RuntimeException( + "RTC transport protocol is not compatible with the production of " + "current data. Aborting."); + } + + std::unique_lock<std::mutex> lck(mtx_); + uint32_t segments = 0; + if (list_secure_producers.empty()) cv_.wait(lck); + + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + segments += + (*it)->produce(content_name, buffer->clone(), is_last, start_offset); + return segments; +} + +uint32_t P2PSecureProducerSocket::produce(Name content_name, + const uint8_t *buffer, + size_t buffer_size, bool is_last, + uint32_t start_offset) { + if (rtc_) { + throw errors::RuntimeException( + "RTC transport protocol is not compatible with the production of " + "current data. Aborting."); + } + + std::unique_lock<std::mutex> lck(mtx_); + uint32_t segments = 0; + if (list_secure_producers.empty()) cv_.wait(lck); + + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + segments += (*it)->produce(content_name, buffer, buffer_size, is_last, + start_offset); + return segments; +} + +void P2PSecureProducerSocket::asyncProduce(const Name &content_name, + const uint8_t *buf, + size_t buffer_size, bool is_last, + uint32_t *start_offset) { + if (rtc_) { + throw errors::RuntimeException( + "RTC transport protocol is not compatible with the production of " + "current data. Aborting."); + } + + std::unique_lock<std::mutex> lck(mtx_); + if (list_secure_producers.empty()) cv_.wait(lck); + + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) { + (*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset); + } +} + +void P2PSecureProducerSocket::asyncProduce( + Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, + uint32_t offset, uint32_t **last_segment) { + if (rtc_) { + throw errors::RuntimeException( + "RTC transport protocol is not compatible with the production of " + "current data. Aborting."); + } + + std::unique_lock<std::mutex> lck(mtx_); + if (list_secure_producers.empty()) cv_.wait(lck); + + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) { + (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset, + last_segment); + } +} + +// Socket Option Redefinition to avoid name hiding + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, ProducerInterestCallback socket_option_value) { + if (!list_secure_producers.empty()) { + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + } + + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + on_interest_input_decrypted_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::INTEREST_DROP: + on_interest_dropped_input_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::INTEREST_PASS: + on_interest_inserted_input_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CACHE_HIT: + on_interest_satisfied_output_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CACHE_MISS: + on_interest_process_decrypted_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, + const std::shared_ptr<utils::Signer> &socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + switch (socket_option_key) { + case GeneralTransportOptions::SIGNER: { + signer_.reset(); + signer_ = socket_option_value; + + return SOCKET_OPTION_SET; + } + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int P2PSecureProducerSocket::setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + if (!list_secure_producers.empty()) { + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + } + switch (socket_option_key) { + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + content_object_expiry_time_ = + socket_option_value; // HICN_HANDSHAKE_CONTENT_EXPIRY_TIME; + return SOCKET_OPTION_SET; + } + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption(int socket_option_key, + bool socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption(int socket_option_key, + Name *socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, std::list<Prefix> socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, ProducerContentObjectCallback socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, ProducerContentCallback socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + on_content_produced_application_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, HashAlgorithm socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, utils::CryptoSuite socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, const std::string &socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +} // namespace implementation + +} // namespace transport diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h new file mode 100644 index 000000000..c2cbf31ac --- /dev/null +++ b/libtransport/src/implementation/p2psecure_socket_producer.h @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/security/identity.h> +#include <hicn/transport/security/signer.h> + +#include <implementation/socket_producer.h> +#include <implementation/tls_rtc_socket_producer.h> +#include <implementation/tls_socket_producer.h> +#include <utils/content_store.h> + +#include <openssl/ssl.h> +#include <condition_variable> +#include <forward_list> +#include <mutex> + +namespace transport { +namespace implementation { + +class P2PSecureProducerSocket : public ProducerSocket { + friend class TLSProducerSocket; + friend class TLSRTCProducerSocket; + + public: + explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket); + explicit P2PSecureProducerSocket( + interface::ProducerSocket *producer_socket, bool rtc, + const std::shared_ptr<utils::Identity> &identity); + ~P2PSecureProducerSocket(); + + void produce(const uint8_t *buffer, size_t buffer_size) override; + + uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size, + bool is_last = true, uint32_t start_offset = 0) override; + + uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, uint32_t start_offset = 0) override; + + void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t offset, + uint32_t **last_segment = nullptr) override; + + void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size, + bool is_last = true, + uint32_t *start_offset = nullptr) override; + + int setSocketOption(int socket_option_key, + ProducerInterestCallback socket_option_value) override; + + int setSocketOption( + int socket_option_key, + const std::shared_ptr<utils::Signer> &socket_option_value) override; + + int setSocketOption(int socket_option_key, + uint32_t socket_option_value) override; + + int setSocketOption(int socket_option_key, bool socket_option_value) override; + + int setSocketOption(int socket_option_key, + Name *socket_option_value) override; + + int setSocketOption(int socket_option_key, + std::list<Prefix> socket_option_value) override; + + int setSocketOption( + int socket_option_key, + ProducerContentObjectCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ProducerContentCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value) override; + + int setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value) override; + + int setSocketOption(int socket_option_key, + const std::string &socket_option_value) override; + + using ProducerSocket::getSocketOption; + using ProducerSocket::onInterest; + + protected: + bool rtc_; + /* Callback invoked once an interest has been received and its payload + * decrypted */ + ProducerInterestCallback on_interest_input_decrypted_; + ProducerInterestCallback on_interest_process_decrypted_; + ProducerContentCallback on_content_produced_application_; + + private: + std::mutex mtx_; + + /* Condition variable for the wait */ + std::condition_variable cv_; + + PARCBuffer *der_cert_; + PARCBuffer *der_prk_; + X509 *cert_509_; + EVP_PKEY *pkey_rsa_; + std::unordered_map<core::Name, std::unique_ptr<TLSProducerSocket>, + core::hash<core::Name>, core::compare2<core::Name>> + map_secure_producers; + std::unordered_map<core::Name, std::unique_ptr<TLSRTCProducerSocket>, + core::hash<core::Name>, core::compare2<core::Name>> + map_secure_rtc_producers; + std::list<std::unique_ptr<TLSProducerSocket>> list_secure_producers; + std::list<std::unique_ptr<TLSRTCProducerSocket>> list_secure_rtc_producers; + + void onInterestCallback(interface::ProducerSocket &p, Interest &interest); +}; + +} // namespace implementation + +} // namespace transport diff --git a/libtransport/src/implementation/rtc_socket_producer.cc b/libtransport/src/implementation/rtc_socket_producer.cc new file mode 100644 index 000000000..a5b2b4a0e --- /dev/null +++ b/libtransport/src/implementation/rtc_socket_producer.cc @@ -0,0 +1,352 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/callbacks.h> + +#include <implementation/rtc_socket_producer.h> +#include <stdlib.h> +#include <time.h> + +#define NACK_HEADER_SIZE 8 // bytes +#define TIMESTAMP_LEN 8 // bytes +#define TCP_HEADER_SIZE 20 +#define IP6_HEADER_SIZE 40 +#define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps) +#define STATS_INTERVAL_DURATION 500 // ms +#define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8 +#define INACTIVE_TIME \ + 500 // ms without producing before the socket + // is considered inactive +#define MILLI_IN_A_SEC 1000 // ms in a second + +#define HICN_MAX_DATA_SEQ 0xefffffff + +// slow production rate param +#define MIN_PRODUCTION_RATE \ + 10 // in pacekts per sec. this value is computed + // through experiments +#define LIFETIME_FRACTION 0.5 + +// NACK HEADER +// +-----------------------------------------+ +// | 4 bytes: current segment in production | +// +-----------------------------------------+ +// | 4 bytes: production rate (bytes x sec) | +// +-----------------------------------------+ +// + +// PACKET HEADER +// +-----------------------------------------+ +// | 8 bytes: TIMESTAMP | +// +-----------------------------------------+ +// | packet | +// +-----------------------------------------+ + +namespace transport { +namespace implementation { + +RTCProducerSocket::RTCProducerSocket(interface::ProducerSocket *producer_socket) + : ProducerSocket(producer_socket), + currentSeg_(1), + producedBytes_(0), + producedPackets_(0), + bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), + packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), + perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), + timer_on_(false) { + srand((unsigned int)time(NULL)); + prodLabel_ = ((rand() % 255) << 24UL); + interests_cache_timer_ = + std::make_unique<asio::steady_timer>(this->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService()); + setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); + scheduleRoundTimer(); +} + +RTCProducerSocket::~RTCProducerSocket() {} + +void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) { + ProducerSocket::registerPrefix(producer_namespace); + + flowName_ = producer_namespace.getName(); + auto family = flowName_.getAddressFamily(); + + switch (family) { + case AF_INET6: + headerSize_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP); + break; + case AF_INET: + headerSize_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP); + break; + default: + throw errors::RuntimeException("Unknown name format."); + } +} + +void RTCProducerSocket::scheduleRoundTimer() { + round_timer_->expires_from_now( + std::chrono::milliseconds(STATS_INTERVAL_DURATION)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(); + }); +} + +void RTCProducerSocket::updateStats() { + bytesProductionRate_ = producedBytes_.load() * perSecondFactor_; + packetsProductionRate_ = producedPackets_.load() * perSecondFactor_; + if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1; + producedBytes_ = 0; + producedPackets_ = 0; + scheduleRoundTimer(); +} + +void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { + auto buffer_size = buffer->length(); + + if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) { + return; + } + + if (TRANSPORT_EXPECT_FALSE((buffer_size + headerSize_ + TIMESTAMP_LEN) > + data_packet_size_)) { + return; + } + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN); + producedPackets_++; + + Name n(flowName_); + auto content_object = + std::make_shared<ContentObject>(n.setSuffix(currentSeg_.load())); + auto payload = utils::MemBuf::create(TIMESTAMP_LEN); + + memcpy(payload->writableData(), &now, TIMESTAMP_LEN); + payload->append(TIMESTAMP_LEN); + payload->prependChain(std::move(buffer)); + content_object->appendPayload(std::move(payload)); + + content_object->setLifetime(500); // XXX this should be set by the APP + + content_object->setPathLabel(prodLabel_); + + output_buffer_.insert(std::static_pointer_cast<ContentObject>( + content_object->shared_from_this())); + + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*getInterface(), *content_object); + } + + TRANSPORT_LOGD("Send content %u (produce)", + content_object->getName().getSuffix()); + portal_->sendContentObject(*content_object); + + if (on_content_object_output_) { + on_content_object_output_(*getInterface(), *content_object); + } + + uint32_t old_curr = currentSeg_.load(); + currentSeg_ = (currentSeg_.load() + 1) % HICN_MAX_DATA_SEQ; + + // remove interests from the interest cache if it exists + // this generates nacks that will tell to the consumer + // that a new data packet was produced + utils::SpinLock::Acquire locked(interests_cache_lock_); + if (!seqs_map_.empty()) { + for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { + if (it->first != old_curr) sendNack(it->first); + } + seqs_map_.clear(); + timers_map_.clear(); + } +} + +void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { + uint32_t interestSeg = interest->getName().getSuffix(); + uint32_t lifetime = interest->getLifetime(); + + if (on_interest_input_) { + on_interest_input_(*getInterface(), *interest); + } + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (interestSeg > HICN_MAX_DATA_SEQ) { + sendNack(interestSeg); + return; + } + + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(*interest); + + if (content_object) { + if (on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_(*getInterface(), *interest); + } + + if (on_content_object_output_) { + on_content_object_output_(*getInterface(), *content_object); + } + + TRANSPORT_LOGD("Send content %u (onInterest)", + content_object->getName().getSuffix()); + portal_->sendContentObject(*content_object); + return; + } else { + if (on_interest_process_) { + on_interest_process_(*getInterface(), *interest); + } + } + + // if the production rate is less than MIN_PRODUCTION_RATE we put the + // interest in a queue, otherwise we handle it in the usual way + if (packetsProductionRate_.load() < MIN_PRODUCTION_RATE && + interestSeg >= currentSeg_.load()) { + utils::SpinLock::Acquire locked(interests_cache_lock_); + + uint64_t next_timer = ~0; + if (!timers_map_.empty()) { + next_timer = timers_map_.begin()->first; + } + + uint64_t expiration = now + (lifetime * LIFETIME_FRACTION); + // check if the seq number exists already + auto it_seqs = seqs_map_.find(interestSeg); + if (it_seqs != seqs_map_.end()) { + // the seq already exists + if (expiration < it_seqs->second) { + // we need to update the timer becasue we got a smaller one + // 1) remove the entry from the multimap + // 2) update this entry + auto range = timers_map_.equal_range(it_seqs->second); + for (auto it_timers = range.first; it_timers != range.second; + it_timers++) { + if (it_timers->second == it_seqs->first) { + timers_map_.erase(it_timers); + break; + } + } + timers_map_.insert( + std::pair<uint64_t, uint32_t>(expiration, interestSeg)); + it_seqs->second = expiration; + } else { + // nothing to do here + return; + } + } else { + // add the new seq + timers_map_.insert( + std::pair<uint64_t, uint32_t>(expiration, interestSeg)); + seqs_map_.insert(std::pair<uint32_t, uint64_t>(interestSeg, expiration)); + } + + // here we have at least one interest in the queue, we need to start or + // update the timer + if (!timer_on_) { + // set timeout + timer_on_ = true; + scheduleCacheTimer(timers_map_.begin()->first - now); + } else { + // re-schedule the timer because a new interest will expires sooner + if (next_timer > timers_map_.begin()->first) { + interests_cache_timer_->cancel(); + scheduleCacheTimer(timers_map_.begin()->first - now); + } + } + return; + } + + uint32_t max_gap = (uint32_t)floor( + (double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR / + 1000.0) * + (double)packetsProductionRate_.load())); + + if (interestSeg < currentSeg_.load() || + interestSeg > (max_gap + currentSeg_.load())) { + sendNack(interestSeg); + } + // else drop packet +} + +void RTCProducerSocket::scheduleCacheTimer(uint64_t wait) { + interests_cache_timer_->expires_from_now(std::chrono::milliseconds(wait)); + interests_cache_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + interestCacheTimer(); + }); +} + +void RTCProducerSocket::interestCacheTimer() { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + utils::SpinLock::Acquire locked(interests_cache_lock_); + + for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) { + uint64_t expire = it_timers->first; + if (expire <= now) { + uint32_t seq = it_timers->second; + sendNack(seq); + // remove the interest from the other map + seqs_map_.erase(seq); + it_timers = timers_map_.erase(it_timers); + } else { + // stop, we are done! + break; + } + } + if (timers_map_.empty()) { + timer_on_ = false; + } else { + timer_on_ = true; + scheduleCacheTimer(timers_map_.begin()->first - now); + } +} + +void RTCProducerSocket::sendNack(uint32_t sequence) { + auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); + nack_payload->append(NACK_HEADER_SIZE); + ContentObject nack; + + Name n(flowName_); + nack.appendPayload(std::move(nack_payload)); + nack.setName(n.setSuffix(sequence)); + + uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data(); + *payload_ptr = currentSeg_.load(); + + *(++payload_ptr) = bytesProductionRate_.load(); + + nack.setLifetime(0); + nack.setPathLabel(prodLabel_); + + if (on_content_object_output_) { + on_content_object_output_(*getInterface(), nack); + } + + TRANSPORT_LOGD("Send nack %u", sequence); + portal_->sendContentObject(nack); +} + +} // namespace implementation + +} // end namespace transport diff --git a/libtransport/src/implementation/rtc_socket_producer.h b/libtransport/src/implementation/rtc_socket_producer.h new file mode 100644 index 000000000..87db2121d --- /dev/null +++ b/libtransport/src/implementation/rtc_socket_producer.h @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <implementation/socket_producer.h> +#include <utils/content_store.h> + +#include <atomic> +#include <map> +#include <mutex> + +namespace transport { +namespace implementation { + +class RTCProducerSocket : virtual public ProducerSocket { + public: + RTCProducerSocket(interface::ProducerSocket *producer_socket); + + ~RTCProducerSocket(); + + void registerPrefix(const Prefix &producer_namespace) override; + void produce(std::unique_ptr<utils::MemBuf> &&buffer) override; + + private: + void onInterest(Interest::Ptr &&interest) override; + void sendNack(uint32_t sequence); + void updateStats(); + void scheduleCacheTimer(uint64_t wait); + void scheduleRoundTimer(); + void interestCacheTimer(); + + std::atomic<uint32_t> currentSeg_; + uint32_t prodLabel_; + uint16_t headerSize_; + Name flowName_; + std::atomic<uint32_t> producedBytes_; + std::atomic<uint32_t> producedPackets_; + std::atomic<uint32_t> bytesProductionRate_; + std::atomic<uint32_t> packetsProductionRate_; + uint32_t perSecondFactor_; + + std::unique_ptr<asio::steady_timer> round_timer_; + + // cache for the received interests + // this map maps the expiration time of an interest to + // its sequence number. the map is sorted by timeouts + // the same timeout may be used for multiple sequence numbers + // but for each sequence number we store only the smallest + // expiry time. In this way the mapping from seqs_map_ to + // timers_map_ is unique + std::multimap<uint64_t, uint32_t> timers_map_; + // this map does the opposite, this map is not ordered + std::unordered_map<uint32_t, uint64_t> seqs_map_; + bool timer_on_; + std::unique_ptr<asio::steady_timer> interests_cache_timer_; + utils::SpinLock interests_cache_lock_; +}; + +} // namespace implementation + +} // end namespace transport diff --git a/libtransport/src/implementation/socket.h b/libtransport/src/implementation/socket.h new file mode 100644 index 000000000..2e51f3027 --- /dev/null +++ b/libtransport/src/implementation/socket.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> +#include <hicn/transport/interfaces/callbacks.h> +#include <hicn/transport/interfaces/socket_options_default_values.h> +#include <hicn/transport/interfaces/socket_options_keys.h> + +#include <core/facade.h> + +#define SOCKET_OPTION_GET 0 +#define SOCKET_OPTION_NOT_GET 1 +#define SOCKET_OPTION_SET 2 +#define SOCKET_OPTION_NOT_SET 3 +#define SOCKET_OPTION_DEFAULT 12345 + +namespace transport { +namespace implementation { + +// Forward Declarations +template <typename PortalType> +class Socket; + +// Define the portal and its connector, depending on the compilation options +// passed by the build tool. +using HicnForwarderPortal = core::HicnForwarderPortal; + +#ifdef __linux__ +#ifndef __ANDROID__ +using RawSocketPortal = core::RawSocketPortal; +#endif +#endif + +#ifdef __vpp__ +using VPPForwarderPortal = core::VPPForwarderPortal; +using BaseSocket = Socket<VPPForwarderPortal>; +using BasePortal = VPPForwarderPortal; +#else +using BaseSocket = Socket<HicnForwarderPortal>; +using BasePortal = HicnForwarderPortal; +#endif + +template <typename PortalType> +class Socket { + static_assert(std::is_same<PortalType, HicnForwarderPortal>::value +#ifdef __linux__ +#ifndef __ANDROID__ + || std::is_same<PortalType, RawSocketPortal>::value +#ifdef __vpp__ + || std::is_same<PortalType, VPPForwarderPortal>::value +#endif +#endif + , +#else + , + +#endif + "This class is not allowed as Portal"); + + public: + using Portal = PortalType; + + virtual asio::io_service &getIoService() = 0; + + virtual void connect() = 0; + + virtual bool isRunning() = 0; + + protected: + virtual ~Socket(){}; +}; + +} // namespace implementation + +} // namespace transport diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h new file mode 100644 index 000000000..2fc8d2b48 --- /dev/null +++ b/libtransport/src/implementation/socket_consumer.h @@ -0,0 +1,950 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/interfaces/socket_options_default_values.h> +#include <hicn/transport/interfaces/statistics.h> +#include <hicn/transport/security/verifier.h> + +#include <protocols/cbr.h> +#include <protocols/protocol.h> +#include <protocols/raaqm.h> +#include <protocols/rtc.h> +#include <utils/event_thread.h> + +namespace transport { +namespace implementation { + +using namespace core; +using namespace interface; +using ReadCallback = interface::ConsumerSocket::ReadCallback; + +class ConsumerSocket : public Socket<BasePortal> { + public: + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) + : consumer_interface_(consumer), + portal_(std::make_shared<Portal>(io_service_)), + async_downloader_(), + interest_lifetime_(default_values::interest_lifetime), + min_window_size_(default_values::min_window_size), + max_window_size_(default_values::max_window_size), + current_window_size_(-1), + max_retransmissions_( + default_values::transport_protocol_max_retransmissions), + /****** RAAQM Parameters ******/ + minimum_drop_probability_(default_values::minimum_drop_probability), + sample_number_(default_values::sample_number), + gamma_(default_values::gamma_value), + beta_(default_values::beta_value), + drop_factor_(default_values::drop_factor), + /****** END RAAQM Parameters ******/ + rate_estimation_alpha_(default_values::rate_alpha), + rate_estimation_observer_(nullptr), + rate_estimation_batching_parameter_(default_values::batch), + rate_estimation_choice_(0), + verifier_(std::make_shared<utils::Verifier>()), + verify_signature_(false), + key_content_(false), + on_interest_output_(VOID_HANDLER), + on_interest_timeout_(VOID_HANDLER), + on_interest_satisfied_(VOID_HANDLER), + on_content_object_input_(VOID_HANDLER), + on_content_object_verification_(VOID_HANDLER), + on_content_object_(VOID_HANDLER), + stats_summary_(VOID_HANDLER), + read_callback_(nullptr), + virtual_download_(false), + timer_interval_milliseconds_(0), + guard_raaqm_params_() { + switch (protocol) { + case TransportProtocolAlgorithms::CBR: + transport_protocol_ = + std::make_unique<protocol::CbrTransportProtocol>(this); + break; + case TransportProtocolAlgorithms::RTC: + transport_protocol_ = + std::make_unique<protocol::RTCTransportProtocol>(this); + break; + case TransportProtocolAlgorithms::RAAQM: + default: + transport_protocol_ = + std::make_unique<protocol::RaaqmTransportProtocol>(this); + break; + } + } + + ~ConsumerSocket() { + stop(); + async_downloader_.stop(); + } + + interface::ConsumerSocket *getInterface() { + return consumer_interface_; + } + + void setInterface(interface::ConsumerSocket *consumer_socket) { + consumer_interface_ = consumer_socket; + } + + void connect() { portal_->connect(); } + + bool isRunning() { return transport_protocol_->isRunning(); } + + virtual int consume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + network_name_ = name; + network_name_.setSuffix(0); + + transport_protocol_->start(); + + return CONSUMER_FINISHED; + } + + virtual int asyncConsume(const Name &name) { + if (!async_downloader_.stopped()) { + async_downloader_.add([this, name]() { + network_name_ = std::move(name); + network_name_.setSuffix(0); + transport_protocol_->start(); + }); + } + + return CONSUMER_RUNNING; + } + + bool verifyKeyPackets() { return transport_protocol_->verifyKeyPackets(); } + + void stop() { + if (transport_protocol_->isRunning()) { + transport_protocol_->stop(); + } + } + + void resume() { + if (!transport_protocol_->isRunning()) { + transport_protocol_->resume(); + } + } + + asio::io_service &getIoService() { return portal_->getIoService(); } + + virtual int setSocketOption(int socket_option_key, + ReadCallback *socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ReadCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + + int getSocketOption(int socket_option_key, + ReadCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ReadCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + *socket_option_value = read_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + + int setSocketOption(int socket_option_key, double socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case MIN_WINDOW_SIZE: + min_window_size_ = socket_option_value; + break; + + case MAX_WINDOW_SIZE: + max_window_size_ = socket_option_value; + break; + + case CURRENT_WINDOW_SIZE: + current_window_size_ = socket_option_value; + break; + + case GAMMA_VALUE: + gamma_ = socket_option_value; + break; + + case BETA_VALUE: + beta_ = socket_option_value; + break; + + case DROP_FACTOR: + drop_factor_ = socket_option_value; + break; + + case MINIMUM_DROP_PROBABILITY: + minimum_drop_probability_ = socket_option_value; + break; + + case RATE_ESTIMATION_ALPHA: + if (socket_option_value >= 0 && socket_option_value < 1) { + rate_estimation_alpha_ = socket_option_value; + } else { + rate_estimation_alpha_ = default_values::alpha; + } + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + int setSocketOption(int socket_option_key, uint32_t socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MAX_INTEREST_RETX: + max_retransmissions_ = socket_option_value; + break; + + case GeneralTransportOptions::INTEREST_LIFETIME: + interest_lifetime_ = socket_option_value; + break; + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + if (socket_option_value > 0) { + rate_estimation_batching_parameter_ = socket_option_value; + } else { + rate_estimation_batching_parameter_ = default_values::batch; + } + break; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + if (socket_option_value > 0) { + rate_estimation_choice_ = socket_option_value; + } else { + rate_estimation_choice_ = default_values::rate_choice; + } + break; + + case GeneralTransportOptions::STATS_INTERVAL: + timer_interval_milliseconds_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + int setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + std::nullptr_t socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + if (socket_option_value == VOID_HANDLER) { + on_interest_retransmission_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + if (socket_option_value == VOID_HANDLER) { + on_interest_timeout_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_output_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_input_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_verification_ = VOID_HANDLER; + break; + } + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + + int setSocketOption(int socket_option_key, bool socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case OtherOptions::VIRTUAL_DOWNLOAD: + virtual_download_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + case GeneralTransportOptions::VERIFY_SIGNATURE: + verify_signature_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + case GeneralTransportOptions::KEY_CONTENT: + key_content_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + default: + return result; + } + } + return result; + } + + int setSocketOption(int socket_option_key, + ConsumerContentObjectCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + on_content_object_input_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + + int setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + on_content_object_verification_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + + int setSocketOption(int socket_option_key, + ConsumerInterestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerInterestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + on_interest_retransmission_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + on_interest_output_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + on_interest_timeout_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + on_interest_satisfied_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + + int setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this]( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::VERIFICATION_FAILED: + verification_failed_callback_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + + // int setSocketOption( + // int socket_option_key, + // ConsumerContentObjectVerificationFailedCallback socket_option_value) { + // return rescheduleOnIOService( + // socket_option_key, socket_option_value, + // [this]( + // int socket_option_key, + // ConsumerContentObjectVerificationFailedCallback + // socket_option_value) + // -> int { + // switch (socket_option_key) { + // case ConsumerCallbacksOptions::VERIFICATION_FAILED: + // verification_failed_callback_ = socket_option_value; + // break; + + // default: + // return SOCKET_OPTION_NOT_SET; + // } + + // return SOCKET_OPTION_SET; + // }); + // } + + int setSocketOption(int socket_option_key, IcnObserver *socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: + rate_estimation_observer_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + int setSocketOption( + int socket_option_key, + const std::shared_ptr<utils::Verifier> &socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case GeneralTransportOptions::VERIFIER: + verifier_.reset(); + verifier_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + default: + return result; + } + } + + return result; + } + + int setSocketOption(int socket_option_key, + const std::string &socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case GeneralTransportOptions::CERTIFICATE: + key_id_ = verifier_->addKeyFromCertificate(socket_option_value); + + if (key_id_ != nullptr) { + result = SOCKET_OPTION_SET; + } + break; + + case DataLinkOptions::OUTPUT_INTERFACE: + output_interface_ = socket_option_value; + portal_->setOutputInterface(output_interface_); + result = SOCKET_OPTION_SET; + break; + + default: + return result; + } + } + return result; + } + + int setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerTimerCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::STATS_SUMMARY: + stats_summary_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + + int getSocketOption(int socket_option_key, double &socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MIN_WINDOW_SIZE: + socket_option_value = min_window_size_; + break; + + case GeneralTransportOptions::MAX_WINDOW_SIZE: + socket_option_value = max_window_size_; + break; + + case GeneralTransportOptions::CURRENT_WINDOW_SIZE: + socket_option_value = current_window_size_; + break; + + // RAAQM parameters + + case RaaqmTransportOptions::GAMMA_VALUE: + socket_option_value = gamma_; + break; + + case RaaqmTransportOptions::BETA_VALUE: + socket_option_value = beta_; + break; + + case RaaqmTransportOptions::DROP_FACTOR: + socket_option_value = drop_factor_; + break; + + case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY: + socket_option_value = minimum_drop_probability_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_ALPHA: + socket_option_value = rate_estimation_alpha_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + int getSocketOption(int socket_option_key, uint32_t &socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MAX_INTEREST_RETX: + socket_option_value = max_retransmissions_; + break; + + case GeneralTransportOptions::INTEREST_LIFETIME: + socket_option_value = interest_lifetime_; + break; + + case RaaqmTransportOptions::SAMPLE_NUMBER: + socket_option_value = sample_number_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + socket_option_value = rate_estimation_batching_parameter_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + socket_option_value = rate_estimation_choice_; + break; + + case GeneralTransportOptions::STATS_INTERVAL: + socket_option_value = timer_interval_milliseconds_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + int getSocketOption(int socket_option_key, bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::RUNNING: + socket_option_value = transport_protocol_->isRunning(); + break; + + case OtherOptions::VIRTUAL_DOWNLOAD: + socket_option_value = virtual_download_; + break; + + case GeneralTransportOptions::VERIFY_SIGNATURE: + socket_option_value = verify_signature_; + break; + + case GeneralTransportOptions::KEY_CONTENT: + socket_option_value = key_content_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + int getSocketOption(int socket_option_key, Name **socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + *socket_option_value = &network_name_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + int getSocketOption(int socket_option_key, + ConsumerContentObjectCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + *socket_option_value = &on_content_object_input_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + + int getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectVerificationCallback **socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + *socket_option_value = &on_content_object_verification_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + + int getSocketOption(int socket_option_key, + ConsumerInterestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerInterestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + *socket_option_value = &on_interest_retransmission_; + break; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + *socket_option_value = &on_interest_output_; + break; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + *socket_option_value = &on_interest_timeout_; + break; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + *socket_option_value = &on_interest_satisfied_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + + int getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectVerificationFailedCallback * + *socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::VERIFICATION_FAILED: + *socket_option_value = &verification_failed_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + + int getSocketOption(int socket_option_key, + std::shared_ptr<Portal> &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + int getSocketOption(int socket_option_key, + IcnObserver **socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: + *socket_option_value = (rate_estimation_observer_); + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + int getSocketOption(int socket_option_key, + std::shared_ptr<utils::Verifier> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::VERIFIER: + socket_option_value = verifier_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + int getSocketOption(int socket_option_key, std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + socket_option_value = output_interface_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + int getSocketOption(int socket_option_key, + interface::TransportStatistics **socket_option_value) { + switch (socket_option_key) { + case OtherOptions::STATISTICS: + *socket_option_value = &stats_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + int getSocketOption(int socket_option_key, + ConsumerTimerCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerTimerCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::STATS_SUMMARY: + *socket_option_value = &stats_summary_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + + protected: + template <typename Lambda, typename arg2> + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (transport_protocol_->isRunning()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, + &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + cv.notify_all(); + }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; + } + + protected: + interface::ConsumerSocket *consumer_interface_; + asio::io_service io_service_; + + std::shared_ptr<Portal> portal_; + utils::EventThread async_downloader_; + + // No need to protect from multiple accesses in the async consumer + // The parameter is accessible only with a getSocketOption and + // set from the consume + Name network_name_; + + int interest_lifetime_; + + double min_window_size_; + double max_window_size_; + double current_window_size_; + uint32_t max_retransmissions_; + + // RAAQM Parameters + double minimum_drop_probability_; + unsigned int sample_number_; + double gamma_; + double beta_; + double drop_factor_; + + // Rate estimation parameters + double rate_estimation_alpha_; + IcnObserver *rate_estimation_observer_; + int rate_estimation_batching_parameter_; + int rate_estimation_choice_; + + bool is_async_; + + // Verification parameters + std::shared_ptr<utils::Verifier> verifier_; + PARCKeyId *key_id_; + std::atomic_bool verify_signature_; + bool key_content_; + + ConsumerInterestCallback on_interest_retransmission_; + ConsumerInterestCallback on_interest_output_; + ConsumerInterestCallback on_interest_timeout_; + ConsumerInterestCallback on_interest_satisfied_; + ConsumerContentObjectCallback on_content_object_input_; + ConsumerContentObjectVerificationCallback on_content_object_verification_; + ConsumerContentObjectCallback on_content_object_; + ConsumerTimerCallback stats_summary_; + ConsumerContentObjectVerificationFailedCallback verification_failed_callback_; + + ReadCallback *read_callback_; + + // Virtual download for traffic generator + bool virtual_download_; + + uint32_t timer_interval_milliseconds_; + + // Transport protocol + std::unique_ptr<protocol::TransportProtocol> transport_protocol_; + + // Statistic + TransportStatistics stats_; + + utils::SpinLock guard_raaqm_params_; + std::string output_interface_; +}; + +} // namespace implementation +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h new file mode 100644 index 000000000..1f03fe53d --- /dev/null +++ b/libtransport/src/implementation/socket_producer.h @@ -0,0 +1,1061 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/security/signer.h> + +#include <implementation/socket.h> + +#include <utils/content_store.h> +#include <utils/event_thread.h> +#include <utils/suffix_strategy.h> + +#include <atomic> +#include <cmath> +#include <condition_variable> +#include <mutex> +#include <queue> +#include <thread> + +#define REGISTRATION_NOT_ATTEMPTED 0 +#define REGISTRATION_SUCCESS 1 +#define REGISTRATION_FAILURE 2 +#define REGISTRATION_IN_PROGRESS 3 + +namespace transport { +namespace implementation { + +using namespace core; +using namespace interface; + +class ProducerSocket : public Socket<BasePortal>, + public BasePortal::ProducerCallback { + public: + explicit ProducerSocket(interface::ProducerSocket *producer_socket) + : producer_interface_(producer_socket), + portal_(std::make_shared<Portal>(io_service_)), + data_packet_size_(default_values::content_object_packet_size), + content_object_expiry_time_(default_values::content_object_expiry_time), + output_buffer_(default_values::producer_socket_output_buffer_size), + async_thread_(), + registration_status_(REGISTRATION_NOT_ATTEMPTED), + making_manifest_(false), + hash_algorithm_(HashAlgorithm::SHA_256), + suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL), + on_interest_input_(VOID_HANDLER), + on_interest_dropped_input_buffer_(VOID_HANDLER), + on_interest_inserted_input_buffer_(VOID_HANDLER), + on_interest_satisfied_output_buffer_(VOID_HANDLER), + on_interest_process_(VOID_HANDLER), + on_new_segment_(VOID_HANDLER), + on_content_object_to_sign_(VOID_HANDLER), + on_content_object_in_output_buffer_(VOID_HANDLER), + on_content_object_output_(VOID_HANDLER), + on_content_object_evicted_from_output_buffer_(VOID_HANDLER), + on_content_produced_(VOID_HANDLER) {} + + virtual ~ProducerSocket() { + stop(); + if (listening_thread_.joinable()) { + listening_thread_.join(); + } + } + + interface::ProducerSocket *getInterface() { + return producer_interface_; + } + + void setInterface(interface::ProducerSocket *producer_socket) { + producer_interface_ = producer_socket; + } + + void connect() override { + portal_->connect(false); + listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this)); + } + + bool isRunning() override { return !io_service_.stopped(); }; + + virtual uint32_t produce(Name content_name, const uint8_t *buffer, + size_t buffer_size, bool is_last = true, + uint32_t start_offset = 0) { + return ProducerSocket::produce( + content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), is_last, + start_offset); + } + + virtual uint32_t produce(Name content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, uint32_t start_offset = 0) { + if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) { + return 0; + } + + // Copy the atomic variables to ensure they keep the same value + // during the production + std::size_t data_packet_size = data_packet_size_; + uint32_t content_object_expiry_time = content_object_expiry_time_; + HashAlgorithm hash_algo = hash_algorithm_; + bool making_manifest = making_manifest_; + auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( + suffix_strategy_, start_offset); + std::shared_ptr<utils::Signer> signer; + getSocketOption(GeneralTransportOptions::SIGNER, signer); + + auto buffer_size = buffer->length(); + int bytes_segmented = 0; + std::size_t header_size; + std::size_t manifest_header_size = 0; + std::size_t signature_length = 0; + std::uint32_t final_block_number = start_offset; + uint64_t free_space_for_content = 0; + + core::Packet::Format format; + std::shared_ptr<ContentObjectManifest> manifest; + bool is_last_manifest = false; + + // TODO Manifest may still be used for indexing + if (making_manifest && !signer) { + TRANSPORT_LOGD("Making manifests without setting producer identity."); + } + + core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; + core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC; + if (content_name.getType() == HNT_CONTIGUOUS_V4 || + content_name.getType() == HNT_IOV_V4) { + hf_format = core::Packet::Format::HF_INET_TCP; + hf_format_ah = core::Packet::Format::HF_INET_TCP_AH; + } else if (content_name.getType() == HNT_CONTIGUOUS_V6 || + content_name.getType() == HNT_IOV_V6) { + hf_format = core::Packet::Format::HF_INET6_TCP; + hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH; + } else { + throw errors::RuntimeException("Unknown name format."); + } + + format = hf_format; + if (making_manifest) { + manifest_header_size = core::Packet::getHeaderSizeFromFormat( + signer ? hf_format_ah : hf_format, + signer ? signer->getSignatureLength() : 0); + } else if (signer) { + format = hf_format_ah; + signature_length = signer->getSignatureLength(); + } + + header_size = + core::Packet::getHeaderSizeFromFormat(format, signature_length); + free_space_for_content = data_packet_size - header_size; + uint32_t number_of_segments = uint32_t( + std::ceil(double(buffer_size) / double(free_space_for_content))); + if (free_space_for_content * number_of_segments < buffer_size) { + number_of_segments++; + } + + // TODO allocate space for all the headers + if (making_manifest) { + uint32_t segment_in_manifest = static_cast<uint32_t>( + std::floor(double(data_packet_size - manifest_header_size - + ContentObjectManifest::getManifestHeaderSize()) / + ContentObjectManifest::getManifestEntrySize()) - + 1.0); + uint32_t number_of_manifests = static_cast<uint32_t>( + std::ceil(float(number_of_segments) / segment_in_manifest)); + final_block_number += number_of_segments + number_of_manifests - 1; + + manifest.reset(ContentObjectManifest::createManifest( + content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), + core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, + hash_algo, is_last_manifest, content_name, suffix_strategy_, + signer ? signer->getSignatureLength() : 0)); + manifest->setLifetime(content_object_expiry_time); + + if (is_last) { + manifest->setFinalBlockNumber(final_block_number); + } else { + manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX); + } + } + + TRANSPORT_LOGD("--------- START PRODUCE ----------"); + for (unsigned int packaged_segments = 0; + packaged_segments < number_of_segments; packaged_segments++) { + if (making_manifest) { + if (manifest->estimateManifestSize(2) > + data_packet_size - manifest_header_size) { + // Send the current manifest + manifest->encode(); + + // If identity set, sign manifest + if (signer) { + signer->sign(*manifest); + } + + passContentObjectToCallbacks(manifest); + TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); + + // Send content objects stored in the queue + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front()); + TRANSPORT_LOGD("Send content %u", + content_queue_.front()->getName().getSuffix()); + content_queue_.pop(); + } + + // Create new manifest. The reference to the last manifest has been + // acquired in the passContentObjectToCallbacks function, so we can + // safely release this reference + manifest.reset(ContentObjectManifest::createManifest( + content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), + core::ManifestVersion::VERSION_1, + core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, + content_name, suffix_strategy_, + signer ? signer->getSignatureLength() : 0)); + + manifest->setLifetime(content_object_expiry_time); + manifest->setFinalBlockNumber( + is_last ? final_block_number + : utils::SuffixStrategy::INVALID_SUFFIX); + } + } + + auto content_suffix = suffix_strategy->getNextContentSuffix(); + auto content_object = std::make_shared<ContentObject>( + content_name.setSuffix(content_suffix), format); + content_object->setLifetime(content_object_expiry_time); + + auto b = buffer->cloneOne(); + b->trimStart(free_space_for_content * packaged_segments); + b->trimEnd(b->length()); + + if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) { + b->append(buffer_size - bytes_segmented); + bytes_segmented += (int)(buffer_size - bytes_segmented); + + if (is_last && making_manifest) { + is_last_manifest = true; + } else if (is_last) { + content_object->setRst(); + } + + } else { + b->append(free_space_for_content); + bytes_segmented += (int)(free_space_for_content); + } + + content_object->appendPayload(std::move(b)); + + if (making_manifest) { + using namespace std::chrono_literals; + utils::CryptoHash hash = content_object->computeDigest(hash_algo); + manifest->addSuffixHash(content_suffix, hash); + content_queue_.push(content_object); + } else { + if (signer) { + signer->sign(*content_object); + } + passContentObjectToCallbacks(content_object); + TRANSPORT_LOGD("Send content %u", + content_object->getName().getSuffix()); + } + } + + if (making_manifest) { + if (is_last_manifest) { + manifest->setFinalManifest(is_last_manifest); + } + + manifest->encode(); + if (signer) { + signer->sign(*manifest); + } + + passContentObjectToCallbacks(manifest); + TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front()); + TRANSPORT_LOGD("Send content %u", + content_queue_.front()->getName().getSuffix()); + content_queue_.pop(); + } + } + + io_service_.dispatch([this, buffer_size]() { + if (on_content_produced_) { + on_content_produced_(*producer_interface_, + std::make_error_code(std::errc(0)), buffer_size); + } + }); + + TRANSPORT_LOGD("--------- END PRODUCE ------------"); + return suffix_strategy->getTotalCount(); + } + + virtual void produce(ContentObject &content_object) { + io_service_.dispatch([this, &content_object]() { + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*producer_interface_, + content_object); + } + }); + + output_buffer_.insert(std::static_pointer_cast<ContentObject>( + content_object.shared_from_this())); + + io_service_.dispatch([this, &content_object]() { + if (on_content_object_output_) { + on_content_object_output_(*producer_interface_, content_object); + } + }); + + portal_->sendContentObject(content_object); + } + + virtual void produce(const uint8_t *buffer, size_t buffer_size) { + produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); + } + + virtual void produce(std::unique_ptr<utils::MemBuf> &&buffer) { + // This API is meant to be used just with the RTC producer. + // Here it cannot be used since no name for the content is specified. + throw errors::NotImplementedException(); + } + + virtual void asyncProduce(const Name &suffix, const uint8_t *buf, + size_t buffer_size, bool is_last = true, + uint32_t *start_offset = nullptr) { + if (!async_thread_.stopped()) { + async_thread_.add([this, suffix, buffer = buf, size = buffer_size, + is_last, start_offset]() { + if (start_offset != nullptr) { + *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last, + *start_offset); + } else { + ProducerSocket::produce(suffix, buffer, size, is_last, 0); + } + }); + } + } + + void asyncProduce(const Name &suffix); + + virtual void asyncProduce(Name content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t offset, + uint32_t **last_segment = nullptr) { + if (!async_thread_.stopped()) { + auto a = buffer.release(); + async_thread_.add( + [this, content_name, a, is_last, offset, last_segment]() { + auto buf = std::unique_ptr<utils::MemBuf>(a); + if (last_segment != NULL) { + **last_segment = + offset + ProducerSocket::produce(content_name, std::move(buf), + is_last, offset); + } else { + ProducerSocket::produce(content_name, std::move(buf), is_last, + offset); + } + }); + } + } + + virtual void asyncProduce(ContentObject &content_object) { + if (!async_thread_.stopped()) { + auto co_ptr = std::static_pointer_cast<ContentObject>( + content_object.shared_from_this()); + async_thread_.add([this, content_object = std::move(co_ptr)]() { + ProducerSocket::produce(*content_object); + }); + } + } + + virtual void registerPrefix(const Prefix &producer_namespace) { + served_namespaces_.push_back(producer_namespace); + } + + void serveForever() { + if (listening_thread_.joinable()) { + listening_thread_.join(); + } + } + + void stop() { portal_->stopEventsLoop(); } + + asio::io_service &getIoService() override { return portal_->getIoService(); }; + + virtual void onInterest(Interest &interest) { + if (on_interest_input_) { + on_interest_input_(*producer_interface_, interest); + } + + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(interest); + + if (content_object) { + if (on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_(*producer_interface_, interest); + } + + if (on_content_object_output_) { + on_content_object_output_(*producer_interface_, *content_object); + } + + portal_->sendContentObject(*content_object); + } else { + if (on_interest_process_) { + on_interest_process_(*producer_interface_, interest); + } + } + } + + virtual void onInterest(Interest::Ptr &&interest) override { + onInterest(*interest); + }; + + virtual int setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::DATA_PACKET_SIZE: + if (socket_option_value < default_values::max_content_object_size && + socket_option_value > 0) { + data_packet_size_ = socket_option_value; + } + break; + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + output_buffer_.setLimit(socket_option_value); + break; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + content_object_expiry_time_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + virtual int setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_input_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::INTEREST_DROP: + if (socket_option_value == VOID_HANDLER) { + on_interest_dropped_input_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::INTEREST_PASS: + if (socket_option_value == VOID_HANDLER) { + on_interest_inserted_input_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CACHE_HIT: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_output_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CACHE_MISS: + if (socket_option_value == VOID_HANDLER) { + on_interest_process_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + if (socket_option_value == VOID_HANDLER) { + on_new_segment_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + if (socket_option_value == VOID_HANDLER) { + on_content_object_to_sign_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_in_output_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_output_ = VOID_HANDLER; + break; + } + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + + virtual int setSocketOption(int socket_option_key, bool socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + making_manifest_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + virtual int setSocketOption(int socket_option_key, + Name *socket_option_value) { + return SOCKET_OPTION_NOT_SET; + } + + virtual int setSocketOption(int socket_option_key, + std::list<Prefix> socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + served_namespaces_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + virtual int setSocketOption( + int socket_option_key, + interface::ProducerContentObjectCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + on_new_segment_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + on_content_object_to_sign_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + on_content_object_in_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + on_content_object_output_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + + virtual int setSocketOption( + int socket_option_key, + interface::ProducerInterestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + on_interest_input_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + on_interest_dropped_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + on_interest_inserted_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_HIT: + on_interest_satisfied_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_MISS: + on_interest_process_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + + virtual int setSocketOption( + int socket_option_key, + interface::ProducerContentCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + on_content_produced_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + + virtual int setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + hash_algorithm_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + virtual int setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::CRYPTO_SUITE: + crypto_suite_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + virtual int setSocketOption( + int socket_option_key, + const std::shared_ptr<utils::Signer> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::SIGNER: { + utils::SpinLock::Acquire locked(signer_lock_); + signer_.reset(); + signer_ = socket_option_value; + } break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + virtual int getSocketOption(int socket_option_key, + uint32_t &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + socket_option_value = (uint32_t)output_buffer_.getLimit(); + break; + + case GeneralTransportOptions::DATA_PACKET_SIZE: + socket_option_value = (uint32_t)data_packet_size_; + break; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + socket_option_value = content_object_expiry_time_; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_GET; + } + + virtual int getSocketOption(int socket_option_key, + bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + socket_option_value = making_manifest_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + virtual int getSocketOption(int socket_option_key, + std::list<Prefix> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + socket_option_value = served_namespaces_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + virtual int getSocketOption( + int socket_option_key, + interface::ProducerContentObjectCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + *socket_option_value = &on_new_segment_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + *socket_option_value = &on_content_object_to_sign_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + *socket_option_value = &on_content_object_in_output_buffer_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + *socket_option_value = &on_content_object_output_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + + virtual int getSocketOption( + int socket_option_key, + interface::ProducerContentCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + *socket_option_value = &on_content_produced_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + + virtual int getSocketOption( + int socket_option_key, + interface::ProducerInterestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + *socket_option_value = &on_interest_input_; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + *socket_option_value = &on_interest_dropped_input_buffer_; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + *socket_option_value = &on_interest_inserted_input_buffer_; + break; + + case CACHE_HIT: + *socket_option_value = &on_interest_satisfied_output_buffer_; + break; + + case CACHE_MISS: + *socket_option_value = &on_interest_process_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + + virtual int getSocketOption(int socket_option_key, + std::shared_ptr<Portal> &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + break; + default: + return SOCKET_OPTION_NOT_GET; + ; + } + + return SOCKET_OPTION_GET; + } + + virtual int getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = hash_algorithm_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + virtual int getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = crypto_suite_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + virtual int getSocketOption( + int socket_option_key, + std::shared_ptr<utils::Signer> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::SIGNER: { + utils::SpinLock::Acquire locked(signer_lock_); + socket_option_value = signer_; + } break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + + virtual int setSocketOption(int socket_option_key, + const std::string &socket_option_value) { + return SOCKET_OPTION_NOT_SET; + } + + // If the thread calling lambda_func is not the same of io_service, this + // function reschedule the function on it + template <typename Lambda, typename arg2> + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, + &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + cv.notify_all(); + }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; + } + + // If the thread calling lambda_func is not the same of io_service, this + // function reschedule the function on it + template <typename Lambda, typename arg2> + int rescheduleOnIOServiceWithReference(int socket_option_key, + arg2 &socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2 &)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != this->listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + std::unique_lock<std::mutex> lck(mtx); + bool done = false; + io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, + &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + + if (!done) { + cv.wait(lck); + } + }); + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; + } + + // Threads + protected: + interface::ProducerSocket *producer_interface_; + std::thread listening_thread_; + asio::io_service io_service_; + std::shared_ptr<Portal> portal_; + std::atomic<size_t> data_packet_size_; + std::list<Prefix> + served_namespaces_; // No need to be threadsafe, this is always modified + // by the application thread + std::atomic<uint32_t> content_object_expiry_time_; + + // buffers + // ContentStore is thread-safe + utils::ContentStore output_buffer_; + + utils::EventThread async_thread_; + int registration_status_; + + std::atomic<bool> making_manifest_; + + // map for storing sequence numbers for several calls of the publish + // function + std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_; + + std::atomic<HashAlgorithm> hash_algorithm_; + std::atomic<utils::CryptoSuite> crypto_suite_; + utils::SpinLock signer_lock_; + std::shared_ptr<utils::Signer> signer_; + core::NextSegmentCalculationStrategy suffix_strategy_; + + // While manifests are being built, contents are stored in a queue + std::queue<std::shared_ptr<ContentObject>> content_queue_; + + // callbacks + ProducerInterestCallback on_interest_input_; + ProducerInterestCallback on_interest_dropped_input_buffer_; + ProducerInterestCallback on_interest_inserted_input_buffer_; + ProducerInterestCallback on_interest_satisfied_output_buffer_; + ProducerInterestCallback on_interest_process_; + + ProducerContentObjectCallback on_new_segment_; + ProducerContentObjectCallback on_content_object_to_sign_; + ProducerContentObjectCallback on_content_object_in_output_buffer_; + ProducerContentObjectCallback on_content_object_output_; + ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_; + + ProducerContentCallback on_content_produced_; + + private: + void listen() { + bool first = true; + + for (core::Prefix &producer_namespace : served_namespaces_) { + if (first) { + core::BindConfig bind_config(producer_namespace, 1000); + portal_->bind(bind_config); + portal_->setProducerCallback(this); + first = !first; + } else { + portal_->registerRoute(producer_namespace); + } + } + + portal_->runEventsLoop(); + } + + void passContentObjectToCallbacks( + const std::shared_ptr<ContentObject> &content_object) { + if (content_object) { + io_service_.dispatch([this, content_object]() { + if (on_new_segment_) { + on_new_segment_(*producer_interface_, *content_object); + } + + if (on_content_object_to_sign_) { + on_content_object_to_sign_(*producer_interface_, *content_object); + } + + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*producer_interface_, + *content_object); + } + }); + + output_buffer_.insert(content_object); + + io_service_.dispatch([this, content_object]() { + if (on_content_object_output_) { + on_content_object_output_(*producer_interface_, *content_object); + } + }); + + portal_->sendContentObject(*content_object); + } + } +}; // namespace implementation + +} // namespace implementation + +} // namespace transport diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc new file mode 100644 index 000000000..3b3152993 --- /dev/null +++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/interest.h> +#include <hicn/transport/interfaces/p2psecure_socket_producer.h> + +#include <implementation/p2psecure_socket_producer.h> +#include <implementation/tls_rtc_socket_producer.h> + +#include <openssl/bio.h> +#include <openssl/rand.h> +#include <openssl/ssl.h> + +namespace transport { +namespace implementation { + +int TLSRTCProducerSocket::read(BIO *b, char *buf, size_t size, + size_t *readbytes) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = TLSRTCProducerSocket::readOld(b, buf, (int)size); + + if (ret <= 0) { + *readbytes = 0; + return ret; + } + + *readbytes = (size_t)ret; + + return 1; +} + +int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) { + TLSRTCProducerSocket *socket; + socket = (TLSRTCProducerSocket *)BIO_get_data(b); + + std::unique_lock<std::mutex> lck(socket->mtx_); + if (!socket->something_to_read_) { + (socket->cv_).wait(lck); + } + + utils::MemBuf *membuf = socket->packet_->next(); + int size_to_read; + + if ((int)membuf->length() > size) { + size_to_read = size; + } else { + size_to_read = membuf->length(); + socket->something_to_read_ = false; + } + + std::memcpy(buf, membuf->data(), size_to_read); + membuf->trimStart(size_to_read); + + return size_to_read; +} + +int TLSRTCProducerSocket::write(BIO *b, const char *buf, size_t size, + size_t *written) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = TLSRTCProducerSocket::writeOld(b, buf, (int)size); + + if (ret <= 0) { + *written = 0; + return ret; + } + + *written = (size_t)ret; + + return 1; +} + +int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { + TLSRTCProducerSocket *socket; + socket = (TLSRTCProducerSocket *)BIO_get_data(b); + + if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && + socket->first_) { + socket->tls_chunks_--; + bool making_manifest = socket->parent_->making_manifest_; + socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, + false); + socket->parent_->ProducerSocket::produce( + socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, 0); + socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, + making_manifest); + socket->first_ = false; + + } else { + std::unique_ptr<utils::MemBuf> mbuf = + utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); + auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { + socket->to_call_oncontentproduced_--; + auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->RTCProducerSocket::produce(std::move(mbuf)); + ProducerContentCallback on_content_produced_application; + socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, + on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && + on_content_produced_application) { + on_content_produced_application( + (transport::interface::ProducerSocket &)(*socket->getInterface()), + std::error_code(), 0); + } + }); + } + + return num; +} + +TLSRTCProducerSocket::TLSRTCProducerSocket( + interface::ProducerSocket *producer_socket, P2PSecureProducerSocket *parent, + const Name &handshake_name) + : ProducerSocket(producer_socket), + RTCProducerSocket(producer_socket), + TLSProducerSocket(producer_socket, parent, handshake_name) { + BIO_METHOD *bio_meth = + BIO_meth_new(BIO_TYPE_ACCEPT, "secure rtc producer socket"); + BIO_meth_set_read(bio_meth, TLSRTCProducerSocket::readOld); + BIO_meth_set_write(bio_meth, TLSRTCProducerSocket::writeOld); + BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl); + BIO *bio = BIO_new(bio_meth); + BIO_set_init(bio, 1); + BIO_set_data(bio, this); + SSL_set_bio(ssl_, bio, bio); +} + +void TLSRTCProducerSocket::accept() { + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + tls_chunks_ = 1; + int result = SSL_accept(ssl_); + if (result != 1) + throw errors::RuntimeException("Unable to perform client handshake"); + } + + TRANSPORT_LOGD("Handshake performed!"); + parent_->list_secure_rtc_producers.push_front( + std::move(parent_->map_secure_rtc_producers[handshake_name_])); + parent_->map_secure_rtc_producers.erase(handshake_name_); + + ProducerInterestCallback on_interest_process_decrypted; + getSocketOption(ProducerCallbacksOptions::CACHE_MISS, + on_interest_process_decrypted); + + if (on_interest_process_decrypted) { + Interest inter(std::move(packet_)); + on_interest_process_decrypted( + (transport::interface::ProducerSocket &)(*getInterface()), inter); + } + + parent_->cv_.notify_one(); +} + +int TLSRTCProducerSocket::async_accept() { + if (!async_thread_.stopped()) { + async_thread_.add([this]() { this->TLSRTCProducerSocket::accept(); }); + } else { + throw errors::RuntimeException( + "Async thread not running, impossible to perform handshake"); + } + + return 1; +} + +void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + throw errors::RuntimeException( + "New handshake on the same P2P secure producer socket not supported"); + } + + size_t buf_size = buffer->length(); + tls_chunks_ = ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); + to_call_oncontentproduced_ = tls_chunks_; + + SSL_write(ssl_, buffer->data(), buf_size); + BIO *wbio = SSL_get_wbio(ssl_); + int i = BIO_flush(wbio); + (void)i; // To shut up gcc 5 +} + +} // namespace implementation + +} // namespace transport diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.h b/libtransport/src/implementation/tls_rtc_socket_producer.h new file mode 100644 index 000000000..685c91244 --- /dev/null +++ b/libtransport/src/implementation/tls_rtc_socket_producer.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <implementation/rtc_socket_producer.h> +#include <implementation/tls_socket_producer.h> + +namespace transport { +namespace implementation { + +class P2PSecureProducerSocket; + +class TLSRTCProducerSocket : public RTCProducerSocket, + public TLSProducerSocket { + friend class P2PSecureProducerSocket; + + public: + explicit TLSRTCProducerSocket(interface::ProducerSocket *producer_socket, + P2PSecureProducerSocket *parent, + const Name &handshake_name); + + ~TLSRTCProducerSocket() = default; + + void produce(std::unique_ptr<utils::MemBuf> &&buffer) override; + + void accept() override; + + int async_accept() override; + + using TLSProducerSocket::onInterest; + using TLSProducerSocket::produce; + + protected: + static int read(BIO *b, char *buf, size_t size, size_t *readbytes); + + static int readOld(BIO *h, char *buf, int size); + + static int write(BIO *b, const char *buf, size_t size, size_t *written); + + static int writeOld(BIO *h, const char *buf, int num); +}; + +} // namespace implementation + +} // end namespace transport diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc new file mode 100644 index 000000000..95b287aa6 --- /dev/null +++ b/libtransport/src/implementation/tls_socket_consumer.cc @@ -0,0 +1,384 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <implementation/tls_socket_consumer.h> + +#include <openssl/bio.h> +#include <openssl/ssl.h> +#include <openssl/tls1.h> + +#include <random> + +namespace transport { +namespace implementation { + +void TLSConsumerSocket::setInterestPayload(interface::ConsumerSocket &c, + const core::Interest &interest) { + Interest &int2 = const_cast<Interest &>(interest); + random_suffix_ = int2.getName().getSuffix(); + + if (payload_ != NULL) int2.appendPayload(std::move(payload_)); +} + +/* Return the number of read bytes in the return param */ +int readOldTLS(BIO *b, char *buf, int size) { + if (size < 0) return size; + + TLSConsumerSocket *socket; + socket = (TLSConsumerSocket *)BIO_get_data(b); + + std::unique_lock<std::mutex> lck(socket->mtx_); + + if (!socket->something_to_read_) { + if (!socket->transport_protocol_->isRunning()) { + socket->network_name_.setSuffix(socket->random_suffix_); + socket->ConsumerSocket::asyncConsume(socket->network_name_); + } + if (!socket->something_to_read_) socket->cv_.wait(lck); + } + + size_t size_to_read, read; + size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) + chain_size = socket->head_->computeChainDataLength(); + + if (chain_size > (size_t)size) { + read = size_to_read = (size_t)size; + } else { + read = size_to_read = chain_size; + socket->something_to_read_ = false; + } + + while (size_to_read) { + if (socket->head_->length() < size_to_read) { + std::memcpy(buf, socket->head_->data(), socket->head_->length()); + size_to_read -= socket->head_->length(); + buf += socket->head_->length(); + socket->head_ = socket->head_->pop(); + } else { + std::memcpy(buf, socket->head_->data(), size_to_read); + socket->head_->trimStart(size_to_read); + size_to_read = 0; + } + } + + return read; +} + +/* Return the number of read bytes in readbytes */ +int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = readOldTLS(b, buf, (int)size); + + if (ret <= 0) { + *readbytes = 0; + return ret; + } + + *readbytes = (size_t)ret; + + return 1; +} + +/* Return the number of written bytes in the return param */ +int writeOldTLS(BIO *b, const char *buf, int num) { + TLSConsumerSocket *socket; + socket = (TLSConsumerSocket *)BIO_get_data(b); + + socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &TLSConsumerSocket::setInterestPayload, socket, std::placeholders::_1, + std::placeholders::_2)); + + return num; +} + +/* Return the number of written bytes in written */ +int writeTLS(BIO *b, const char *buf, size_t size, size_t *written) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = writeOldTLS(b, buf, (int)size); + + if (ret <= 0) { + *written = 0; + return ret; + } + + *written = (size_t)ret; + + return 1; +} + +long ctrlTLS(BIO *b, int cmd, long num, void *ptr) { return 1; } + +TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, + int protocol, SSL *ssl) + : ConsumerSocket(consumer_socket, protocol), + name_(), + buf_pool_(), + decrypted_content_(), + payload_(), + head_(), + something_to_read_(false), + content_downloaded_(false), + random_suffix_(), + producer_namespace_(), + read_callback_decrypted_(), + mtx_(), + cv_(), + async_downloader_tls_() { + /* Create the (d)TLS state */ + const SSL_METHOD *meth = TLS_client_method(); + ctx_ = SSL_CTX_new(meth); + + int result = + SSL_CTX_set_ciphersuites(ctx_, + "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" + "SHA256:TLS_AES_128_GCM_SHA256"); + if (result != 1) { + throw errors::RuntimeException( + "Unable to set cipher list on TLS subsystem. Aborting."); + } + + SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL); + SSL_CTX_set_ssl_version(ctx_, meth); + + ssl_ = ssl; + + BIO_METHOD *bio_meth = + BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket"); + BIO_meth_set_read(bio_meth, readOldTLS); + BIO_meth_set_write(bio_meth, writeOldTLS); + BIO_meth_set_ctrl(bio_meth, ctrlTLS); + BIO *bio = BIO_new(bio_meth); + BIO_set_init(bio, 1); + BIO_set_data(bio, this); + SSL_set_bio(ssl_, bio, bio); + + ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + + ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); + + std::default_random_engine generator; + std::uniform_int_distribution<int> distribution( + 1, std::numeric_limits<uint32_t>::max()); + random_suffix_ = 0; + + this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + this); +}; + +/* + * The producer interface is not owned by the application, so is TLSSocket task + * to deallocate the memory + */ +TLSConsumerSocket::~TLSConsumerSocket() { delete consumer_interface_; } + +int TLSConsumerSocket::consume(const Name &name, + std::unique_ptr<utils::MemBuf> &&buffer) { + this->payload_ = std::move(buffer); + + this->ConsumerSocket::setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1, + std::placeholders::_2)); + + return consume(name); +} + +int TLSConsumerSocket::consume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + throw errors::RuntimeException("Handshake not performed"); + } + + return download_content(name); +} + +int TLSConsumerSocket::download_content(const Name &name) { + network_name_ = name; + network_name_.setSuffix(0); + something_to_read_ = false; + content_downloaded_ = false; + + decrypted_content_ = utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH); + uint8_t *buf = decrypted_content_->writableData(); + size_t size = 0; + int result = -1; + + while (!content_downloaded_ || something_to_read_) { + if (decrypted_content_->tailroom() < SSL3_RT_MAX_PLAIN_LENGTH) { + decrypted_content_->appendChain( + utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH)); + // decrypted_content_->computeChainDataLength(); + buf = decrypted_content_->prev()->writableData(); + } else { + buf = decrypted_content_->writableTail(); + } + + result = SSL_read(this->ssl_, buf, SSL3_RT_MAX_PLAIN_LENGTH); + + /* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of + * the data has been fully downloaded */ + + /* ASSERT((result < SSL3_RT_MAX_PLAIN_LENGTH && content_downloaded_) || */ + /* result == SSL3_RT_MAX_PLAIN_LENGTH); */ + + if (result >= 0) { + size += result; + decrypted_content_->prepend(result); + } else + throw errors::RuntimeException("Unable to download content"); + + if (size >= read_callback_decrypted_->maxBufferSize()) { + if (read_callback_decrypted_->isBufferMovable()) { + // No need to perform an additional copy. The whole buffer will be + // tranferred to the application. + + read_callback_decrypted_->readBufferAvailable( + std::move(decrypted_content_)); + decrypted_content_ = utils::MemBuf::create(SSL3_RT_MAX_PLAIN_LENGTH); + } else { + // The buffer will be copied into the application-provided buffer + uint8_t *buffer; + std::size_t length; + std::size_t total_length = decrypted_content_->length(); + + while (decrypted_content_->length()) { + buffer = nullptr; + length = 0; + read_callback_decrypted_->getReadBuffer(&buffer, &length); + + if (!buffer || !length) { + throw errors::RuntimeException( + "Invalid buffer provided by the application."); + } + + auto to_copy = std::min(decrypted_content_->length(), length); + std::memcpy(buffer, decrypted_content_->data(), to_copy); + decrypted_content_->trimStart(to_copy); + } + + read_callback_decrypted_->readDataAvailable(total_length); + decrypted_content_->clear(); + } + } + } + + read_callback_decrypted_->readSuccess(size); + + return CONSUMER_FINISHED; +} + +int TLSConsumerSocket::asyncConsume(const Name &name, + std::unique_ptr<utils::MemBuf> &&buffer) { + this->payload_ = std::move(buffer); + + this->ConsumerSocket::setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1, + std::placeholders::_2)); + + return asyncConsume(name); +} + +int TLSConsumerSocket::asyncConsume(const Name &name) { + if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + throw errors::RuntimeException("Handshake not performed"); + } + + if (!async_downloader_tls_.stopped()) { + async_downloader_tls_.add([this, name]() { + is_async_ = true; + download_content(name); + }); + } + + return CONSUMER_RUNNING; +} + +void TLSConsumerSocket::registerPrefix(const Prefix &producer_namespace) { + producer_namespace_ = producer_namespace; +} + +int TLSConsumerSocket::setSocketOption(int socket_option_key, + ReadCallback *socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, ReadCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_decrypted_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +void TLSConsumerSocket::getReadBuffer(uint8_t **application_buffer, + size_t *max_length) {} + +void TLSConsumerSocket::readDataAvailable(size_t length) noexcept {} + +size_t TLSConsumerSocket::maxBufferSize() const { + return SSL3_RT_MAX_PLAIN_LENGTH; +} + +void TLSConsumerSocket::readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept { + std::unique_lock<std::mutex> lck(this->mtx_); + if (head_) { + head_->prependChain(std::move(buffer)); + } else { + head_ = std::move(buffer); + } + + something_to_read_ = true; + cv_.notify_one(); +} + +void TLSConsumerSocket::readError(const std::error_code ec) noexcept {} + +void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept { + std::unique_lock<std::mutex> lck(this->mtx_); + content_downloaded_ = true; + something_to_read_ = true; + cv_.notify_one(); +} + +bool TLSConsumerSocket::isBufferMovable() noexcept { return true; } + +} // namespace implementation + +} // namespace transport diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h new file mode 100644 index 000000000..2e88dc47e --- /dev/null +++ b/libtransport/src/implementation/tls_socket_consumer.h @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/socket_consumer.h> + +#include <implementation/socket_consumer.h> + +#include <openssl/ssl.h> + +namespace transport { +namespace implementation { + +class TLSConsumerSocket : public ConsumerSocket, + public interface::ConsumerSocket::ReadCallback { + /* Return the number of read bytes in readbytes */ + friend int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes); + + /* Return the number of read bytes in the return param */ + friend int readOldTLS(BIO *h, char *buf, int size); + + /* Return the number of written bytes in written */ + friend int writeTLS(BIO *b, const char *buf, size_t size, size_t *written); + + /* Return the number of written bytes in the return param */ + friend int writeOldTLS(BIO *h, const char *buf, int num); + + friend long ctrlTLS(BIO *b, int cmd, long num, void *ptr); + + public: + explicit TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, + int protocol, SSL *ssl_); + + ~TLSConsumerSocket(); + + int consume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer); + int consume(const Name &name) override; + + int asyncConsume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer); + int asyncConsume(const Name &name) override; + + void registerPrefix(const Prefix &producer_namespace); + + int setSocketOption( + int socket_option_key, + interface::ConsumerSocket::ReadCallback *socket_option_value) override; + + using ConsumerSocket::getSocketOption; + using ConsumerSocket::setSocketOption; + + protected: + /* Callback invoked once an interest has been received and its payload + * decrypted */ + ConsumerInterestCallback on_interest_input_decrypted_; + ConsumerInterestCallback on_interest_process_decrypted_; + + private: + Name name_; + + /* SSL handle */ + SSL *ssl_; + SSL_CTX *ctx_; + + /* Chain of MemBuf to be used as a temporary buffer to pass descypted data + * from the underlying layer to the application */ + utils::ObjectPool<utils::MemBuf> buf_pool_; + std::unique_ptr<utils::MemBuf> decrypted_content_; + + /* Chain of MemBuf holding the payload to be written into interest or data + */ + std::unique_ptr<utils::MemBuf> payload_; + + /* Chain of MemBuf holding the data retrieved from the underlying layer */ + std::unique_ptr<utils::MemBuf> head_; + + bool something_to_read_; + + bool content_downloaded_; + + double old_max_win_; + + double old_current_win_; + + uint32_t random_suffix_; + + Prefix producer_namespace_; + + interface::ConsumerSocket::ReadCallback *read_callback_decrypted_; + + std::mutex mtx_; + + /* Condition variable for the wait */ + std::condition_variable cv_; + + utils::EventThread async_downloader_tls_; + + void setInterestPayload(interface::ConsumerSocket &c, + const core::Interest &interest); + + virtual void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override; + + virtual void readDataAvailable(size_t length) noexcept override; + + virtual size_t maxBufferSize() const override; + + virtual void readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept override; + + virtual void readError(const std::error_code ec) noexcept override; + + virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; + + int download_content(const Name &name); +}; + +} // namespace implementation + +} // end namespace transport
\ No newline at end of file diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc new file mode 100644 index 000000000..9a5b94a1c --- /dev/null +++ b/libtransport/src/implementation/tls_socket_producer.cc @@ -0,0 +1,611 @@ +/* + * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_producer.h> + +#include <implementation/p2psecure_socket_producer.h> +#include <implementation/tls_socket_producer.h> + +#include <openssl/bio.h> +#include <openssl/rand.h> +#include <openssl/ssl.h> + +namespace transport { +namespace implementation { + +/* Return the number of read bytes in readbytes */ +int TLSProducerSocket::read(BIO *b, char *buf, size_t size, size_t *readbytes) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = TLSProducerSocket::readOld(b, buf, (int)size); + + if (ret <= 0) { + *readbytes = 0; + return ret; + } + + *readbytes = (size_t)ret; + + return 1; +} + +/* Return the number of read bytes in the return param */ +int TLSProducerSocket::readOld(BIO *b, char *buf, int size) { + TLSProducerSocket *socket; + socket = (TLSProducerSocket *)BIO_get_data(b); + + /* take a lock on the mutex. It will be unlocked by */ + std::unique_lock<std::mutex> lck(socket->mtx_); + if (!socket->something_to_read_) { + (socket->cv_).wait(lck); + } + + /* Either there already is something to read, or the thread has been waken up + */ + /* must return the payload in the interest */ + + utils::MemBuf *membuf = socket->packet_->next(); + int size_to_read; + if ((int)membuf->length() > size) { + size_to_read = size; + } else { + size_to_read = membuf->length(); + socket->something_to_read_ = false; + } + + std::memcpy(buf, membuf->data(), size_to_read); + membuf->trimStart(size_to_read); + + return size_to_read; +} + +/* Return the number of written bytes in written */ +int TLSProducerSocket::write(BIO *b, const char *buf, size_t size, + size_t *written) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = TLSProducerSocket::writeOld(b, buf, (int)size); + + if (ret <= 0) { + *written = 0; + return ret; + } + + *written = (size_t)ret; + + return 1; +} + +/* Return the number of written bytes in the return param */ +int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { + TLSProducerSocket *socket; + socket = (TLSProducerSocket *)BIO_get_data(b); + + if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && + socket->first_) { + //! socket->tls_chunks_ corresponds to is_last + socket->tls_chunks_--; + bool making_manifest = socket->parent_->making_manifest_; + socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, + false); + socket->parent_->ProducerSocket::produce( + socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, + socket->last_segment_); + socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, + making_manifest); + socket->first_ = false; + } else { + socket->still_writing_ = true; + + std::unique_ptr<utils::MemBuf> mbuf = + utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); + auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { + socket->tls_chunks_--; + socket->to_call_oncontentproduced_--; + auto mbuf = std::unique_ptr<utils::MemBuf>(a); + socket->last_segment_ += socket->ProducerSocket::produce( + socket->name_, std::move(mbuf), socket->tls_chunks_ == 0, + socket->last_segment_); + ProducerContentCallback on_content_produced_application; + socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, + on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && + on_content_produced_application) { + on_content_produced_application(*socket->getInterface(), + std::error_code(), 0); + } + }); + } + + return num; +} + +TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, + P2PSecureProducerSocket *parent, + const Name &handshake_name) + : ProducerSocket(producer_socket), + on_content_produced_application_(), + mtx_(), + cv_(), + something_to_read_(), + name_(), + last_segment_(0), + parent_(parent), + first_(true), + handshake_name_(handshake_name), + tls_chunks_(0), + to_call_oncontentproduced_(0), + still_writing_(false), + encryption_thread_() { + const SSL_METHOD *meth = TLS_server_method(); + ctx_ = SSL_CTX_new(meth); + + /* + * Setup SSL context (identity and parameter to use TLS 1.3) + */ + SSL_CTX_use_certificate(ctx_, parent->cert_509_); + SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_); + + int result = + SSL_CTX_set_ciphersuites(ctx_, + "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" + "SHA256:TLS_AES_128_GCM_SHA256"); + if (result != 1) { + throw errors::RuntimeException( + "Unable to set cipher list on TLS subsystem. Aborting."); + } + + // We force it to be TLS 1.3 + SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL); + SSL_CTX_set_num_tickets(ctx_, 0); + + result = SSL_CTX_add_custom_ext( + ctx_, 100, SSL_EXT_CLIENT_HELLO | SSL_EXT_TLS1_3_ENCRYPTED_EXTENSIONS, + TLSProducerSocket::addHicnKeyIdCb, TLSProducerSocket::freeHicnKeyIdCb, + this, TLSProducerSocket::parseHicnKeyIdCb, NULL); + + ssl_ = SSL_new(ctx_); + /* + * Setup this producer socker as the bio that TLS will use to write and read + * data (in stream mode) + */ + BIO_METHOD *bio_meth = + BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket"); + BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld); + BIO_meth_set_write(bio_meth, TLSProducerSocket::writeOld); + BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl); + BIO *bio = BIO_new(bio_meth); + BIO_set_init(bio, 1); + BIO_set_data(bio, this); + SSL_set_bio(ssl_, bio, bio); + /* + * Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. + */ + this->ProducerSocket::setSocketOption( + ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this, + std::placeholders::_1, + std::placeholders::_2)); + this->ProducerSocket::setSocketOption( + ProducerCallbacksOptions::CONTENT_PRODUCED, + (ProducerContentCallback)bind( + &TLSProducerSocket::onContentProduced, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); +} + +/* + * The producer interface is not owned by the application, so is TLSSocket task + * to deallocate the memory + */ +TLSProducerSocket::~TLSProducerSocket() { delete producer_interface_; } + +void TLSProducerSocket::accept() { + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + tls_chunks_ = 1; + int result = SSL_accept(ssl_); + if (result != 1) + throw errors::RuntimeException("Unable to perform client handshake"); + } + TRANSPORT_LOGD("Handshake performed!"); + parent_->list_secure_producers.push_front( + std::move(parent_->map_secure_producers[handshake_name_])); + parent_->map_secure_producers.erase(handshake_name_); + + ProducerInterestCallback on_interest_process_decrypted; + getSocketOption(ProducerCallbacksOptions::CACHE_MISS, + on_interest_process_decrypted); + + if (on_interest_process_decrypted) { + Interest inter(std::move(packet_)); + on_interest_process_decrypted(*getInterface(), inter); + } else { + throw errors::RuntimeException( + "On interest process unset. Unable to perform handshake"); + } +} + +int TLSProducerSocket::async_accept() { + if (!async_thread_.stopped()) { + async_thread_.add([this]() { this->accept(); }); + } else { + throw errors::RuntimeException( + "Async thread not running, impossible to perform handshake"); + } + + return 1; +} + +void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) { + /* Based on the state machine of (D)TLS, we know what action to do */ + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + std::unique_lock<std::mutex> lck(mtx_); + name_ = interest.getName(); + something_to_read_ = true; + packet_ = interest.acquireMemBufReference(); + if (head_) { + payload_->prependChain(interest.getPayload()); + } else { + payload_ = interest.getPayload(); // std::move(interest.getPayload()); + } + cv_.notify_one(); + } else { + name_ = interest.getName(); + packet_ = interest.acquireMemBufReference(); + payload_ = interest.getPayload(); + something_to_read_ = true; + + if (interest.getPayload()->length() > 0) + SSL_read( + ssl_, + const_cast<unsigned char *>(interest.getPayload()->writableData()), + interest.getPayload()->length()); + } + + ProducerInterestCallback on_interest_input_decrypted; + getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, + on_interest_input_decrypted); + if (on_interest_input_decrypted) + (on_interest_input_decrypted)(*getInterface(), interest); +} + +void TLSProducerSocket::cacheMiss(interface::ProducerSocket &p, + Interest &interest) { + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + std::unique_lock<std::mutex> lck(mtx_); + name_ = interest.getName(); + something_to_read_ = true; + packet_ = interest.acquireMemBufReference(); + payload_ = interest.getPayload(); + cv_.notify_one(); + } else { + name_ = interest.getName(); + packet_ = interest.acquireMemBufReference(); + payload_ = interest.getPayload(); + something_to_read_ = true; + + if (interest.getPayload()->length() > 0) + SSL_read( + ssl_, + const_cast<unsigned char *>(interest.getPayload()->writableData()), + interest.getPayload()->length()); + + if (on_interest_process_decrypted_ != VOID_HANDLER) + on_interest_process_decrypted_(*getInterface(), interest); + } +} + +void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p, + const std::error_code &err, + uint64_t bytes_written) {} + +uint32_t TLSProducerSocket::produce(Name content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t start_offset) { + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + throw errors::RuntimeException( + "New handshake on the same P2P secure producer socket not supported"); + } + size_t buf_size = buffer->length(); + name_ = served_namespaces_.front().mapName(content_name); + + tls_chunks_ = to_call_oncontentproduced_ = + ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); + + if (!is_last) { + tls_chunks_++; + } + + last_segment_ = start_offset; + + SSL_write(ssl_, buffer->data(), buf_size); + BIO *wbio = SSL_get_wbio(ssl_); + int i = BIO_flush(wbio); + (void)i; // To shut up gcc 5 + + return 0; +} + +void TLSProducerSocket::asyncProduce(const Name &content_name, + const uint8_t *buf, size_t buffer_size, + bool is_last, uint32_t *start_offset) { + if (!encryption_thread_.stopped()) { + encryption_thread_.add([this, content_name, buffer = buf, + size = buffer_size, is_last, start_offset]() { + if (start_offset != NULL) { + produce(content_name, buffer, size, is_last, *start_offset); + } else { + produce(content_name, buffer, size, is_last, 0); + } + }); + } +} + +void TLSProducerSocket::asyncProduce(Name content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t offset, + uint32_t **last_segment) { + if (!encryption_thread_.stopped()) { + auto a = buffer.release(); + encryption_thread_.add( + [this, content_name, a, is_last, offset, last_segment]() { + auto buf = std::unique_ptr<utils::MemBuf>(a); + if (last_segment != NULL) { + *last_segment = &last_segment_; + } + produce(content_name, std::move(buf), is_last, offset); + }); + } +} + +void TLSProducerSocket::asyncProduce(ContentObject &content_object) { + throw errors::RuntimeException("API not supported"); +} + +void TLSProducerSocket::produce(ContentObject &content_object) { + throw errors::RuntimeException("API not supported"); +} + +long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) { + if (cmd == BIO_CTRL_FLUSH) { + } + return 1; +} + +int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char **out, size_t *outlen, + X509 *x, size_t chainidx, int *al, + void *add_arg) { + TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg); + if (ext_type == 100) { + ip_prefix_t ip_prefix = + socket->parent_->served_namespaces_.front().toIpPrefixStruct(); + int inet_family = + socket->parent_->served_namespaces_.front().getAddressFamily(); + uint16_t prefix_len_bits = + socket->parent_->served_namespaces_.front().getPrefixLength(); + uint8_t prefix_len_bytes = prefix_len_bits / 8; + uint8_t prefix_len_u32 = prefix_len_bits / 32; + + ip_prefix_t *out_ip = (ip_prefix_t *)malloc(sizeof(ip_prefix_t)); + out_ip->family = inet_family; + out_ip->len = prefix_len_bits + 32; + u8 *out_ip_buf = const_cast<u8 *>( + ip_address_get_buffer(&(out_ip->address), inet_family)); + *out = reinterpret_cast<unsigned char *>(out_ip); + + RAND_bytes((unsigned char *)&socket->key_id_, 4); + + memcpy(out_ip_buf, ip_address_get_buffer(&(ip_prefix.address), inet_family), + prefix_len_bytes); + memcpy((out_ip_buf + prefix_len_bytes), &socket->key_id_, 4); + *outlen = sizeof(ip_prefix_t); + + ip_address_t mask = {}; + ip_address_t keyId_component = {}; + u32 *mask_buf; + u32 *keyId_component_buf; + switch (inet_family) { + case AF_INET: + mask_buf = &(mask.v4.as_u32); + keyId_component_buf = &(keyId_component.v4.as_u32); + break; + case AF_INET6: + mask_buf = mask.v6.as_u32; + keyId_component_buf = keyId_component.v6.as_u32; + break; + default: + throw errors::RuntimeException("Unknown protocol"); + } + + if (prefix_len_bits > (inet_family == AF_INET6 ? IPV6_ADDR_LEN_BITS - 32 + : IPV4_ADDR_LEN_BITS - 32)) + throw errors::RuntimeException( + "Not enough space in the content name to add key_id"); + + mask_buf[prefix_len_u32] = 0xffffffff; + keyId_component_buf[prefix_len_u32] = socket->key_id_; + socket->last_segment_ = 0; + + socket->on_interest_process_decrypted_ = + socket->parent_->on_interest_process_decrypted_; + + socket->registerPrefix( + Prefix(socket->parent_->served_namespaces_.front().getName( + Name(inet_family, (uint8_t *)&mask), + Name(inet_family, (uint8_t *)&keyId_component), + socket->parent_->served_namespaces_.front().getName()), + out_ip->len)); + socket->connect(); + } + return 1; +} + +void TLSProducerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char *out, + void *add_arg) { + free(const_cast<unsigned char *>(out)); +} + +int TLSProducerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char *in, size_t inlen, + X509 *x, size_t chainidx, int *al, + void *add_arg) { + return 1; +} + +int TLSProducerSocket::setSocketOption( + int socket_option_key, ProducerInterestCallback socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback socket_option_value) -> int { + int result = SOCKET_OPTION_SET; + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + on_interest_input_decrypted_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + on_interest_dropped_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + on_interest_inserted_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_HIT: + on_interest_satisfied_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_MISS: + on_interest_process_decrypted_ = socket_option_value; + break; + + default: + result = SOCKET_OPTION_NOT_SET; + break; + } + return result; + }); +} + +int TLSProducerSocket::setSocketOption( + int socket_option_key, ProducerContentCallback socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + on_content_produced_application_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int TLSProducerSocket::getSocketOption( + int socket_option_key, ProducerContentCallback **socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + *socket_option_value = &on_content_produced_application_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int TLSProducerSocket::getSocketOption( + int socket_option_key, ProducerContentCallback &socket_option_value) { + return rescheduleOnIOServiceWithReference( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback &socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + socket_option_value = on_content_produced_application_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int TLSProducerSocket::getSocketOption( + int socket_option_key, ProducerInterestCallback &socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOServiceWithReference( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback &socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + socket_option_value = on_interest_input_decrypted_; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + socket_option_value = on_interest_dropped_input_buffer_; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + socket_option_value = on_interest_inserted_input_buffer_; + break; + + case ProducerCallbacksOptions::CACHE_HIT: + socket_option_value = on_interest_satisfied_output_buffer_; + break; + + case ProducerCallbacksOptions::CACHE_MISS: + socket_option_value = on_interest_process_decrypted_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +} // namespace implementation + +} // namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h new file mode 100644 index 000000000..e910c8259 --- /dev/null +++ b/libtransport/src/implementation/tls_socket_producer.h @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <implementation/socket_producer.h> + +#include <openssl/ssl.h> +#include <condition_variable> +#include <mutex> + +namespace transport { +namespace implementation { + +class P2PSecureProducerSocket; + +class TLSProducerSocket : virtual public ProducerSocket { + friend class P2PSecureProducerSocket; + + public: + explicit TLSProducerSocket(interface::ProducerSocket *producer_socket, + P2PSecureProducerSocket *parent, + const Name &handshake_name); + + ~TLSProducerSocket(); + + uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size, + bool is_last = true, uint32_t start_offset = 0) override { + return produce(content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), + is_last, start_offset); + } + + uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, uint32_t start_offset = 0) override; + + void produce(ContentObject &content_object) override; + + void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size, + bool is_last = true, + uint32_t *start_offset = nullptr) override; + + void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t offset, + uint32_t **last_segment = nullptr) override; + + void asyncProduce(ContentObject &content_object) override; + + virtual void accept(); + + virtual int async_accept(); + + virtual int setSocketOption( + int socket_option_key, + ProducerInterestCallback socket_option_value) override; + + virtual int setSocketOption( + int socket_option_key, + ProducerContentCallback socket_option_value) override; + + virtual int getSocketOption( + int socket_option_key, + ProducerContentCallback **socket_option_value) override; + + int getSocketOption(int socket_option_key, + ProducerContentCallback &socket_option_value); + + int getSocketOption(int socket_option_key, + ProducerInterestCallback &socket_option_value); + + using ProducerSocket::getSocketOption; + using ProducerSocket::onInterest; + using ProducerSocket::setSocketOption; + + protected: + /* Callback invoked once an interest has been received and its payload + * decrypted */ + ProducerInterestCallback on_interest_input_decrypted_; + ProducerInterestCallback on_interest_process_decrypted_; + ProducerContentCallback on_content_produced_application_; + + std::mutex mtx_; + + /* Condition variable for the wait */ + std::condition_variable cv_; + + /* Bool variable, true if there is something to read (an interest arrived) */ + bool something_to_read_; + + /* First interest that open a secure connection */ + transport::core::Name name_; + + /* SSL handle */ + SSL *ssl_; + SSL_CTX *ctx_; + + Packet::MemBufPtr packet_; + + std::unique_ptr<utils::MemBuf> head_; + std::uint32_t last_segment_; + std::shared_ptr<utils::MemBuf> payload_; + std::uint32_t key_id_; + + std::thread *handshake; + P2PSecureProducerSocket *parent_; + + bool first_; + Name handshake_name_; + int tls_chunks_; + int to_call_oncontentproduced_; + + bool still_writing_; + + utils::EventThread encryption_thread_; + + void onInterest(ProducerSocket &p, Interest &interest); + void cacheMiss(interface::ProducerSocket &p, Interest &interest); + + /* Return the number of read bytes in readbytes */ + static int read(BIO *b, char *buf, size_t size, size_t *readbytes); + + /* Return the number of read bytes in the return param */ + static int readOld(BIO *h, char *buf, int size); + + /* Return the number of written bytes in written */ + static int write(BIO *b, const char *buf, size_t size, size_t *written); + + /* Return the number of written bytes in the return param */ + static int writeOld(BIO *h, const char *buf, int num); + + static long ctrl(BIO *b, int cmd, long num, void *ptr); + + static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context, + const unsigned char **out, size_t *outlen, X509 *x, + size_t chainidx, int *al, void *add_arg); + + static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, const unsigned char *out, + void *add_arg); + + static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, const unsigned char *in, + size_t inlen, X509 *x, size_t chainidx, int *al, + void *add_arg); + + void onContentProduced(interface::ProducerSocket &p, + const std::error_code &err, uint64_t bytes_written); +}; + +} // namespace implementation + +} // end namespace transport |