path: root/libtransport/src/hicn/transport/interfaces
diff options
authorAlberto Compagno <acompagn+fdio@cisco.com>2020-01-07 11:46:02 +0100
committerMauro Sardara <msardara@cisco.com>2020-02-21 15:48:18 +0100
commit35058cdfe0134c88f1aa8d23342d1d7b9d39e296 (patch)
tree978ca9c2232ac381c8391b3d1eeb0f875670d5b1 /libtransport/src/hicn/transport/interfaces
parent0710f1ff754ebf01ae5befabb055349fe472b0c2 (diff)
[HICN-2] Added P2P confidential communication on hICN
P2P confidential communications exploit the TLS 1.3 protocol to let a consumer to establish a secure communication on an hICN name. Currently we don't support the consumer authentication (mutual authentication in TLS) and the 0-rtt session establishment. Change-Id: I2be073847c08a17f28c837d444081920c5e57a07 Signed-off-by: Alberto Compagno <acompagn+fdio@cisco.com> Signed-off-by: Olivier Roques <oroques+fdio@cisco.com> Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces')
19 files changed, 2772 insertions, 131 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
index 1f3c29b1f..88b83e9d4 100644
--- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
@@ -17,6 +17,11 @@ list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h
@@ -26,11 +31,29 @@ list(APPEND HEADER_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.cc
+ )
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h
+ )
diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h
index 6de48d14b..9d3d57992 100644
--- a/libtransport/src/hicn/transport/interfaces/callbacks.h
+++ b/libtransport/src/hicn/transport/interfaces/callbacks.h
@@ -122,4 +122,4 @@ extern std::nullptr_t VOID_HANDLER;
} // namespace interface
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.cc
new file mode 100644
index 000000000..ec966e509
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.cc
@@ -0,0 +1,382 @@
+#include <hicn/transport/interfaces/p2psecure_socket_consumer.h>
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+#include <openssl/tls1.h>
+#include <random>
+namespace transport {
+namespace interface {
+void P2PSecureConsumerSocket::setInterestPayload(
+ ConsumerSocket &c, const core::Interest &interest) {
+ Interest &int2 = const_cast<Interest &>(interest);
+ random_suffix_ = int2.getName().getSuffix();
+ if (payload_ != NULL) int2.appendPayload(std::move(payload_));
+// implement void readBufferAvailable(), size_t maxBufferSize() const override,
+// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable()
+// must be implemented even if empty.
+/* Return the number of read bytes in the return param */
+int readOld(BIO *b, char *buf, int size) {
+ if (size < 0) return size;
+ P2PSecureConsumerSocket *socket;
+ socket = (P2PSecureConsumerSocket *)BIO_get_data(b);
+ std::unique_lock<std::mutex> lck(socket->mtx_);
+ if (!socket->something_to_read_) {
+ if (!socket->transport_protocol_->isRunning()) {
+ socket->network_name_.setSuffix(socket->random_suffix_);
+ socket->ConsumerSocket::asyncConsume(socket->network_name_);
+ }
+ if (!socket->something_to_read_) socket->cv_.wait(lck);
+ }
+ size_t size_to_read, read;
+ size_t chain_size = socket->head_->length();
+ if (socket->head_->isChained())
+ chain_size = socket->head_->computeChainDataLength();
+ if (chain_size > (size_t)size) {
+ read = size_to_read = (size_t)size;
+ } else {
+ read = size_to_read = chain_size;
+ socket->something_to_read_ = false;
+ }
+ while (size_to_read) {
+ if (socket->head_->length() < size_to_read) {
+ std::memcpy(buf, socket->head_->data(), socket->head_->length());
+ size_to_read -= socket->head_->length();
+ buf += socket->head_->length();
+ socket->head_ = socket->head_->pop();
+ } else {
+ std::memcpy(buf, socket->head_->data(), size_to_read);
+ socket->head_->trimStart(size_to_read);
+ size_to_read = 0;
+ }
+ }
+ return read;
+/* Return the number of read bytes in readbytes */
+int read(BIO *b, char *buf, size_t size, size_t *readbytes) {
+ int ret;
+ if (size > INT_MAX) size = INT_MAX;
+ ret = transport::interface::readOld(b, buf, (int)size);
+ if (ret <= 0) {
+ *readbytes = 0;
+ return ret;
+ }
+ *readbytes = (size_t)ret;
+ return 1;
+/* Return the number of written bytes in the return param */
+int writeOld(BIO *b, const char *buf, int num) {
+ P2PSecureConsumerSocket *socket;
+ socket = (P2PSecureConsumerSocket *)BIO_get_data(b);
+ socket->payload_ = utils::MemBuf::copyBuffer(buf, num);
+ socket->ConsumerSocket::setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &P2PSecureConsumerSocket::setInterestPayload, socket,
+ std::placeholders::_1, std::placeholders::_2));
+ return num;
+/* Return the number of written bytes in written */
+int write(BIO *b, const char *buf, size_t size, size_t *written) {
+ int ret;
+ if (size > INT_MAX) size = INT_MAX;
+ ret = transport::interface::writeOld(b, buf, (int)size);
+ if (ret <= 0) {
+ *written = 0;
+ return ret;
+ }
+ *written = (size_t)ret;
+ return 1;
+long ctrl(BIO *b, int cmd, long num, void *ptr) { return 1; }
+int P2PSecureConsumerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char **out,
+ size_t *outlen, X509 *x,
+ size_t chainidx, int *al,
+ void *add_arg) {
+ if (ext_type == 100) {
+ *out = (unsigned char *)malloc(4);
+ *(uint32_t *)*out = 10;
+ *outlen = 4;
+ }
+ return 1;
+void P2PSecureConsumerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char *out,
+ void *add_arg) {
+ free(const_cast<unsigned char *>(out));
+int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char *in,
+ size_t inlen, X509 *x,
+ size_t chainidx, int *al,
+ void *add_arg) {
+ P2PSecureConsumerSocket *socket =
+ reinterpret_cast<P2PSecureConsumerSocket *>(add_arg);
+ if (ext_type == 100) {
+ memcpy(&socket->secure_prefix_, in, sizeof(ip_prefix_t));
+ }
+ return 1;
+P2PSecureConsumerSocket::P2PSecureConsumerSocket(int handshake_protocol, int transport_protocol)
+ : ConsumerSocket(handshake_protocol),
+ name_(),
+ tls_consumer_(),
+ buf_pool_(),
+ decrypted_content_(),
+ payload_(),
+ head_(),
+ something_to_read_(false),
+ content_downloaded_(false),
+ random_suffix_(),
+ secure_prefix_(),
+ producer_namespace_(),
+ read_callback_decrypted_(),
+ mtx_(),
+ cv_(),
+ protocol_(transport_protocol) {
+ /* Create the (d)TLS state */
+ const SSL_METHOD *meth = TLS_client_method();
+ ctx_ = SSL_CTX_new(meth);
+ int result =
+ SSL_CTX_set_ciphersuites(ctx_,
+ "SHA256:TLS_AES_128_GCM_SHA256");
+ if (result != 1) {
+ throw errors::RuntimeException(
+ "Unable to set cipher list on TLS subsystem. Aborting.");
+ }
+ SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL);
+ SSL_CTX_set_ssl_version(ctx_, meth);
+ result = SSL_CTX_add_custom_ext(
+ P2PSecureConsumerSocket::addHicnKeyIdCb,
+ P2PSecureConsumerSocket::freeHicnKeyIdCb, NULL,
+ P2PSecureConsumerSocket::parseHicnKeyIdCb, this);
+ ssl_ = SSL_new(ctx_);
+ bio_meth_ = BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket");
+ BIO_meth_set_read(bio_meth_, transport::interface::readOld);
+ BIO_meth_set_write(bio_meth_, transport::interface::writeOld);
+ BIO_meth_set_ctrl(bio_meth_, transport::interface::ctrl);
+ BIO *bio = BIO_new(bio_meth_);
+ BIO_set_init(bio, 1);
+ BIO_set_data(bio, this);
+ SSL_set_bio(ssl_, bio, bio);
+ ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
+ ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
+ std::default_random_engine generator;
+ std::uniform_int_distribution<int> distribution(
+ 1, std::numeric_limits<uint32_t>::max());
+ random_suffix_ = 0;
+ this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ this);
+P2PSecureConsumerSocket::~P2PSecureConsumerSocket() {
+ BIO_meth_free(bio_meth_);
+ SSL_shutdown(ssl_);
+int P2PSecureConsumerSocket::consume(const Name &name) {
+ if (transport_protocol_->isRunning()) {
+ }
+ if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
+ network_name_ = producer_namespace_.getRandomName();
+ network_name_.setSuffix(0);
+ int result = SSL_connect(this->ssl_);
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ if (result != 1)
+ throw errors::RuntimeException("Unable to perform client handshake");
+ }
+ std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
+ secure_prefix_.family,
+ ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
+ std::shared_ptr<Prefix> prefix =
+ std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
+ TLSConsumerSocket tls_consumer(this->protocol_, this->ssl_);
+ ConsumerTimerCallback *stats_summary_callback = nullptr;
+ this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_summary_callback);
+ uint32_t lifetime;
+ this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
+ tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ read_callback_decrypted_);
+ tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ *stats_summary_callback);
+ tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL,
+ this->timer_interval_milliseconds_);
+ tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ tls_consumer.connect();
+ if (payload_ != NULL)
+ return tls_consumer.consume((prefix->mapName(name)), std::move(payload_));
+ else
+ return tls_consumer.consume((prefix->mapName(name)));
+int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
+ if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
+ network_name_ = producer_namespace_.getRandomName();
+ network_name_.setSuffix(0);
+ TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str());
+ interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
+ this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload);
+ int result = SSL_connect(this->ssl_);
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ if (result != 1)
+ throw errors::RuntimeException("Unable to perform client handshake");
+ TRANSPORT_LOGD("Handshake performed!");
+ }
+ std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
+ secure_prefix_.family,
+ ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
+ std::shared_ptr<Prefix> prefix =
+ std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
+ tls_consumer_ =
+ std::make_shared<TLSConsumerSocket>(this->protocol_, this->ssl_);
+ ConsumerTimerCallback *stats_summary_callback = nullptr;
+ this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_summary_callback);
+ uint32_t lifetime;
+ this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
+ tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ read_callback_decrypted_);
+ tls_consumer_->setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ *stats_summary_callback);
+ tls_consumer_->setSocketOption(GeneralTransportOptions::STATS_INTERVAL,
+ this->timer_interval_milliseconds_);
+ tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ tls_consumer_->connect();
+ if (payload_ != NULL)
+ return tls_consumer_->asyncConsume((prefix->mapName(name)),
+ std::move(payload_));
+ else
+ return tls_consumer_->asyncConsume((prefix->mapName(name)));
+void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) {
+ producer_namespace_ = producer_namespace;
+int P2PSecureConsumerSocket::setSocketOption(
+ int socket_option_key, ConsumerSocket::ReadCallback *socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerSocket::ReadCallback *socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ read_callback_decrypted_ = socket_option_value;
+ break;
+ default:
+ }
+ });
+void P2PSecureConsumerSocket::getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length){};
+void P2PSecureConsumerSocket::readDataAvailable(size_t length) noexcept {};
+size_t P2PSecureConsumerSocket::maxBufferSize() const {
+void P2PSecureConsumerSocket::readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept {
+ std::unique_lock<std::mutex> lck(this->mtx_);
+ if (head_) {
+ head_->prependChain(std::move(buffer));
+ } else {
+ head_ = std::move(buffer);
+ }
+ something_to_read_ = true;
+ cv_.notify_one();
+void P2PSecureConsumerSocket::readError(const std::error_code ec) noexcept {};
+void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept {
+ std::unique_lock<std::mutex> lck(this->mtx_);
+ content_downloaded_ = true;
+ something_to_read_ = true;
+ cv_.notify_one();
+bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; }
+} // namespace interface
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.h b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.h
new file mode 100644
index 000000000..ff867f07b
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.h
@@ -0,0 +1,147 @@
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/interfaces/tls_socket_consumer.h>
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+namespace transport {
+namespace interface {
+class P2PSecureConsumerSocket : public ConsumerSocket,
+ public ConsumerSocket::ReadCallback {
+ /* Return the number of read bytes in readbytes */
+ friend int read(BIO *b, char *buf, size_t size, size_t *readbytes);
+ /* Return the number of read bytes in the return param */
+ friend int readOld(BIO *h, char *buf, int size);
+ /* Return the number of written bytes in written */
+ friend int write(BIO *b, const char *buf, size_t size, size_t *written);
+ /* Return the number of written bytes in the return param */
+ friend int writeOld(BIO *h, const char *buf, int num);
+ friend long ctrl(BIO *b, int cmd, long num, void *ptr);
+ public:
+ explicit P2PSecureConsumerSocket(int handshake_protocol, int transport_protocol);
+ ~P2PSecureConsumerSocket();
+ int consume(const Name &name) override;
+ int asyncConsume(const Name &name) override;
+ void registerPrefix(const Prefix &producer_namespace);
+ int setSocketOption(
+ int socket_option_key,
+ ConsumerSocket::ReadCallback *socket_option_value) override;
+ using ConsumerSocket::getSocketOption;
+ using ConsumerSocket::setSocketOption;
+ protected:
+ /* Callback invoked once an interest has been received and its payload
+ * decrypted */
+ ConsumerInterestCallback on_interest_input_decrypted_;
+ ConsumerInterestCallback on_interest_process_decrypted_;
+ private:
+ Name name_;
+ std::shared_ptr<TLSConsumerSocket> tls_consumer_;
+ /* SSL handle */
+ SSL *ssl_;
+ SSL_CTX *ctx_;
+ BIO_METHOD *bio_meth_;
+ /* Chain of MemBuf to be used as a temporary buffer to pass descypted data
+ * from the underlying layer to the application */
+ utils::ObjectPool<utils::MemBuf> buf_pool_;
+ std::unique_ptr<utils::MemBuf> decrypted_content_;
+ /* Chain of MemBuf holding the payload to be written into interest or data */
+ std::unique_ptr<utils::MemBuf> payload_;
+ /* Chain of MemBuf holding the data retrieved from the underlying layer */
+ std::unique_ptr<utils::MemBuf> head_;
+ bool something_to_read_;
+ bool content_downloaded_;
+ double old_max_win_;
+ double old_current_win_;
+ uint32_t random_suffix_;
+ ip_prefix_t secure_prefix_;
+ Prefix producer_namespace_;
+ ConsumerSocket::ReadCallback *read_callback_decrypted_;
+ std::mutex mtx_;
+ /* Condition variable for the wait */
+ std::condition_variable cv_;
+ int protocol_;
+ void setInterestPayload(ConsumerSocket &c, const core::Interest &interest);
+ void processPayload(ConsumerSocket &c, std::size_t bytes_transferred,
+ const std::error_code &ec);
+ static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context,
+ const unsigned char **out, size_t *outlen, X509 *x,
+ size_t chainidx, int *al, void *add_arg);
+ static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context, const unsigned char *out,
+ void *add_arg);
+ static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context, const unsigned char *in,
+ size_t inlen, X509 *x, size_t chainidx, int *al,
+ void *add_arg);
+ virtual void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override;
+ virtual void readDataAvailable(size_t length) noexcept override;
+ virtual size_t maxBufferSize() const override;
+ virtual void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override;
+ virtual void readError(const std::error_code ec) noexcept override;
+ virtual void readSuccess(std::size_t total_size) noexcept override;
+ virtual bool isBufferMovable() noexcept override;
+ int download_content(const Name &name);
+} // namespace interface
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.cc
new file mode 100644
index 000000000..8850bde8a
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.cc
@@ -0,0 +1,380 @@
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
+#include <hicn/transport/interfaces/tls_rtc_socket_producer.h>
+#include <hicn/transport/interfaces/tls_socket_producer.h>
+#include <openssl/bio.h>
+#include <openssl/rand.h>
+#include <openssl/ssl.h>
+namespace transport {
+namespace interface {
+/* Workaround to prevent content with expiry time equal to 0 to be lost when
+ * pushed in the forwarder */
+ : ProducerSocket(),
+ mtx_(),
+ cv_(),
+ map_secure_producers(),
+ map_secure_rtc_producers(),
+ list_secure_producers() {}
+ bool rtc, const std::shared_ptr<utils::Identity> &identity)
+ : ProducerSocket(),
+ rtc_(rtc),
+ mtx_(),
+ cv_(),
+ map_secure_producers(),
+ map_secure_rtc_producers(),
+ list_secure_producers() {
+ /*
+ * Setup SSL context (identity and parameter to use TLS 1.3)
+ */
+ der_cert_ = parcKeyStore_GetDEREncodedCertificate(
+ (identity->getSigner()->getKeyStore()));
+ der_prk_ = parcKeyStore_GetDEREncodedPrivateKey(
+ (identity->getSigner()->getKeyStore()));
+ int cert_size = parcBuffer_Limit(der_cert_);
+ int prk_size = parcBuffer_Limit(der_prk_);
+ const uint8_t *cert =
+ reinterpret_cast<uint8_t *>(parcBuffer_Overlay(der_cert_, cert_size));
+ const uint8_t *prk =
+ reinterpret_cast<uint8_t *>(parcBuffer_Overlay(der_prk_, prk_size));
+ cert_509_ = d2i_X509(NULL, &cert, cert_size);
+ pkey_rsa_ = d2i_AutoPrivateKey(NULL, &prk, prk_size);
+ /*
+ * Set the callback so that when an interest is received we catch it and we
+ * decrypt the payload before passing it to the application.
+ */
+ ProducerSocket::setSocketOption(
+ ProducerCallbacksOptions::INTEREST_INPUT,
+ (ProducerInterestCallback)std::bind(
+ &P2PSecureProducerSocket::onInterestCallback, this,
+ std::placeholders::_1, std::placeholders::_2));
+P2PSecureProducerSocket::~P2PSecureProducerSocket() {
+ if (der_cert_) parcBuffer_Release(&der_cert_);
+ if (der_prk_) parcBuffer_Release(&der_prk_);
+void P2PSecureProducerSocket::onInterestCallback(ProducerSocket &p,
+ Interest &interest) {
+ std::unique_lock<std::mutex> lck(mtx_);
+ TRANSPORT_LOGD("Start handshake at %s", interest.getName().toString().c_str());
+ if (!rtc_) {
+ auto it = map_secure_producers.find(interest.getName());
+ if (it != map_secure_producers.end()) return;
+ TLSProducerSocket *tls_producer =
+ new TLSProducerSocket(this, interest.getName());
+ tls_producer->on_content_produced_application_ =
+ this->on_content_produced_application_;
+ tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
+ this->content_object_expiry_time_);
+ tls_producer->setSocketOption(SIGNER, this->signer_);
+ tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
+ tls_producer->setSocketOption(DATA_PACKET_SIZE,
+ (uint32_t)(this->data_packet_size_));
+ tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
+ map_secure_producers.insert(
+ {interest.getName(), std::unique_ptr<TLSProducerSocket>(tls_producer)});
+ tls_producer->onInterest(*tls_producer, interest);
+ tls_producer->async_accept();
+ } else {
+ auto it = map_secure_rtc_producers.find(interest.getName());
+ if (it != map_secure_rtc_producers.end()) return;
+ TLSRTCProducerSocket *tls_producer =
+ new TLSRTCProducerSocket(this, interest.getName());
+ tls_producer->on_content_produced_application_ =
+ this->on_content_produced_application_;
+ tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
+ this->content_object_expiry_time_);
+ tls_producer->setSocketOption(SIGNER, this->signer_);
+ tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
+ tls_producer->setSocketOption(DATA_PACKET_SIZE,
+ (uint32_t)(this->data_packet_size_));
+ tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
+ map_secure_rtc_producers.insert(
+ {interest.getName(),
+ std::unique_ptr<TLSRTCProducerSocket>(tls_producer)});
+ tls_producer->onInterest(*tls_producer, interest);
+ tls_producer->async_accept();
+ }
+void P2PSecureProducerSocket::produce(const uint8_t *buffer,
+ size_t buffer_size) {
+ if (!rtc_) {
+ throw errors::RuntimeException(
+ "RTC must be the transport protocol to start the production of current "
+ "data. Aborting.");
+ }
+ std::unique_lock<std::mutex> lck(mtx_);
+ if (list_secure_rtc_producers.empty()) cv_.wait(lck);
+ for (auto it = list_secure_rtc_producers.cbegin();
+ it != list_secure_rtc_producers.cend(); it++) {
+ (*it)->produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
+ }
+uint32_t P2PSecureProducerSocket::produce(
+ Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last,
+ uint32_t start_offset) {
+ if (rtc_) {
+ throw errors::RuntimeException(
+ "RTC transport protocol is not compatible with the production of "
+ "current data. Aborting.");
+ }
+ std::unique_lock<std::mutex> lck(mtx_);
+ uint32_t segments = 0;
+ if (list_secure_producers.empty()) cv_.wait(lck);
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ segments +=
+ (*it)->produce(content_name, buffer->clone(), is_last, start_offset);
+ return segments;
+uint32_t P2PSecureProducerSocket::produce(Name content_name,
+ const uint8_t *buffer,
+ size_t buffer_size, bool is_last,
+ uint32_t start_offset) {
+ if (rtc_) {
+ throw errors::RuntimeException(
+ "RTC transport protocol is not compatible with the production of "
+ "current data. Aborting.");
+ }
+ std::unique_lock<std::mutex> lck(mtx_);
+ uint32_t segments = 0;
+ if (list_secure_producers.empty()) cv_.wait(lck);
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ segments += (*it)->produce(content_name, buffer, buffer_size, is_last,
+ start_offset);
+ return segments;
+void P2PSecureProducerSocket::asyncProduce(const Name &content_name,
+ const uint8_t *buf,
+ size_t buffer_size, bool is_last,
+ uint32_t *start_offset) {
+ if (rtc_) {
+ throw errors::RuntimeException(
+ "RTC transport protocol is not compatible with the production of "
+ "current data. Aborting.");
+ }
+ std::unique_lock<std::mutex> lck(mtx_);
+ if (list_secure_producers.empty()) cv_.wait(lck);
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++) {
+ (*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset);
+ }
+void P2PSecureProducerSocket::asyncProduce(
+ Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last,
+ uint32_t offset, uint32_t **last_segment) {
+ if (rtc_) {
+ throw errors::RuntimeException(
+ "RTC transport protocol is not compatible with the production of "
+ "current data. Aborting.");
+ }
+ std::unique_lock<std::mutex> lck(mtx_);
+ if (list_secure_producers.empty()) cv_.wait(lck);
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++) {
+ (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset,
+ last_segment);
+ }
+// Socket Option Redefinition to avoid name hiding
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, ProducerInterestCallback socket_option_value) {
+ if (!list_secure_producers.empty()) {
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ }
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ on_interest_input_decrypted_ = socket_option_value;
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ on_interest_dropped_input_buffer_ = socket_option_value;
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ on_interest_inserted_input_buffer_ = socket_option_value;
+ case ProducerCallbacksOptions::CACHE_HIT:
+ on_interest_satisfied_output_buffer_ = socket_option_value;
+ case ProducerCallbacksOptions::CACHE_MISS:
+ on_interest_process_decrypted_ = socket_option_value;
+ default:
+ }
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key,
+ const std::shared_ptr<utils::Signer> &socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ switch (socket_option_key) {
+ case GeneralTransportOptions::SIGNER: {
+ signer_.reset();
+ signer_ = socket_option_value;
+ }
+ default:
+ }
+int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) {
+ if (!list_secure_producers.empty()) {
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ }
+ switch (socket_option_key) {
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ content_object_expiry_time_ =
+ socket_option_value; // HICN_HANDSHAKE_CONTENT_EXPIRY_TIME;
+ }
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
+ bool socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
+ Name *socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, std::list<Prefix> socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, ProducerContentObjectCallback socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, ProducerContentCallback socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ on_content_produced_application_ = socket_option_value;
+ break;
+ default:
+ }
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, HashAlgorithm socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, utils::CryptoSuite socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, const std::string &socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+} // namespace interface
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.h b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.h
new file mode 100644
index 000000000..ba3fa0189
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.h
@@ -0,0 +1,129 @@
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/interfaces/tls_socket_producer.h>
+#include <hicn/transport/interfaces/tls_rtc_socket_producer.h>
+#include <hicn/transport/utils/content_store.h>
+#include <hicn/transport/utils/identity.h>
+#include <hicn/transport/utils/signer.h>
+#include <openssl/ssl.h>
+#include <condition_variable>
+#include <forward_list>
+#include <mutex>
+namespace transport {
+namespace interface {
+class P2PSecureProducerSocket : public ProducerSocket {
+ friend class TLSProducerSocket;
+ friend class TLSRTCProducerSocket;
+ public:
+ explicit P2PSecureProducerSocket();
+ explicit P2PSecureProducerSocket(
+ bool rtc, const std::shared_ptr<utils::Identity> &identity);
+ ~P2PSecureProducerSocket();
+ void produce(const uint8_t *buffer, size_t buffer_size) override;
+ uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size,
+ bool is_last = true, uint32_t start_offset = 0) override;
+ uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true, uint32_t start_offset = 0) override;
+ void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size,
+ bool is_last = true,
+ uint32_t *start_offset = nullptr) override;
+ void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t offset,
+ uint32_t **last_segment = nullptr) override;
+ int setSocketOption(int socket_option_key,
+ ProducerInterestCallback socket_option_value) override;
+ int setSocketOption(
+ int socket_option_key,
+ const std::shared_ptr<utils::Signer> &socket_option_value) override;
+ int setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) override;
+ int setSocketOption(int socket_option_key, bool socket_option_value) override;
+ int setSocketOption(int socket_option_key,
+ Name *socket_option_value) override;
+ int setSocketOption(int socket_option_key,
+ std::list<Prefix> socket_option_value) override;
+ int setSocketOption(
+ int socket_option_key,
+ ProducerContentObjectCallback socket_option_value) override;
+ int setSocketOption(int socket_option_key,
+ ProducerContentCallback socket_option_value) override;
+ int setSocketOption(int socket_option_key,
+ HashAlgorithm socket_option_value) override;
+ int setSocketOption(int socket_option_key,
+ utils::CryptoSuite socket_option_value) override;
+ int setSocketOption(int socket_option_key,
+ const std::string &socket_option_value) override;
+ using ProducerSocket::getSocketOption;
+ using ProducerSocket::onInterest;
+ protected:
+ bool rtc_;
+ /* Callback invoked once an interest has been received and its payload
+ * decrypted */
+ ProducerInterestCallback on_interest_input_decrypted_;
+ ProducerInterestCallback on_interest_process_decrypted_;
+ ProducerContentCallback on_content_produced_application_;
+ private:
+ std::mutex mtx_;
+ /* Condition variable for the wait */
+ std::condition_variable cv_;
+ PARCBuffer *der_cert_;
+ PARCBuffer *der_prk_;
+ X509 *cert_509_;
+ EVP_PKEY *pkey_rsa_;
+ std::unordered_map<core::Name, std::unique_ptr<TLSProducerSocket>,
+ core::hash<core::Name>, core::compare2<core::Name>>
+ map_secure_producers;
+ std::unordered_map<core::Name, std::unique_ptr<TLSRTCProducerSocket>,
+ core::hash<core::Name>, core::compare2<core::Name>>
+ map_secure_rtc_producers;
+ std::list<std::unique_ptr<TLSProducerSocket>> list_secure_producers;
+ std::list<std::unique_ptr<TLSRTCProducerSocket>> list_secure_rtc_producers;
+ void onInterestCallback(ProducerSocket &p, Interest &interest);
+} // namespace interface
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index bb93e0535..fefa419a9 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -171,6 +171,7 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
on_content_object_in_output_buffer_(*this, *content_object);
+ TRANSPORT_LOGD("Send content %u (produce)", content_object->getName().getSuffix());
if (on_content_object_output_) {
@@ -222,6 +223,7 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
on_content_object_output_(*this, *content_object);
+ TRANSPORT_LOGD("Send content %u (onInterest)", content_object->getName().getSuffix());
} else {
@@ -357,6 +359,7 @@ void RTCProducerSocket::sendNack(uint32_t sequence) {
on_content_object_output_(*this, nack);
+ TRANSPORT_LOGD("Send nack %u", sequence);
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index 37ba88d8a..d7917a8c0 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -26,7 +26,7 @@ namespace transport {
namespace interface {
-class RTCProducerSocket : public ProducerSocket {
+class RTCProducerSocket : virtual public ProducerSocket {
RTCProducerSocket(asio::io_service &io_service);
@@ -36,7 +36,7 @@ class RTCProducerSocket : public ProducerSocket {
void registerPrefix(const Prefix &producer_namespace) override;
- void produce(std::unique_ptr<utils::MemBuf> &&buffer) override;
+ virtual void produce(std::unique_ptr<utils::MemBuf> &&buffer) override;
void onInterest(Interest::Ptr &&interest) override;
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
index fba972fe5..b2c054947 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -48,6 +48,7 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service)
+ key_content_(false),
@@ -106,9 +107,13 @@ int ConsumerSocket::asyncConsume(const Name &name) {
+bool ConsumerSocket::verifyKeyPackets() {
+ return transport_protocol_->verifyKeyPackets();
void ConsumerSocket::stop() {
- if (transport_protocol_->isRunning()) {
- transport_protocol_->stop();
+ if (transport_protocol_) {
+ if (transport_protocol_->isRunning()) transport_protocol_->stop();
@@ -312,6 +317,11 @@ int ConsumerSocket::setSocketOption(int socket_option_key,
+ case GeneralTransportOptions::KEY_CONTENT:
+ key_content_ = socket_option_value;
+ break;
return result;
@@ -461,6 +471,7 @@ int ConsumerSocket::setSocketOption(
if (!transport_protocol_->isRunning()) {
switch (socket_option_key) {
case GeneralTransportOptions::VERIFIER:
+ verifier_.reset();
verifier_ = socket_option_value;
@@ -479,10 +490,7 @@ int ConsumerSocket::setSocketOption(int socket_option_key,
switch (socket_option_key) {
case GeneralTransportOptions::CERTIFICATE:
key_id_ = verifier_->addKeyFromCertificate(socket_option_value);
- if (key_id_ != nullptr) {
- }
+ if (key_id_ != nullptr) result = SOCKET_OPTION_SET;
case DataLinkOptions::OUTPUT_INTERFACE:
@@ -614,6 +622,10 @@ int ConsumerSocket::getSocketOption(int socket_option_key,
socket_option_value = verify_signature_;
+ case GeneralTransportOptions::KEY_CONTENT:
+ socket_option_value = key_content_;
+ break;
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index acce28c1d..48a594adf 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -132,6 +132,8 @@ class ConsumerSocket : public BaseSocket {
* the application when the transfer is done.
virtual void readSuccess(std::size_t total_size) noexcept = 0;
+ virtual void afterRead() {}
@@ -181,8 +183,16 @@ class ConsumerSocket : public BaseSocket {
* content retrieval succeeded. This information can be obtained from the
* error code in CONTENT_RETRIEVED callback.
- int consume(const Name &name);
- int asyncConsume(const Name &name);
+ virtual int consume(const Name &name);
+ virtual int asyncConsume(const Name &name);
+ /**
+ * Verify the packets containing a key after the origin of the key has been
+ * validated by the client.
+ *
+ * @return true if all packets are valid, false otherwise
+ */
+ virtual bool verifyKeyPackets();
* Stops the consumer socket. If several downloads are queued (using
@@ -330,16 +340,14 @@ class ConsumerSocket : public BaseSocket {
return result;
- private:
+ // context inner state variables
asio::io_service internal_io_service_;
asio::io_service &io_service_;
std::shared_ptr<Portal> portal_;
utils::EventThread async_downloader_;
- // No need to protect from multiple accesses in the async consumer
- // The parameter is accessible only with a getSocketOption and
- // set from the consume
Name network_name_;
int interest_lifetime_;
@@ -362,17 +370,22 @@ class ConsumerSocket : public BaseSocket {
int rate_estimation_batching_parameter_;
int rate_estimation_choice_;
+ bool is_async_;
// Verification parameters
std::shared_ptr<utils::Verifier> verifier_;
PARCKeyId *key_id_;
std::atomic_bool verify_signature_;
+ std::atomic_bool key_content_;
ConsumerInterestCallback on_interest_retransmission_;
ConsumerInterestCallback on_interest_output_;
ConsumerInterestCallback on_interest_timeout_;
ConsumerInterestCallback on_interest_satisfied_;
ConsumerContentObjectCallback on_content_object_input_;
ConsumerContentObjectVerificationCallback on_content_object_verification_;
ConsumerContentObjectCallback on_content_object_;
ConsumerManifestCallback on_manifest_;
ConsumerTimerCallback stats_summary_;
@@ -396,4 +409,4 @@ class ConsumerSocket : public BaseSocket {
} // namespace interface
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
index b25bacbb9..a38131271 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
@@ -34,7 +34,7 @@ typedef enum {
- KEY_LOCATOR = 110,
+ KEY_CONTENT = 110,
@@ -45,11 +45,11 @@ typedef enum {
- IDENTITY = 121,
+ SIGNER = 121,
} GeneralTransportOptions;
typedef enum {
@@ -92,7 +92,7 @@ typedef enum {
} ProducerCallbacksOptions;
typedef enum { OUTPUT_INTERFACE = 601 } DataLinkOptions;
@@ -110,4 +110,4 @@ typedef enum {
} // namespace interface
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 4fef5d1e2..26a7208b6 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -14,7 +14,7 @@
#include <hicn/transport/interfaces/socket_producer.h>
-#include <hicn/transport/utils/identity.h>
+#include <hicn/transport/utils/signer.h>
#include <functional>
@@ -35,6 +35,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
+ async_thread_(),
@@ -96,51 +97,47 @@ void ProducerSocket::listen() {
void ProducerSocket::passContentObjectToCallbacks(
const std::shared_ptr<ContentObject> &content_object) {
if (content_object) {
- if (on_new_segment_) {
- io_service_.dispatch([this, content_object]() {
+ io_service_.dispatch([this, content_object]() {
+ if (on_new_segment_) {
on_new_segment_(*this, *content_object);
- });
- }
+ }
- if (on_content_object_to_sign_) {
- io_service_.dispatch([this, content_object]() {
+ if (on_content_object_to_sign_) {
on_content_object_to_sign_(*this, *content_object);
- });
- }
+ }
- if (on_content_object_in_output_buffer_) {
- io_service_.dispatch([this, content_object]() {
+ if (on_content_object_in_output_buffer_) {
on_content_object_in_output_buffer_(*this, *content_object);
- });
- }
+ }
+ });
- if (on_content_object_output_) {
- io_service_.dispatch([this, content_object]() {
+ io_service_.dispatch([this, content_object]() {
+ if (on_content_object_output_) {
on_content_object_output_(*this, *content_object);
- });
- }
+ }
+ });
void ProducerSocket::produce(ContentObject &content_object) {
- if (on_content_object_in_output_buffer_) {
- io_service_.dispatch([this, &content_object]() {
+ io_service_.dispatch([this, &content_object]() {
+ if (on_content_object_in_output_buffer_) {
on_content_object_in_output_buffer_(*this, content_object);
- });
- }
+ }
+ });
- if (on_content_object_output_) {
- io_service_.dispatch([this, &content_object]() {
+ io_service_.dispatch([this, &content_object]() {
+ if (on_content_object_output_) {
on_content_object_output_(*this, content_object);
- });
- }
+ }
+ });
@@ -160,8 +157,8 @@ uint32_t ProducerSocket::produce(Name content_name,
bool making_manifest = making_manifest_;
auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy(
suffix_strategy_, start_offset);
- std::shared_ptr<utils::Identity> identity;
- getSocketOption(GeneralTransportOptions::IDENTITY, identity);
+ std::shared_ptr<utils::Signer> signer;
+ getSocketOption(GeneralTransportOptions::SIGNER, signer);
auto buffer_size = buffer->length();
int bytes_segmented = 0;
@@ -176,7 +173,7 @@ uint32_t ProducerSocket::produce(Name content_name,
bool is_last_manifest = false;
// TODO Manifest may still be used for indexing
- if (making_manifest && !identity) {
+ if (making_manifest && !signer) {
TRANSPORT_LOGD("Making manifests without setting producer identity.");
@@ -197,11 +194,11 @@ uint32_t ProducerSocket::produce(Name content_name,
format = hf_format;
if (making_manifest) {
manifest_header_size = core::Packet::getHeaderSizeFromFormat(
- identity ? hf_format_ah : hf_format,
- identity ? identity->getSignatureLength() : 0);
- } else if (identity) {
+ signer ? hf_format_ah : hf_format,
+ signer ? signer->getSignatureLength() : 0);
+ } else if (signer) {
format = hf_format_ah;
- signature_length = identity->getSignatureLength();
+ signature_length = signer->getSignatureLength();
header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length);
@@ -227,7 +224,7 @@ uint32_t ProducerSocket::produce(Name content_name,
core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
hash_algo, is_last_manifest, content_name, suffix_strategy_,
- identity ? identity->getSignatureLength() : 0));
+ signer ? signer->getSignatureLength() : 0));
if (is_last) {
@@ -237,6 +234,7 @@ uint32_t ProducerSocket::produce(Name content_name,
+ TRANSPORT_LOGD("--------- START PRODUCE ----------");
for (unsigned int packaged_segments = 0;
packaged_segments < number_of_segments; packaged_segments++) {
if (making_manifest) {
@@ -246,15 +244,18 @@ uint32_t ProducerSocket::produce(Name content_name,
// If identity set, sign manifest
- if (identity) {
- identity->getSigner().sign(*manifest);
+ if (signer) {
+ signer->sign(*manifest);
+ TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
// Send content objects stored in the queue
while (!content_queue_.empty()) {
+ TRANSPORT_LOGD("Send content %u",
+ content_queue_.front()->getName().getSuffix());
@@ -266,7 +267,7 @@ uint32_t ProducerSocket::produce(Name content_name,
core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
content_name, suffix_strategy_,
- identity ? identity->getSignatureLength() : 0));
+ signer ? signer->getSignatureLength() : 0));
@@ -307,10 +308,11 @@ uint32_t ProducerSocket::produce(Name content_name,
manifest->addSuffixHash(content_suffix, hash);
} else {
- if (identity) {
- identity->getSigner().sign(*content_object);
+ if (signer) {
+ signer->sign(*content_object);
+ TRANSPORT_LOGD("Send content %u", content_object->getName().getSuffix());
@@ -320,24 +322,28 @@ uint32_t ProducerSocket::produce(Name content_name,
- if (identity) {
- identity->getSigner().sign(*manifest);
+ if (signer) {
+ signer->sign(*manifest);
+ TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
while (!content_queue_.empty()) {
+ TRANSPORT_LOGD("Send content %u",
+ content_queue_.front()->getName().getSuffix());
- if (on_content_produced_) {
- io_service_.dispatch([this, buffer_size]() {
+ io_service_.dispatch([this, buffer_size]() {
+ if (on_content_produced_) {
on_content_produced_(*this, std::make_error_code(std::errc(0)),
- });
- }
+ }
+ });
+ TRANSPORT_LOGD("--------- END PRODUCE ------------");
return suffix_strategy->getTotalCount();
@@ -346,16 +352,42 @@ void ProducerSocket::asyncProduce(ContentObject &content_object) {
auto co_ptr = std::static_pointer_cast<ContentObject>(
async_thread_.add([this, content_object = std::move(co_ptr)]() {
- produce(*content_object);
+ ProducerSocket::produce(*content_object);
void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf,
- size_t buffer_size) {
+ size_t buffer_size, bool is_last,
+ uint32_t *start_offset) {
if (!async_thread_.stopped()) {
- async_thread_.add([this, suffix, buffer = buf, size = buffer_size]() {
- produce(suffix, buffer, size, 0, false);
+ async_thread_.add([this, suffix, buffer = buf, size = buffer_size, is_last,
+ start_offset]() {
+ if (start_offset != NULL) {
+ *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last,
+ *start_offset);
+ } else {
+ ProducerSocket::produce(suffix, buffer, size, is_last, 0);
+ }
+ });
+ }
+void ProducerSocket::asyncProduce(Name content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t offset,
+ uint32_t **last_segment) {
+ if (!async_thread_.stopped()) {
+ auto a = buffer.release();
+ async_thread_.add([this, content_name, a, is_last, offset, last_segment]() {
+ auto buf = std::unique_ptr<utils::MemBuf>(a);
+ if (last_segment != NULL) {
+ **last_segment =
+ offset + ProducerSocket::produce(content_name, std::move(buf),
+ is_last, offset);
+ } else {
+ ProducerSocket::produce(content_name, std::move(buf), is_last, offset);
+ }
@@ -392,8 +424,8 @@ int ProducerSocket::setSocketOption(int socket_option_key,
if (socket_option_value < default_values::max_content_object_size &&
socket_option_value > 0) {
data_packet_size_ = socket_option_value;
- break;
+ break;
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
@@ -632,12 +664,12 @@ int ProducerSocket::setSocketOption(int socket_option_key,
int ProducerSocket::setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Identity> &socket_option_value) {
+ const std::shared_ptr<utils::Signer> &socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::IDENTITY: {
- utils::SpinLock::Acquire locked(identity_lock_);
- identity_.reset();
- identity_ = socket_option_value;
+ case GeneralTransportOptions::SIGNER: {
+ utils::SpinLock::Acquire locked(signer_lock_);
+ signer_.reset();
+ signer_ = socket_option_value;
} break;
@@ -844,11 +876,11 @@ int ProducerSocket::getSocketOption(int socket_option_key,
int ProducerSocket::getSocketOption(
int socket_option_key,
- std::shared_ptr<utils::Identity> &socket_option_value) {
+ std::shared_ptr<utils::Signer> &socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::IDENTITY: {
- utils::SpinLock::Acquire locked(identity_lock_);
- socket_option_value = identity_;
+ case GeneralTransportOptions::SIGNER: {
+ utils::SpinLock::Acquire locked(signer_lock_);
+ socket_option_value = signer_;
} break;
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index ff6f49723..5f360f2ce 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -18,10 +18,12 @@
#include <hicn/transport/interfaces/socket.h>
#include <hicn/transport/utils/content_store.h>
#include <hicn/transport/utils/event_thread.h>
+#include <hicn/transport/utils/signer.h>
#include <hicn/transport/utils/suffix_strategy.h>
#include <atomic>
#include <cmath>
+#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
@@ -31,10 +33,6 @@
-namespace utils {
-class Identity;
namespace transport {
namespace interface {
@@ -53,16 +51,19 @@ class ProducerSocket : public Socket<BasePortal>,
bool isRunning() override { return !io_service_.stopped(); };
- uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size,
- bool is_last = true, uint32_t start_offset = 0) {
- return produce(content_name, utils::MemBuf::copyBuffer(buffer, buffer_size),
- is_last, start_offset);
+ virtual uint32_t produce(Name content_name, const uint8_t *buffer,
+ size_t buffer_size, bool is_last = true,
+ uint32_t start_offset = 0) {
+ return ProducerSocket::produce(
+ content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), is_last,
+ start_offset);
- uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last = true, uint32_t start_offset = 0);
+ virtual uint32_t produce(Name content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true, uint32_t start_offset = 0);
- void produce(ContentObject &content_object);
+ virtual void produce(ContentObject &content_object);
virtual void produce(const uint8_t *buffer, size_t buffer_size) {
produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
@@ -74,11 +75,18 @@ class ProducerSocket : public Socket<BasePortal>,
throw errors::NotImplementedException();
- void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size);
+ virtual void asyncProduce(const Name &suffix, const uint8_t *buf,
+ size_t buffer_size, bool is_last = true,
+ uint32_t *start_offset = nullptr);
void asyncProduce(const Name &suffix);
- void asyncProduce(ContentObject &content_object);
+ virtual void asyncProduce(Name content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t offset,
+ uint32_t **last_segment = nullptr);
+ virtual void asyncProduce(ContentObject &content_object);
virtual void registerPrefix(const Prefix &producer_namespace);
@@ -124,7 +132,7 @@ class ProducerSocket : public Socket<BasePortal>,
virtual int setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Identity> &socket_option_value);
+ const std::shared_ptr<utils::Signer> &socket_option_value);
virtual int setSocketOption(int socket_option_key,
const std::string &socket_option_value);
@@ -158,15 +166,77 @@ class ProducerSocket : public Socket<BasePortal>,
virtual int getSocketOption(
int socket_option_key,
- std::shared_ptr<utils::Identity> &socket_option_value);
+ std::shared_ptr<utils::Signer> &socket_option_value);
virtual int getSocketOption(int socket_option_key,
std::string &socket_option_value);
- protected:
+ // If the thread calling lambda_func is not the same of io_service, this
+ // function reschedule the function on it
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
+ Lambda lambda_func) {
+ // To enforce type check
+ std::function<int(int, arg2)> func = lambda_func;
+ int result = SOCKET_OPTION_SET;
+ if (listening_thread_.joinable() &&
+ std::this_thread::get_id() != listening_thread_.get_id()) {
+ std::mutex mtx;
+ /* Condition variable for the wait */
+ std::condition_variable cv;
+ bool done = false;
+ io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
+ std::unique_lock<std::mutex> lck(mtx);
+ done = true;
+ result = func(socket_option_key, socket_option_value);
+ cv.notify_all();
+ });
+ std::unique_lock<std::mutex> lck(mtx);
+ if (!done) {
+ cv.wait(lck);
+ }
+ } else {
+ result = func(socket_option_key, socket_option_value);
+ }
+ return result;
+ }
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOServiceWithReference(int socket_option_key,
+ arg2 &socket_option_value,
+ Lambda lambda_func) {
+ // To enforce type check
+ std::function<int(int, arg2 &)> func = lambda_func;
+ int result = SOCKET_OPTION_SET;
+ if (listening_thread_.joinable() &&
+ std::this_thread::get_id() != this->listening_thread_.get_id()) {
+ std::mutex mtx;
+ /* Condition variable for the wait */
+ std::condition_variable cv;
+ bool done = false;
+ io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
+ std::unique_lock<std::mutex> lck(mtx);
+ done = true;
+ result = func(socket_option_key, socket_option_value);
+ cv.notify_all();
+ });
+ std::unique_lock<std::mutex> lck(mtx);
+ if (!done) {
+ cv.wait(lck);
+ }
+ } else {
+ result = func(socket_option_key, socket_option_value);
+ }
+ return result;
+ }
// Threads
+ protected:
std::thread listening_thread_;
asio::io_service internal_io_service_;
asio::io_service &io_service_;
std::shared_ptr<Portal> portal_;
@@ -191,8 +261,8 @@ class ProducerSocket : public Socket<BasePortal>,
std::atomic<HashAlgorithm> hash_algorithm_;
std::atomic<utils::CryptoSuite> crypto_suite_;
- utils::SpinLock identity_lock_;
- std::shared_ptr<utils::Identity> identity_;
+ utils::SpinLock signer_lock_;
+ std::shared_ptr<utils::Signer> signer_;
core::NextSegmentCalculationStrategy suffix_strategy_;
// While manifests are being built, contents are stored in a queue
@@ -213,38 +283,6 @@ class ProducerSocket : public Socket<BasePortal>,
ProducerContentCallback on_content_produced_;
- // If the thread calling lambda_func is not the same of io_service, this
- // function reschedule the function on it
- template <typename Lambda, typename arg2>
- int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
- Lambda lambda_func) {
- // To enforce type check
- std::function<int(int, arg2)> func = lambda_func;
- int result = SOCKET_OPTION_SET;
- if (listening_thread_.joinable() &&
- std::this_thread::get_id() != listening_thread_.get_id()) {
- std::mutex mtx;
- /* Condition variable for the wait */
- std::condition_variable cv;
- bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
- std::unique_lock<std::mutex> lck(mtx);
- done = true;
- result = func(socket_option_key, socket_option_value);
- cv.notify_all();
- });
- std::unique_lock<std::mutex> lck(mtx);
- if (!done) {
- cv.wait(lck);
- }
- } else {
- result = func(socket_option_key, socket_option_value);
- }
- return result;
- }
void listen();
diff --git a/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.cc
new file mode 100644
index 000000000..27c1e54bd
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.cc
@@ -0,0 +1,178 @@
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
+#include <hicn/transport/interfaces/tls_rtc_socket_producer.h>
+#include <openssl/bio.h>
+#include <openssl/rand.h>
+#include <openssl/ssl.h>
+namespace transport {
+namespace interface {
+int TLSRTCProducerSocket::read(BIO *b, char *buf, size_t size,
+ size_t *readbytes) {
+ int ret;
+ if (size > INT_MAX) size = INT_MAX;
+ ret = TLSRTCProducerSocket::readOld(b, buf, (int)size);
+ if (ret <= 0) {
+ *readbytes = 0;
+ return ret;
+ }
+ *readbytes = (size_t)ret;
+ return 1;
+int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) {
+ TLSRTCProducerSocket *socket;
+ socket = (TLSRTCProducerSocket *)BIO_get_data(b);
+ std::unique_lock<std::mutex> lck(socket->mtx_);
+ if (!socket->something_to_read_) {
+ (socket->cv_).wait(lck);
+ }
+ utils::MemBuf *membuf = socket->packet_->next();
+ int size_to_read;
+ if ((int)membuf->length() > size) {
+ size_to_read = size;
+ } else {
+ size_to_read = membuf->length();
+ socket->something_to_read_ = false;
+ }
+ std::memcpy(buf, membuf->data(), size_to_read);
+ membuf->trimStart(size_to_read);
+ return size_to_read;
+int TLSRTCProducerSocket::write(BIO *b, const char *buf, size_t size,
+ size_t *written) {
+ int ret;
+ if (size > INT_MAX) size = INT_MAX;
+ ret = TLSRTCProducerSocket::writeOld(b, buf, (int)size);
+ if (ret <= 0) {
+ *written = 0;
+ return ret;
+ }
+ *written = (size_t)ret;
+ return 1;
+int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) {
+ TLSRTCProducerSocket *socket;
+ socket = (TLSRTCProducerSocket *)BIO_get_data(b);
+ if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) &&
+ socket->first_) {
+ socket->tls_chunks_--;
+ bool making_manifest = socket->parent_->making_manifest_;
+ socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
+ false);
+ socket->parent_->ProducerSocket::produce(
+ socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, 0);
+ socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
+ making_manifest);
+ socket->first_ = false;
+ } else {
+ std::unique_ptr<utils::MemBuf> mbuf =
+ utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0);
+ auto a = mbuf.release();
+ socket->async_thread_.add([socket = socket, a]() {
+ socket->to_call_oncontentproduced_--;
+ auto mbuf = std::unique_ptr<utils::MemBuf>(a);
+ socket->RTCProducerSocket::produce(std::move(mbuf));
+ ProducerContentCallback on_content_produced_application;
+ socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
+ on_content_produced_application);
+ if (socket->to_call_oncontentproduced_ == 0 &&
+ on_content_produced_application) {
+ on_content_produced_application(*socket, std::error_code(), 0);
+ }
+ });
+ }
+ return num;
+TLSRTCProducerSocket::TLSRTCProducerSocket(P2PSecureProducerSocket *parent,
+ const Name &handshake_name)
+ : RTCProducerSocket(), TLSProducerSocket(parent, handshake_name) {
+ BIO_METHOD *bio_meth =
+ BIO_meth_new(BIO_TYPE_ACCEPT, "secure rtc producer socket");
+ BIO_meth_set_read(bio_meth, TLSRTCProducerSocket::readOld);
+ BIO_meth_set_write(bio_meth, TLSRTCProducerSocket::writeOld);
+ BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl);
+ BIO *bio = BIO_new(bio_meth);
+ BIO_set_init(bio, 1);
+ BIO_set_data(bio, this);
+ SSL_set_bio(ssl_, bio, bio);
+void TLSRTCProducerSocket::accept() {
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ tls_chunks_ = 1;
+ int result = SSL_accept(ssl_);
+ if (result != 1)
+ throw errors::RuntimeException("Unable to perform client handshake");
+ }
+ TRANSPORT_LOGD("Handshake performed!");
+ parent_->list_secure_rtc_producers.push_front(
+ std::move(parent_->map_secure_rtc_producers[handshake_name_]));
+ parent_->map_secure_rtc_producers.erase(handshake_name_);
+ ProducerInterestCallback on_interest_process_decrypted;
+ getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ on_interest_process_decrypted);
+ if (on_interest_process_decrypted) {
+ Interest inter(std::move(packet_));
+ on_interest_process_decrypted(*this, inter);
+ }
+ parent_->cv_.notify_one();
+int TLSRTCProducerSocket::async_accept() {
+ if (!async_thread_.stopped()) {
+ async_thread_.add([this]() { this->TLSRTCProducerSocket::accept(); });
+ } else {
+ throw errors::RuntimeException(
+ "Async thread not running, impossible to perform handshake");
+ }
+ return 1;
+void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ throw errors::RuntimeException(
+ "New handshake on the same P2P secure producer socket not supported");
+ }
+ size_t buf_size = buffer->length();
+ tls_chunks_ = ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH);
+ to_call_oncontentproduced_ = tls_chunks_;
+ SSL_write(ssl_, buffer->data(), buf_size);
+ BIO *wbio = SSL_get_wbio(ssl_);
+ int i = BIO_flush(wbio);
+ (void)i; // To shut up gcc 5
+} // namespace interface
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.h
new file mode 100644
index 000000000..16125f889
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.h
@@ -0,0 +1,58 @@
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <hicn/transport/interfaces/rtc_socket_producer.h>
+#include <hicn/transport/interfaces/tls_socket_producer.h>
+namespace transport {
+namespace interface {
+class P2PSecureProducerSocket;
+class TLSRTCProducerSocket : public RTCProducerSocket,
+ public TLSProducerSocket {
+ friend class P2PSecureProducerSocket;
+ public:
+ explicit TLSRTCProducerSocket(P2PSecureProducerSocket *parent,
+ const Name &handshake_name);
+ ~TLSRTCProducerSocket() = default;
+ void produce(std::unique_ptr<utils::MemBuf> &&buffer) override;
+ void accept() override;
+ int async_accept() override;
+ using TLSProducerSocket::produce;
+ using TLSProducerSocket::onInterest;
+ protected:
+ static int read(BIO *b, char *buf, size_t size, size_t *readbytes);
+ static int readOld(BIO *h, char *buf, int size);
+ static int write(BIO *b, const char *buf, size_t size, size_t *written);
+ static int writeOld(BIO *h, const char *buf, int num);
+} // namespace interface
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.cc
new file mode 100644
index 000000000..58b3c1d7d
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.cc
@@ -0,0 +1,364 @@
+#include <hicn/transport/interfaces/tls_socket_consumer.h>
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+#include <openssl/tls1.h>
+#include <random>
+namespace transport {
+namespace interface {
+void TLSConsumerSocket::setInterestPayload(ConsumerSocket &c,
+ const core::Interest &interest) {
+ Interest &int2 = const_cast<Interest &>(interest);
+ random_suffix_ = int2.getName().getSuffix();
+ if (payload_ != NULL) int2.appendPayload(std::move(payload_));
+/* Return the number of read bytes in the return param */
+int readOldTLS(BIO *b, char *buf, int size) {
+ if (size < 0) return size;
+ TLSConsumerSocket *socket;
+ socket = (TLSConsumerSocket *)BIO_get_data(b);
+ std::unique_lock<std::mutex> lck(socket->mtx_);
+ if (!socket->something_to_read_) {
+ if (!socket->transport_protocol_->isRunning()) {
+ socket->network_name_.setSuffix(socket->random_suffix_);
+ socket->ConsumerSocket::asyncConsume(socket->network_name_);
+ }
+ if (!socket->something_to_read_) socket->cv_.wait(lck);
+ }
+ size_t size_to_read, read;
+ size_t chain_size = socket->head_->length();
+ if (socket->head_->isChained())
+ chain_size = socket->head_->computeChainDataLength();
+ if (chain_size > (size_t)size) {
+ read = size_to_read = (size_t)size;
+ } else {
+ read = size_to_read = chain_size;
+ socket->something_to_read_ = false;
+ }
+ while (size_to_read) {
+ if (socket->head_->length() < size_to_read) {
+ std::memcpy(buf, socket->head_->data(), socket->head_->length());
+ size_to_read -= socket->head_->length();
+ buf += socket->head_->length();
+ socket->head_ = socket->head_->pop();
+ } else {
+ std::memcpy(buf, socket->head_->data(), size_to_read);
+ socket->head_->trimStart(size_to_read);
+ size_to_read = 0;
+ }
+ }
+ return read;
+/* Return the number of read bytes in readbytes */
+int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes) {
+ int ret;
+ if (size > INT_MAX) size = INT_MAX;
+ ret = transport::interface::readOldTLS(b, buf, (int)size);
+ if (ret <= 0) {
+ *readbytes = 0;
+ return ret;
+ }
+ *readbytes = (size_t)ret;
+ return 1;
+/* Return the number of written bytes in the return param */
+int writeOldTLS(BIO *b, const char *buf, int num) {
+ TLSConsumerSocket *socket;
+ socket = (TLSConsumerSocket *)BIO_get_data(b);
+ socket->payload_ = utils::MemBuf::copyBuffer(buf, num);
+ socket->ConsumerSocket::setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &TLSConsumerSocket::setInterestPayload, socket, std::placeholders::_1,
+ std::placeholders::_2));
+ return num;
+/* Return the number of written bytes in written */
+int writeTLS(BIO *b, const char *buf, size_t size, size_t *written) {
+ int ret;
+ if (size > INT_MAX) size = INT_MAX;
+ ret = transport::interface::writeOldTLS(b, buf, (int)size);
+ if (ret <= 0) {
+ *written = 0;
+ return ret;
+ }
+ *written = (size_t)ret;
+ return 1;
+long ctrlTLS(BIO *b, int cmd, long num, void *ptr) { return 1; }
+TLSConsumerSocket::TLSConsumerSocket(int protocol, SSL *ssl)
+ : ConsumerSocket(protocol),
+ name_(),
+ buf_pool_(),
+ decrypted_content_(),
+ payload_(),
+ head_(),
+ something_to_read_(false),
+ content_downloaded_(false),
+ random_suffix_(),
+ secure_prefix_(),
+ producer_namespace_(),
+ read_callback_decrypted_(),
+ mtx_(),
+ cv_(),
+ async_downloader_tls_() {
+ /* Create the (d)TLS state */
+ const SSL_METHOD *meth = TLS_client_method();
+ ctx_ = SSL_CTX_new(meth);
+ int result =
+ SSL_CTX_set_ciphersuites(ctx_,
+ "SHA256:TLS_AES_128_GCM_SHA256");
+ if (result != 1) {
+ throw errors::RuntimeException(
+ "Unable to set cipher list on TLS subsystem. Aborting.");
+ }
+ SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL);
+ SSL_CTX_set_ssl_version(ctx_, meth);
+ ssl_ = ssl;
+ BIO_METHOD *bio_meth =
+ BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket");
+ BIO_meth_set_read(bio_meth, transport::interface::readOldTLS);
+ BIO_meth_set_write(bio_meth, transport::interface::writeOldTLS);
+ BIO_meth_set_ctrl(bio_meth, transport::interface::ctrlTLS);
+ BIO *bio = BIO_new(bio_meth);
+ BIO_set_init(bio, 1);
+ BIO_set_data(bio, this);
+ SSL_set_bio(ssl_, bio, bio);
+ ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
+ ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
+ std::default_random_engine generator;
+ std::uniform_int_distribution<int> distribution(
+ 1, std::numeric_limits<uint32_t>::max());
+ random_suffix_ = 0;
+ this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ this);
+int TLSConsumerSocket::consume(const Name &name,
+ std::unique_ptr<utils::MemBuf> &&buffer) {
+ this->payload_ = std::move(buffer);
+ this->ConsumerSocket::setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1,
+ std::placeholders::_2));
+ return consume(name);
+int TLSConsumerSocket::consume(const Name &name) {
+ if (transport_protocol_->isRunning()) {
+ }
+ if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
+ throw errors::RuntimeException("Handshake not performed");
+ }
+ return download_content(name);
+int TLSConsumerSocket::download_content(const Name &name) {
+ network_name_ = name;
+ network_name_.setSuffix(0);
+ something_to_read_ = false;
+ content_downloaded_ = false;
+ decrypted_content_ = utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH);
+ uint8_t *buf = decrypted_content_->writableData();
+ size_t size = 0;
+ int result = -1;
+ while (!content_downloaded_ || something_to_read_) {
+ if (decrypted_content_->tailroom() < SSL3_RT_MAX_PLAIN_LENGTH) {
+ decrypted_content_->appendChain(
+ utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH));
+ // decrypted_content_->computeChainDataLength();
+ buf = decrypted_content_->prev()->writableData();
+ } else {
+ buf = decrypted_content_->writableTail();
+ }
+ result = SSL_read(this->ssl_, buf, SSL3_RT_MAX_PLAIN_LENGTH);
+ /* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of
+ * the data has been fully downloaded */
+ /* ASSERT((result < SSL3_RT_MAX_PLAIN_LENGTH && content_downloaded_) || */
+ /* result == SSL3_RT_MAX_PLAIN_LENGTH); */
+ if (result >= 0) {
+ size += result;
+ decrypted_content_->prepend(result);
+ } else
+ throw errors::RuntimeException("Unable to download content");
+ if (size >= read_callback_decrypted_->maxBufferSize()) {
+ if (read_callback_decrypted_->isBufferMovable()) {
+ // No need to perform an additional copy. The whole buffer will be
+ // tranferred to the application.
+ read_callback_decrypted_->readBufferAvailable(
+ std::move(decrypted_content_));
+ decrypted_content_ = utils::MemBuf::create(SSL3_RT_MAX_PLAIN_LENGTH);
+ } else {
+ // The buffer will be copied into the application-provided buffer
+ uint8_t *buffer;
+ std::size_t length;
+ std::size_t total_length = decrypted_content_->length();
+ while (decrypted_content_->length()) {
+ buffer = nullptr;
+ length = 0;
+ read_callback_decrypted_->getReadBuffer(&buffer, &length);
+ if (!buffer || !length) {
+ throw errors::RuntimeException(
+ "Invalid buffer provided by the application.");
+ }
+ auto to_copy = std::min(decrypted_content_->length(), length);
+ std::memcpy(buffer, decrypted_content_->data(), to_copy);
+ decrypted_content_->trimStart(to_copy);
+ }
+ read_callback_decrypted_->readDataAvailable(total_length);
+ decrypted_content_->clear();
+ }
+ }
+ }
+ read_callback_decrypted_->readSuccess(size);
+int TLSConsumerSocket::asyncConsume(const Name &name,
+ std::unique_ptr<utils::MemBuf> &&buffer) {
+ this->payload_ = std::move(buffer);
+ this->ConsumerSocket::setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1,
+ std::placeholders::_2));
+ return asyncConsume(name);
+int TLSConsumerSocket::asyncConsume(const Name &name) {
+ if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
+ throw errors::RuntimeException("Handshake not performed");
+ }
+ if (!async_downloader_tls_.stopped()) {
+ async_downloader_tls_.add([this, name]() {
+ is_async_ = true;
+ download_content(name);
+ });
+ }
+void TLSConsumerSocket::registerPrefix(const Prefix &producer_namespace) {
+ producer_namespace_ = producer_namespace;
+int TLSConsumerSocket::setSocketOption(
+ int socket_option_key, ConsumerSocket::ReadCallback *socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerSocket::ReadCallback *socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ read_callback_decrypted_ = socket_option_value;
+ break;
+ default:
+ }
+ });
+void TLSConsumerSocket::getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) {}
+void TLSConsumerSocket::readDataAvailable(size_t length) noexcept {}
+size_t TLSConsumerSocket::maxBufferSize() const {
+void TLSConsumerSocket::readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept {
+ std::unique_lock<std::mutex> lck(this->mtx_);
+ if (head_) {
+ head_->prependChain(std::move(buffer));
+ } else {
+ head_ = std::move(buffer);
+ }
+ something_to_read_ = true;
+ cv_.notify_one();
+void TLSConsumerSocket::readError(const std::error_code ec) noexcept {}
+void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept {
+ std::unique_lock<std::mutex> lck(this->mtx_);
+ content_downloaded_ = true;
+ something_to_read_ = true;
+ cv_.notify_one();
+bool TLSConsumerSocket::isBufferMovable() noexcept { return true; }
+} // namespace interface
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.h b/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.h
new file mode 100644
index 000000000..05f7fe6a5
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.h
@@ -0,0 +1,132 @@
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <openssl/ssl.h>
+namespace transport {
+namespace interface {
+class TLSConsumerSocket : public ConsumerSocket,
+ public ConsumerSocket::ReadCallback {
+ /* Return the number of read bytes in readbytes */
+ friend int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes);
+ /* Return the number of read bytes in the return param */
+ friend int readOldTLS(BIO *h, char *buf, int size);
+ /* Return the number of written bytes in written */
+ friend int writeTLS(BIO *b, const char *buf, size_t size, size_t *written);
+ /* Return the number of written bytes in the return param */
+ friend int writeOldTLS(BIO *h, const char *buf, int num);
+ friend long ctrlTLS(BIO *b, int cmd, long num, void *ptr);
+ public:
+ explicit TLSConsumerSocket(int protocol, SSL *ssl_);
+ ~TLSConsumerSocket() = default;
+ int consume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer);
+ int consume(const Name &name) override;
+ int asyncConsume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer);
+ int asyncConsume(const Name &name) override;
+ void registerPrefix(const Prefix &producer_namespace);
+ int setSocketOption(
+ int socket_option_key,
+ ConsumerSocket::ReadCallback *socket_option_value) override;
+ using ConsumerSocket::getSocketOption;
+ using ConsumerSocket::setSocketOption;
+ protected:
+ /* Callback invoked once an interest has been received and its payload
+ * decrypted */
+ ConsumerInterestCallback on_interest_input_decrypted_;
+ ConsumerInterestCallback on_interest_process_decrypted_;
+ private:
+ Name name_;
+ /* SSL handle */
+ SSL *ssl_;
+ SSL_CTX *ctx_;
+ /* Chain of MemBuf to be used as a temporary buffer to pass descypted data
+ * from the underlying layer to the application */
+ utils::ObjectPool<utils::MemBuf> buf_pool_;
+ std::unique_ptr<utils::MemBuf> decrypted_content_;
+ /* Chain of MemBuf holding the payload to be written into interest or data */
+ std::unique_ptr<utils::MemBuf> payload_;
+ /* Chain of MemBuf holding the data retrieved from the underlying layer */
+ std::unique_ptr<utils::MemBuf> head_;
+ bool something_to_read_;
+ bool content_downloaded_;
+ double old_max_win_;
+ double old_current_win_;
+ uint32_t random_suffix_;
+ ip_address_t secure_prefix_;
+ Prefix producer_namespace_;
+ ConsumerSocket::ReadCallback *read_callback_decrypted_;
+ std::mutex mtx_;
+ /* Condition variable for the wait */
+ std::condition_variable cv_;
+ utils::EventThread async_downloader_tls_;
+ void setInterestPayload(ConsumerSocket &c, const core::Interest &interest);
+ void processPayload(ConsumerSocket &c, std::size_t bytes_transferred,
+ const std::error_code &ec);
+ virtual void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override;
+ virtual void readDataAvailable(size_t length) noexcept override;
+ virtual size_t maxBufferSize() const override;
+ virtual void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override;
+ virtual void readError(const std::error_code ec) noexcept override;
+ virtual void readSuccess(std::size_t total_size) noexcept override;
+ virtual bool isBufferMovable() noexcept override;
+ int download_content(const Name &name);
+} // namespace interface
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/tls_socket_producer.cc
new file mode 100644
index 000000000..ad85ec6ea
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/tls_socket_producer.cc
@@ -0,0 +1,587 @@
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
+#include <hicn/transport/interfaces/tls_socket_producer.h>
+#include <openssl/bio.h>
+#include <openssl/rand.h>
+#include <openssl/ssl.h>
+namespace transport {
+namespace interface {
+/* Return the number of read bytes in readbytes */
+int TLSProducerSocket::read(BIO *b, char *buf, size_t size, size_t *readbytes) {
+ int ret;
+ if (size > INT_MAX) size = INT_MAX;
+ ret = TLSProducerSocket::readOld(b, buf, (int)size);
+ if (ret <= 0) {
+ *readbytes = 0;
+ return ret;
+ }
+ *readbytes = (size_t)ret;
+ return 1;
+/* Return the number of read bytes in the return param */
+int TLSProducerSocket::readOld(BIO *b, char *buf, int size) {
+ TLSProducerSocket *socket;
+ socket = (TLSProducerSocket *)BIO_get_data(b);
+ /* take a lock on the mutex. It will be unlocked by */
+ std::unique_lock<std::mutex> lck(socket->mtx_);
+ if (!socket->something_to_read_) {
+ (socket->cv_).wait(lck);
+ }
+ /* Either there already is something to read, or the thread has been waken up
+ */
+ /* must return the payload in the interest */
+ utils::MemBuf *membuf = socket->packet_->next();
+ int size_to_read;
+ if ((int)membuf->length() > size) {
+ size_to_read = size;
+ } else {
+ size_to_read = membuf->length();
+ socket->something_to_read_ = false;
+ }
+ std::memcpy(buf, membuf->data(), size_to_read);
+ membuf->trimStart(size_to_read);
+ return size_to_read;
+/* Return the number of written bytes in written */
+int TLSProducerSocket::write(BIO *b, const char *buf, size_t size,
+ size_t *written) {
+ int ret;
+ if (size > INT_MAX) size = INT_MAX;
+ ret = TLSProducerSocket::writeOld(b, buf, (int)size);
+ if (ret <= 0) {
+ *written = 0;
+ return ret;
+ }
+ *written = (size_t)ret;
+ return 1;
+/* Return the number of written bytes in the return param */
+int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) {
+ TLSProducerSocket *socket;
+ socket = (TLSProducerSocket *)BIO_get_data(b);
+ if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) &&
+ socket->first_) {
+ //! socket->tls_chunks_ corresponds to is_last
+ socket->tls_chunks_--;
+ bool making_manifest = socket->parent_->making_manifest_;
+ socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
+ false);
+ socket->parent_->ProducerSocket::produce(
+ socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0,
+ socket->last_segment_);
+ socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
+ making_manifest);
+ socket->first_ = false;
+ } else {
+ socket->still_writing_ = true;
+ std::unique_ptr<utils::MemBuf> mbuf =
+ utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0);
+ auto a = mbuf.release();
+ socket->async_thread_.add([socket = socket, a]() {
+ socket->tls_chunks_--;
+ socket->to_call_oncontentproduced_--;
+ auto mbuf = std::unique_ptr<utils::MemBuf>(a);
+ socket->last_segment_ += socket->ProducerSocket::produce(
+ socket->name_, std::move(mbuf), socket->tls_chunks_ == 0,
+ socket->last_segment_);
+ ProducerContentCallback on_content_produced_application;
+ socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
+ on_content_produced_application);
+ if (socket->to_call_oncontentproduced_ == 0 &&
+ on_content_produced_application) {
+ on_content_produced_application(*socket, std::error_code(), 0);
+ }
+ });
+ }
+ return num;
+TLSProducerSocket::TLSProducerSocket(P2PSecureProducerSocket *parent,
+ const Name &handshake_name)
+ : ProducerSocket(),
+ on_content_produced_application_(),
+ mtx_(),
+ cv_(),
+ something_to_read_(),
+ name_(),
+ last_segment_(0),
+ parent_(parent),
+ first_(true),
+ handshake_name_(handshake_name),
+ tls_chunks_(0),
+ to_call_oncontentproduced_(0),
+ still_writing_(false),
+ encryption_thread_() {
+ const SSL_METHOD *meth = TLS_server_method();
+ ctx_ = SSL_CTX_new(meth);
+ /*
+ * Setup SSL context (identity and parameter to use TLS 1.3)
+ */
+ SSL_CTX_use_certificate(ctx_, parent->cert_509_);
+ SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_);
+ int result =
+ SSL_CTX_set_ciphersuites(ctx_,
+ "SHA256:TLS_AES_128_GCM_SHA256");
+ if (result != 1) {
+ throw errors::RuntimeException(
+ "Unable to set cipher list on TLS subsystem. Aborting.");
+ }
+ // We force it to be TLS 1.3
+ SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL);
+ SSL_CTX_set_num_tickets(ctx_, 0);
+ result = SSL_CTX_add_custom_ext(
+ TLSProducerSocket::addHicnKeyIdCb, TLSProducerSocket::freeHicnKeyIdCb,
+ this, TLSProducerSocket::parseHicnKeyIdCb, NULL);
+ ssl_ = SSL_new(ctx_);
+ /*
+ * Setup this producer socker as the bio that TLS will use to write and read
+ * data (in stream mode)
+ */
+ BIO_METHOD *bio_meth =
+ BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket");
+ BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld);
+ BIO_meth_set_write(bio_meth, TLSProducerSocket::writeOld);
+ BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl);
+ BIO *bio = BIO_new(bio_meth);
+ BIO_set_init(bio, 1);
+ BIO_set_data(bio, this);
+ SSL_set_bio(ssl_, bio, bio);
+ /*
+ * Set the callback so that when an interest is received we catch it and we
+ * decrypt the payload before passing it to the application.
+ */
+ this->ProducerSocket::setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+ this->ProducerSocket::setSocketOption(
+ ProducerCallbacksOptions::CONTENT_PRODUCED,
+ (ProducerContentCallback)bind(
+ &TLSProducerSocket::onContentProduced, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
+void TLSProducerSocket::accept() {
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ tls_chunks_ = 1;
+ int result = SSL_accept(ssl_);
+ if (result != 1)
+ throw errors::RuntimeException("Unable to perform client handshake");
+ }
+ TRANSPORT_LOGD("Handshake performed!");
+ parent_->list_secure_producers.push_front(
+ std::move(parent_->map_secure_producers[handshake_name_]));
+ parent_->map_secure_producers.erase(handshake_name_);
+ ProducerInterestCallback on_interest_process_decrypted;
+ getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ on_interest_process_decrypted);
+ if (on_interest_process_decrypted) {
+ Interest inter(std::move(packet_));
+ on_interest_process_decrypted(*this, inter);
+ } else {
+ throw errors::RuntimeException(
+ "On interest process unset. Unable to perform handshake");
+ }
+int TLSProducerSocket::async_accept() {
+ if (!async_thread_.stopped()) {
+ async_thread_.add([this]() { this->accept(); });
+ } else {
+ throw errors::RuntimeException(
+ "Async thread not running, impossible to perform handshake");
+ }
+ return 1;
+void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) {
+ /* Based on the state machine of (D)TLS, we know what action to do */
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ std::unique_lock<std::mutex> lck(mtx_);
+ name_ = interest.getName();
+ something_to_read_ = true;
+ packet_ = interest.acquireMemBufReference();
+ if (head_) {
+ payload_->prependChain(interest.getPayload());
+ } else {
+ payload_ = interest.getPayload(); // std::move(interest.getPayload());
+ }
+ cv_.notify_one();
+ } else {
+ name_ = interest.getName();
+ packet_ = interest.acquireMemBufReference();
+ payload_ = interest.getPayload();
+ something_to_read_ = true;
+ if (interest.getPayload()->length() > 0)
+ SSL_read(
+ ssl_,
+ const_cast<unsigned char *>(interest.getPayload()->writableData()),
+ interest.getPayload()->length());
+ }
+ ProducerInterestCallback on_interest_input_decrypted;
+ getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT,
+ on_interest_input_decrypted);
+ if (on_interest_input_decrypted)
+ (on_interest_input_decrypted)(*this, interest);
+void TLSProducerSocket::cacheMiss(ProducerSocket &p, Interest &interest) {
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ std::unique_lock<std::mutex> lck(mtx_);
+ name_ = interest.getName();
+ something_to_read_ = true;
+ packet_ = interest.acquireMemBufReference();
+ payload_ = interest.getPayload();
+ cv_.notify_one();
+ } else {
+ name_ = interest.getName();
+ packet_ = interest.acquireMemBufReference();
+ payload_ = interest.getPayload();
+ something_to_read_ = true;
+ if (interest.getPayload()->length() > 0)
+ SSL_read(
+ ssl_,
+ const_cast<unsigned char *>(interest.getPayload()->writableData()),
+ interest.getPayload()->length());
+ if (on_interest_process_decrypted_ != VOID_HANDLER)
+ on_interest_process_decrypted_(*this, interest);
+ }
+void TLSProducerSocket::onContentProduced(ProducerSocket &p,
+ const std::error_code &err,
+ uint64_t bytes_written) {}
+uint32_t TLSProducerSocket::produce(Name content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t start_offset) {
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ throw errors::RuntimeException(
+ "New handshake on the same P2P secure producer socket not supported");
+ }
+ size_t buf_size = buffer->length();
+ name_ = served_namespaces_.front().mapName(content_name);
+ tls_chunks_ = to_call_oncontentproduced_ =
+ ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH);
+ if (!is_last) {
+ tls_chunks_++;
+ }
+ last_segment_ = start_offset;
+ SSL_write(ssl_, buffer->data(), buf_size);
+ BIO *wbio = SSL_get_wbio(ssl_);
+ int i = BIO_flush(wbio);
+ (void)i; // To shut up gcc 5
+ return 0;
+void TLSProducerSocket::asyncProduce(const Name &content_name,
+ const uint8_t *buf, size_t buffer_size,
+ bool is_last, uint32_t *start_offset) {
+ if (!encryption_thread_.stopped()) {
+ encryption_thread_.add([this, content_name, buffer = buf,
+ size = buffer_size, is_last, start_offset]() {
+ if (start_offset != NULL) {
+ produce(content_name, buffer, size, is_last, *start_offset);
+ } else {
+ produce(content_name, buffer, size, is_last, 0);
+ }
+ });
+ }
+void TLSProducerSocket::asyncProduce(Name content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t offset,
+ uint32_t **last_segment) {
+ if (!encryption_thread_.stopped()) {
+ auto a = buffer.release();
+ encryption_thread_.add(
+ [this, content_name, a, is_last, offset, last_segment]() {
+ auto buf = std::unique_ptr<utils::MemBuf>(a);
+ if (last_segment != NULL) {
+ *last_segment = &last_segment_;
+ }
+ produce(content_name, std::move(buf), is_last, offset);
+ });
+ }
+void TLSProducerSocket::asyncProduce(ContentObject &content_object) {
+ throw errors::RuntimeException("API not supported");
+void TLSProducerSocket::produce(ContentObject &content_object) {
+ throw errors::RuntimeException("API not supported");
+long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) {
+ if (cmd == BIO_CTRL_FLUSH) {
+ }
+ return 1;
+int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char **out, size_t *outlen,
+ X509 *x, size_t chainidx, int *al,
+ void *add_arg) {
+ TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg);
+ if (ext_type == 100) {
+ ip_prefix_t ip_prefix =
+ socket->parent_->served_namespaces_.front().toIpPrefixStruct();
+ int inet_family =
+ socket->parent_->served_namespaces_.front().getAddressFamily();
+ uint16_t prefix_len_bits =
+ socket->parent_->served_namespaces_.front().getPrefixLength();
+ uint8_t prefix_len_bytes = prefix_len_bits / 8;
+ uint8_t prefix_len_u32 = prefix_len_bits / 32;
+ ip_prefix_t *out_ip = (ip_prefix_t *)malloc(sizeof(ip_prefix_t));
+ out_ip->family = inet_family;
+ out_ip->len = prefix_len_bits + 32;
+ u8 *out_ip_buf = const_cast<u8 *>(
+ ip_address_get_buffer(&(out_ip->address), inet_family));
+ *out = reinterpret_cast<unsigned char *>(out_ip);
+ RAND_bytes((unsigned char *)&socket->key_id_, 4);
+ memcpy(out_ip_buf, ip_address_get_buffer(&(ip_prefix.address), inet_family),
+ prefix_len_bytes);
+ memcpy((out_ip_buf + prefix_len_bytes), &socket->key_id_, 4);
+ *outlen = sizeof(ip_prefix_t);
+ ip_address_t mask = {};
+ ip_address_t keyId_component = {};
+ u32 *mask_buf;
+ u32 *keyId_component_buf;
+ switch (inet_family) {
+ case AF_INET:
+ mask_buf = &(mask.v4.as_u32);
+ keyId_component_buf = &(keyId_component.v4.as_u32);
+ break;
+ case AF_INET6:
+ mask_buf = mask.v6.as_u32;
+ keyId_component_buf = keyId_component.v6.as_u32;
+ break;
+ default:
+ throw errors::RuntimeException("Unknown protocol");
+ }
+ if (prefix_len_bits > (inet_family == AF_INET6 ? IPV6_ADDR_LEN_BITS - 32
+ : IPV4_ADDR_LEN_BITS - 32))
+ throw errors::RuntimeException(
+ "Not enough space in the content name to add key_id");
+ mask_buf[prefix_len_u32] = 0xffffffff;
+ keyId_component_buf[prefix_len_u32] = socket->key_id_;
+ socket->last_segment_ = 0;
+ socket->on_interest_process_decrypted_ =
+ socket->parent_->on_interest_process_decrypted_;
+ socket->registerPrefix(
+ Prefix(socket->parent_->served_namespaces_.front().getName(
+ Name(inet_family, (uint8_t *)&mask),
+ Name(inet_family, (uint8_t *)&keyId_component),
+ socket->parent_->served_namespaces_.front().getName()),
+ out_ip->len));
+ socket->connect();
+ }
+ return 1;
+void TLSProducerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char *out,
+ void *add_arg) {
+ free(const_cast<unsigned char *>(out));
+int TLSProducerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char *in, size_t inlen,
+ X509 *x, size_t chainidx, int *al,
+ void *add_arg) {
+ return 1;
+int TLSProducerSocket::setSocketOption(
+ int socket_option_key, ProducerInterestCallback socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerInterestCallback socket_option_value) -> int {
+ int result = SOCKET_OPTION_SET;
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ on_interest_input_decrypted_ = socket_option_value;
+ break;
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ on_interest_dropped_input_buffer_ = socket_option_value;
+ break;
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ on_interest_inserted_input_buffer_ = socket_option_value;
+ break;
+ case ProducerCallbacksOptions::CACHE_HIT:
+ on_interest_satisfied_output_buffer_ = socket_option_value;
+ break;
+ case ProducerCallbacksOptions::CACHE_MISS:
+ on_interest_process_decrypted_ = socket_option_value;
+ break;
+ default:
+ break;
+ }
+ return result;
+ });
+int TLSProducerSocket::setSocketOption(
+ int socket_option_key, ProducerContentCallback socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ on_content_produced_application_ = socket_option_value;
+ break;
+ default:
+ }
+ });
+int TLSProducerSocket::getSocketOption(
+ int socket_option_key, ProducerContentCallback **socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ *socket_option_value = &on_content_produced_application_;
+ break;
+ default:
+ }
+ });
+int TLSProducerSocket::getSocketOption(
+ int socket_option_key, ProducerContentCallback &socket_option_value) {
+ return rescheduleOnIOServiceWithReference(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentCallback &socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ socket_option_value = on_content_produced_application_;
+ break;
+ default:
+ }
+ });
+int TLSProducerSocket::getSocketOption(
+ int socket_option_key, ProducerInterestCallback &socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOServiceWithReference(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerInterestCallback &socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ socket_option_value = on_interest_input_decrypted_;
+ break;
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ socket_option_value = on_interest_dropped_input_buffer_;
+ break;
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ socket_option_value = on_interest_inserted_input_buffer_;
+ break;
+ case ProducerCallbacksOptions::CACHE_HIT:
+ socket_option_value = on_interest_satisfied_output_buffer_;
+ break;
+ case ProducerCallbacksOptions::CACHE_MISS:
+ socket_option_value = on_interest_process_decrypted_;
+ break;
+ default:
+ }
+ });
+} // namespace interface
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_producer.h b/libtransport/src/hicn/transport/interfaces/tls_socket_producer.h
new file mode 100644
index 000000000..4c09ddaa5
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/tls_socket_producer.h
@@ -0,0 +1,163 @@
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/utils/content_store.h>
+#include <openssl/ssl.h>
+#include <condition_variable>
+#include <mutex>
+namespace transport {
+namespace interface {
+class P2PSecureProducerSocket;
+class TLSProducerSocket : virtual public ProducerSocket {
+ friend class P2PSecureProducerSocket;
+ public:
+ explicit TLSProducerSocket(P2PSecureProducerSocket *parent,
+ const Name &handshake_name);
+ ~TLSProducerSocket() = default;
+ uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size,
+ bool is_last = true, uint32_t start_offset = 0) override {
+ return produce(content_name, utils::MemBuf::copyBuffer(buffer, buffer_size),
+ is_last, start_offset);
+ }
+ uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true, uint32_t start_offset = 0) override;
+ void produce(ContentObject &content_object) override;
+ void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size,
+ bool is_last = true,
+ uint32_t *start_offset = nullptr) override;
+ void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t offset,
+ uint32_t **last_segment = nullptr) override;
+ void asyncProduce(ContentObject &content_object) override;
+ virtual void accept();
+ virtual int async_accept();
+ virtual int setSocketOption(
+ int socket_option_key,
+ ProducerInterestCallback socket_option_value) override;
+ virtual int setSocketOption(
+ int socket_option_key,
+ ProducerContentCallback socket_option_value) override;
+ virtual int getSocketOption(
+ int socket_option_key,
+ ProducerContentCallback **socket_option_value) override;
+ int getSocketOption(int socket_option_key,
+ ProducerContentCallback &socket_option_value);
+ int getSocketOption(int socket_option_key,
+ ProducerInterestCallback &socket_option_value);
+ using ProducerSocket::getSocketOption;
+ using ProducerSocket::onInterest;
+ using ProducerSocket::setSocketOption;
+ protected:
+ /* Callback invoked once an interest has been received and its payload
+ * decrypted */
+ ProducerInterestCallback on_interest_input_decrypted_;
+ ProducerInterestCallback on_interest_process_decrypted_;
+ ProducerContentCallback on_content_produced_application_;
+ std::mutex mtx_;
+ /* Condition variable for the wait */
+ std::condition_variable cv_;
+ /* Bool variable, true if there is something to read (an interest arrived) */
+ bool something_to_read_;
+ /* First interest that open a secure connection */
+ transport::core::Name name_;
+ /* SSL handle */
+ SSL *ssl_;
+ SSL_CTX *ctx_;
+ Packet::MemBufPtr packet_;
+ std::unique_ptr<utils::MemBuf> head_;
+ std::uint32_t last_segment_;
+ std::shared_ptr<utils::MemBuf> payload_;
+ std::uint32_t key_id_;
+ std::thread *handshake;
+ P2PSecureProducerSocket *parent_;
+ bool first_;
+ Name handshake_name_;
+ int tls_chunks_;
+ int to_call_oncontentproduced_;
+ bool still_writing_;
+ utils::EventThread encryption_thread_;
+ void onInterest(ProducerSocket &p, Interest &interest);
+ void cacheMiss(ProducerSocket &p, Interest &interest);
+ /* Return the number of read bytes in readbytes */
+ static int read(BIO *b, char *buf, size_t size, size_t *readbytes);
+ /* Return the number of read bytes in the return param */
+ static int readOld(BIO *h, char *buf, int size);
+ /* Return the number of written bytes in written */
+ static int write(BIO *b, const char *buf, size_t size, size_t *written);
+ /* Return the number of written bytes in the return param */
+ static int writeOld(BIO *h, const char *buf, int num);
+ static long ctrl(BIO *b, int cmd, long num, void *ptr);
+ static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context,
+ const unsigned char **out, size_t *outlen, X509 *x,
+ size_t chainidx, int *al, void *add_arg);
+ static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context, const unsigned char *out,
+ void *add_arg);
+ static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context, const unsigned char *in,
+ size_t inlen, X509 *x, size_t chainidx, int *al,
+ void *add_arg);
+ void onContentProduced(ProducerSocket &p, const std::error_code &err,
+ uint64_t bytes_written);
+} // namespace interface
+} // end namespace transport