From 35058cdfe0134c88f1aa8d23342d1d7b9d39e296 Mon Sep 17 00:00:00 2001 From: Alberto Compagno Date: Tue, 7 Jan 2020 11:46:02 +0100 Subject: [HICN-2] Added P2P confidential communication on hICN P2P confidential communications exploit the TLS 1.3 protocol to let a consumer to establish a secure communication on an hICN name. Currently we don't support the consumer authentication (mutual authentication in TLS) and the 0-rtt session establishment. Change-Id: I2be073847c08a17f28c837d444081920c5e57a07 Signed-off-by: Alberto Compagno Signed-off-by: Olivier Roques Signed-off-by: Mauro Sardara --- .../src/hicn/transport/interfaces/CMakeLists.txt | 25 +- .../src/hicn/transport/interfaces/callbacks.h | 2 +- .../interfaces/p2psecure_socket_consumer.cc | 382 ++++++++++++++ .../interfaces/p2psecure_socket_consumer.h | 147 ++++++ .../interfaces/p2psecure_socket_producer.cc | 380 +++++++++++++ .../interfaces/p2psecure_socket_producer.h | 129 +++++ .../transport/interfaces/rtc_socket_producer.cc | 3 + .../transport/interfaces/rtc_socket_producer.h | 4 +- .../hicn/transport/interfaces/socket_consumer.cc | 24 +- .../hicn/transport/interfaces/socket_consumer.h | 27 +- .../transport/interfaces/socket_options_keys.h | 10 +- .../hicn/transport/interfaces/socket_producer.cc | 148 ++++-- .../hicn/transport/interfaces/socket_producer.h | 140 +++-- .../interfaces/tls_rtc_socket_producer.cc | 178 +++++++ .../transport/interfaces/tls_rtc_socket_producer.h | 58 ++ .../transport/interfaces/tls_socket_consumer.cc | 364 +++++++++++++ .../transport/interfaces/tls_socket_consumer.h | 132 +++++ .../transport/interfaces/tls_socket_producer.cc | 587 +++++++++++++++++++++ .../transport/interfaces/tls_socket_producer.h | 163 ++++++ 19 files changed, 2772 insertions(+), 131 deletions(-) create mode 100644 libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.cc create mode 100644 libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.h create mode 100644 libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.cc create mode 100644 libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.h create mode 100644 libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.cc create mode 100644 libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.h create mode 100644 libtransport/src/hicn/transport/interfaces/tls_socket_consumer.cc create mode 100644 libtransport/src/hicn/transport/interfaces/tls_socket_consumer.h create mode 100644 libtransport/src/hicn/transport/interfaces/tls_socket_producer.cc create mode 100644 libtransport/src/hicn/transport/interfaces/tls_socket_producer.h (limited to 'libtransport/src/hicn/transport/interfaces') diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt index 1f3c29b1f..88b83e9d4 100644 --- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt +++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt @@ -17,6 +17,11 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/socket.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h + ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h ${CMAKE_CURRENT_SOURCE_DIR}/publication_options.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h @@ -26,11 +31,29 @@ list(APPEND HEADER_FILES ) list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.cc ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.cc ) +if (${OPENSSL_VERSION} VERSION_EQUAL "1.1.1a" OR ${OPENSSL_VERSION} VERSION_GREATER "1.1.1a") + list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.cc + ) + + list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h + ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h + ) +endif() + set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h index 6de48d14b..9d3d57992 100644 --- a/libtransport/src/hicn/transport/interfaces/callbacks.h +++ b/libtransport/src/hicn/transport/interfaces/callbacks.h @@ -122,4 +122,4 @@ extern std::nullptr_t VOID_HANDLER; } // namespace interface -} // namespace transport \ No newline at end of file +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.cc new file mode 100644 index 000000000..ec966e509 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.cc @@ -0,0 +1,382 @@ +#include +#include +#include +#include + +#include + +namespace transport { + +namespace interface { + +void P2PSecureConsumerSocket::setInterestPayload( + ConsumerSocket &c, const core::Interest &interest) { + Interest &int2 = const_cast(interest); + random_suffix_ = int2.getName().getSuffix(); + + if (payload_ != NULL) int2.appendPayload(std::move(payload_)); +} + +// implement void readBufferAvailable(), size_t maxBufferSize() const override, +// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable() +// must be implemented even if empty. + +/* Return the number of read bytes in the return param */ +int readOld(BIO *b, char *buf, int size) { + if (size < 0) return size; + + P2PSecureConsumerSocket *socket; + socket = (P2PSecureConsumerSocket *)BIO_get_data(b); + + std::unique_lock lck(socket->mtx_); + + if (!socket->something_to_read_) { + if (!socket->transport_protocol_->isRunning()) { + socket->network_name_.setSuffix(socket->random_suffix_); + socket->ConsumerSocket::asyncConsume(socket->network_name_); + } + if (!socket->something_to_read_) socket->cv_.wait(lck); + } + + size_t size_to_read, read; + size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) + chain_size = socket->head_->computeChainDataLength(); + + if (chain_size > (size_t)size) { + read = size_to_read = (size_t)size; + } else { + read = size_to_read = chain_size; + socket->something_to_read_ = false; + } + + while (size_to_read) { + if (socket->head_->length() < size_to_read) { + std::memcpy(buf, socket->head_->data(), socket->head_->length()); + size_to_read -= socket->head_->length(); + buf += socket->head_->length(); + socket->head_ = socket->head_->pop(); + } else { + std::memcpy(buf, socket->head_->data(), size_to_read); + socket->head_->trimStart(size_to_read); + size_to_read = 0; + } + } + + return read; +} + +/* Return the number of read bytes in readbytes */ +int read(BIO *b, char *buf, size_t size, size_t *readbytes) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = transport::interface::readOld(b, buf, (int)size); + + if (ret <= 0) { + *readbytes = 0; + return ret; + } + + *readbytes = (size_t)ret; + + return 1; +} + +/* Return the number of written bytes in the return param */ +int writeOld(BIO *b, const char *buf, int num) { + P2PSecureConsumerSocket *socket; + socket = (P2PSecureConsumerSocket *)BIO_get_data(b); + + socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &P2PSecureConsumerSocket::setInterestPayload, socket, + std::placeholders::_1, std::placeholders::_2)); + + return num; +} + +/* Return the number of written bytes in written */ +int write(BIO *b, const char *buf, size_t size, size_t *written) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = transport::interface::writeOld(b, buf, (int)size); + + if (ret <= 0) { + *written = 0; + return ret; + } + + *written = (size_t)ret; + + return 1; +} + +long ctrl(BIO *b, int cmd, long num, void *ptr) { return 1; } + +int P2PSecureConsumerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char **out, + size_t *outlen, X509 *x, + size_t chainidx, int *al, + void *add_arg) { + if (ext_type == 100) { + *out = (unsigned char *)malloc(4); + *(uint32_t *)*out = 10; + *outlen = 4; + } + return 1; +} + +void P2PSecureConsumerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char *out, + void *add_arg) { + free(const_cast(out)); +} + +int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char *in, + size_t inlen, X509 *x, + size_t chainidx, int *al, + void *add_arg) { + P2PSecureConsumerSocket *socket = + reinterpret_cast(add_arg); + if (ext_type == 100) { + memcpy(&socket->secure_prefix_, in, sizeof(ip_prefix_t)); + } + return 1; +} + +P2PSecureConsumerSocket::P2PSecureConsumerSocket(int handshake_protocol, int transport_protocol) + : ConsumerSocket(handshake_protocol), + name_(), + tls_consumer_(), + buf_pool_(), + decrypted_content_(), + payload_(), + head_(), + something_to_read_(false), + content_downloaded_(false), + random_suffix_(), + secure_prefix_(), + producer_namespace_(), + read_callback_decrypted_(), + mtx_(), + cv_(), + protocol_(transport_protocol) { + /* Create the (d)TLS state */ + const SSL_METHOD *meth = TLS_client_method(); + ctx_ = SSL_CTX_new(meth); + + int result = + SSL_CTX_set_ciphersuites(ctx_, + "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" + "SHA256:TLS_AES_128_GCM_SHA256"); + if (result != 1) { + throw errors::RuntimeException( + "Unable to set cipher list on TLS subsystem. Aborting."); + } + + SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL); + SSL_CTX_set_ssl_version(ctx_, meth); + + result = SSL_CTX_add_custom_ext( + ctx_, 100, SSL_EXT_CLIENT_HELLO | SSL_EXT_TLS1_3_ENCRYPTED_EXTENSIONS, + P2PSecureConsumerSocket::addHicnKeyIdCb, + P2PSecureConsumerSocket::freeHicnKeyIdCb, NULL, + P2PSecureConsumerSocket::parseHicnKeyIdCb, this); + + ssl_ = SSL_new(ctx_); + + bio_meth_ = BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket"); + BIO_meth_set_read(bio_meth_, transport::interface::readOld); + BIO_meth_set_write(bio_meth_, transport::interface::writeOld); + BIO_meth_set_ctrl(bio_meth_, transport::interface::ctrl); + BIO *bio = BIO_new(bio_meth_); + BIO_set_init(bio, 1); + BIO_set_data(bio, this); + SSL_set_bio(ssl_, bio, bio); + + ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + + ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); + + std::default_random_engine generator; + std::uniform_int_distribution distribution( + 1, std::numeric_limits::max()); + random_suffix_ = 0; + + this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + this); +}; + +P2PSecureConsumerSocket::~P2PSecureConsumerSocket() { + BIO_meth_free(bio_meth_); + SSL_shutdown(ssl_); +} + +int P2PSecureConsumerSocket::consume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + network_name_ = producer_namespace_.getRandomName(); + network_name_.setSuffix(0); + int result = SSL_connect(this->ssl_); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + if (result != 1) + throw errors::RuntimeException("Unable to perform client handshake"); + } + std::shared_ptr prefix_name = std::make_shared( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr prefix = + std::make_shared(*prefix_name, secure_prefix_.len); + TLSConsumerSocket tls_consumer(this->protocol_, this->ssl_); + + ConsumerTimerCallback *stats_summary_callback = nullptr; + this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_summary_callback); + + uint32_t lifetime; + this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + read_callback_decrypted_); + tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + *stats_summary_callback); + tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL, + this->timer_interval_milliseconds_); + tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + tls_consumer.connect(); + + if (payload_ != NULL) + return tls_consumer.consume((prefix->mapName(name)), std::move(payload_)); + else + return tls_consumer.consume((prefix->mapName(name))); +} + +int P2PSecureConsumerSocket::asyncConsume(const Name &name) { + if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + network_name_ = producer_namespace_.getRandomName(); + network_name_.setSuffix(0); + TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); + interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; + this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload); + int result = SSL_connect(this->ssl_); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + if (result != 1) + throw errors::RuntimeException("Unable to perform client handshake"); + TRANSPORT_LOGD("Handshake performed!"); + } + + std::shared_ptr prefix_name = std::make_shared( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr prefix = + std::make_shared(*prefix_name, secure_prefix_.len); + tls_consumer_ = + std::make_shared(this->protocol_, this->ssl_); + + ConsumerTimerCallback *stats_summary_callback = nullptr; + this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_summary_callback); + + uint32_t lifetime; + this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + read_callback_decrypted_); + tls_consumer_->setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + *stats_summary_callback); + tls_consumer_->setSocketOption(GeneralTransportOptions::STATS_INTERVAL, + this->timer_interval_milliseconds_); + tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + tls_consumer_->connect(); + + if (payload_ != NULL) + return tls_consumer_->asyncConsume((prefix->mapName(name)), + std::move(payload_)); + else + return tls_consumer_->asyncConsume((prefix->mapName(name))); +} + +void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) { + producer_namespace_ = producer_namespace; +} + +int P2PSecureConsumerSocket::setSocketOption( + int socket_option_key, ConsumerSocket::ReadCallback *socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerSocket::ReadCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_decrypted_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +void P2PSecureConsumerSocket::getReadBuffer(uint8_t **application_buffer, + size_t *max_length){}; + +void P2PSecureConsumerSocket::readDataAvailable(size_t length) noexcept {}; + +size_t P2PSecureConsumerSocket::maxBufferSize() const { + return SSL3_RT_MAX_PLAIN_LENGTH; +} + +void P2PSecureConsumerSocket::readBufferAvailable( + std::unique_ptr &&buffer) noexcept { + std::unique_lock lck(this->mtx_); + if (head_) { + head_->prependChain(std::move(buffer)); + } else { + head_ = std::move(buffer); + } + + something_to_read_ = true; + cv_.notify_one(); +} + +void P2PSecureConsumerSocket::readError(const std::error_code ec) noexcept {}; + +void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept { + std::unique_lock lck(this->mtx_); + content_downloaded_ = true; + something_to_read_ = true; + cv_.notify_one(); +} + +bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; } + +} // namespace interface + +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.h b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.h new file mode 100644 index 000000000..ff867f07b --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.h @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace transport { + +namespace interface { + +class P2PSecureConsumerSocket : public ConsumerSocket, + public ConsumerSocket::ReadCallback { + /* Return the number of read bytes in readbytes */ + friend int read(BIO *b, char *buf, size_t size, size_t *readbytes); + + /* Return the number of read bytes in the return param */ + friend int readOld(BIO *h, char *buf, int size); + + /* Return the number of written bytes in written */ + friend int write(BIO *b, const char *buf, size_t size, size_t *written); + + /* Return the number of written bytes in the return param */ + friend int writeOld(BIO *h, const char *buf, int num); + + friend long ctrl(BIO *b, int cmd, long num, void *ptr); + + public: + explicit P2PSecureConsumerSocket(int handshake_protocol, int transport_protocol); + + ~P2PSecureConsumerSocket(); + + int consume(const Name &name) override; + + int asyncConsume(const Name &name) override; + + void registerPrefix(const Prefix &producer_namespace); + + int setSocketOption( + int socket_option_key, + ConsumerSocket::ReadCallback *socket_option_value) override; + + using ConsumerSocket::getSocketOption; + using ConsumerSocket::setSocketOption; + + protected: + /* Callback invoked once an interest has been received and its payload + * decrypted */ + ConsumerInterestCallback on_interest_input_decrypted_; + ConsumerInterestCallback on_interest_process_decrypted_; + + private: + Name name_; + std::shared_ptr tls_consumer_; + + /* SSL handle */ + SSL *ssl_; + SSL_CTX *ctx_; + BIO_METHOD *bio_meth_; + + /* Chain of MemBuf to be used as a temporary buffer to pass descypted data + * from the underlying layer to the application */ + utils::ObjectPool buf_pool_; + std::unique_ptr decrypted_content_; + + /* Chain of MemBuf holding the payload to be written into interest or data */ + std::unique_ptr payload_; + + /* Chain of MemBuf holding the data retrieved from the underlying layer */ + std::unique_ptr head_; + + bool something_to_read_; + + bool content_downloaded_; + + double old_max_win_; + + double old_current_win_; + + uint32_t random_suffix_; + + ip_prefix_t secure_prefix_; + + Prefix producer_namespace_; + + ConsumerSocket::ReadCallback *read_callback_decrypted_; + + std::mutex mtx_; + + /* Condition variable for the wait */ + std::condition_variable cv_; + + int protocol_; + + void setInterestPayload(ConsumerSocket &c, const core::Interest &interest); + void processPayload(ConsumerSocket &c, std::size_t bytes_transferred, + const std::error_code &ec); + + static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context, + const unsigned char **out, size_t *outlen, X509 *x, + size_t chainidx, int *al, void *add_arg); + + static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, const unsigned char *out, + void *add_arg); + + static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, const unsigned char *in, + size_t inlen, X509 *x, size_t chainidx, int *al, + void *add_arg); + + virtual void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override; + + virtual void readDataAvailable(size_t length) noexcept override; + + virtual size_t maxBufferSize() const override; + + virtual void readBufferAvailable( + std::unique_ptr &&buffer) noexcept override; + + virtual void readError(const std::error_code ec) noexcept override; + + virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; + + int download_content(const Name &name); +}; + +} // namespace interface + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.cc new file mode 100644 index 000000000..8850bde8a --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.cc @@ -0,0 +1,380 @@ +#include +#include +#include +#include + +#include +#include +#include + +namespace transport { + +namespace interface { + +/* Workaround to prevent content with expiry time equal to 0 to be lost when + * pushed in the forwarder */ +#define HICN_HANDSHAKE_CONTENT_EXPIRY_TIME 100; + +P2PSecureProducerSocket::P2PSecureProducerSocket() + : ProducerSocket(), + mtx_(), + cv_(), + map_secure_producers(), + map_secure_rtc_producers(), + list_secure_producers() {} + +P2PSecureProducerSocket::P2PSecureProducerSocket( + bool rtc, const std::shared_ptr &identity) + : ProducerSocket(), + rtc_(rtc), + mtx_(), + cv_(), + map_secure_producers(), + map_secure_rtc_producers(), + list_secure_producers() { + /* + * Setup SSL context (identity and parameter to use TLS 1.3) + */ + der_cert_ = parcKeyStore_GetDEREncodedCertificate( + (identity->getSigner()->getKeyStore())); + der_prk_ = parcKeyStore_GetDEREncodedPrivateKey( + (identity->getSigner()->getKeyStore())); + + int cert_size = parcBuffer_Limit(der_cert_); + int prk_size = parcBuffer_Limit(der_prk_); + const uint8_t *cert = + reinterpret_cast(parcBuffer_Overlay(der_cert_, cert_size)); + const uint8_t *prk = + reinterpret_cast(parcBuffer_Overlay(der_prk_, prk_size)); + cert_509_ = d2i_X509(NULL, &cert, cert_size); + pkey_rsa_ = d2i_AutoPrivateKey(NULL, &prk, prk_size); + + /* + * Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. + */ + ProducerSocket::setSocketOption( + ProducerCallbacksOptions::INTEREST_INPUT, + (ProducerInterestCallback)std::bind( + &P2PSecureProducerSocket::onInterestCallback, this, + std::placeholders::_1, std::placeholders::_2)); +} + +P2PSecureProducerSocket::~P2PSecureProducerSocket() { + if (der_cert_) parcBuffer_Release(&der_cert_); + if (der_prk_) parcBuffer_Release(&der_prk_); +} + +void P2PSecureProducerSocket::onInterestCallback(ProducerSocket &p, + Interest &interest) { + std::unique_lock lck(mtx_); + + TRANSPORT_LOGD("Start handshake at %s", interest.getName().toString().c_str()); + if (!rtc_) { + auto it = map_secure_producers.find(interest.getName()); + if (it != map_secure_producers.end()) return; + TLSProducerSocket *tls_producer = + new TLSProducerSocket(this, interest.getName()); + tls_producer->on_content_produced_application_ = + this->on_content_produced_application_; + tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, + this->content_object_expiry_time_); + tls_producer->setSocketOption(SIGNER, this->signer_); + tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); + tls_producer->setSocketOption(DATA_PACKET_SIZE, + (uint32_t)(this->data_packet_size_)); + tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); + map_secure_producers.insert( + {interest.getName(), std::unique_ptr(tls_producer)}); + tls_producer->onInterest(*tls_producer, interest); + tls_producer->async_accept(); + } else { + auto it = map_secure_rtc_producers.find(interest.getName()); + if (it != map_secure_rtc_producers.end()) return; + TLSRTCProducerSocket *tls_producer = + new TLSRTCProducerSocket(this, interest.getName()); + tls_producer->on_content_produced_application_ = + this->on_content_produced_application_; + tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME, + this->content_object_expiry_time_); + tls_producer->setSocketOption(SIGNER, this->signer_); + tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); + tls_producer->setSocketOption(DATA_PACKET_SIZE, + (uint32_t)(this->data_packet_size_)); + tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); + map_secure_rtc_producers.insert( + {interest.getName(), + std::unique_ptr(tls_producer)}); + tls_producer->onInterest(*tls_producer, interest); + tls_producer->async_accept(); + } +} + +void P2PSecureProducerSocket::produce(const uint8_t *buffer, + size_t buffer_size) { + if (!rtc_) { + throw errors::RuntimeException( + "RTC must be the transport protocol to start the production of current " + "data. Aborting."); + } + + std::unique_lock lck(mtx_); + if (list_secure_rtc_producers.empty()) cv_.wait(lck); + + for (auto it = list_secure_rtc_producers.cbegin(); + it != list_secure_rtc_producers.cend(); it++) { + (*it)->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); + } +} + +uint32_t P2PSecureProducerSocket::produce( + Name content_name, std::unique_ptr &&buffer, bool is_last, + uint32_t start_offset) { + if (rtc_) { + throw errors::RuntimeException( + "RTC transport protocol is not compatible with the production of " + "current data. Aborting."); + } + + std::unique_lock lck(mtx_); + uint32_t segments = 0; + if (list_secure_producers.empty()) cv_.wait(lck); + + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + segments += + (*it)->produce(content_name, buffer->clone(), is_last, start_offset); + return segments; +} + +uint32_t P2PSecureProducerSocket::produce(Name content_name, + const uint8_t *buffer, + size_t buffer_size, bool is_last, + uint32_t start_offset) { + if (rtc_) { + throw errors::RuntimeException( + "RTC transport protocol is not compatible with the production of " + "current data. Aborting."); + } + + std::unique_lock lck(mtx_); + uint32_t segments = 0; + if (list_secure_producers.empty()) cv_.wait(lck); + + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + segments += (*it)->produce(content_name, buffer, buffer_size, is_last, + start_offset); + return segments; +} + +void P2PSecureProducerSocket::asyncProduce(const Name &content_name, + const uint8_t *buf, + size_t buffer_size, bool is_last, + uint32_t *start_offset) { + if (rtc_) { + throw errors::RuntimeException( + "RTC transport protocol is not compatible with the production of " + "current data. Aborting."); + } + + std::unique_lock lck(mtx_); + if (list_secure_producers.empty()) cv_.wait(lck); + + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) { + (*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset); + } +} + +void P2PSecureProducerSocket::asyncProduce( + Name content_name, std::unique_ptr &&buffer, bool is_last, + uint32_t offset, uint32_t **last_segment) { + if (rtc_) { + throw errors::RuntimeException( + "RTC transport protocol is not compatible with the production of " + "current data. Aborting."); + } + + std::unique_lock lck(mtx_); + if (list_secure_producers.empty()) cv_.wait(lck); + + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) { + (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset, + last_segment); + } +} + +// Socket Option Redefinition to avoid name hiding + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, ProducerInterestCallback socket_option_value) { + if (!list_secure_producers.empty()) { + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + } + + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + on_interest_input_decrypted_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::INTEREST_DROP: + on_interest_dropped_input_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::INTEREST_PASS: + on_interest_inserted_input_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CACHE_HIT: + on_interest_satisfied_output_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CACHE_MISS: + on_interest_process_decrypted_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, + const std::shared_ptr &socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + switch (socket_option_key) { + case GeneralTransportOptions::SIGNER: { + signer_.reset(); + signer_ = socket_option_value; + + return SOCKET_OPTION_SET; + } + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int P2PSecureProducerSocket::setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + if (!list_secure_producers.empty()) { + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + } + switch (socket_option_key) { + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + content_object_expiry_time_ = + socket_option_value; // HICN_HANDSHAKE_CONTENT_EXPIRY_TIME; + return SOCKET_OPTION_SET; + } + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption(int socket_option_key, + bool socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption(int socket_option_key, + Name *socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, std::list socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, ProducerContentObjectCallback socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, ProducerContentCallback socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + on_content_produced_application_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, HashAlgorithm socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, utils::CryptoSuite socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +int P2PSecureProducerSocket::setSocketOption( + int socket_option_key, const std::string &socket_option_value) { + if (!list_secure_producers.empty()) + for (auto it = list_secure_producers.cbegin(); + it != list_secure_producers.cend(); it++) + (*it)->setSocketOption(socket_option_key, socket_option_value); + + return ProducerSocket::setSocketOption(socket_option_key, + socket_option_value); +} + +} // namespace interface + +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.h b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.h new file mode 100644 index 000000000..ba3fa0189 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.h @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace transport { + +namespace interface { + +class P2PSecureProducerSocket : public ProducerSocket { + friend class TLSProducerSocket; + friend class TLSRTCProducerSocket; + + public: + explicit P2PSecureProducerSocket(); + explicit P2PSecureProducerSocket( + bool rtc, const std::shared_ptr &identity); + ~P2PSecureProducerSocket(); + + void produce(const uint8_t *buffer, size_t buffer_size) override; + + uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size, + bool is_last = true, uint32_t start_offset = 0) override; + + uint32_t produce(Name content_name, std::unique_ptr &&buffer, + bool is_last = true, uint32_t start_offset = 0) override; + + void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size, + bool is_last = true, + uint32_t *start_offset = nullptr) override; + + void asyncProduce(Name content_name, std::unique_ptr &&buffer, + bool is_last, uint32_t offset, + uint32_t **last_segment = nullptr) override; + + int setSocketOption(int socket_option_key, + ProducerInterestCallback socket_option_value) override; + + int setSocketOption( + int socket_option_key, + const std::shared_ptr &socket_option_value) override; + + int setSocketOption(int socket_option_key, + uint32_t socket_option_value) override; + + int setSocketOption(int socket_option_key, bool socket_option_value) override; + + int setSocketOption(int socket_option_key, + Name *socket_option_value) override; + + int setSocketOption(int socket_option_key, + std::list socket_option_value) override; + + int setSocketOption( + int socket_option_key, + ProducerContentObjectCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ProducerContentCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value) override; + + int setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value) override; + + int setSocketOption(int socket_option_key, + const std::string &socket_option_value) override; + + using ProducerSocket::getSocketOption; + using ProducerSocket::onInterest; + + protected: + bool rtc_; + /* Callback invoked once an interest has been received and its payload + * decrypted */ + ProducerInterestCallback on_interest_input_decrypted_; + ProducerInterestCallback on_interest_process_decrypted_; + ProducerContentCallback on_content_produced_application_; + + private: + std::mutex mtx_; + + /* Condition variable for the wait */ + std::condition_variable cv_; + + PARCBuffer *der_cert_; + PARCBuffer *der_prk_; + X509 *cert_509_; + EVP_PKEY *pkey_rsa_; + std::unordered_map, + core::hash, core::compare2> + map_secure_producers; + std::unordered_map, + core::hash, core::compare2> + map_secure_rtc_producers; + std::list> list_secure_producers; + std::list> list_secure_rtc_producers; + + void onInterestCallback(ProducerSocket &p, Interest &interest); +}; + +} // namespace interface + +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index bb93e0535..fefa419a9 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -171,6 +171,7 @@ void RTCProducerSocket::produce(std::unique_ptr &&buffer) { on_content_object_in_output_buffer_(*this, *content_object); } + TRANSPORT_LOGD("Send content %u (produce)", content_object->getName().getSuffix()); portal_->sendContentObject(*content_object); if (on_content_object_output_) { @@ -222,6 +223,7 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { on_content_object_output_(*this, *content_object); } + TRANSPORT_LOGD("Send content %u (onInterest)", content_object->getName().getSuffix()); portal_->sendContentObject(*content_object); return; } else { @@ -357,6 +359,7 @@ void RTCProducerSocket::sendNack(uint32_t sequence) { on_content_object_output_(*this, nack); } + TRANSPORT_LOGD("Send nack %u", sequence); portal_->sendContentObject(nack); } diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index 37ba88d8a..d7917a8c0 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -26,7 +26,7 @@ namespace transport { namespace interface { -class RTCProducerSocket : public ProducerSocket { +class RTCProducerSocket : virtual public ProducerSocket { public: RTCProducerSocket(asio::io_service &io_service); @@ -36,7 +36,7 @@ class RTCProducerSocket : public ProducerSocket { void registerPrefix(const Prefix &producer_namespace) override; - void produce(std::unique_ptr &&buffer) override; + virtual void produce(std::unique_ptr &&buffer) override; void onInterest(Interest::Ptr &&interest) override; diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index fba972fe5..b2c054947 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -48,6 +48,7 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) rate_estimation_choice_(0), verifier_(std::make_shared()), verify_signature_(false), + key_content_(false), on_interest_output_(VOID_HANDLER), on_interest_timeout_(VOID_HANDLER), on_interest_satisfied_(VOID_HANDLER), @@ -106,9 +107,13 @@ int ConsumerSocket::asyncConsume(const Name &name) { return CONSUMER_RUNNING; } +bool ConsumerSocket::verifyKeyPackets() { + return transport_protocol_->verifyKeyPackets(); +} + void ConsumerSocket::stop() { - if (transport_protocol_->isRunning()) { - transport_protocol_->stop(); + if (transport_protocol_) { + if (transport_protocol_->isRunning()) transport_protocol_->stop(); } } @@ -312,6 +317,11 @@ int ConsumerSocket::setSocketOption(int socket_option_key, result = SOCKET_OPTION_SET; break; + case GeneralTransportOptions::KEY_CONTENT: + key_content_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + default: return result; } @@ -461,6 +471,7 @@ int ConsumerSocket::setSocketOption( if (!transport_protocol_->isRunning()) { switch (socket_option_key) { case GeneralTransportOptions::VERIFIER: + verifier_.reset(); verifier_ = socket_option_value; result = SOCKET_OPTION_SET; break; @@ -479,10 +490,7 @@ int ConsumerSocket::setSocketOption(int socket_option_key, switch (socket_option_key) { case GeneralTransportOptions::CERTIFICATE: key_id_ = verifier_->addKeyFromCertificate(socket_option_value); - - if (key_id_ != nullptr) { - result = SOCKET_OPTION_SET; - } + if (key_id_ != nullptr) result = SOCKET_OPTION_SET; break; case DataLinkOptions::OUTPUT_INTERFACE: @@ -614,6 +622,10 @@ int ConsumerSocket::getSocketOption(int socket_option_key, socket_option_value = verify_signature_; break; + case GeneralTransportOptions::KEY_CONTENT: + socket_option_value = key_content_; + break; + default: return SOCKET_OPTION_NOT_GET; } diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index acce28c1d..48a594adf 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -132,6 +132,8 @@ class ConsumerSocket : public BaseSocket { * the application when the transfer is done. */ virtual void readSuccess(std::size_t total_size) noexcept = 0; + + virtual void afterRead() {} }; /** @@ -181,8 +183,16 @@ class ConsumerSocket : public BaseSocket { * content retrieval succeeded. This information can be obtained from the * error code in CONTENT_RETRIEVED callback. */ - int consume(const Name &name); - int asyncConsume(const Name &name); + virtual int consume(const Name &name); + virtual int asyncConsume(const Name &name); + + /** + * Verify the packets containing a key after the origin of the key has been + * validated by the client. + * + * @return true if all packets are valid, false otherwise + */ + virtual bool verifyKeyPackets(); /** * Stops the consumer socket. If several downloads are queued (using @@ -330,16 +340,14 @@ class ConsumerSocket : public BaseSocket { return result; } - private: + // context inner state variables asio::io_service internal_io_service_; asio::io_service &io_service_; std::shared_ptr portal_; + utils::EventThread async_downloader_; - // No need to protect from multiple accesses in the async consumer - // The parameter is accessible only with a getSocketOption and - // set from the consume Name network_name_; int interest_lifetime_; @@ -362,17 +370,22 @@ class ConsumerSocket : public BaseSocket { int rate_estimation_batching_parameter_; int rate_estimation_choice_; + bool is_async_; + // Verification parameters std::shared_ptr verifier_; PARCKeyId *key_id_; std::atomic_bool verify_signature_; + std::atomic_bool key_content_; ConsumerInterestCallback on_interest_retransmission_; ConsumerInterestCallback on_interest_output_; ConsumerInterestCallback on_interest_timeout_; ConsumerInterestCallback on_interest_satisfied_; + ConsumerContentObjectCallback on_content_object_input_; ConsumerContentObjectVerificationCallback on_content_object_verification_; + ConsumerContentObjectCallback on_content_object_; ConsumerManifestCallback on_manifest_; ConsumerTimerCallback stats_summary_; @@ -396,4 +409,4 @@ class ConsumerSocket : public BaseSocket { } // namespace interface -} // end namespace transport \ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h index b25bacbb9..a38131271 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h @@ -34,7 +34,7 @@ typedef enum { DATA_PACKET_SIZE = 106, INTEREST_LIFETIME = 107, CONTENT_OBJECT_EXPIRY_TIME = 108, - KEY_LOCATOR = 110, + KEY_CONTENT = 110, MIN_WINDOW_SIZE = 111, MAX_WINDOW_SIZE = 112, CURRENT_WINDOW_SIZE = 113, @@ -45,11 +45,11 @@ typedef enum { APPLICATION_BUFFER = 118, HASH_ALGORITHM = 119, CRYPTO_SUITE = 120, - IDENTITY = 121, + SIGNER = 121, VERIFIER = 122, CERTIFICATE = 123, VERIFY_SIGNATURE = 124, - STATS_INTERVAL = 125 + STATS_INTERVAL = 125, } GeneralTransportOptions; typedef enum { @@ -92,7 +92,7 @@ typedef enum { CONTENT_OBJECT_SIGN = 513, CONTENT_OBJECT_READY = 510, CONTENT_OBJECT_OUTPUT = 511, - CONTENT_PRODUCED = 512 + CONTENT_PRODUCED = 512, } ProducerCallbacksOptions; typedef enum { OUTPUT_INTERFACE = 601 } DataLinkOptions; @@ -110,4 +110,4 @@ typedef enum { } // namespace interface -} // end namespace transport \ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 4fef5d1e2..26a7208b6 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -14,7 +14,7 @@ */ #include -#include +#include #include @@ -35,6 +35,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) data_packet_size_(default_values::content_object_packet_size), content_object_expiry_time_(default_values::content_object_expiry_time), output_buffer_(default_values::producer_socket_output_buffer_size), + async_thread_(), registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), hash_algorithm_(HashAlgorithm::SHA_256), @@ -96,51 +97,47 @@ void ProducerSocket::listen() { void ProducerSocket::passContentObjectToCallbacks( const std::shared_ptr &content_object) { if (content_object) { - if (on_new_segment_) { - io_service_.dispatch([this, content_object]() { + io_service_.dispatch([this, content_object]() { + if (on_new_segment_) { on_new_segment_(*this, *content_object); - }); - } + } - if (on_content_object_to_sign_) { - io_service_.dispatch([this, content_object]() { + if (on_content_object_to_sign_) { on_content_object_to_sign_(*this, *content_object); - }); - } + } - if (on_content_object_in_output_buffer_) { - io_service_.dispatch([this, content_object]() { + if (on_content_object_in_output_buffer_) { on_content_object_in_output_buffer_(*this, *content_object); - }); - } + } + }); output_buffer_.insert(content_object); - if (on_content_object_output_) { - io_service_.dispatch([this, content_object]() { + io_service_.dispatch([this, content_object]() { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); - }); - } + } + }); portal_->sendContentObject(*content_object); } } void ProducerSocket::produce(ContentObject &content_object) { - if (on_content_object_in_output_buffer_) { - io_service_.dispatch([this, &content_object]() { + io_service_.dispatch([this, &content_object]() { + if (on_content_object_in_output_buffer_) { on_content_object_in_output_buffer_(*this, content_object); - }); - } + } + }); output_buffer_.insert(std::static_pointer_cast( content_object.shared_from_this())); - if (on_content_object_output_) { - io_service_.dispatch([this, &content_object]() { + io_service_.dispatch([this, &content_object]() { + if (on_content_object_output_) { on_content_object_output_(*this, content_object); - }); - } + } + }); portal_->sendContentObject(content_object); } @@ -160,8 +157,8 @@ uint32_t ProducerSocket::produce(Name content_name, bool making_manifest = making_manifest_; auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( suffix_strategy_, start_offset); - std::shared_ptr identity; - getSocketOption(GeneralTransportOptions::IDENTITY, identity); + std::shared_ptr signer; + getSocketOption(GeneralTransportOptions::SIGNER, signer); auto buffer_size = buffer->length(); int bytes_segmented = 0; @@ -176,7 +173,7 @@ uint32_t ProducerSocket::produce(Name content_name, bool is_last_manifest = false; // TODO Manifest may still be used for indexing - if (making_manifest && !identity) { + if (making_manifest && !signer) { TRANSPORT_LOGD("Making manifests without setting producer identity."); } @@ -197,11 +194,11 @@ uint32_t ProducerSocket::produce(Name content_name, format = hf_format; if (making_manifest) { manifest_header_size = core::Packet::getHeaderSizeFromFormat( - identity ? hf_format_ah : hf_format, - identity ? identity->getSignatureLength() : 0); - } else if (identity) { + signer ? hf_format_ah : hf_format, + signer ? signer->getSignatureLength() : 0); + } else if (signer) { format = hf_format_ah; - signature_length = identity->getSignatureLength(); + signature_length = signer->getSignatureLength(); } header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length); @@ -227,7 +224,7 @@ uint32_t ProducerSocket::produce(Name content_name, content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, content_name, suffix_strategy_, - identity ? identity->getSignatureLength() : 0)); + signer ? signer->getSignatureLength() : 0)); manifest->setLifetime(content_object_expiry_time); if (is_last) { @@ -237,6 +234,7 @@ uint32_t ProducerSocket::produce(Name content_name, } } + TRANSPORT_LOGD("--------- START PRODUCE ----------"); for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments; packaged_segments++) { if (making_manifest) { @@ -246,15 +244,18 @@ uint32_t ProducerSocket::produce(Name content_name, manifest->encode(); // If identity set, sign manifest - if (identity) { - identity->getSigner().sign(*manifest); + if (signer) { + signer->sign(*manifest); } passContentObjectToCallbacks(manifest); + TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); // Send content objects stored in the queue while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); + TRANSPORT_LOGD("Send content %u", + content_queue_.front()->getName().getSuffix()); content_queue_.pop(); } @@ -266,7 +267,7 @@ uint32_t ProducerSocket::produce(Name content_name, core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, content_name, suffix_strategy_, - identity ? identity->getSignatureLength() : 0)); + signer ? signer->getSignatureLength() : 0)); manifest->setLifetime(content_object_expiry_time); manifest->setFinalBlockNumber( @@ -307,10 +308,11 @@ uint32_t ProducerSocket::produce(Name content_name, manifest->addSuffixHash(content_suffix, hash); content_queue_.push(content_object); } else { - if (identity) { - identity->getSigner().sign(*content_object); + if (signer) { + signer->sign(*content_object); } passContentObjectToCallbacks(content_object); + TRANSPORT_LOGD("Send content %u", content_object->getName().getSuffix()); } } @@ -320,24 +322,28 @@ uint32_t ProducerSocket::produce(Name content_name, } manifest->encode(); - if (identity) { - identity->getSigner().sign(*manifest); + if (signer) { + signer->sign(*manifest); } passContentObjectToCallbacks(manifest); + TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix()); while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); + TRANSPORT_LOGD("Send content %u", + content_queue_.front()->getName().getSuffix()); content_queue_.pop(); } } - if (on_content_produced_) { - io_service_.dispatch([this, buffer_size]() { + io_service_.dispatch([this, buffer_size]() { + if (on_content_produced_) { on_content_produced_(*this, std::make_error_code(std::errc(0)), buffer_size); - }); - } + } + }); + TRANSPORT_LOGD("--------- END PRODUCE ------------"); return suffix_strategy->getTotalCount(); } @@ -346,16 +352,42 @@ void ProducerSocket::asyncProduce(ContentObject &content_object) { auto co_ptr = std::static_pointer_cast( content_object.shared_from_this()); async_thread_.add([this, content_object = std::move(co_ptr)]() { - produce(*content_object); + ProducerSocket::produce(*content_object); }); } } void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, - size_t buffer_size) { + size_t buffer_size, bool is_last, + uint32_t *start_offset) { if (!async_thread_.stopped()) { - async_thread_.add([this, suffix, buffer = buf, size = buffer_size]() { - produce(suffix, buffer, size, 0, false); + async_thread_.add([this, suffix, buffer = buf, size = buffer_size, is_last, + start_offset]() { + if (start_offset != NULL) { + *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last, + *start_offset); + } else { + ProducerSocket::produce(suffix, buffer, size, is_last, 0); + } + }); + } +} + +void ProducerSocket::asyncProduce(Name content_name, + std::unique_ptr &&buffer, + bool is_last, uint32_t offset, + uint32_t **last_segment) { + if (!async_thread_.stopped()) { + auto a = buffer.release(); + async_thread_.add([this, content_name, a, is_last, offset, last_segment]() { + auto buf = std::unique_ptr(a); + if (last_segment != NULL) { + **last_segment = + offset + ProducerSocket::produce(content_name, std::move(buf), + is_last, offset); + } else { + ProducerSocket::produce(content_name, std::move(buf), is_last, offset); + } }); } } @@ -392,8 +424,8 @@ int ProducerSocket::setSocketOption(int socket_option_key, if (socket_option_value < default_values::max_content_object_size && socket_option_value > 0) { data_packet_size_ = socket_option_value; - break; } + break; case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: output_buffer_.setLimit(socket_option_value); @@ -632,12 +664,12 @@ int ProducerSocket::setSocketOption(int socket_option_key, int ProducerSocket::setSocketOption( int socket_option_key, - const std::shared_ptr &socket_option_value) { + const std::shared_ptr &socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::IDENTITY: { - utils::SpinLock::Acquire locked(identity_lock_); - identity_.reset(); - identity_ = socket_option_value; + case GeneralTransportOptions::SIGNER: { + utils::SpinLock::Acquire locked(signer_lock_); + signer_.reset(); + signer_ = socket_option_value; } break; default: return SOCKET_OPTION_NOT_SET; @@ -844,11 +876,11 @@ int ProducerSocket::getSocketOption(int socket_option_key, int ProducerSocket::getSocketOption( int socket_option_key, - std::shared_ptr &socket_option_value) { + std::shared_ptr &socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::IDENTITY: { - utils::SpinLock::Acquire locked(identity_lock_); - socket_option_value = identity_; + case GeneralTransportOptions::SIGNER: { + utils::SpinLock::Acquire locked(signer_lock_); + socket_option_value = signer_; } break; default: return SOCKET_OPTION_NOT_GET; diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index ff6f49723..5f360f2ce 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -18,10 +18,12 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -31,10 +33,6 @@ #define REGISTRATION_FAILURE 2 #define REGISTRATION_IN_PROGRESS 3 -namespace utils { -class Identity; -} - namespace transport { namespace interface { @@ -53,16 +51,19 @@ class ProducerSocket : public Socket, bool isRunning() override { return !io_service_.stopped(); }; - uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size, - bool is_last = true, uint32_t start_offset = 0) { - return produce(content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), - is_last, start_offset); + virtual uint32_t produce(Name content_name, const uint8_t *buffer, + size_t buffer_size, bool is_last = true, + uint32_t start_offset = 0) { + return ProducerSocket::produce( + content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), is_last, + start_offset); } - uint32_t produce(Name content_name, std::unique_ptr &&buffer, - bool is_last = true, uint32_t start_offset = 0); + virtual uint32_t produce(Name content_name, + std::unique_ptr &&buffer, + bool is_last = true, uint32_t start_offset = 0); - void produce(ContentObject &content_object); + virtual void produce(ContentObject &content_object); virtual void produce(const uint8_t *buffer, size_t buffer_size) { produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); @@ -74,11 +75,18 @@ class ProducerSocket : public Socket, throw errors::NotImplementedException(); } - void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size); + virtual void asyncProduce(const Name &suffix, const uint8_t *buf, + size_t buffer_size, bool is_last = true, + uint32_t *start_offset = nullptr); void asyncProduce(const Name &suffix); - void asyncProduce(ContentObject &content_object); + virtual void asyncProduce(Name content_name, + std::unique_ptr &&buffer, + bool is_last, uint32_t offset, + uint32_t **last_segment = nullptr); + + virtual void asyncProduce(ContentObject &content_object); virtual void registerPrefix(const Prefix &producer_namespace); @@ -124,7 +132,7 @@ class ProducerSocket : public Socket, virtual int setSocketOption( int socket_option_key, - const std::shared_ptr &socket_option_value); + const std::shared_ptr &socket_option_value); virtual int setSocketOption(int socket_option_key, const std::string &socket_option_value); @@ -158,15 +166,77 @@ class ProducerSocket : public Socket, virtual int getSocketOption( int socket_option_key, - std::shared_ptr &socket_option_value); + std::shared_ptr &socket_option_value); virtual int getSocketOption(int socket_option_key, std::string &socket_option_value); - protected: + // If the thread calling lambda_func is not the same of io_service, this + // function reschedule the function on it + template + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, + &result, &done, &func]() { + std::unique_lock lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + cv.notify_all(); + }); + std::unique_lock lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; + } + + template + int rescheduleOnIOServiceWithReference(int socket_option_key, + arg2 &socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != this->listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + + bool done = false; + io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, + &result, &done, &func]() { + std::unique_lock lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + cv.notify_all(); + }); + std::unique_lock lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; + } // Threads + protected: std::thread listening_thread_; - asio::io_service internal_io_service_; asio::io_service &io_service_; std::shared_ptr portal_; @@ -191,8 +261,8 @@ class ProducerSocket : public Socket, std::atomic hash_algorithm_; std::atomic crypto_suite_; - utils::SpinLock identity_lock_; - std::shared_ptr identity_; + utils::SpinLock signer_lock_; + std::shared_ptr signer_; core::NextSegmentCalculationStrategy suffix_strategy_; // While manifests are being built, contents are stored in a queue @@ -213,38 +283,6 @@ class ProducerSocket : public Socket, ProducerContentCallback on_content_produced_; - // If the thread calling lambda_func is not the same of io_service, this - // function reschedule the function on it - template - int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, - Lambda lambda_func) { - // To enforce type check - std::function func = lambda_func; - int result = SOCKET_OPTION_SET; - if (listening_thread_.joinable() && - std::this_thread::get_id() != listening_thread_.get_id()) { - std::mutex mtx; - /* Condition variable for the wait */ - std::condition_variable cv; - bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, - &result, &done, &func]() { - std::unique_lock lck(mtx); - done = true; - result = func(socket_option_key, socket_option_value); - cv.notify_all(); - }); - std::unique_lock lck(mtx); - if (!done) { - cv.wait(lck); - } - } else { - result = func(socket_option_key, socket_option_value); - } - - return result; - } - private: void listen(); diff --git a/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.cc new file mode 100644 index 000000000..27c1e54bd --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.cc @@ -0,0 +1,178 @@ +#include +#include +#include + +#include +#include +#include + +namespace transport { + +namespace interface { +int TLSRTCProducerSocket::read(BIO *b, char *buf, size_t size, + size_t *readbytes) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = TLSRTCProducerSocket::readOld(b, buf, (int)size); + + if (ret <= 0) { + *readbytes = 0; + return ret; + } + + *readbytes = (size_t)ret; + + return 1; +} + +int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) { + TLSRTCProducerSocket *socket; + socket = (TLSRTCProducerSocket *)BIO_get_data(b); + + std::unique_lock lck(socket->mtx_); + if (!socket->something_to_read_) { + (socket->cv_).wait(lck); + } + + utils::MemBuf *membuf = socket->packet_->next(); + int size_to_read; + + if ((int)membuf->length() > size) { + size_to_read = size; + } else { + size_to_read = membuf->length(); + socket->something_to_read_ = false; + } + + std::memcpy(buf, membuf->data(), size_to_read); + membuf->trimStart(size_to_read); + + return size_to_read; +} + +int TLSRTCProducerSocket::write(BIO *b, const char *buf, size_t size, + size_t *written) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = TLSRTCProducerSocket::writeOld(b, buf, (int)size); + + if (ret <= 0) { + *written = 0; + return ret; + } + + *written = (size_t)ret; + + return 1; +} + +int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { + TLSRTCProducerSocket *socket; + socket = (TLSRTCProducerSocket *)BIO_get_data(b); + + if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && + socket->first_) { + socket->tls_chunks_--; + bool making_manifest = socket->parent_->making_manifest_; + socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, + false); + socket->parent_->ProducerSocket::produce( + socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, 0); + socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, + making_manifest); + socket->first_ = false; + + } else { + std::unique_ptr mbuf = + utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); + auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { + socket->to_call_oncontentproduced_--; + auto mbuf = std::unique_ptr(a); + socket->RTCProducerSocket::produce(std::move(mbuf)); + ProducerContentCallback on_content_produced_application; + socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, + on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && + on_content_produced_application) { + on_content_produced_application(*socket, std::error_code(), 0); + } + }); + } + + return num; +} + +TLSRTCProducerSocket::TLSRTCProducerSocket(P2PSecureProducerSocket *parent, + const Name &handshake_name) + : RTCProducerSocket(), TLSProducerSocket(parent, handshake_name) { + BIO_METHOD *bio_meth = + BIO_meth_new(BIO_TYPE_ACCEPT, "secure rtc producer socket"); + BIO_meth_set_read(bio_meth, TLSRTCProducerSocket::readOld); + BIO_meth_set_write(bio_meth, TLSRTCProducerSocket::writeOld); + BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl); + BIO *bio = BIO_new(bio_meth); + BIO_set_init(bio, 1); + BIO_set_data(bio, this); + SSL_set_bio(ssl_, bio, bio); +} + +void TLSRTCProducerSocket::accept() { + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + tls_chunks_ = 1; + int result = SSL_accept(ssl_); + if (result != 1) + throw errors::RuntimeException("Unable to perform client handshake"); + } + + TRANSPORT_LOGD("Handshake performed!"); + parent_->list_secure_rtc_producers.push_front( + std::move(parent_->map_secure_rtc_producers[handshake_name_])); + parent_->map_secure_rtc_producers.erase(handshake_name_); + + ProducerInterestCallback on_interest_process_decrypted; + getSocketOption(ProducerCallbacksOptions::CACHE_MISS, + on_interest_process_decrypted); + + if (on_interest_process_decrypted) { + Interest inter(std::move(packet_)); + on_interest_process_decrypted(*this, inter); + } + + parent_->cv_.notify_one(); +} + +int TLSRTCProducerSocket::async_accept() { + if (!async_thread_.stopped()) { + async_thread_.add([this]() { this->TLSRTCProducerSocket::accept(); }); + } else { + throw errors::RuntimeException( + "Async thread not running, impossible to perform handshake"); + } + + return 1; +} + +void TLSRTCProducerSocket::produce(std::unique_ptr &&buffer) { + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + throw errors::RuntimeException( + "New handshake on the same P2P secure producer socket not supported"); + } + + size_t buf_size = buffer->length(); + tls_chunks_ = ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); + to_call_oncontentproduced_ = tls_chunks_; + + SSL_write(ssl_, buffer->data(), buf_size); + BIO *wbio = SSL_get_wbio(ssl_); + int i = BIO_flush(wbio); + (void)i; // To shut up gcc 5 +} + +} // namespace interface + +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.h new file mode 100644 index 000000000..16125f889 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace transport { + +namespace interface { + +class P2PSecureProducerSocket; + +class TLSRTCProducerSocket : public RTCProducerSocket, + public TLSProducerSocket { + friend class P2PSecureProducerSocket; + + public: + explicit TLSRTCProducerSocket(P2PSecureProducerSocket *parent, + const Name &handshake_name); + + ~TLSRTCProducerSocket() = default; + + void produce(std::unique_ptr &&buffer) override; + + void accept() override; + + int async_accept() override; + + using TLSProducerSocket::produce; + using TLSProducerSocket::onInterest; + + protected: + static int read(BIO *b, char *buf, size_t size, size_t *readbytes); + + static int readOld(BIO *h, char *buf, int size); + + static int write(BIO *b, const char *buf, size_t size, size_t *written); + + static int writeOld(BIO *h, const char *buf, int num); +}; + +} // namespace interface + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.cc new file mode 100644 index 000000000..58b3c1d7d --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.cc @@ -0,0 +1,364 @@ +#include +#include +#include +#include + +#include + +namespace transport { + +namespace interface { + +void TLSConsumerSocket::setInterestPayload(ConsumerSocket &c, + const core::Interest &interest) { + Interest &int2 = const_cast(interest); + random_suffix_ = int2.getName().getSuffix(); + + if (payload_ != NULL) int2.appendPayload(std::move(payload_)); +} + +/* Return the number of read bytes in the return param */ +int readOldTLS(BIO *b, char *buf, int size) { + if (size < 0) return size; + + TLSConsumerSocket *socket; + socket = (TLSConsumerSocket *)BIO_get_data(b); + + std::unique_lock lck(socket->mtx_); + + if (!socket->something_to_read_) { + if (!socket->transport_protocol_->isRunning()) { + socket->network_name_.setSuffix(socket->random_suffix_); + socket->ConsumerSocket::asyncConsume(socket->network_name_); + } + if (!socket->something_to_read_) socket->cv_.wait(lck); + } + + size_t size_to_read, read; + size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) + chain_size = socket->head_->computeChainDataLength(); + + if (chain_size > (size_t)size) { + read = size_to_read = (size_t)size; + } else { + read = size_to_read = chain_size; + socket->something_to_read_ = false; + } + + while (size_to_read) { + if (socket->head_->length() < size_to_read) { + std::memcpy(buf, socket->head_->data(), socket->head_->length()); + size_to_read -= socket->head_->length(); + buf += socket->head_->length(); + socket->head_ = socket->head_->pop(); + } else { + std::memcpy(buf, socket->head_->data(), size_to_read); + socket->head_->trimStart(size_to_read); + size_to_read = 0; + } + } + + return read; +} + +/* Return the number of read bytes in readbytes */ +int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = transport::interface::readOldTLS(b, buf, (int)size); + + if (ret <= 0) { + *readbytes = 0; + return ret; + } + + *readbytes = (size_t)ret; + + return 1; +} + +/* Return the number of written bytes in the return param */ +int writeOldTLS(BIO *b, const char *buf, int num) { + TLSConsumerSocket *socket; + socket = (TLSConsumerSocket *)BIO_get_data(b); + + socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &TLSConsumerSocket::setInterestPayload, socket, std::placeholders::_1, + std::placeholders::_2)); + + return num; +} + +/* Return the number of written bytes in written */ +int writeTLS(BIO *b, const char *buf, size_t size, size_t *written) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = transport::interface::writeOldTLS(b, buf, (int)size); + + if (ret <= 0) { + *written = 0; + return ret; + } + + *written = (size_t)ret; + + return 1; +} + +long ctrlTLS(BIO *b, int cmd, long num, void *ptr) { return 1; } + +TLSConsumerSocket::TLSConsumerSocket(int protocol, SSL *ssl) + : ConsumerSocket(protocol), + name_(), + buf_pool_(), + decrypted_content_(), + payload_(), + head_(), + something_to_read_(false), + content_downloaded_(false), + random_suffix_(), + secure_prefix_(), + producer_namespace_(), + read_callback_decrypted_(), + mtx_(), + cv_(), + async_downloader_tls_() { + /* Create the (d)TLS state */ + const SSL_METHOD *meth = TLS_client_method(); + ctx_ = SSL_CTX_new(meth); + + int result = + SSL_CTX_set_ciphersuites(ctx_, + "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" + "SHA256:TLS_AES_128_GCM_SHA256"); + if (result != 1) { + throw errors::RuntimeException( + "Unable to set cipher list on TLS subsystem. Aborting."); + } + + SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL); + SSL_CTX_set_ssl_version(ctx_, meth); + + ssl_ = ssl; + + BIO_METHOD *bio_meth = + BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket"); + BIO_meth_set_read(bio_meth, transport::interface::readOldTLS); + BIO_meth_set_write(bio_meth, transport::interface::writeOldTLS); + BIO_meth_set_ctrl(bio_meth, transport::interface::ctrlTLS); + BIO *bio = BIO_new(bio_meth); + BIO_set_init(bio, 1); + BIO_set_data(bio, this); + SSL_set_bio(ssl_, bio, bio); + + ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + + ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); + + std::default_random_engine generator; + std::uniform_int_distribution distribution( + 1, std::numeric_limits::max()); + random_suffix_ = 0; + + this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + this); +}; + +int TLSConsumerSocket::consume(const Name &name, + std::unique_ptr &&buffer) { + this->payload_ = std::move(buffer); + + this->ConsumerSocket::setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1, + std::placeholders::_2)); + + return consume(name); +} + +int TLSConsumerSocket::consume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + throw errors::RuntimeException("Handshake not performed"); + } + + return download_content(name); +} + +int TLSConsumerSocket::download_content(const Name &name) { + network_name_ = name; + network_name_.setSuffix(0); + something_to_read_ = false; + content_downloaded_ = false; + + decrypted_content_ = utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH); + uint8_t *buf = decrypted_content_->writableData(); + size_t size = 0; + int result = -1; + + while (!content_downloaded_ || something_to_read_) { + if (decrypted_content_->tailroom() < SSL3_RT_MAX_PLAIN_LENGTH) { + decrypted_content_->appendChain( + utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH)); + // decrypted_content_->computeChainDataLength(); + buf = decrypted_content_->prev()->writableData(); + } else { + buf = decrypted_content_->writableTail(); + } + + result = SSL_read(this->ssl_, buf, SSL3_RT_MAX_PLAIN_LENGTH); + + /* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of + * the data has been fully downloaded */ + + /* ASSERT((result < SSL3_RT_MAX_PLAIN_LENGTH && content_downloaded_) || */ + /* result == SSL3_RT_MAX_PLAIN_LENGTH); */ + + if (result >= 0) { + size += result; + decrypted_content_->prepend(result); + } else + throw errors::RuntimeException("Unable to download content"); + + if (size >= read_callback_decrypted_->maxBufferSize()) { + if (read_callback_decrypted_->isBufferMovable()) { + // No need to perform an additional copy. The whole buffer will be + // tranferred to the application. + + read_callback_decrypted_->readBufferAvailable( + std::move(decrypted_content_)); + decrypted_content_ = utils::MemBuf::create(SSL3_RT_MAX_PLAIN_LENGTH); + } else { + // The buffer will be copied into the application-provided buffer + uint8_t *buffer; + std::size_t length; + std::size_t total_length = decrypted_content_->length(); + + while (decrypted_content_->length()) { + buffer = nullptr; + length = 0; + read_callback_decrypted_->getReadBuffer(&buffer, &length); + + if (!buffer || !length) { + throw errors::RuntimeException( + "Invalid buffer provided by the application."); + } + + auto to_copy = std::min(decrypted_content_->length(), length); + std::memcpy(buffer, decrypted_content_->data(), to_copy); + decrypted_content_->trimStart(to_copy); + } + + read_callback_decrypted_->readDataAvailable(total_length); + decrypted_content_->clear(); + } + } + } + + read_callback_decrypted_->readSuccess(size); + + return CONSUMER_FINISHED; +} + +int TLSConsumerSocket::asyncConsume(const Name &name, + std::unique_ptr &&buffer) { + this->payload_ = std::move(buffer); + + this->ConsumerSocket::setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1, + std::placeholders::_2)); + + return asyncConsume(name); +} + +int TLSConsumerSocket::asyncConsume(const Name &name) { + if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + throw errors::RuntimeException("Handshake not performed"); + } + + if (!async_downloader_tls_.stopped()) { + async_downloader_tls_.add([this, name]() { + is_async_ = true; + download_content(name); + }); + } + + return CONSUMER_RUNNING; +} + +void TLSConsumerSocket::registerPrefix(const Prefix &producer_namespace) { + producer_namespace_ = producer_namespace; +} + +int TLSConsumerSocket::setSocketOption( + int socket_option_key, ConsumerSocket::ReadCallback *socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerSocket::ReadCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_decrypted_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +void TLSConsumerSocket::getReadBuffer(uint8_t **application_buffer, + size_t *max_length) {} + +void TLSConsumerSocket::readDataAvailable(size_t length) noexcept {} + +size_t TLSConsumerSocket::maxBufferSize() const { + return SSL3_RT_MAX_PLAIN_LENGTH; +} + +void TLSConsumerSocket::readBufferAvailable( + std::unique_ptr &&buffer) noexcept { + std::unique_lock lck(this->mtx_); + if (head_) { + head_->prependChain(std::move(buffer)); + } else { + head_ = std::move(buffer); + } + + something_to_read_ = true; + cv_.notify_one(); +} + +void TLSConsumerSocket::readError(const std::error_code ec) noexcept {} + +void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept { + std::unique_lock lck(this->mtx_); + content_downloaded_ = true; + something_to_read_ = true; + cv_.notify_one(); +} + +bool TLSConsumerSocket::isBufferMovable() noexcept { return true; } + +} // namespace interface + +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.h b/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.h new file mode 100644 index 000000000..05f7fe6a5 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.h @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace transport { + +namespace interface { + +class TLSConsumerSocket : public ConsumerSocket, + public ConsumerSocket::ReadCallback { + /* Return the number of read bytes in readbytes */ + friend int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes); + + /* Return the number of read bytes in the return param */ + friend int readOldTLS(BIO *h, char *buf, int size); + + /* Return the number of written bytes in written */ + friend int writeTLS(BIO *b, const char *buf, size_t size, size_t *written); + + /* Return the number of written bytes in the return param */ + friend int writeOldTLS(BIO *h, const char *buf, int num); + + friend long ctrlTLS(BIO *b, int cmd, long num, void *ptr); + + public: + explicit TLSConsumerSocket(int protocol, SSL *ssl_); + + ~TLSConsumerSocket() = default; + + int consume(const Name &name, std::unique_ptr &&buffer); + int consume(const Name &name) override; + + int asyncConsume(const Name &name, std::unique_ptr &&buffer); + int asyncConsume(const Name &name) override; + + void registerPrefix(const Prefix &producer_namespace); + + int setSocketOption( + int socket_option_key, + ConsumerSocket::ReadCallback *socket_option_value) override; + + using ConsumerSocket::getSocketOption; + using ConsumerSocket::setSocketOption; + + protected: + /* Callback invoked once an interest has been received and its payload + * decrypted */ + ConsumerInterestCallback on_interest_input_decrypted_; + ConsumerInterestCallback on_interest_process_decrypted_; + + private: + Name name_; + + /* SSL handle */ + SSL *ssl_; + SSL_CTX *ctx_; + + /* Chain of MemBuf to be used as a temporary buffer to pass descypted data + * from the underlying layer to the application */ + utils::ObjectPool buf_pool_; + std::unique_ptr decrypted_content_; + + /* Chain of MemBuf holding the payload to be written into interest or data */ + std::unique_ptr payload_; + + /* Chain of MemBuf holding the data retrieved from the underlying layer */ + std::unique_ptr head_; + + bool something_to_read_; + + bool content_downloaded_; + + double old_max_win_; + + double old_current_win_; + + uint32_t random_suffix_; + + ip_address_t secure_prefix_; + + Prefix producer_namespace_; + + ConsumerSocket::ReadCallback *read_callback_decrypted_; + + std::mutex mtx_; + + /* Condition variable for the wait */ + std::condition_variable cv_; + + utils::EventThread async_downloader_tls_; + + void setInterestPayload(ConsumerSocket &c, const core::Interest &interest); + void processPayload(ConsumerSocket &c, std::size_t bytes_transferred, + const std::error_code &ec); + + virtual void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override; + + virtual void readDataAvailable(size_t length) noexcept override; + + virtual size_t maxBufferSize() const override; + + virtual void readBufferAvailable( + std::unique_ptr &&buffer) noexcept override; + + virtual void readError(const std::error_code ec) noexcept override; + + virtual void readSuccess(std::size_t total_size) noexcept override; + virtual bool isBufferMovable() noexcept override; + + int download_content(const Name &name); +}; + +} // namespace interface + +} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/tls_socket_producer.cc new file mode 100644 index 000000000..ad85ec6ea --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/tls_socket_producer.cc @@ -0,0 +1,587 @@ +#include +#include +#include + +#include +#include +#include + +namespace transport { + +namespace interface { + +/* Return the number of read bytes in readbytes */ +int TLSProducerSocket::read(BIO *b, char *buf, size_t size, size_t *readbytes) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = TLSProducerSocket::readOld(b, buf, (int)size); + + if (ret <= 0) { + *readbytes = 0; + return ret; + } + + *readbytes = (size_t)ret; + + return 1; +} + +/* Return the number of read bytes in the return param */ +int TLSProducerSocket::readOld(BIO *b, char *buf, int size) { + TLSProducerSocket *socket; + socket = (TLSProducerSocket *)BIO_get_data(b); + + /* take a lock on the mutex. It will be unlocked by */ + std::unique_lock lck(socket->mtx_); + if (!socket->something_to_read_) { + (socket->cv_).wait(lck); + } + + /* Either there already is something to read, or the thread has been waken up + */ + /* must return the payload in the interest */ + + utils::MemBuf *membuf = socket->packet_->next(); + int size_to_read; + if ((int)membuf->length() > size) { + size_to_read = size; + } else { + size_to_read = membuf->length(); + socket->something_to_read_ = false; + } + + std::memcpy(buf, membuf->data(), size_to_read); + membuf->trimStart(size_to_read); + + return size_to_read; +} + +/* Return the number of written bytes in written */ +int TLSProducerSocket::write(BIO *b, const char *buf, size_t size, + size_t *written) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = TLSProducerSocket::writeOld(b, buf, (int)size); + + if (ret <= 0) { + *written = 0; + return ret; + } + + *written = (size_t)ret; + + return 1; +} + +/* Return the number of written bytes in the return param */ +int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { + TLSProducerSocket *socket; + socket = (TLSProducerSocket *)BIO_get_data(b); + + if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) && + socket->first_) { + //! socket->tls_chunks_ corresponds to is_last + socket->tls_chunks_--; + bool making_manifest = socket->parent_->making_manifest_; + socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, + false); + socket->parent_->ProducerSocket::produce( + socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, + socket->last_segment_); + socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, + making_manifest); + socket->first_ = false; + } else { + socket->still_writing_ = true; + + std::unique_ptr mbuf = + utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0); + auto a = mbuf.release(); + socket->async_thread_.add([socket = socket, a]() { + socket->tls_chunks_--; + socket->to_call_oncontentproduced_--; + auto mbuf = std::unique_ptr(a); + socket->last_segment_ += socket->ProducerSocket::produce( + socket->name_, std::move(mbuf), socket->tls_chunks_ == 0, + socket->last_segment_); + ProducerContentCallback on_content_produced_application; + socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, + on_content_produced_application); + if (socket->to_call_oncontentproduced_ == 0 && + on_content_produced_application) { + on_content_produced_application(*socket, std::error_code(), 0); + } + }); + } + + return num; +} + +TLSProducerSocket::TLSProducerSocket(P2PSecureProducerSocket *parent, + const Name &handshake_name) + : ProducerSocket(), + on_content_produced_application_(), + mtx_(), + cv_(), + something_to_read_(), + name_(), + last_segment_(0), + parent_(parent), + first_(true), + handshake_name_(handshake_name), + tls_chunks_(0), + to_call_oncontentproduced_(0), + still_writing_(false), + encryption_thread_() { + const SSL_METHOD *meth = TLS_server_method(); + ctx_ = SSL_CTX_new(meth); + + /* + * Setup SSL context (identity and parameter to use TLS 1.3) + */ + SSL_CTX_use_certificate(ctx_, parent->cert_509_); + SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_); + + int result = + SSL_CTX_set_ciphersuites(ctx_, + "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" + "SHA256:TLS_AES_128_GCM_SHA256"); + if (result != 1) { + throw errors::RuntimeException( + "Unable to set cipher list on TLS subsystem. Aborting."); + } + + // We force it to be TLS 1.3 + SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL); + SSL_CTX_set_num_tickets(ctx_, 0); + + result = SSL_CTX_add_custom_ext( + ctx_, 100, SSL_EXT_CLIENT_HELLO | SSL_EXT_TLS1_3_ENCRYPTED_EXTENSIONS, + TLSProducerSocket::addHicnKeyIdCb, TLSProducerSocket::freeHicnKeyIdCb, + this, TLSProducerSocket::parseHicnKeyIdCb, NULL); + + ssl_ = SSL_new(ctx_); + /* + * Setup this producer socker as the bio that TLS will use to write and read + * data (in stream mode) + */ + BIO_METHOD *bio_meth = + BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket"); + BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld); + BIO_meth_set_write(bio_meth, TLSProducerSocket::writeOld); + BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl); + BIO *bio = BIO_new(bio_meth); + BIO_set_init(bio, 1); + BIO_set_data(bio, this); + SSL_set_bio(ssl_, bio, bio); + /* + * Set the callback so that when an interest is received we catch it and we + * decrypt the payload before passing it to the application. + */ + this->ProducerSocket::setSocketOption( + ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this, + std::placeholders::_1, + std::placeholders::_2)); + this->ProducerSocket::setSocketOption( + ProducerCallbacksOptions::CONTENT_PRODUCED, + (ProducerContentCallback)bind( + &TLSProducerSocket::onContentProduced, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); +} + +void TLSProducerSocket::accept() { + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + tls_chunks_ = 1; + int result = SSL_accept(ssl_); + if (result != 1) + throw errors::RuntimeException("Unable to perform client handshake"); + } + TRANSPORT_LOGD("Handshake performed!"); + parent_->list_secure_producers.push_front( + std::move(parent_->map_secure_producers[handshake_name_])); + parent_->map_secure_producers.erase(handshake_name_); + + ProducerInterestCallback on_interest_process_decrypted; + getSocketOption(ProducerCallbacksOptions::CACHE_MISS, + on_interest_process_decrypted); + + if (on_interest_process_decrypted) { + Interest inter(std::move(packet_)); + on_interest_process_decrypted(*this, inter); + } else { + throw errors::RuntimeException( + "On interest process unset. Unable to perform handshake"); + } +} + +int TLSProducerSocket::async_accept() { + if (!async_thread_.stopped()) { + async_thread_.add([this]() { this->accept(); }); + } else { + throw errors::RuntimeException( + "Async thread not running, impossible to perform handshake"); + } + + return 1; +} + +void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) { + /* Based on the state machine of (D)TLS, we know what action to do */ + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + std::unique_lock lck(mtx_); + name_ = interest.getName(); + something_to_read_ = true; + packet_ = interest.acquireMemBufReference(); + if (head_) { + payload_->prependChain(interest.getPayload()); + } else { + payload_ = interest.getPayload(); // std::move(interest.getPayload()); + } + cv_.notify_one(); + } else { + name_ = interest.getName(); + packet_ = interest.acquireMemBufReference(); + payload_ = interest.getPayload(); + something_to_read_ = true; + + if (interest.getPayload()->length() > 0) + SSL_read( + ssl_, + const_cast(interest.getPayload()->writableData()), + interest.getPayload()->length()); + } + + ProducerInterestCallback on_interest_input_decrypted; + getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, + on_interest_input_decrypted); + if (on_interest_input_decrypted) + (on_interest_input_decrypted)(*this, interest); +} + +void TLSProducerSocket::cacheMiss(ProducerSocket &p, Interest &interest) { + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + std::unique_lock lck(mtx_); + name_ = interest.getName(); + something_to_read_ = true; + packet_ = interest.acquireMemBufReference(); + payload_ = interest.getPayload(); + cv_.notify_one(); + } else { + name_ = interest.getName(); + packet_ = interest.acquireMemBufReference(); + payload_ = interest.getPayload(); + something_to_read_ = true; + + if (interest.getPayload()->length() > 0) + SSL_read( + ssl_, + const_cast(interest.getPayload()->writableData()), + interest.getPayload()->length()); + + if (on_interest_process_decrypted_ != VOID_HANDLER) + on_interest_process_decrypted_(*this, interest); + } +} + +void TLSProducerSocket::onContentProduced(ProducerSocket &p, + const std::error_code &err, + uint64_t bytes_written) {} + +uint32_t TLSProducerSocket::produce(Name content_name, + std::unique_ptr &&buffer, + bool is_last, uint32_t start_offset) { + if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) { + throw errors::RuntimeException( + "New handshake on the same P2P secure producer socket not supported"); + } + size_t buf_size = buffer->length(); + name_ = served_namespaces_.front().mapName(content_name); + + tls_chunks_ = to_call_oncontentproduced_ = + ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); + + if (!is_last) { + tls_chunks_++; + } + + last_segment_ = start_offset; + + SSL_write(ssl_, buffer->data(), buf_size); + BIO *wbio = SSL_get_wbio(ssl_); + int i = BIO_flush(wbio); + (void)i; // To shut up gcc 5 + + return 0; +} + +void TLSProducerSocket::asyncProduce(const Name &content_name, + const uint8_t *buf, size_t buffer_size, + bool is_last, uint32_t *start_offset) { + if (!encryption_thread_.stopped()) { + encryption_thread_.add([this, content_name, buffer = buf, + size = buffer_size, is_last, start_offset]() { + if (start_offset != NULL) { + produce(content_name, buffer, size, is_last, *start_offset); + } else { + produce(content_name, buffer, size, is_last, 0); + } + }); + } +} + +void TLSProducerSocket::asyncProduce(Name content_name, + std::unique_ptr &&buffer, + bool is_last, uint32_t offset, + uint32_t **last_segment) { + if (!encryption_thread_.stopped()) { + auto a = buffer.release(); + encryption_thread_.add( + [this, content_name, a, is_last, offset, last_segment]() { + auto buf = std::unique_ptr(a); + if (last_segment != NULL) { + *last_segment = &last_segment_; + } + produce(content_name, std::move(buf), is_last, offset); + }); + } +} + +void TLSProducerSocket::asyncProduce(ContentObject &content_object) { + throw errors::RuntimeException("API not supported"); +} + +void TLSProducerSocket::produce(ContentObject &content_object) { + throw errors::RuntimeException("API not supported"); +} + +long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) { + if (cmd == BIO_CTRL_FLUSH) { + } + return 1; +} + +int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char **out, size_t *outlen, + X509 *x, size_t chainidx, int *al, + void *add_arg) { + TLSProducerSocket *socket = reinterpret_cast(add_arg); + if (ext_type == 100) { + ip_prefix_t ip_prefix = + socket->parent_->served_namespaces_.front().toIpPrefixStruct(); + int inet_family = + socket->parent_->served_namespaces_.front().getAddressFamily(); + uint16_t prefix_len_bits = + socket->parent_->served_namespaces_.front().getPrefixLength(); + uint8_t prefix_len_bytes = prefix_len_bits / 8; + uint8_t prefix_len_u32 = prefix_len_bits / 32; + + ip_prefix_t *out_ip = (ip_prefix_t *)malloc(sizeof(ip_prefix_t)); + out_ip->family = inet_family; + out_ip->len = prefix_len_bits + 32; + u8 *out_ip_buf = const_cast( + ip_address_get_buffer(&(out_ip->address), inet_family)); + *out = reinterpret_cast(out_ip); + + RAND_bytes((unsigned char *)&socket->key_id_, 4); + + memcpy(out_ip_buf, ip_address_get_buffer(&(ip_prefix.address), inet_family), + prefix_len_bytes); + memcpy((out_ip_buf + prefix_len_bytes), &socket->key_id_, 4); + *outlen = sizeof(ip_prefix_t); + + ip_address_t mask = {}; + ip_address_t keyId_component = {}; + u32 *mask_buf; + u32 *keyId_component_buf; + switch (inet_family) { + case AF_INET: + mask_buf = &(mask.v4.as_u32); + keyId_component_buf = &(keyId_component.v4.as_u32); + break; + case AF_INET6: + mask_buf = mask.v6.as_u32; + keyId_component_buf = keyId_component.v6.as_u32; + break; + default: + throw errors::RuntimeException("Unknown protocol"); + } + + if (prefix_len_bits > (inet_family == AF_INET6 ? IPV6_ADDR_LEN_BITS - 32 + : IPV4_ADDR_LEN_BITS - 32)) + throw errors::RuntimeException( + "Not enough space in the content name to add key_id"); + + mask_buf[prefix_len_u32] = 0xffffffff; + keyId_component_buf[prefix_len_u32] = socket->key_id_; + socket->last_segment_ = 0; + + socket->on_interest_process_decrypted_ = + socket->parent_->on_interest_process_decrypted_; + + socket->registerPrefix( + Prefix(socket->parent_->served_namespaces_.front().getName( + Name(inet_family, (uint8_t *)&mask), + Name(inet_family, (uint8_t *)&keyId_component), + socket->parent_->served_namespaces_.front().getName()), + out_ip->len)); + socket->connect(); + } + return 1; +} + +void TLSProducerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char *out, + void *add_arg) { + free(const_cast(out)); +} + +int TLSProducerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char *in, size_t inlen, + X509 *x, size_t chainidx, int *al, + void *add_arg) { + return 1; +} + +int TLSProducerSocket::setSocketOption( + int socket_option_key, ProducerInterestCallback socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback socket_option_value) -> int { + int result = SOCKET_OPTION_SET; + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + on_interest_input_decrypted_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + on_interest_dropped_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + on_interest_inserted_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_HIT: + on_interest_satisfied_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_MISS: + on_interest_process_decrypted_ = socket_option_value; + break; + + default: + result = SOCKET_OPTION_NOT_SET; + break; + } + return result; + }); +} + +int TLSProducerSocket::setSocketOption( + int socket_option_key, ProducerContentCallback socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + on_content_produced_application_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int TLSProducerSocket::getSocketOption( + int socket_option_key, ProducerContentCallback **socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + *socket_option_value = &on_content_produced_application_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int TLSProducerSocket::getSocketOption( + int socket_option_key, ProducerContentCallback &socket_option_value) { + return rescheduleOnIOServiceWithReference( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback &socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + socket_option_value = on_content_produced_application_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int TLSProducerSocket::getSocketOption( + int socket_option_key, ProducerInterestCallback &socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOServiceWithReference( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback &socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + socket_option_value = on_interest_input_decrypted_; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + socket_option_value = on_interest_dropped_input_buffer_; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + socket_option_value = on_interest_inserted_input_buffer_; + break; + + case ProducerCallbacksOptions::CACHE_HIT: + socket_option_value = on_interest_satisfied_output_buffer_; + break; + + case ProducerCallbacksOptions::CACHE_MISS: + socket_option_value = on_interest_process_decrypted_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +} // namespace interface + +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_producer.h b/libtransport/src/hicn/transport/interfaces/tls_socket_producer.h new file mode 100644 index 000000000..4c09ddaa5 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/tls_socket_producer.h @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include + +namespace transport { + +namespace interface { + +class P2PSecureProducerSocket; + +class TLSProducerSocket : virtual public ProducerSocket { + friend class P2PSecureProducerSocket; + + public: + explicit TLSProducerSocket(P2PSecureProducerSocket *parent, + const Name &handshake_name); + ~TLSProducerSocket() = default; + + uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size, + bool is_last = true, uint32_t start_offset = 0) override { + return produce(content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), + is_last, start_offset); + } + + uint32_t produce(Name content_name, std::unique_ptr &&buffer, + bool is_last = true, uint32_t start_offset = 0) override; + + void produce(ContentObject &content_object) override; + + void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size, + bool is_last = true, + uint32_t *start_offset = nullptr) override; + + void asyncProduce(Name content_name, std::unique_ptr &&buffer, + bool is_last, uint32_t offset, + uint32_t **last_segment = nullptr) override; + + void asyncProduce(ContentObject &content_object) override; + + virtual void accept(); + + virtual int async_accept(); + + virtual int setSocketOption( + int socket_option_key, + ProducerInterestCallback socket_option_value) override; + + virtual int setSocketOption( + int socket_option_key, + ProducerContentCallback socket_option_value) override; + + virtual int getSocketOption( + int socket_option_key, + ProducerContentCallback **socket_option_value) override; + + int getSocketOption(int socket_option_key, + ProducerContentCallback &socket_option_value); + + int getSocketOption(int socket_option_key, + ProducerInterestCallback &socket_option_value); + + using ProducerSocket::getSocketOption; + using ProducerSocket::onInterest; + using ProducerSocket::setSocketOption; + + protected: + /* Callback invoked once an interest has been received and its payload + * decrypted */ + ProducerInterestCallback on_interest_input_decrypted_; + ProducerInterestCallback on_interest_process_decrypted_; + ProducerContentCallback on_content_produced_application_; + + std::mutex mtx_; + + /* Condition variable for the wait */ + std::condition_variable cv_; + + /* Bool variable, true if there is something to read (an interest arrived) */ + bool something_to_read_; + + /* First interest that open a secure connection */ + transport::core::Name name_; + + /* SSL handle */ + SSL *ssl_; + SSL_CTX *ctx_; + + Packet::MemBufPtr packet_; + + std::unique_ptr head_; + std::uint32_t last_segment_; + std::shared_ptr payload_; + std::uint32_t key_id_; + + std::thread *handshake; + P2PSecureProducerSocket *parent_; + + bool first_; + Name handshake_name_; + int tls_chunks_; + int to_call_oncontentproduced_; + + bool still_writing_; + + utils::EventThread encryption_thread_; + + void onInterest(ProducerSocket &p, Interest &interest); + void cacheMiss(ProducerSocket &p, Interest &interest); + + /* Return the number of read bytes in readbytes */ + static int read(BIO *b, char *buf, size_t size, size_t *readbytes); + + /* Return the number of read bytes in the return param */ + static int readOld(BIO *h, char *buf, int size); + + /* Return the number of written bytes in written */ + static int write(BIO *b, const char *buf, size_t size, size_t *written); + + /* Return the number of written bytes in the return param */ + static int writeOld(BIO *h, const char *buf, int num); + + static long ctrl(BIO *b, int cmd, long num, void *ptr); + + static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context, + const unsigned char **out, size_t *outlen, X509 *x, + size_t chainidx, int *al, void *add_arg); + + static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, const unsigned char *out, + void *add_arg); + + static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, const unsigned char *in, + size_t inlen, X509 *x, size_t chainidx, int *al, + void *add_arg); + + void onContentProduced(ProducerSocket &p, const std::error_code &err, + uint64_t bytes_written); +}; + +} // namespace interface + +} // end namespace transport -- cgit 1.2.3-korg