aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/implementation
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-02-21 11:52:28 +0100
committerMauro Sardara <msardara@cisco.com>2020-02-26 13:19:16 +0100
commitf4433f28b509a9f67ca85d79000ccf9c2f4b7a24 (patch)
tree0f754bc9d8222f3ace11849165753acd85be3b38 /libtransport/src/implementation
parent0e7669445b6be1163189521eabed7dd0124043c8 (diff)
[HICN-534] Major rework on libtransport organization
Change-Id: I361b83a18b4fd59be136d5f0817fc28e17e89884 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/implementation')
-rw-r--r--libtransport/src/implementation/CMakeLists.txt46
-rw-r--r--libtransport/src/implementation/p2psecure_socket_consumer.cc403
-rw-r--r--libtransport/src/implementation/p2psecure_socket_consumer.h148
-rw-r--r--libtransport/src/implementation/p2psecure_socket_producer.cc404
-rw-r--r--libtransport/src/implementation/p2psecure_socket_producer.h130
-rw-r--r--libtransport/src/implementation/rtc_socket_producer.cc352
-rw-r--r--libtransport/src/implementation/rtc_socket_producer.h74
-rw-r--r--libtransport/src/implementation/socket.h89
-rw-r--r--libtransport/src/implementation/socket_consumer.h950
-rw-r--r--libtransport/src/implementation/socket_producer.h1061
-rw-r--r--libtransport/src/implementation/tls_rtc_socket_producer.cc201
-rw-r--r--libtransport/src/implementation/tls_rtc_socket_producer.h58
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.cc384
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.h133
-rw-r--r--libtransport/src/implementation/tls_socket_producer.cc611
-rw-r--r--libtransport/src/implementation/tls_socket_producer.h163
16 files changed, 5207 insertions, 0 deletions
diff --git a/libtransport/src/implementation/CMakeLists.txt b/libtransport/src/implementation/CMakeLists.txt
new file mode 100644
index 000000000..5423a7697
--- /dev/null
+++ b/libtransport/src/implementation/CMakeLists.txt
@@ -0,0 +1,46 @@
+# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc
+)
+
+list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/socket.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.h
+)
+
+if (${OPENSSL_VERSION} VERSION_EQUAL "1.1.1a" OR ${OPENSSL_VERSION} VERSION_GREATER "1.1.1a")
+ list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.cc
+ )
+
+ list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h
+ )
+endif()
+
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
+set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc
new file mode 100644
index 000000000..40ab58161
--- /dev/null
+++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc
@@ -0,0 +1,403 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <implementation/p2psecure_socket_consumer.h>
+#include <interfaces/tls_socket_consumer.h>
+
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+#include <openssl/tls1.h>
+
+#include <random>
+
+namespace transport {
+namespace implementation {
+
+void P2PSecureConsumerSocket::setInterestPayload(
+ interface::ConsumerSocket &c, const core::Interest &interest) {
+ Interest &int2 = const_cast<Interest &>(interest);
+ random_suffix_ = int2.getName().getSuffix();
+
+ if (payload_ != NULL) int2.appendPayload(std::move(payload_));
+}
+
+// implement void readBufferAvailable(), size_t maxBufferSize() const override,
+// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable()
+// must be implemented even if empty.
+
+/* Return the number of read bytes in the return param */
+int readOld(BIO *b, char *buf, int size) {
+ if (size < 0) return size;
+
+ P2PSecureConsumerSocket *socket;
+ socket = (P2PSecureConsumerSocket *)BIO_get_data(b);
+
+ std::unique_lock<std::mutex> lck(socket->mtx_);
+
+ if (!socket->something_to_read_) {
+ if (!socket->transport_protocol_->isRunning()) {
+ socket->network_name_.setSuffix(socket->random_suffix_);
+ socket->ConsumerSocket::asyncConsume(socket->network_name_);
+ }
+ if (!socket->something_to_read_) socket->cv_.wait(lck);
+ }
+
+ size_t size_to_read, read;
+ size_t chain_size = socket->head_->length();
+ if (socket->head_->isChained())
+ chain_size = socket->head_->computeChainDataLength();
+
+ if (chain_size > (size_t)size) {
+ read = size_to_read = (size_t)size;
+ } else {
+ read = size_to_read = chain_size;
+ socket->something_to_read_ = false;
+ }
+
+ while (size_to_read) {
+ if (socket->head_->length() < size_to_read) {
+ std::memcpy(buf, socket->head_->data(), socket->head_->length());
+ size_to_read -= socket->head_->length();
+ buf += socket->head_->length();
+ socket->head_ = socket->head_->pop();
+ } else {
+ std::memcpy(buf, socket->head_->data(), size_to_read);
+ socket->head_->trimStart(size_to_read);
+ size_to_read = 0;
+ }
+ }
+
+ return read;
+}
+
+/* Return the number of read bytes in readbytes */
+int read(BIO *b, char *buf, size_t size, size_t *readbytes) {
+ int ret;
+
+ if (size > INT_MAX) size = INT_MAX;
+
+ ret = readOld(b, buf, (int)size);
+
+ if (ret <= 0) {
+ *readbytes = 0;
+ return ret;
+ }
+
+ *readbytes = (size_t)ret;
+
+ return 1;
+}
+
+/* Return the number of written bytes in the return param */
+int writeOld(BIO *b, const char *buf, int num) {
+ P2PSecureConsumerSocket *socket;
+ socket = (P2PSecureConsumerSocket *)BIO_get_data(b);
+
+ socket->payload_ = utils::MemBuf::copyBuffer(buf, num);
+ socket->ConsumerSocket::setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &P2PSecureConsumerSocket::setInterestPayload, socket,
+ std::placeholders::_1, std::placeholders::_2));
+
+ return num;
+}
+
+/* Return the number of written bytes in written */
+int write(BIO *b, const char *buf, size_t size, size_t *written) {
+ int ret;
+
+ if (size > INT_MAX) size = INT_MAX;
+
+ ret = writeOld(b, buf, (int)size);
+
+ if (ret <= 0) {
+ *written = 0;
+ return ret;
+ }
+
+ *written = (size_t)ret;
+
+ return 1;
+}
+
+long ctrl(BIO *b, int cmd, long num, void *ptr) { return 1; }
+
+int P2PSecureConsumerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char **out,
+ size_t *outlen, X509 *x,
+ size_t chainidx, int *al,
+ void *add_arg) {
+ if (ext_type == 100) {
+ *out = (unsigned char *)malloc(4);
+ *(uint32_t *)*out = 10;
+ *outlen = 4;
+ }
+ return 1;
+}
+
+void P2PSecureConsumerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char *out,
+ void *add_arg) {
+ free(const_cast<unsigned char *>(out));
+}
+
+int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char *in,
+ size_t inlen, X509 *x,
+ size_t chainidx, int *al,
+ void *add_arg) {
+ P2PSecureConsumerSocket *socket =
+ reinterpret_cast<P2PSecureConsumerSocket *>(add_arg);
+ if (ext_type == 100) {
+ memcpy(&socket->secure_prefix_, in, sizeof(ip_prefix_t));
+ }
+ return 1;
+}
+
+P2PSecureConsumerSocket::P2PSecureConsumerSocket(
+ interface::ConsumerSocket *consumer, int handshake_protocol,
+ int transport_protocol)
+ : ConsumerSocket(consumer, transport_protocol),
+ name_(),
+ tls_consumer_(),
+ buf_pool_(),
+ decrypted_content_(),
+ payload_(),
+ head_(),
+ something_to_read_(false),
+ content_downloaded_(false),
+ random_suffix_(),
+ secure_prefix_(),
+ producer_namespace_(),
+ read_callback_decrypted_(),
+ mtx_(),
+ cv_(),
+ protocol_(transport_protocol) {
+ /* Create the (d)TLS state */
+ const SSL_METHOD *meth = TLS_client_method();
+ ctx_ = SSL_CTX_new(meth);
+
+ int result =
+ SSL_CTX_set_ciphersuites(ctx_,
+ "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_"
+ "SHA256:TLS_AES_128_GCM_SHA256");
+ if (result != 1) {
+ throw errors::RuntimeException(
+ "Unable to set cipher list on TLS subsystem. Aborting.");
+ }
+
+ SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL);
+ SSL_CTX_set_ssl_version(ctx_, meth);
+
+ result = SSL_CTX_add_custom_ext(
+ ctx_, 100, SSL_EXT_CLIENT_HELLO | SSL_EXT_TLS1_3_ENCRYPTED_EXTENSIONS,
+ P2PSecureConsumerSocket::addHicnKeyIdCb,
+ P2PSecureConsumerSocket::freeHicnKeyIdCb, NULL,
+ P2PSecureConsumerSocket::parseHicnKeyIdCb, this);
+
+ ssl_ = SSL_new(ctx_);
+
+ bio_meth_ = BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket");
+ BIO_meth_set_read(bio_meth_, readOld);
+ BIO_meth_set_write(bio_meth_, writeOld);
+ BIO_meth_set_ctrl(bio_meth_, ctrl);
+ BIO *bio = BIO_new(bio_meth_);
+ BIO_set_init(bio, 1);
+ BIO_set_data(bio, this);
+ SSL_set_bio(ssl_, bio, bio);
+
+ ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
+
+ ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
+
+ std::default_random_engine generator;
+ std::uniform_int_distribution<int> distribution(
+ 1, std::numeric_limits<uint32_t>::max());
+ random_suffix_ = 0;
+
+ this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ this);
+};
+
+P2PSecureConsumerSocket::~P2PSecureConsumerSocket() {
+ BIO_meth_free(bio_meth_);
+ SSL_shutdown(ssl_);
+}
+
+int P2PSecureConsumerSocket::consume(const Name &name) {
+ if (transport_protocol_->isRunning()) {
+ return CONSUMER_BUSY;
+ }
+
+ if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
+ network_name_ = producer_namespace_.getRandomName();
+ network_name_.setSuffix(0);
+ int result = SSL_connect(this->ssl_);
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ if (result != 1)
+ throw errors::RuntimeException("Unable to perform client handshake");
+ }
+ std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
+ secure_prefix_.family,
+ ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
+ std::shared_ptr<Prefix> prefix =
+ std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
+ TLSConsumerSocket tls_consumer(nullptr, this->protocol_, this->ssl_);
+ tls_consumer.setInterface(new interface::TLSConsumerSocket(&tls_consumer));
+
+ ConsumerTimerCallback *stats_summary_callback = nullptr;
+ this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_summary_callback);
+
+ uint32_t lifetime;
+ this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
+ tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ read_callback_decrypted_);
+ tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ *stats_summary_callback);
+ tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL,
+ this->timer_interval_milliseconds_);
+ tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ tls_consumer.connect();
+
+ if (payload_ != NULL)
+ return tls_consumer.consume((prefix->mapName(name)), std::move(payload_));
+ else
+ return tls_consumer.consume((prefix->mapName(name)));
+}
+
+int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
+ if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
+ network_name_ = producer_namespace_.getRandomName();
+ network_name_.setSuffix(0);
+ TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str());
+ interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
+ this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload);
+ int result = SSL_connect(this->ssl_);
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ if (result != 1)
+ throw errors::RuntimeException("Unable to perform client handshake");
+ TRANSPORT_LOGD("Handshake performed!");
+ }
+
+ std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
+ secure_prefix_.family,
+ ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
+ std::shared_ptr<Prefix> prefix =
+ std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
+
+ tls_consumer_ =
+ std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_);
+ tls_consumer_->setInterface(
+ new interface::TLSConsumerSocket(tls_consumer_.get()));
+
+ ConsumerTimerCallback *stats_summary_callback = nullptr;
+ this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_summary_callback);
+
+ uint32_t lifetime;
+ this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
+ tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ read_callback_decrypted_);
+ tls_consumer_->setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ *stats_summary_callback);
+ tls_consumer_->setSocketOption(GeneralTransportOptions::STATS_INTERVAL,
+ this->timer_interval_milliseconds_);
+ tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ tls_consumer_->connect();
+
+ if (payload_ != NULL)
+ return tls_consumer_->asyncConsume((prefix->mapName(name)),
+ std::move(payload_));
+ else
+ return tls_consumer_->asyncConsume((prefix->mapName(name)));
+}
+
+void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) {
+ producer_namespace_ = producer_namespace;
+}
+
+int P2PSecureConsumerSocket::setSocketOption(
+ int socket_option_key, ReadCallback *socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key, ReadCallback *socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ read_callback_decrypted_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+void P2PSecureConsumerSocket::getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length){};
+
+void P2PSecureConsumerSocket::readDataAvailable(size_t length) noexcept {};
+
+size_t P2PSecureConsumerSocket::maxBufferSize() const {
+ return SSL3_RT_MAX_PLAIN_LENGTH;
+}
+
+void P2PSecureConsumerSocket::readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept {
+ std::unique_lock<std::mutex> lck(this->mtx_);
+ if (head_) {
+ head_->prependChain(std::move(buffer));
+ } else {
+ head_ = std::move(buffer);
+ }
+
+ something_to_read_ = true;
+ cv_.notify_one();
+}
+
+void P2PSecureConsumerSocket::readError(const std::error_code ec) noexcept {};
+
+void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept {
+ std::unique_lock<std::mutex> lck(this->mtx_);
+ content_downloaded_ = true;
+ something_to_read_ = true;
+ cv_.notify_one();
+}
+
+bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; }
+
+} // namespace implementation
+
+} // namespace transport
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h
new file mode 100644
index 000000000..e2ebaf94e
--- /dev/null
+++ b/libtransport/src/implementation/p2psecure_socket_consumer.h
@@ -0,0 +1,148 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+
+#include <implementation/tls_socket_consumer.h>
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+
+namespace transport {
+namespace implementation {
+
+class P2PSecureConsumerSocket : public ConsumerSocket,
+ public interface::ConsumerSocket::ReadCallback {
+ /* Return the number of read bytes in readbytes */
+ friend int read(BIO *b, char *buf, size_t size, size_t *readbytes);
+
+ /* Return the number of read bytes in the return param */
+ friend int readOld(BIO *h, char *buf, int size);
+
+ /* Return the number of written bytes in written */
+ friend int write(BIO *b, const char *buf, size_t size, size_t *written);
+
+ /* Return the number of written bytes in the return param */
+ friend int writeOld(BIO *h, const char *buf, int num);
+
+ friend long ctrl(BIO *b, int cmd, long num, void *ptr);
+
+ public:
+ explicit P2PSecureConsumerSocket(interface::ConsumerSocket *consumer,
+ int handshake_protocol,
+ int transport_protocol);
+
+ ~P2PSecureConsumerSocket();
+
+ int consume(const Name &name) override;
+
+ int asyncConsume(const Name &name) override;
+
+ void registerPrefix(const Prefix &producer_namespace);
+
+ int setSocketOption(
+ int socket_option_key,
+ interface::ConsumerSocket::ReadCallback *socket_option_value) override;
+
+ using ConsumerSocket::getSocketOption;
+ using ConsumerSocket::setSocketOption;
+
+ protected:
+ /* Callback invoked once an interest has been received and its payload
+ * decrypted */
+ ConsumerInterestCallback on_interest_input_decrypted_;
+ ConsumerInterestCallback on_interest_process_decrypted_;
+
+ private:
+ Name name_;
+ std::shared_ptr<TLSConsumerSocket> tls_consumer_;
+
+ /* SSL handle */
+ SSL *ssl_;
+ SSL_CTX *ctx_;
+ BIO_METHOD *bio_meth_;
+
+ /* Chain of MemBuf to be used as a temporary buffer to pass descypted data
+ * from the underlying layer to the application */
+ utils::ObjectPool<utils::MemBuf> buf_pool_;
+ std::unique_ptr<utils::MemBuf> decrypted_content_;
+
+ /* Chain of MemBuf holding the payload to be written into interest or data */
+ std::unique_ptr<utils::MemBuf> payload_;
+
+ /* Chain of MemBuf holding the data retrieved from the underlying layer */
+ std::unique_ptr<utils::MemBuf> head_;
+
+ bool something_to_read_;
+
+ bool content_downloaded_;
+
+ double old_max_win_;
+
+ double old_current_win_;
+
+ uint32_t random_suffix_;
+
+ ip_prefix_t secure_prefix_;
+
+ Prefix producer_namespace_;
+
+ interface::ConsumerSocket::ReadCallback *read_callback_decrypted_;
+
+ std::mutex mtx_;
+
+ /* Condition variable for the wait */
+ std::condition_variable cv_;
+
+ int protocol_;
+
+ void setInterestPayload(interface::ConsumerSocket &c,
+ const core::Interest &interest);
+
+ static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context,
+ const unsigned char **out, size_t *outlen, X509 *x,
+ size_t chainidx, int *al, void *add_arg);
+
+ static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context, const unsigned char *out,
+ void *add_arg);
+
+ static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context, const unsigned char *in,
+ size_t inlen, X509 *x, size_t chainidx, int *al,
+ void *add_arg);
+
+ virtual void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override;
+
+ virtual void readDataAvailable(size_t length) noexcept override;
+
+ virtual size_t maxBufferSize() const override;
+
+ virtual void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override;
+
+ virtual void readError(const std::error_code ec) noexcept override;
+
+ virtual void readSuccess(std::size_t total_size) noexcept override;
+ virtual bool isBufferMovable() noexcept override;
+
+ int download_content(const Name &name);
+};
+
+} // namespace implementation
+
+} // end namespace transport
diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc
new file mode 100644
index 000000000..d7161986b
--- /dev/null
+++ b/libtransport/src/implementation/p2psecure_socket_producer.cc
@@ -0,0 +1,404 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/interest.h>
+
+#include <implementation/p2psecure_socket_producer.h>
+#include <implementation/tls_rtc_socket_producer.h>
+#include <implementation/tls_socket_producer.h>
+#include <interfaces/tls_rtc_socket_producer.h>
+#include <interfaces/tls_socket_producer.h>
+
+#include <openssl/bio.h>
+#include <openssl/rand.h>
+#include <openssl/ssl.h>
+
+namespace transport {
+namespace implementation {
+
+/* Workaround to prevent content with expiry time equal to 0 to be lost when
+ * pushed in the forwarder */
+#define HICN_HANDSHAKE_CONTENT_EXPIRY_TIME 100;
+
+P2PSecureProducerSocket::P2PSecureProducerSocket(
+ interface::ProducerSocket *producer_socket)
+ : ProducerSocket(producer_socket),
+ mtx_(),
+ cv_(),
+ map_secure_producers(),
+ map_secure_rtc_producers(),
+ list_secure_producers() {}
+
+P2PSecureProducerSocket::P2PSecureProducerSocket(
+ interface::ProducerSocket *producer_socket, bool rtc,
+ const std::shared_ptr<utils::Identity> &identity)
+ : ProducerSocket(producer_socket),
+ rtc_(rtc),
+ mtx_(),
+ cv_(),
+ map_secure_producers(),
+ map_secure_rtc_producers(),
+ list_secure_producers() {
+ /*
+ * Setup SSL context (identity and parameter to use TLS 1.3)
+ */
+ der_cert_ = parcKeyStore_GetDEREncodedCertificate(
+ (identity->getSigner()->getKeyStore()));
+ der_prk_ = parcKeyStore_GetDEREncodedPrivateKey(
+ (identity->getSigner()->getKeyStore()));
+
+ int cert_size = parcBuffer_Limit(der_cert_);
+ int prk_size = parcBuffer_Limit(der_prk_);
+ const uint8_t *cert =
+ reinterpret_cast<uint8_t *>(parcBuffer_Overlay(der_cert_, cert_size));
+ const uint8_t *prk =
+ reinterpret_cast<uint8_t *>(parcBuffer_Overlay(der_prk_, prk_size));
+ cert_509_ = d2i_X509(NULL, &cert, cert_size);
+ pkey_rsa_ = d2i_AutoPrivateKey(NULL, &prk, prk_size);
+
+ /*
+ * Set the callback so that when an interest is received we catch it and we
+ * decrypt the payload before passing it to the application.
+ */
+ ProducerSocket::setSocketOption(
+ ProducerCallbacksOptions::INTEREST_INPUT,
+ (ProducerInterestCallback)std::bind(
+ &P2PSecureProducerSocket::onInterestCallback, this,
+ std::placeholders::_1, std::placeholders::_2));
+}
+
+P2PSecureProducerSocket::~P2PSecureProducerSocket() {
+ if (der_cert_) parcBuffer_Release(&der_cert_);
+ if (der_prk_) parcBuffer_Release(&der_prk_);
+}
+
+void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p,
+ Interest &interest) {
+ std::unique_lock<std::mutex> lck(mtx_);
+
+ TRANSPORT_LOGD("Start handshake at %s",
+ interest.getName().toString().c_str());
+ if (!rtc_) {
+ auto it = map_secure_producers.find(interest.getName());
+ if (it != map_secure_producers.end()) return;
+ TLSProducerSocket *tls_producer =
+ new TLSProducerSocket(nullptr, this, interest.getName());
+ tls_producer->setInterface(new interface::TLSProducerSocket(tls_producer));
+
+ tls_producer->on_content_produced_application_ =
+ this->on_content_produced_application_;
+ tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
+ this->content_object_expiry_time_);
+ tls_producer->setSocketOption(SIGNER, this->signer_);
+ tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
+ tls_producer->setSocketOption(DATA_PACKET_SIZE,
+ (uint32_t)(this->data_packet_size_));
+ tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
+ map_secure_producers.insert(
+ {interest.getName(), std::unique_ptr<TLSProducerSocket>(tls_producer)});
+ tls_producer->onInterest(*tls_producer, interest);
+ tls_producer->async_accept();
+ } else {
+ auto it = map_secure_rtc_producers.find(interest.getName());
+ if (it != map_secure_rtc_producers.end()) return;
+ TLSRTCProducerSocket *tls_producer =
+ new TLSRTCProducerSocket(nullptr, this, interest.getName());
+ tls_producer->setInterface(
+ new interface::TLSRTCProducerSocket(tls_producer));
+ tls_producer->on_content_produced_application_ =
+ this->on_content_produced_application_;
+ tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
+ this->content_object_expiry_time_);
+ tls_producer->setSocketOption(SIGNER, this->signer_);
+ tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
+ tls_producer->setSocketOption(DATA_PACKET_SIZE,
+ (uint32_t)(this->data_packet_size_));
+ tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
+ map_secure_rtc_producers.insert(
+ {interest.getName(),
+ std::unique_ptr<TLSRTCProducerSocket>(tls_producer)});
+ tls_producer->onInterest(*tls_producer, interest);
+ tls_producer->async_accept();
+ }
+}
+
+void P2PSecureProducerSocket::produce(const uint8_t *buffer,
+ size_t buffer_size) {
+ if (!rtc_) {
+ throw errors::RuntimeException(
+ "RTC must be the transport protocol to start the production of current "
+ "data. Aborting.");
+ }
+
+ std::unique_lock<std::mutex> lck(mtx_);
+ if (list_secure_rtc_producers.empty()) cv_.wait(lck);
+
+ for (auto it = list_secure_rtc_producers.cbegin();
+ it != list_secure_rtc_producers.cend(); it++) {
+ (*it)->produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
+ }
+}
+
+uint32_t P2PSecureProducerSocket::produce(
+ Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last,
+ uint32_t start_offset) {
+ if (rtc_) {
+ throw errors::RuntimeException(
+ "RTC transport protocol is not compatible with the production of "
+ "current data. Aborting.");
+ }
+
+ std::unique_lock<std::mutex> lck(mtx_);
+ uint32_t segments = 0;
+ if (list_secure_producers.empty()) cv_.wait(lck);
+
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ segments +=
+ (*it)->produce(content_name, buffer->clone(), is_last, start_offset);
+ return segments;
+}
+
+uint32_t P2PSecureProducerSocket::produce(Name content_name,
+ const uint8_t *buffer,
+ size_t buffer_size, bool is_last,
+ uint32_t start_offset) {
+ if (rtc_) {
+ throw errors::RuntimeException(
+ "RTC transport protocol is not compatible with the production of "
+ "current data. Aborting.");
+ }
+
+ std::unique_lock<std::mutex> lck(mtx_);
+ uint32_t segments = 0;
+ if (list_secure_producers.empty()) cv_.wait(lck);
+
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ segments += (*it)->produce(content_name, buffer, buffer_size, is_last,
+ start_offset);
+ return segments;
+}
+
+void P2PSecureProducerSocket::asyncProduce(const Name &content_name,
+ const uint8_t *buf,
+ size_t buffer_size, bool is_last,
+ uint32_t *start_offset) {
+ if (rtc_) {
+ throw errors::RuntimeException(
+ "RTC transport protocol is not compatible with the production of "
+ "current data. Aborting.");
+ }
+
+ std::unique_lock<std::mutex> lck(mtx_);
+ if (list_secure_producers.empty()) cv_.wait(lck);
+
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++) {
+ (*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset);
+ }
+}
+
+void P2PSecureProducerSocket::asyncProduce(
+ Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last,
+ uint32_t offset, uint32_t **last_segment) {
+ if (rtc_) {
+ throw errors::RuntimeException(
+ "RTC transport protocol is not compatible with the production of "
+ "current data. Aborting.");
+ }
+
+ std::unique_lock<std::mutex> lck(mtx_);
+ if (list_secure_producers.empty()) cv_.wait(lck);
+
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++) {
+ (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset,
+ last_segment);
+ }
+}
+
+// Socket Option Redefinition to avoid name hiding
+
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, ProducerInterestCallback socket_option_value) {
+ if (!list_secure_producers.empty()) {
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ }
+
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ on_interest_input_decrypted_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ on_interest_dropped_input_buffer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ on_interest_inserted_input_buffer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ on_interest_satisfied_output_buffer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ on_interest_process_decrypted_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key,
+ const std::shared_ptr<utils::Signer> &socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+
+ switch (socket_option_key) {
+ case GeneralTransportOptions::SIGNER: {
+ signer_.reset();
+ signer_ = socket_option_value;
+
+ return SOCKET_OPTION_SET;
+ }
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) {
+ if (!list_secure_producers.empty()) {
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+ }
+ switch (socket_option_key) {
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ content_object_expiry_time_ =
+ socket_option_value; // HICN_HANDSHAKE_CONTENT_EXPIRY_TIME;
+ return SOCKET_OPTION_SET;
+ }
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+}
+
+int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
+ bool socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+}
+
+int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
+ Name *socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+}
+
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, std::list<Prefix> socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+}
+
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, ProducerContentObjectCallback socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+}
+
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, ProducerContentCallback socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ on_content_produced_application_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, HashAlgorithm socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+}
+
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, utils::CryptoSuite socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+}
+
+int P2PSecureProducerSocket::setSocketOption(
+ int socket_option_key, const std::string &socket_option_value) {
+ if (!list_secure_producers.empty())
+ for (auto it = list_secure_producers.cbegin();
+ it != list_secure_producers.cend(); it++)
+ (*it)->setSocketOption(socket_option_key, socket_option_value);
+
+ return ProducerSocket::setSocketOption(socket_option_key,
+ socket_option_value);
+}
+
+} // namespace implementation
+
+} // namespace transport
diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h
new file mode 100644
index 000000000..c2cbf31ac
--- /dev/null
+++ b/libtransport/src/implementation/p2psecure_socket_producer.h
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/security/identity.h>
+#include <hicn/transport/security/signer.h>
+
+#include <implementation/socket_producer.h>
+#include <implementation/tls_rtc_socket_producer.h>
+#include <implementation/tls_socket_producer.h>
+#include <utils/content_store.h>
+
+#include <openssl/ssl.h>
+#include <condition_variable>
+#include <forward_list>
+#include <mutex>
+
+namespace transport {
+namespace implementation {
+
+class P2PSecureProducerSocket : public ProducerSocket {
+ friend class TLSProducerSocket;
+ friend class TLSRTCProducerSocket;
+
+ public:
+ explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket);
+ explicit P2PSecureProducerSocket(
+ interface::ProducerSocket *producer_socket, bool rtc,
+ const std::shared_ptr<utils::Identity> &identity);
+ ~P2PSecureProducerSocket();
+
+ void produce(const uint8_t *buffer, size_t buffer_size) override;
+
+ uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size,
+ bool is_last = true, uint32_t start_offset = 0) override;
+
+ uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true, uint32_t start_offset = 0) override;
+
+ void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t offset,
+ uint32_t **last_segment = nullptr) override;
+
+ void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size,
+ bool is_last = true,
+ uint32_t *start_offset = nullptr) override;
+
+ int setSocketOption(int socket_option_key,
+ ProducerInterestCallback socket_option_value) override;
+
+ int setSocketOption(
+ int socket_option_key,
+ const std::shared_ptr<utils::Signer> &socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) override;
+
+ int setSocketOption(int socket_option_key, bool socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ Name *socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ std::list<Prefix> socket_option_value) override;
+
+ int setSocketOption(
+ int socket_option_key,
+ ProducerContentObjectCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ProducerContentCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ HashAlgorithm socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ utils::CryptoSuite socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ const std::string &socket_option_value) override;
+
+ using ProducerSocket::getSocketOption;
+ using ProducerSocket::onInterest;
+
+ protected:
+ bool rtc_;
+ /* Callback invoked once an interest has been received and its payload
+ * decrypted */
+ ProducerInterestCallback on_interest_input_decrypted_;
+ ProducerInterestCallback on_interest_process_decrypted_;
+ ProducerContentCallback on_content_produced_application_;
+
+ private:
+ std::mutex mtx_;
+
+ /* Condition variable for the wait */
+ std::condition_variable cv_;
+
+ PARCBuffer *der_cert_;
+ PARCBuffer *der_prk_;
+ X509 *cert_509_;
+ EVP_PKEY *pkey_rsa_;
+ std::unordered_map<core::Name, std::unique_ptr<TLSProducerSocket>,
+ core::hash<core::Name>, core::compare2<core::Name>>
+ map_secure_producers;
+ std::unordered_map<core::Name, std::unique_ptr<TLSRTCProducerSocket>,
+ core::hash<core::Name>, core::compare2<core::Name>>
+ map_secure_rtc_producers;
+ std::list<std::unique_ptr<TLSProducerSocket>> list_secure_producers;
+ std::list<std::unique_ptr<TLSRTCProducerSocket>> list_secure_rtc_producers;
+
+ void onInterestCallback(interface::ProducerSocket &p, Interest &interest);
+};
+
+} // namespace implementation
+
+} // namespace transport
diff --git a/libtransport/src/implementation/rtc_socket_producer.cc b/libtransport/src/implementation/rtc_socket_producer.cc
new file mode 100644
index 000000000..a5b2b4a0e
--- /dev/null
+++ b/libtransport/src/implementation/rtc_socket_producer.cc
@@ -0,0 +1,352 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/callbacks.h>
+
+#include <implementation/rtc_socket_producer.h>
+#include <stdlib.h>
+#include <time.h>
+
+#define NACK_HEADER_SIZE 8 // bytes
+#define TIMESTAMP_LEN 8 // bytes
+#define TCP_HEADER_SIZE 20
+#define IP6_HEADER_SIZE 40
+#define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps)
+#define STATS_INTERVAL_DURATION 500 // ms
+#define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8
+#define INACTIVE_TIME \
+ 500 // ms without producing before the socket
+ // is considered inactive
+#define MILLI_IN_A_SEC 1000 // ms in a second
+
+#define HICN_MAX_DATA_SEQ 0xefffffff
+
+// slow production rate param
+#define MIN_PRODUCTION_RATE \
+ 10 // in pacekts per sec. this value is computed
+ // through experiments
+#define LIFETIME_FRACTION 0.5
+
+// NACK HEADER
+// +-----------------------------------------+
+// | 4 bytes: current segment in production |
+// +-----------------------------------------+
+// | 4 bytes: production rate (bytes x sec) |
+// +-----------------------------------------+
+//
+
+// PACKET HEADER
+// +-----------------------------------------+
+// | 8 bytes: TIMESTAMP |
+// +-----------------------------------------+
+// | packet |
+// +-----------------------------------------+
+
+namespace transport {
+namespace implementation {
+
+RTCProducerSocket::RTCProducerSocket(interface::ProducerSocket *producer_socket)
+ : ProducerSocket(producer_socket),
+ currentSeg_(1),
+ producedBytes_(0),
+ producedPackets_(0),
+ bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400),
+ packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
+ perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
+ timer_on_(false) {
+ srand((unsigned int)time(NULL));
+ prodLabel_ = ((rand() % 255) << 24UL);
+ interests_cache_timer_ =
+ std::make_unique<asio::steady_timer>(this->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService());
+ setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U);
+ scheduleRoundTimer();
+}
+
+RTCProducerSocket::~RTCProducerSocket() {}
+
+void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) {
+ ProducerSocket::registerPrefix(producer_namespace);
+
+ flowName_ = producer_namespace.getName();
+ auto family = flowName_.getAddressFamily();
+
+ switch (family) {
+ case AF_INET6:
+ headerSize_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP);
+ break;
+ case AF_INET:
+ headerSize_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP);
+ break;
+ default:
+ throw errors::RuntimeException("Unknown name format.");
+ }
+}
+
+void RTCProducerSocket::scheduleRoundTimer() {
+ round_timer_->expires_from_now(
+ std::chrono::milliseconds(STATS_INTERVAL_DURATION));
+ round_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ updateStats();
+ });
+}
+
+void RTCProducerSocket::updateStats() {
+ bytesProductionRate_ = producedBytes_.load() * perSecondFactor_;
+ packetsProductionRate_ = producedPackets_.load() * perSecondFactor_;
+ if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1;
+ producedBytes_ = 0;
+ producedPackets_ = 0;
+ scheduleRoundTimer();
+}
+
+void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
+ auto buffer_size = buffer->length();
+
+ if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) {
+ return;
+ }
+
+ if (TRANSPORT_EXPECT_FALSE((buffer_size + headerSize_ + TIMESTAMP_LEN) >
+ data_packet_size_)) {
+ return;
+ }
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN);
+ producedPackets_++;
+
+ Name n(flowName_);
+ auto content_object =
+ std::make_shared<ContentObject>(n.setSuffix(currentSeg_.load()));
+ auto payload = utils::MemBuf::create(TIMESTAMP_LEN);
+
+ memcpy(payload->writableData(), &now, TIMESTAMP_LEN);
+ payload->append(TIMESTAMP_LEN);
+ payload->prependChain(std::move(buffer));
+ content_object->appendPayload(std::move(payload));
+
+ content_object->setLifetime(500); // XXX this should be set by the APP
+
+ content_object->setPathLabel(prodLabel_);
+
+ output_buffer_.insert(std::static_pointer_cast<ContentObject>(
+ content_object->shared_from_this()));
+
+ if (on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_(*getInterface(), *content_object);
+ }
+
+ TRANSPORT_LOGD("Send content %u (produce)",
+ content_object->getName().getSuffix());
+ portal_->sendContentObject(*content_object);
+
+ if (on_content_object_output_) {
+ on_content_object_output_(*getInterface(), *content_object);
+ }
+
+ uint32_t old_curr = currentSeg_.load();
+ currentSeg_ = (currentSeg_.load() + 1) % HICN_MAX_DATA_SEQ;
+
+ // remove interests from the interest cache if it exists
+ // this generates nacks that will tell to the consumer
+ // that a new data packet was produced
+ utils::SpinLock::Acquire locked(interests_cache_lock_);
+ if (!seqs_map_.empty()) {
+ for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
+ if (it->first != old_curr) sendNack(it->first);
+ }
+ seqs_map_.clear();
+ timers_map_.clear();
+ }
+}
+
+void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
+ uint32_t interestSeg = interest->getName().getSuffix();
+ uint32_t lifetime = interest->getLifetime();
+
+ if (on_interest_input_) {
+ on_interest_input_(*getInterface(), *interest);
+ }
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (interestSeg > HICN_MAX_DATA_SEQ) {
+ sendNack(interestSeg);
+ return;
+ }
+
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(*interest);
+
+ if (content_object) {
+ if (on_interest_satisfied_output_buffer_) {
+ on_interest_satisfied_output_buffer_(*getInterface(), *interest);
+ }
+
+ if (on_content_object_output_) {
+ on_content_object_output_(*getInterface(), *content_object);
+ }
+
+ TRANSPORT_LOGD("Send content %u (onInterest)",
+ content_object->getName().getSuffix());
+ portal_->sendContentObject(*content_object);
+ return;
+ } else {
+ if (on_interest_process_) {
+ on_interest_process_(*getInterface(), *interest);
+ }
+ }
+
+ // if the production rate is less than MIN_PRODUCTION_RATE we put the
+ // interest in a queue, otherwise we handle it in the usual way
+ if (packetsProductionRate_.load() < MIN_PRODUCTION_RATE &&
+ interestSeg >= currentSeg_.load()) {
+ utils::SpinLock::Acquire locked(interests_cache_lock_);
+
+ uint64_t next_timer = ~0;
+ if (!timers_map_.empty()) {
+ next_timer = timers_map_.begin()->first;
+ }
+
+ uint64_t expiration = now + (lifetime * LIFETIME_FRACTION);
+ // check if the seq number exists already
+ auto it_seqs = seqs_map_.find(interestSeg);
+ if (it_seqs != seqs_map_.end()) {
+ // the seq already exists
+ if (expiration < it_seqs->second) {
+ // we need to update the timer becasue we got a smaller one
+ // 1) remove the entry from the multimap
+ // 2) update this entry
+ auto range = timers_map_.equal_range(it_seqs->second);
+ for (auto it_timers = range.first; it_timers != range.second;
+ it_timers++) {
+ if (it_timers->second == it_seqs->first) {
+ timers_map_.erase(it_timers);
+ break;
+ }
+ }
+ timers_map_.insert(
+ std::pair<uint64_t, uint32_t>(expiration, interestSeg));
+ it_seqs->second = expiration;
+ } else {
+ // nothing to do here
+ return;
+ }
+ } else {
+ // add the new seq
+ timers_map_.insert(
+ std::pair<uint64_t, uint32_t>(expiration, interestSeg));
+ seqs_map_.insert(std::pair<uint32_t, uint64_t>(interestSeg, expiration));
+ }
+
+ // here we have at least one interest in the queue, we need to start or
+ // update the timer
+ if (!timer_on_) {
+ // set timeout
+ timer_on_ = true;
+ scheduleCacheTimer(timers_map_.begin()->first - now);
+ } else {
+ // re-schedule the timer because a new interest will expires sooner
+ if (next_timer > timers_map_.begin()->first) {
+ interests_cache_timer_->cancel();
+ scheduleCacheTimer(timers_map_.begin()->first - now);
+ }
+ }
+ return;
+ }
+
+ uint32_t max_gap = (uint32_t)floor(
+ (double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR /
+ 1000.0) *
+ (double)packetsProductionRate_.load()));
+
+ if (interestSeg < currentSeg_.load() ||
+ interestSeg > (max_gap + currentSeg_.load())) {
+ sendNack(interestSeg);
+ }
+ // else drop packet
+}
+
+void RTCProducerSocket::scheduleCacheTimer(uint64_t wait) {
+ interests_cache_timer_->expires_from_now(std::chrono::milliseconds(wait));
+ interests_cache_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ interestCacheTimer();
+ });
+}
+
+void RTCProducerSocket::interestCacheTimer() {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ utils::SpinLock::Acquire locked(interests_cache_lock_);
+
+ for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) {
+ uint64_t expire = it_timers->first;
+ if (expire <= now) {
+ uint32_t seq = it_timers->second;
+ sendNack(seq);
+ // remove the interest from the other map
+ seqs_map_.erase(seq);
+ it_timers = timers_map_.erase(it_timers);
+ } else {
+ // stop, we are done!
+ break;
+ }
+ }
+ if (timers_map_.empty()) {
+ timer_on_ = false;
+ } else {
+ timer_on_ = true;
+ scheduleCacheTimer(timers_map_.begin()->first - now);
+ }
+}
+
+void RTCProducerSocket::sendNack(uint32_t sequence) {
+ auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
+ nack_payload->append(NACK_HEADER_SIZE);
+ ContentObject nack;
+
+ Name n(flowName_);
+ nack.appendPayload(std::move(nack_payload));
+ nack.setName(n.setSuffix(sequence));
+
+ uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data();
+ *payload_ptr = currentSeg_.load();
+
+ *(++payload_ptr) = bytesProductionRate_.load();
+
+ nack.setLifetime(0);
+ nack.setPathLabel(prodLabel_);
+
+ if (on_content_object_output_) {
+ on_content_object_output_(*getInterface(), nack);
+ }
+
+ TRANSPORT_LOGD("Send nack %u", sequence);
+ portal_->sendContentObject(nack);
+}
+
+} // namespace implementation
+
+} // end namespace transport
diff --git a/libtransport/src/implementation/rtc_socket_producer.h b/libtransport/src/implementation/rtc_socket_producer.h
new file mode 100644
index 000000000..87db2121d
--- /dev/null
+++ b/libtransport/src/implementation/rtc_socket_producer.h
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <implementation/socket_producer.h>
+#include <utils/content_store.h>
+
+#include <atomic>
+#include <map>
+#include <mutex>
+
+namespace transport {
+namespace implementation {
+
+class RTCProducerSocket : virtual public ProducerSocket {
+ public:
+ RTCProducerSocket(interface::ProducerSocket *producer_socket);
+
+ ~RTCProducerSocket();
+
+ void registerPrefix(const Prefix &producer_namespace) override;
+ void produce(std::unique_ptr<utils::MemBuf> &&buffer) override;
+
+ private:
+ void onInterest(Interest::Ptr &&interest) override;
+ void sendNack(uint32_t sequence);
+ void updateStats();
+ void scheduleCacheTimer(uint64_t wait);
+ void scheduleRoundTimer();
+ void interestCacheTimer();
+
+ std::atomic<uint32_t> currentSeg_;
+ uint32_t prodLabel_;
+ uint16_t headerSize_;
+ Name flowName_;
+ std::atomic<uint32_t> producedBytes_;
+ std::atomic<uint32_t> producedPackets_;
+ std::atomic<uint32_t> bytesProductionRate_;
+ std::atomic<uint32_t> packetsProductionRate_;
+ uint32_t perSecondFactor_;
+
+ std::unique_ptr<asio::steady_timer> round_timer_;
+
+ // cache for the received interests
+ // this map maps the expiration time of an interest to
+ // its sequence number. the map is sorted by timeouts
+ // the same timeout may be used for multiple sequence numbers
+ // but for each sequence number we store only the smallest
+ // expiry time. In this way the mapping from seqs_map_ to
+ // timers_map_ is unique
+ std::multimap<uint64_t, uint32_t> timers_map_;
+ // this map does the opposite, this map is not ordered
+ std::unordered_map<uint32_t, uint64_t> seqs_map_;
+ bool timer_on_;
+ std::unique_ptr<asio::steady_timer> interests_cache_timer_;
+ utils::SpinLock interests_cache_lock_;
+};
+
+} // namespace implementation
+
+} // end namespace transport
diff --git a/libtransport/src/implementation/socket.h b/libtransport/src/implementation/socket.h
new file mode 100644
index 000000000..2e51f3027
--- /dev/null
+++ b/libtransport/src/implementation/socket.h
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/interfaces/socket_options_default_values.h>
+#include <hicn/transport/interfaces/socket_options_keys.h>
+
+#include <core/facade.h>
+
+#define SOCKET_OPTION_GET 0
+#define SOCKET_OPTION_NOT_GET 1
+#define SOCKET_OPTION_SET 2
+#define SOCKET_OPTION_NOT_SET 3
+#define SOCKET_OPTION_DEFAULT 12345
+
+namespace transport {
+namespace implementation {
+
+// Forward Declarations
+template <typename PortalType>
+class Socket;
+
+// Define the portal and its connector, depending on the compilation options
+// passed by the build tool.
+using HicnForwarderPortal = core::HicnForwarderPortal;
+
+#ifdef __linux__
+#ifndef __ANDROID__
+using RawSocketPortal = core::RawSocketPortal;
+#endif
+#endif
+
+#ifdef __vpp__
+using VPPForwarderPortal = core::VPPForwarderPortal;
+using BaseSocket = Socket<VPPForwarderPortal>;
+using BasePortal = VPPForwarderPortal;
+#else
+using BaseSocket = Socket<HicnForwarderPortal>;
+using BasePortal = HicnForwarderPortal;
+#endif
+
+template <typename PortalType>
+class Socket {
+ static_assert(std::is_same<PortalType, HicnForwarderPortal>::value
+#ifdef __linux__
+#ifndef __ANDROID__
+ || std::is_same<PortalType, RawSocketPortal>::value
+#ifdef __vpp__
+ || std::is_same<PortalType, VPPForwarderPortal>::value
+#endif
+#endif
+ ,
+#else
+ ,
+
+#endif
+ "This class is not allowed as Portal");
+
+ public:
+ using Portal = PortalType;
+
+ virtual asio::io_service &getIoService() = 0;
+
+ virtual void connect() = 0;
+
+ virtual bool isRunning() = 0;
+
+ protected:
+ virtual ~Socket(){};
+};
+
+} // namespace implementation
+
+} // namespace transport
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
new file mode 100644
index 000000000..2fc8d2b48
--- /dev/null
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -0,0 +1,950 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/interfaces/socket_options_default_values.h>
+#include <hicn/transport/interfaces/statistics.h>
+#include <hicn/transport/security/verifier.h>
+
+#include <protocols/cbr.h>
+#include <protocols/protocol.h>
+#include <protocols/raaqm.h>
+#include <protocols/rtc.h>
+#include <utils/event_thread.h>
+
+namespace transport {
+namespace implementation {
+
+using namespace core;
+using namespace interface;
+using ReadCallback = interface::ConsumerSocket::ReadCallback;
+
+class ConsumerSocket : public Socket<BasePortal> {
+ public:
+ ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
+ : consumer_interface_(consumer),
+ portal_(std::make_shared<Portal>(io_service_)),
+ async_downloader_(),
+ interest_lifetime_(default_values::interest_lifetime),
+ min_window_size_(default_values::min_window_size),
+ max_window_size_(default_values::max_window_size),
+ current_window_size_(-1),
+ max_retransmissions_(
+ default_values::transport_protocol_max_retransmissions),
+ /****** RAAQM Parameters ******/
+ minimum_drop_probability_(default_values::minimum_drop_probability),
+ sample_number_(default_values::sample_number),
+ gamma_(default_values::gamma_value),
+ beta_(default_values::beta_value),
+ drop_factor_(default_values::drop_factor),
+ /****** END RAAQM Parameters ******/
+ rate_estimation_alpha_(default_values::rate_alpha),
+ rate_estimation_observer_(nullptr),
+ rate_estimation_batching_parameter_(default_values::batch),
+ rate_estimation_choice_(0),
+ verifier_(std::make_shared<utils::Verifier>()),
+ verify_signature_(false),
+ key_content_(false),
+ on_interest_output_(VOID_HANDLER),
+ on_interest_timeout_(VOID_HANDLER),
+ on_interest_satisfied_(VOID_HANDLER),
+ on_content_object_input_(VOID_HANDLER),
+ on_content_object_verification_(VOID_HANDLER),
+ on_content_object_(VOID_HANDLER),
+ stats_summary_(VOID_HANDLER),
+ read_callback_(nullptr),
+ virtual_download_(false),
+ timer_interval_milliseconds_(0),
+ guard_raaqm_params_() {
+ switch (protocol) {
+ case TransportProtocolAlgorithms::CBR:
+ transport_protocol_ =
+ std::make_unique<protocol::CbrTransportProtocol>(this);
+ break;
+ case TransportProtocolAlgorithms::RTC:
+ transport_protocol_ =
+ std::make_unique<protocol::RTCTransportProtocol>(this);
+ break;
+ case TransportProtocolAlgorithms::RAAQM:
+ default:
+ transport_protocol_ =
+ std::make_unique<protocol::RaaqmTransportProtocol>(this);
+ break;
+ }
+ }
+
+ ~ConsumerSocket() {
+ stop();
+ async_downloader_.stop();
+ }
+
+ interface::ConsumerSocket *getInterface() {
+ return consumer_interface_;
+ }
+
+ void setInterface(interface::ConsumerSocket *consumer_socket) {
+ consumer_interface_ = consumer_socket;
+ }
+
+ void connect() { portal_->connect(); }
+
+ bool isRunning() { return transport_protocol_->isRunning(); }
+
+ virtual int consume(const Name &name) {
+ if (transport_protocol_->isRunning()) {
+ return CONSUMER_BUSY;
+ }
+
+ network_name_ = name;
+ network_name_.setSuffix(0);
+
+ transport_protocol_->start();
+
+ return CONSUMER_FINISHED;
+ }
+
+ virtual int asyncConsume(const Name &name) {
+ if (!async_downloader_.stopped()) {
+ async_downloader_.add([this, name]() {
+ network_name_ = std::move(name);
+ network_name_.setSuffix(0);
+ transport_protocol_->start();
+ });
+ }
+
+ return CONSUMER_RUNNING;
+ }
+
+ bool verifyKeyPackets() { return transport_protocol_->verifyKeyPackets(); }
+
+ void stop() {
+ if (transport_protocol_->isRunning()) {
+ transport_protocol_->stop();
+ }
+ }
+
+ void resume() {
+ if (!transport_protocol_->isRunning()) {
+ transport_protocol_->resume();
+ }
+ }
+
+ asio::io_service &getIoService() { return portal_->getIoService(); }
+
+ virtual int setSocketOption(int socket_option_key,
+ ReadCallback *socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ReadCallback *socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ read_callback_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
+ int getSocketOption(int socket_option_key,
+ ReadCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ReadCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ *socket_option_value = read_callback_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
+ int setSocketOption(int socket_option_key, double socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case MIN_WINDOW_SIZE:
+ min_window_size_ = socket_option_value;
+ break;
+
+ case MAX_WINDOW_SIZE:
+ max_window_size_ = socket_option_value;
+ break;
+
+ case CURRENT_WINDOW_SIZE:
+ current_window_size_ = socket_option_value;
+ break;
+
+ case GAMMA_VALUE:
+ gamma_ = socket_option_value;
+ break;
+
+ case BETA_VALUE:
+ beta_ = socket_option_value;
+ break;
+
+ case DROP_FACTOR:
+ drop_factor_ = socket_option_value;
+ break;
+
+ case MINIMUM_DROP_PROBABILITY:
+ minimum_drop_probability_ = socket_option_value;
+ break;
+
+ case RATE_ESTIMATION_ALPHA:
+ if (socket_option_value >= 0 && socket_option_value < 1) {
+ rate_estimation_alpha_ = socket_option_value;
+ } else {
+ rate_estimation_alpha_ = default_values::alpha;
+ }
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ int setSocketOption(int socket_option_key, uint32_t socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAX_INTEREST_RETX:
+ max_retransmissions_ = socket_option_value;
+ break;
+
+ case GeneralTransportOptions::INTEREST_LIFETIME:
+ interest_lifetime_ = socket_option_value;
+ break;
+
+ case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
+ if (socket_option_value > 0) {
+ rate_estimation_batching_parameter_ = socket_option_value;
+ } else {
+ rate_estimation_batching_parameter_ = default_values::batch;
+ }
+ break;
+
+ case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
+ if (socket_option_value > 0) {
+ rate_estimation_choice_ = socket_option_value;
+ } else {
+ rate_estimation_choice_ = default_values::rate_choice;
+ }
+ break;
+
+ case GeneralTransportOptions::STATS_INTERVAL:
+ timer_interval_milliseconds_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ int setSocketOption(int socket_option_key,
+ std::nullptr_t socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ std::nullptr_t socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_retransmission_ = VOID_HANDLER;
+ break;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_timeout_ = VOID_HANDLER;
+ break;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_satisfied_ = VOID_HANDLER;
+ break;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_output_ = VOID_HANDLER;
+ break;
+ }
+
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_input_ = VOID_HANDLER;
+ break;
+ }
+
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_verification_ = VOID_HANDLER;
+ break;
+ }
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
+ int setSocketOption(int socket_option_key, bool socket_option_value) {
+ int result = SOCKET_OPTION_NOT_SET;
+ if (!transport_protocol_->isRunning()) {
+ switch (socket_option_key) {
+ case OtherOptions::VIRTUAL_DOWNLOAD:
+ virtual_download_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+
+ case GeneralTransportOptions::VERIFY_SIGNATURE:
+ verify_signature_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+
+ case GeneralTransportOptions::KEY_CONTENT:
+ key_content_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+
+ default:
+ return result;
+ }
+ }
+ return result;
+ }
+
+ int setSocketOption(int socket_option_key,
+ ConsumerContentObjectCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerContentObjectCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ on_content_object_input_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
+ int setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerContentObjectVerificationCallback socket_option_value)
+ -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ on_content_object_verification_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
+ int setSocketOption(int socket_option_key,
+ ConsumerInterestCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerInterestCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ on_interest_retransmission_ = socket_option_value;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ on_interest_output_ = socket_option_value;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ on_interest_timeout_ = socket_option_value;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ on_interest_satisfied_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
+ int setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](
+ int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback socket_option_value)
+ -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::VERIFICATION_FAILED:
+ verification_failed_callback_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
+ // int setSocketOption(
+ // int socket_option_key,
+ // ConsumerContentObjectVerificationFailedCallback socket_option_value) {
+ // return rescheduleOnIOService(
+ // socket_option_key, socket_option_value,
+ // [this](
+ // int socket_option_key,
+ // ConsumerContentObjectVerificationFailedCallback
+ // socket_option_value)
+ // -> int {
+ // switch (socket_option_key) {
+ // case ConsumerCallbacksOptions::VERIFICATION_FAILED:
+ // verification_failed_callback_ = socket_option_value;
+ // break;
+
+ // default:
+ // return SOCKET_OPTION_NOT_SET;
+ // }
+
+ // return SOCKET_OPTION_SET;
+ // });
+ // }
+
+ int setSocketOption(int socket_option_key, IcnObserver *socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
+ rate_estimation_observer_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ int setSocketOption(
+ int socket_option_key,
+ const std::shared_ptr<utils::Verifier> &socket_option_value) {
+ int result = SOCKET_OPTION_NOT_SET;
+ if (!transport_protocol_->isRunning()) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::VERIFIER:
+ verifier_.reset();
+ verifier_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+ default:
+ return result;
+ }
+ }
+
+ return result;
+ }
+
+ int setSocketOption(int socket_option_key,
+ const std::string &socket_option_value) {
+ int result = SOCKET_OPTION_NOT_SET;
+ if (!transport_protocol_->isRunning()) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::CERTIFICATE:
+ key_id_ = verifier_->addKeyFromCertificate(socket_option_value);
+
+ if (key_id_ != nullptr) {
+ result = SOCKET_OPTION_SET;
+ }
+ break;
+
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ output_interface_ = socket_option_value;
+ portal_->setOutputInterface(output_interface_);
+ result = SOCKET_OPTION_SET;
+ break;
+
+ default:
+ return result;
+ }
+ }
+ return result;
+ }
+
+ int setSocketOption(int socket_option_key,
+ ConsumerTimerCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerTimerCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::STATS_SUMMARY:
+ stats_summary_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
+ int getSocketOption(int socket_option_key, double &socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MIN_WINDOW_SIZE:
+ socket_option_value = min_window_size_;
+ break;
+
+ case GeneralTransportOptions::MAX_WINDOW_SIZE:
+ socket_option_value = max_window_size_;
+ break;
+
+ case GeneralTransportOptions::CURRENT_WINDOW_SIZE:
+ socket_option_value = current_window_size_;
+ break;
+
+ // RAAQM parameters
+
+ case RaaqmTransportOptions::GAMMA_VALUE:
+ socket_option_value = gamma_;
+ break;
+
+ case RaaqmTransportOptions::BETA_VALUE:
+ socket_option_value = beta_;
+ break;
+
+ case RaaqmTransportOptions::DROP_FACTOR:
+ socket_option_value = drop_factor_;
+ break;
+
+ case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY:
+ socket_option_value = minimum_drop_probability_;
+ break;
+
+ case RateEstimationOptions::RATE_ESTIMATION_ALPHA:
+ socket_option_value = rate_estimation_alpha_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ int getSocketOption(int socket_option_key, uint32_t &socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAX_INTEREST_RETX:
+ socket_option_value = max_retransmissions_;
+ break;
+
+ case GeneralTransportOptions::INTEREST_LIFETIME:
+ socket_option_value = interest_lifetime_;
+ break;
+
+ case RaaqmTransportOptions::SAMPLE_NUMBER:
+ socket_option_value = sample_number_;
+ break;
+
+ case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
+ socket_option_value = rate_estimation_batching_parameter_;
+ break;
+
+ case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
+ socket_option_value = rate_estimation_choice_;
+ break;
+
+ case GeneralTransportOptions::STATS_INTERVAL:
+ socket_option_value = timer_interval_milliseconds_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ int getSocketOption(int socket_option_key, bool &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::RUNNING:
+ socket_option_value = transport_protocol_->isRunning();
+ break;
+
+ case OtherOptions::VIRTUAL_DOWNLOAD:
+ socket_option_value = virtual_download_;
+ break;
+
+ case GeneralTransportOptions::VERIFY_SIGNATURE:
+ socket_option_value = verify_signature_;
+ break;
+
+ case GeneralTransportOptions::KEY_CONTENT:
+ socket_option_value = key_content_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ int getSocketOption(int socket_option_key, Name **socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+ *socket_option_value = &network_name_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ int getSocketOption(int socket_option_key,
+ ConsumerContentObjectCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerContentObjectCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ *socket_option_value = &on_content_object_input_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
+ int getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerContentObjectVerificationCallback **socket_option_value)
+ -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ *socket_option_value = &on_content_object_verification_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
+ int getSocketOption(int socket_option_key,
+ ConsumerInterestCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerInterestCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ *socket_option_value = &on_interest_retransmission_;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ *socket_option_value = &on_interest_output_;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ *socket_option_value = &on_interest_timeout_;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ *socket_option_value = &on_interest_satisfied_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
+ int getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback *
+ *socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::VERIFICATION_FAILED:
+ *socket_option_value = &verification_failed_callback_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
+ int getSocketOption(int socket_option_key,
+ std::shared_ptr<Portal> &socket_option_value) {
+ switch (socket_option_key) {
+ case PORTAL:
+ socket_option_value = portal_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ int getSocketOption(int socket_option_key,
+ IcnObserver **socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
+ *socket_option_value = (rate_estimation_observer_);
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ int getSocketOption(int socket_option_key,
+ std::shared_ptr<utils::Verifier> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::VERIFIER:
+ socket_option_value = verifier_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ int getSocketOption(int socket_option_key, std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ socket_option_value = output_interface_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ int getSocketOption(int socket_option_key,
+ interface::TransportStatistics **socket_option_value) {
+ switch (socket_option_key) {
+ case OtherOptions::STATISTICS:
+ *socket_option_value = &stats_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ int getSocketOption(int socket_option_key,
+ ConsumerTimerCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerTimerCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::STATS_SUMMARY:
+ *socket_option_value = &stats_summary_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
+ protected:
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
+ Lambda lambda_func) {
+ // To enforce type check
+ std::function<int(int, arg2)> func = lambda_func;
+ int result = SOCKET_OPTION_SET;
+ if (transport_protocol_->isRunning()) {
+ std::mutex mtx;
+ /* Condition variable for the wait */
+ std::condition_variable cv;
+ bool done = false;
+ io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
+ std::unique_lock<std::mutex> lck(mtx);
+ done = true;
+ result = func(socket_option_key, socket_option_value);
+ cv.notify_all();
+ });
+ std::unique_lock<std::mutex> lck(mtx);
+ if (!done) {
+ cv.wait(lck);
+ }
+ } else {
+ result = func(socket_option_key, socket_option_value);
+ }
+
+ return result;
+ }
+
+ protected:
+ interface::ConsumerSocket *consumer_interface_;
+ asio::io_service io_service_;
+
+ std::shared_ptr<Portal> portal_;
+ utils::EventThread async_downloader_;
+
+ // No need to protect from multiple accesses in the async consumer
+ // The parameter is accessible only with a getSocketOption and
+ // set from the consume
+ Name network_name_;
+
+ int interest_lifetime_;
+
+ double min_window_size_;
+ double max_window_size_;
+ double current_window_size_;
+ uint32_t max_retransmissions_;
+
+ // RAAQM Parameters
+ double minimum_drop_probability_;
+ unsigned int sample_number_;
+ double gamma_;
+ double beta_;
+ double drop_factor_;
+
+ // Rate estimation parameters
+ double rate_estimation_alpha_;
+ IcnObserver *rate_estimation_observer_;
+ int rate_estimation_batching_parameter_;
+ int rate_estimation_choice_;
+
+ bool is_async_;
+
+ // Verification parameters
+ std::shared_ptr<utils::Verifier> verifier_;
+ PARCKeyId *key_id_;
+ std::atomic_bool verify_signature_;
+ bool key_content_;
+
+ ConsumerInterestCallback on_interest_retransmission_;
+ ConsumerInterestCallback on_interest_output_;
+ ConsumerInterestCallback on_interest_timeout_;
+ ConsumerInterestCallback on_interest_satisfied_;
+ ConsumerContentObjectCallback on_content_object_input_;
+ ConsumerContentObjectVerificationCallback on_content_object_verification_;
+ ConsumerContentObjectCallback on_content_object_;
+ ConsumerTimerCallback stats_summary_;
+ ConsumerContentObjectVerificationFailedCallback verification_failed_callback_;
+
+ ReadCallback *read_callback_;
+
+ // Virtual download for traffic generator
+ bool virtual_download_;
+
+ uint32_t timer_interval_milliseconds_;
+
+ // Transport protocol
+ std::unique_ptr<protocol::TransportProtocol> transport_protocol_;
+
+ // Statistic
+ TransportStatistics stats_;
+
+ utils::SpinLock guard_raaqm_params_;
+ std::string output_interface_;
+};
+
+} // namespace implementation
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
new file mode 100644
index 000000000..1f03fe53d
--- /dev/null
+++ b/libtransport/src/implementation/socket_producer.h
@@ -0,0 +1,1061 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/security/signer.h>
+
+#include <implementation/socket.h>
+
+#include <utils/content_store.h>
+#include <utils/event_thread.h>
+#include <utils/suffix_strategy.h>
+
+#include <atomic>
+#include <cmath>
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include <thread>
+
+#define REGISTRATION_NOT_ATTEMPTED 0
+#define REGISTRATION_SUCCESS 1
+#define REGISTRATION_FAILURE 2
+#define REGISTRATION_IN_PROGRESS 3
+
+namespace transport {
+namespace implementation {
+
+using namespace core;
+using namespace interface;
+
+class ProducerSocket : public Socket<BasePortal>,
+ public BasePortal::ProducerCallback {
+ public:
+ explicit ProducerSocket(interface::ProducerSocket *producer_socket)
+ : producer_interface_(producer_socket),
+ portal_(std::make_shared<Portal>(io_service_)),
+ data_packet_size_(default_values::content_object_packet_size),
+ content_object_expiry_time_(default_values::content_object_expiry_time),
+ output_buffer_(default_values::producer_socket_output_buffer_size),
+ async_thread_(),
+ registration_status_(REGISTRATION_NOT_ATTEMPTED),
+ making_manifest_(false),
+ hash_algorithm_(HashAlgorithm::SHA_256),
+ suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL),
+ on_interest_input_(VOID_HANDLER),
+ on_interest_dropped_input_buffer_(VOID_HANDLER),
+ on_interest_inserted_input_buffer_(VOID_HANDLER),
+ on_interest_satisfied_output_buffer_(VOID_HANDLER),
+ on_interest_process_(VOID_HANDLER),
+ on_new_segment_(VOID_HANDLER),
+ on_content_object_to_sign_(VOID_HANDLER),
+ on_content_object_in_output_buffer_(VOID_HANDLER),
+ on_content_object_output_(VOID_HANDLER),
+ on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
+ on_content_produced_(VOID_HANDLER) {}
+
+ virtual ~ProducerSocket() {
+ stop();
+ if (listening_thread_.joinable()) {
+ listening_thread_.join();
+ }
+ }
+
+ interface::ProducerSocket *getInterface() {
+ return producer_interface_;
+ }
+
+ void setInterface(interface::ProducerSocket *producer_socket) {
+ producer_interface_ = producer_socket;
+ }
+
+ void connect() override {
+ portal_->connect(false);
+ listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this));
+ }
+
+ bool isRunning() override { return !io_service_.stopped(); };
+
+ virtual uint32_t produce(Name content_name, const uint8_t *buffer,
+ size_t buffer_size, bool is_last = true,
+ uint32_t start_offset = 0) {
+ return ProducerSocket::produce(
+ content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), is_last,
+ start_offset);
+ }
+
+ virtual uint32_t produce(Name content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true, uint32_t start_offset = 0) {
+ if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) {
+ return 0;
+ }
+
+ // Copy the atomic variables to ensure they keep the same value
+ // during the production
+ std::size_t data_packet_size = data_packet_size_;
+ uint32_t content_object_expiry_time = content_object_expiry_time_;
+ HashAlgorithm hash_algo = hash_algorithm_;
+ bool making_manifest = making_manifest_;
+ auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy(
+ suffix_strategy_, start_offset);
+ std::shared_ptr<utils::Signer> signer;
+ getSocketOption(GeneralTransportOptions::SIGNER, signer);
+
+ auto buffer_size = buffer->length();
+ int bytes_segmented = 0;
+ std::size_t header_size;
+ std::size_t manifest_header_size = 0;
+ std::size_t signature_length = 0;
+ std::uint32_t final_block_number = start_offset;
+ uint64_t free_space_for_content = 0;
+
+ core::Packet::Format format;
+ std::shared_ptr<ContentObjectManifest> manifest;
+ bool is_last_manifest = false;
+
+ // TODO Manifest may still be used for indexing
+ if (making_manifest && !signer) {
+ TRANSPORT_LOGD("Making manifests without setting producer identity.");
+ }
+
+ core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
+ core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC;
+ if (content_name.getType() == HNT_CONTIGUOUS_V4 ||
+ content_name.getType() == HNT_IOV_V4) {
+ hf_format = core::Packet::Format::HF_INET_TCP;
+ hf_format_ah = core::Packet::Format::HF_INET_TCP_AH;
+ } else if (content_name.getType() == HNT_CONTIGUOUS_V6 ||
+ content_name.getType() == HNT_IOV_V6) {
+ hf_format = core::Packet::Format::HF_INET6_TCP;
+ hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH;
+ } else {
+ throw errors::RuntimeException("Unknown name format.");
+ }
+
+ format = hf_format;
+ if (making_manifest) {
+ manifest_header_size = core::Packet::getHeaderSizeFromFormat(
+ signer ? hf_format_ah : hf_format,
+ signer ? signer->getSignatureLength() : 0);
+ } else if (signer) {
+ format = hf_format_ah;
+ signature_length = signer->getSignatureLength();
+ }
+
+ header_size =
+ core::Packet::getHeaderSizeFromFormat(format, signature_length);
+ free_space_for_content = data_packet_size - header_size;
+ uint32_t number_of_segments = uint32_t(
+ std::ceil(double(buffer_size) / double(free_space_for_content)));
+ if (free_space_for_content * number_of_segments < buffer_size) {
+ number_of_segments++;
+ }
+
+ // TODO allocate space for all the headers
+ if (making_manifest) {
+ uint32_t segment_in_manifest = static_cast<uint32_t>(
+ std::floor(double(data_packet_size - manifest_header_size -
+ ContentObjectManifest::getManifestHeaderSize()) /
+ ContentObjectManifest::getManifestEntrySize()) -
+ 1.0);
+ uint32_t number_of_manifests = static_cast<uint32_t>(
+ std::ceil(float(number_of_segments) / segment_in_manifest));
+ final_block_number += number_of_segments + number_of_manifests - 1;
+
+ manifest.reset(ContentObjectManifest::createManifest(
+ content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
+ core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
+ hash_algo, is_last_manifest, content_name, suffix_strategy_,
+ signer ? signer->getSignatureLength() : 0));
+ manifest->setLifetime(content_object_expiry_time);
+
+ if (is_last) {
+ manifest->setFinalBlockNumber(final_block_number);
+ } else {
+ manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX);
+ }
+ }
+
+ TRANSPORT_LOGD("--------- START PRODUCE ----------");
+ for (unsigned int packaged_segments = 0;
+ packaged_segments < number_of_segments; packaged_segments++) {
+ if (making_manifest) {
+ if (manifest->estimateManifestSize(2) >
+ data_packet_size - manifest_header_size) {
+ // Send the current manifest
+ manifest->encode();
+
+ // If identity set, sign manifest
+ if (signer) {
+ signer->sign(*manifest);
+ }
+
+ passContentObjectToCallbacks(manifest);
+ TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
+
+ // Send content objects stored in the queue
+ while (!content_queue_.empty()) {
+ passContentObjectToCallbacks(content_queue_.front());
+ TRANSPORT_LOGD("Send content %u",
+ content_queue_.front()->getName().getSuffix());
+ content_queue_.pop();
+ }
+
+ // Create new manifest. The reference to the last manifest has been
+ // acquired in the passContentObjectToCallbacks function, so we can
+ // safely release this reference
+ manifest.reset(ContentObjectManifest::createManifest(
+ content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
+ core::ManifestVersion::VERSION_1,
+ core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
+ content_name, suffix_strategy_,
+ signer ? signer->getSignatureLength() : 0));
+
+ manifest->setLifetime(content_object_expiry_time);
+ manifest->setFinalBlockNumber(
+ is_last ? final_block_number
+ : utils::SuffixStrategy::INVALID_SUFFIX);
+ }
+ }
+
+ auto content_suffix = suffix_strategy->getNextContentSuffix();
+ auto content_object = std::make_shared<ContentObject>(
+ content_name.setSuffix(content_suffix), format);
+ content_object->setLifetime(content_object_expiry_time);
+
+ auto b = buffer->cloneOne();
+ b->trimStart(free_space_for_content * packaged_segments);
+ b->trimEnd(b->length());
+
+ if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) {
+ b->append(buffer_size - bytes_segmented);
+ bytes_segmented += (int)(buffer_size - bytes_segmented);
+
+ if (is_last && making_manifest) {
+ is_last_manifest = true;
+ } else if (is_last) {
+ content_object->setRst();
+ }
+
+ } else {
+ b->append(free_space_for_content);
+ bytes_segmented += (int)(free_space_for_content);
+ }
+
+ content_object->appendPayload(std::move(b));
+
+ if (making_manifest) {
+ using namespace std::chrono_literals;
+ utils::CryptoHash hash = content_object->computeDigest(hash_algo);
+ manifest->addSuffixHash(content_suffix, hash);
+ content_queue_.push(content_object);
+ } else {
+ if (signer) {
+ signer->sign(*content_object);
+ }
+ passContentObjectToCallbacks(content_object);
+ TRANSPORT_LOGD("Send content %u",
+ content_object->getName().getSuffix());
+ }
+ }
+
+ if (making_manifest) {
+ if (is_last_manifest) {
+ manifest->setFinalManifest(is_last_manifest);
+ }
+
+ manifest->encode();
+ if (signer) {
+ signer->sign(*manifest);
+ }
+
+ passContentObjectToCallbacks(manifest);
+ TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
+ while (!content_queue_.empty()) {
+ passContentObjectToCallbacks(content_queue_.front());
+ TRANSPORT_LOGD("Send content %u",
+ content_queue_.front()->getName().getSuffix());
+ content_queue_.pop();
+ }
+ }
+
+ io_service_.dispatch([this, buffer_size]() {
+ if (on_content_produced_) {
+ on_content_produced_(*producer_interface_,
+ std::make_error_code(std::errc(0)), buffer_size);
+ }
+ });
+
+ TRANSPORT_LOGD("--------- END PRODUCE ------------");
+ return suffix_strategy->getTotalCount();
+ }
+
+ virtual void produce(ContentObject &content_object) {
+ io_service_.dispatch([this, &content_object]() {
+ if (on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_(*producer_interface_,
+ content_object);
+ }
+ });
+
+ output_buffer_.insert(std::static_pointer_cast<ContentObject>(
+ content_object.shared_from_this()));
+
+ io_service_.dispatch([this, &content_object]() {
+ if (on_content_object_output_) {
+ on_content_object_output_(*producer_interface_, content_object);
+ }
+ });
+
+ portal_->sendContentObject(content_object);
+ }
+
+ virtual void produce(const uint8_t *buffer, size_t buffer_size) {
+ produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
+ }
+
+ virtual void produce(std::unique_ptr<utils::MemBuf> &&buffer) {
+ // This API is meant to be used just with the RTC producer.
+ // Here it cannot be used since no name for the content is specified.
+ throw errors::NotImplementedException();
+ }
+
+ virtual void asyncProduce(const Name &suffix, const uint8_t *buf,
+ size_t buffer_size, bool is_last = true,
+ uint32_t *start_offset = nullptr) {
+ if (!async_thread_.stopped()) {
+ async_thread_.add([this, suffix, buffer = buf, size = buffer_size,
+ is_last, start_offset]() {
+ if (start_offset != nullptr) {
+ *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last,
+ *start_offset);
+ } else {
+ ProducerSocket::produce(suffix, buffer, size, is_last, 0);
+ }
+ });
+ }
+ }
+
+ void asyncProduce(const Name &suffix);
+
+ virtual void asyncProduce(Name content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t offset,
+ uint32_t **last_segment = nullptr) {
+ if (!async_thread_.stopped()) {
+ auto a = buffer.release();
+ async_thread_.add(
+ [this, content_name, a, is_last, offset, last_segment]() {
+ auto buf = std::unique_ptr<utils::MemBuf>(a);
+ if (last_segment != NULL) {
+ **last_segment =
+ offset + ProducerSocket::produce(content_name, std::move(buf),
+ is_last, offset);
+ } else {
+ ProducerSocket::produce(content_name, std::move(buf), is_last,
+ offset);
+ }
+ });
+ }
+ }
+
+ virtual void asyncProduce(ContentObject &content_object) {
+ if (!async_thread_.stopped()) {
+ auto co_ptr = std::static_pointer_cast<ContentObject>(
+ content_object.shared_from_this());
+ async_thread_.add([this, content_object = std::move(co_ptr)]() {
+ ProducerSocket::produce(*content_object);
+ });
+ }
+ }
+
+ virtual void registerPrefix(const Prefix &producer_namespace) {
+ served_namespaces_.push_back(producer_namespace);
+ }
+
+ void serveForever() {
+ if (listening_thread_.joinable()) {
+ listening_thread_.join();
+ }
+ }
+
+ void stop() { portal_->stopEventsLoop(); }
+
+ asio::io_service &getIoService() override { return portal_->getIoService(); };
+
+ virtual void onInterest(Interest &interest) {
+ if (on_interest_input_) {
+ on_interest_input_(*producer_interface_, interest);
+ }
+
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(interest);
+
+ if (content_object) {
+ if (on_interest_satisfied_output_buffer_) {
+ on_interest_satisfied_output_buffer_(*producer_interface_, interest);
+ }
+
+ if (on_content_object_output_) {
+ on_content_object_output_(*producer_interface_, *content_object);
+ }
+
+ portal_->sendContentObject(*content_object);
+ } else {
+ if (on_interest_process_) {
+ on_interest_process_(*producer_interface_, interest);
+ }
+ }
+ }
+
+ virtual void onInterest(Interest::Ptr &&interest) override {
+ onInterest(*interest);
+ };
+
+ virtual int setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::DATA_PACKET_SIZE:
+ if (socket_option_value < default_values::max_content_object_size &&
+ socket_option_value > 0) {
+ data_packet_size_ = socket_option_value;
+ }
+ break;
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ output_buffer_.setLimit(socket_option_value);
+ break;
+
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ content_object_expiry_time_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ virtual int setSocketOption(int socket_option_key,
+ std::nullptr_t socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentObjectCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_input_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_dropped_input_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_inserted_input_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_satisfied_output_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_process_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_new_segment_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_to_sign_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_in_output_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_output_ = VOID_HANDLER;
+ break;
+ }
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
+ virtual int setSocketOption(int socket_option_key, bool socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ making_manifest_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ virtual int setSocketOption(int socket_option_key,
+ Name *socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ virtual int setSocketOption(int socket_option_key,
+ std::list<Prefix> socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+ served_namespaces_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ virtual int setSocketOption(
+ int socket_option_key,
+ interface::ProducerContentObjectCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentObjectCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ on_new_segment_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ on_content_object_to_sign_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ on_content_object_in_output_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ on_content_object_output_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
+ virtual int setSocketOption(
+ int socket_option_key,
+ interface::ProducerInterestCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerInterestCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ on_interest_input_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ on_interest_dropped_input_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ on_interest_inserted_input_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ on_interest_satisfied_output_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ on_interest_process_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
+ virtual int setSocketOption(
+ int socket_option_key,
+ interface::ProducerContentCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ on_content_produced_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
+ virtual int setSocketOption(int socket_option_key,
+ HashAlgorithm socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ hash_algorithm_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ virtual int setSocketOption(int socket_option_key,
+ utils::CryptoSuite socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::CRYPTO_SUITE:
+ crypto_suite_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ virtual int setSocketOption(
+ int socket_option_key,
+ const std::shared_ptr<utils::Signer> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::SIGNER: {
+ utils::SpinLock::Acquire locked(signer_lock_);
+ signer_.reset();
+ signer_ = socket_option_value;
+ } break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ virtual int getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ socket_option_value = (uint32_t)output_buffer_.getLimit();
+ break;
+
+ case GeneralTransportOptions::DATA_PACKET_SIZE:
+ socket_option_value = (uint32_t)data_packet_size_;
+ break;
+
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ socket_option_value = content_object_expiry_time_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ virtual int getSocketOption(int socket_option_key,
+ bool &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ socket_option_value = making_manifest_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ virtual int getSocketOption(int socket_option_key,
+ std::list<Prefix> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+ socket_option_value = served_namespaces_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ virtual int getSocketOption(
+ int socket_option_key,
+ interface::ProducerContentObjectCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentObjectCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ *socket_option_value = &on_new_segment_;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ *socket_option_value = &on_content_object_to_sign_;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ *socket_option_value = &on_content_object_in_output_buffer_;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ *socket_option_value = &on_content_object_output_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
+ virtual int getSocketOption(
+ int socket_option_key,
+ interface::ProducerContentCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ *socket_option_value = &on_content_produced_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
+ virtual int getSocketOption(
+ int socket_option_key,
+ interface::ProducerInterestCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerInterestCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ *socket_option_value = &on_interest_input_;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ *socket_option_value = &on_interest_dropped_input_buffer_;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ *socket_option_value = &on_interest_inserted_input_buffer_;
+ break;
+
+ case CACHE_HIT:
+ *socket_option_value = &on_interest_satisfied_output_buffer_;
+ break;
+
+ case CACHE_MISS:
+ *socket_option_value = &on_interest_process_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
+ virtual int getSocketOption(int socket_option_key,
+ std::shared_ptr<Portal> &socket_option_value) {
+ switch (socket_option_key) {
+ case PORTAL:
+ socket_option_value = portal_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ ;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ virtual int getSocketOption(int socket_option_key,
+ HashAlgorithm &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ socket_option_value = hash_algorithm_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ virtual int getSocketOption(int socket_option_key,
+ utils::CryptoSuite &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ socket_option_value = crypto_suite_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ virtual int getSocketOption(
+ int socket_option_key,
+ std::shared_ptr<utils::Signer> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::SIGNER: {
+ utils::SpinLock::Acquire locked(signer_lock_);
+ socket_option_value = signer_;
+ } break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ virtual int setSocketOption(int socket_option_key,
+ const std::string &socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ // If the thread calling lambda_func is not the same of io_service, this
+ // function reschedule the function on it
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
+ Lambda lambda_func) {
+ // To enforce type check
+ std::function<int(int, arg2)> func = lambda_func;
+ int result = SOCKET_OPTION_SET;
+ if (listening_thread_.joinable() &&
+ std::this_thread::get_id() != listening_thread_.get_id()) {
+ std::mutex mtx;
+ /* Condition variable for the wait */
+ std::condition_variable cv;
+ bool done = false;
+ io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
+ std::unique_lock<std::mutex> lck(mtx);
+ done = true;
+ result = func(socket_option_key, socket_option_value);
+ cv.notify_all();
+ });
+ std::unique_lock<std::mutex> lck(mtx);
+ if (!done) {
+ cv.wait(lck);
+ }
+ } else {
+ result = func(socket_option_key, socket_option_value);
+ }
+
+ return result;
+ }
+
+ // If the thread calling lambda_func is not the same of io_service, this
+ // function reschedule the function on it
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOServiceWithReference(int socket_option_key,
+ arg2 &socket_option_value,
+ Lambda lambda_func) {
+ // To enforce type check
+ std::function<int(int, arg2 &)> func = lambda_func;
+ int result = SOCKET_OPTION_SET;
+ if (listening_thread_.joinable() &&
+ std::this_thread::get_id() != this->listening_thread_.get_id()) {
+ std::mutex mtx;
+ /* Condition variable for the wait */
+ std::condition_variable cv;
+ std::unique_lock<std::mutex> lck(mtx);
+ bool done = false;
+ io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
+ std::unique_lock<std::mutex> lck(mtx);
+ done = true;
+ result = func(socket_option_key, socket_option_value);
+
+ if (!done) {
+ cv.wait(lck);
+ }
+ });
+ } else {
+ result = func(socket_option_key, socket_option_value);
+ }
+
+ return result;
+ }
+
+ // Threads
+ protected:
+ interface::ProducerSocket *producer_interface_;
+ std::thread listening_thread_;
+ asio::io_service io_service_;
+ std::shared_ptr<Portal> portal_;
+ std::atomic<size_t> data_packet_size_;
+ std::list<Prefix>
+ served_namespaces_; // No need to be threadsafe, this is always modified
+ // by the application thread
+ std::atomic<uint32_t> content_object_expiry_time_;
+
+ // buffers
+ // ContentStore is thread-safe
+ utils::ContentStore output_buffer_;
+
+ utils::EventThread async_thread_;
+ int registration_status_;
+
+ std::atomic<bool> making_manifest_;
+
+ // map for storing sequence numbers for several calls of the publish
+ // function
+ std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_;
+
+ std::atomic<HashAlgorithm> hash_algorithm_;
+ std::atomic<utils::CryptoSuite> crypto_suite_;
+ utils::SpinLock signer_lock_;
+ std::shared_ptr<utils::Signer> signer_;
+ core::NextSegmentCalculationStrategy suffix_strategy_;
+
+ // While manifests are being built, contents are stored in a queue
+ std::queue<std::shared_ptr<ContentObject>> content_queue_;
+
+ // callbacks
+ ProducerInterestCallback on_interest_input_;
+ ProducerInterestCallback on_interest_dropped_input_buffer_;
+ ProducerInterestCallback on_interest_inserted_input_buffer_;
+ ProducerInterestCallback on_interest_satisfied_output_buffer_;
+ ProducerInterestCallback on_interest_process_;
+
+ ProducerContentObjectCallback on_new_segment_;
+ ProducerContentObjectCallback on_content_object_to_sign_;
+ ProducerContentObjectCallback on_content_object_in_output_buffer_;
+ ProducerContentObjectCallback on_content_object_output_;
+ ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_;
+
+ ProducerContentCallback on_content_produced_;
+
+ private:
+ void listen() {
+ bool first = true;
+
+ for (core::Prefix &producer_namespace : served_namespaces_) {
+ if (first) {
+ core::BindConfig bind_config(producer_namespace, 1000);
+ portal_->bind(bind_config);
+ portal_->setProducerCallback(this);
+ first = !first;
+ } else {
+ portal_->registerRoute(producer_namespace);
+ }
+ }
+
+ portal_->runEventsLoop();
+ }
+
+ void passContentObjectToCallbacks(
+ const std::shared_ptr<ContentObject> &content_object) {
+ if (content_object) {
+ io_service_.dispatch([this, content_object]() {
+ if (on_new_segment_) {
+ on_new_segment_(*producer_interface_, *content_object);
+ }
+
+ if (on_content_object_to_sign_) {
+ on_content_object_to_sign_(*producer_interface_, *content_object);
+ }
+
+ if (on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_(*producer_interface_,
+ *content_object);
+ }
+ });
+
+ output_buffer_.insert(content_object);
+
+ io_service_.dispatch([this, content_object]() {
+ if (on_content_object_output_) {
+ on_content_object_output_(*producer_interface_, *content_object);
+ }
+ });
+
+ portal_->sendContentObject(*content_object);
+ }
+ }
+}; // namespace implementation
+
+} // namespace implementation
+
+} // namespace transport
diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc
new file mode 100644
index 000000000..3b3152993
--- /dev/null
+++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc
@@ -0,0 +1,201 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
+
+#include <implementation/p2psecure_socket_producer.h>
+#include <implementation/tls_rtc_socket_producer.h>
+
+#include <openssl/bio.h>
+#include <openssl/rand.h>
+#include <openssl/ssl.h>
+
+namespace transport {
+namespace implementation {
+
+int TLSRTCProducerSocket::read(BIO *b, char *buf, size_t size,
+ size_t *readbytes) {
+ int ret;
+
+ if (size > INT_MAX) size = INT_MAX;
+
+ ret = TLSRTCProducerSocket::readOld(b, buf, (int)size);
+
+ if (ret <= 0) {
+ *readbytes = 0;
+ return ret;
+ }
+
+ *readbytes = (size_t)ret;
+
+ return 1;
+}
+
+int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) {
+ TLSRTCProducerSocket *socket;
+ socket = (TLSRTCProducerSocket *)BIO_get_data(b);
+
+ std::unique_lock<std::mutex> lck(socket->mtx_);
+ if (!socket->something_to_read_) {
+ (socket->cv_).wait(lck);
+ }
+
+ utils::MemBuf *membuf = socket->packet_->next();
+ int size_to_read;
+
+ if ((int)membuf->length() > size) {
+ size_to_read = size;
+ } else {
+ size_to_read = membuf->length();
+ socket->something_to_read_ = false;
+ }
+
+ std::memcpy(buf, membuf->data(), size_to_read);
+ membuf->trimStart(size_to_read);
+
+ return size_to_read;
+}
+
+int TLSRTCProducerSocket::write(BIO *b, const char *buf, size_t size,
+ size_t *written) {
+ int ret;
+
+ if (size > INT_MAX) size = INT_MAX;
+
+ ret = TLSRTCProducerSocket::writeOld(b, buf, (int)size);
+
+ if (ret <= 0) {
+ *written = 0;
+ return ret;
+ }
+
+ *written = (size_t)ret;
+
+ return 1;
+}
+
+int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) {
+ TLSRTCProducerSocket *socket;
+ socket = (TLSRTCProducerSocket *)BIO_get_data(b);
+
+ if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) &&
+ socket->first_) {
+ socket->tls_chunks_--;
+ bool making_manifest = socket->parent_->making_manifest_;
+ socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
+ false);
+ socket->parent_->ProducerSocket::produce(
+ socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, 0);
+ socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
+ making_manifest);
+ socket->first_ = false;
+
+ } else {
+ std::unique_ptr<utils::MemBuf> mbuf =
+ utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0);
+ auto a = mbuf.release();
+ socket->async_thread_.add([socket = socket, a]() {
+ socket->to_call_oncontentproduced_--;
+ auto mbuf = std::unique_ptr<utils::MemBuf>(a);
+ socket->RTCProducerSocket::produce(std::move(mbuf));
+ ProducerContentCallback on_content_produced_application;
+ socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
+ on_content_produced_application);
+ if (socket->to_call_oncontentproduced_ == 0 &&
+ on_content_produced_application) {
+ on_content_produced_application(
+ (transport::interface::ProducerSocket &)(*socket->getInterface()),
+ std::error_code(), 0);
+ }
+ });
+ }
+
+ return num;
+}
+
+TLSRTCProducerSocket::TLSRTCProducerSocket(
+ interface::ProducerSocket *producer_socket, P2PSecureProducerSocket *parent,
+ const Name &handshake_name)
+ : ProducerSocket(producer_socket),
+ RTCProducerSocket(producer_socket),
+ TLSProducerSocket(producer_socket, parent, handshake_name) {
+ BIO_METHOD *bio_meth =
+ BIO_meth_new(BIO_TYPE_ACCEPT, "secure rtc producer socket");
+ BIO_meth_set_read(bio_meth, TLSRTCProducerSocket::readOld);
+ BIO_meth_set_write(bio_meth, TLSRTCProducerSocket::writeOld);
+ BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl);
+ BIO *bio = BIO_new(bio_meth);
+ BIO_set_init(bio, 1);
+ BIO_set_data(bio, this);
+ SSL_set_bio(ssl_, bio, bio);
+}
+
+void TLSRTCProducerSocket::accept() {
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ tls_chunks_ = 1;
+ int result = SSL_accept(ssl_);
+ if (result != 1)
+ throw errors::RuntimeException("Unable to perform client handshake");
+ }
+
+ TRANSPORT_LOGD("Handshake performed!");
+ parent_->list_secure_rtc_producers.push_front(
+ std::move(parent_->map_secure_rtc_producers[handshake_name_]));
+ parent_->map_secure_rtc_producers.erase(handshake_name_);
+
+ ProducerInterestCallback on_interest_process_decrypted;
+ getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ on_interest_process_decrypted);
+
+ if (on_interest_process_decrypted) {
+ Interest inter(std::move(packet_));
+ on_interest_process_decrypted(
+ (transport::interface::ProducerSocket &)(*getInterface()), inter);
+ }
+
+ parent_->cv_.notify_one();
+}
+
+int TLSRTCProducerSocket::async_accept() {
+ if (!async_thread_.stopped()) {
+ async_thread_.add([this]() { this->TLSRTCProducerSocket::accept(); });
+ } else {
+ throw errors::RuntimeException(
+ "Async thread not running, impossible to perform handshake");
+ }
+
+ return 1;
+}
+
+void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ throw errors::RuntimeException(
+ "New handshake on the same P2P secure producer socket not supported");
+ }
+
+ size_t buf_size = buffer->length();
+ tls_chunks_ = ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH);
+ to_call_oncontentproduced_ = tls_chunks_;
+
+ SSL_write(ssl_, buffer->data(), buf_size);
+ BIO *wbio = SSL_get_wbio(ssl_);
+ int i = BIO_flush(wbio);
+ (void)i; // To shut up gcc 5
+}
+
+} // namespace implementation
+
+} // namespace transport
diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.h b/libtransport/src/implementation/tls_rtc_socket_producer.h
new file mode 100644
index 000000000..685c91244
--- /dev/null
+++ b/libtransport/src/implementation/tls_rtc_socket_producer.h
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <implementation/rtc_socket_producer.h>
+#include <implementation/tls_socket_producer.h>
+
+namespace transport {
+namespace implementation {
+
+class P2PSecureProducerSocket;
+
+class TLSRTCProducerSocket : public RTCProducerSocket,
+ public TLSProducerSocket {
+ friend class P2PSecureProducerSocket;
+
+ public:
+ explicit TLSRTCProducerSocket(interface::ProducerSocket *producer_socket,
+ P2PSecureProducerSocket *parent,
+ const Name &handshake_name);
+
+ ~TLSRTCProducerSocket() = default;
+
+ void produce(std::unique_ptr<utils::MemBuf> &&buffer) override;
+
+ void accept() override;
+
+ int async_accept() override;
+
+ using TLSProducerSocket::onInterest;
+ using TLSProducerSocket::produce;
+
+ protected:
+ static int read(BIO *b, char *buf, size_t size, size_t *readbytes);
+
+ static int readOld(BIO *h, char *buf, int size);
+
+ static int write(BIO *b, const char *buf, size_t size, size_t *written);
+
+ static int writeOld(BIO *h, const char *buf, int num);
+};
+
+} // namespace implementation
+
+} // end namespace transport
diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc
new file mode 100644
index 000000000..95b287aa6
--- /dev/null
+++ b/libtransport/src/implementation/tls_socket_consumer.cc
@@ -0,0 +1,384 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <implementation/tls_socket_consumer.h>
+
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+#include <openssl/tls1.h>
+
+#include <random>
+
+namespace transport {
+namespace implementation {
+
+void TLSConsumerSocket::setInterestPayload(interface::ConsumerSocket &c,
+ const core::Interest &interest) {
+ Interest &int2 = const_cast<Interest &>(interest);
+ random_suffix_ = int2.getName().getSuffix();
+
+ if (payload_ != NULL) int2.appendPayload(std::move(payload_));
+}
+
+/* Return the number of read bytes in the return param */
+int readOldTLS(BIO *b, char *buf, int size) {
+ if (size < 0) return size;
+
+ TLSConsumerSocket *socket;
+ socket = (TLSConsumerSocket *)BIO_get_data(b);
+
+ std::unique_lock<std::mutex> lck(socket->mtx_);
+
+ if (!socket->something_to_read_) {
+ if (!socket->transport_protocol_->isRunning()) {
+ socket->network_name_.setSuffix(socket->random_suffix_);
+ socket->ConsumerSocket::asyncConsume(socket->network_name_);
+ }
+ if (!socket->something_to_read_) socket->cv_.wait(lck);
+ }
+
+ size_t size_to_read, read;
+ size_t chain_size = socket->head_->length();
+ if (socket->head_->isChained())
+ chain_size = socket->head_->computeChainDataLength();
+
+ if (chain_size > (size_t)size) {
+ read = size_to_read = (size_t)size;
+ } else {
+ read = size_to_read = chain_size;
+ socket->something_to_read_ = false;
+ }
+
+ while (size_to_read) {
+ if (socket->head_->length() < size_to_read) {
+ std::memcpy(buf, socket->head_->data(), socket->head_->length());
+ size_to_read -= socket->head_->length();
+ buf += socket->head_->length();
+ socket->head_ = socket->head_->pop();
+ } else {
+ std::memcpy(buf, socket->head_->data(), size_to_read);
+ socket->head_->trimStart(size_to_read);
+ size_to_read = 0;
+ }
+ }
+
+ return read;
+}
+
+/* Return the number of read bytes in readbytes */
+int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes) {
+ int ret;
+
+ if (size > INT_MAX) size = INT_MAX;
+
+ ret = readOldTLS(b, buf, (int)size);
+
+ if (ret <= 0) {
+ *readbytes = 0;
+ return ret;
+ }
+
+ *readbytes = (size_t)ret;
+
+ return 1;
+}
+
+/* Return the number of written bytes in the return param */
+int writeOldTLS(BIO *b, const char *buf, int num) {
+ TLSConsumerSocket *socket;
+ socket = (TLSConsumerSocket *)BIO_get_data(b);
+
+ socket->payload_ = utils::MemBuf::copyBuffer(buf, num);
+ socket->ConsumerSocket::setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &TLSConsumerSocket::setInterestPayload, socket, std::placeholders::_1,
+ std::placeholders::_2));
+
+ return num;
+}
+
+/* Return the number of written bytes in written */
+int writeTLS(BIO *b, const char *buf, size_t size, size_t *written) {
+ int ret;
+
+ if (size > INT_MAX) size = INT_MAX;
+
+ ret = writeOldTLS(b, buf, (int)size);
+
+ if (ret <= 0) {
+ *written = 0;
+ return ret;
+ }
+
+ *written = (size_t)ret;
+
+ return 1;
+}
+
+long ctrlTLS(BIO *b, int cmd, long num, void *ptr) { return 1; }
+
+TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket,
+ int protocol, SSL *ssl)
+ : ConsumerSocket(consumer_socket, protocol),
+ name_(),
+ buf_pool_(),
+ decrypted_content_(),
+ payload_(),
+ head_(),
+ something_to_read_(false),
+ content_downloaded_(false),
+ random_suffix_(),
+ producer_namespace_(),
+ read_callback_decrypted_(),
+ mtx_(),
+ cv_(),
+ async_downloader_tls_() {
+ /* Create the (d)TLS state */
+ const SSL_METHOD *meth = TLS_client_method();
+ ctx_ = SSL_CTX_new(meth);
+
+ int result =
+ SSL_CTX_set_ciphersuites(ctx_,
+ "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_"
+ "SHA256:TLS_AES_128_GCM_SHA256");
+ if (result != 1) {
+ throw errors::RuntimeException(
+ "Unable to set cipher list on TLS subsystem. Aborting.");
+ }
+
+ SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL);
+ SSL_CTX_set_ssl_version(ctx_, meth);
+
+ ssl_ = ssl;
+
+ BIO_METHOD *bio_meth =
+ BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket");
+ BIO_meth_set_read(bio_meth, readOldTLS);
+ BIO_meth_set_write(bio_meth, writeOldTLS);
+ BIO_meth_set_ctrl(bio_meth, ctrlTLS);
+ BIO *bio = BIO_new(bio_meth);
+ BIO_set_init(bio, 1);
+ BIO_set_data(bio, this);
+ SSL_set_bio(ssl_, bio, bio);
+
+ ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
+
+ ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
+
+ std::default_random_engine generator;
+ std::uniform_int_distribution<int> distribution(
+ 1, std::numeric_limits<uint32_t>::max());
+ random_suffix_ = 0;
+
+ this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ this);
+};
+
+/*
+ * The producer interface is not owned by the application, so is TLSSocket task
+ * to deallocate the memory
+ */
+TLSConsumerSocket::~TLSConsumerSocket() { delete consumer_interface_; }
+
+int TLSConsumerSocket::consume(const Name &name,
+ std::unique_ptr<utils::MemBuf> &&buffer) {
+ this->payload_ = std::move(buffer);
+
+ this->ConsumerSocket::setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1,
+ std::placeholders::_2));
+
+ return consume(name);
+}
+
+int TLSConsumerSocket::consume(const Name &name) {
+ if (transport_protocol_->isRunning()) {
+ return CONSUMER_BUSY;
+ }
+
+ if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
+ throw errors::RuntimeException("Handshake not performed");
+ }
+
+ return download_content(name);
+}
+
+int TLSConsumerSocket::download_content(const Name &name) {
+ network_name_ = name;
+ network_name_.setSuffix(0);
+ something_to_read_ = false;
+ content_downloaded_ = false;
+
+ decrypted_content_ = utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH);
+ uint8_t *buf = decrypted_content_->writableData();
+ size_t size = 0;
+ int result = -1;
+
+ while (!content_downloaded_ || something_to_read_) {
+ if (decrypted_content_->tailroom() < SSL3_RT_MAX_PLAIN_LENGTH) {
+ decrypted_content_->appendChain(
+ utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH));
+ // decrypted_content_->computeChainDataLength();
+ buf = decrypted_content_->prev()->writableData();
+ } else {
+ buf = decrypted_content_->writableTail();
+ }
+
+ result = SSL_read(this->ssl_, buf, SSL3_RT_MAX_PLAIN_LENGTH);
+
+ /* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of
+ * the data has been fully downloaded */
+
+ /* ASSERT((result < SSL3_RT_MAX_PLAIN_LENGTH && content_downloaded_) || */
+ /* result == SSL3_RT_MAX_PLAIN_LENGTH); */
+
+ if (result >= 0) {
+ size += result;
+ decrypted_content_->prepend(result);
+ } else
+ throw errors::RuntimeException("Unable to download content");
+
+ if (size >= read_callback_decrypted_->maxBufferSize()) {
+ if (read_callback_decrypted_->isBufferMovable()) {
+ // No need to perform an additional copy. The whole buffer will be
+ // tranferred to the application.
+
+ read_callback_decrypted_->readBufferAvailable(
+ std::move(decrypted_content_));
+ decrypted_content_ = utils::MemBuf::create(SSL3_RT_MAX_PLAIN_LENGTH);
+ } else {
+ // The buffer will be copied into the application-provided buffer
+ uint8_t *buffer;
+ std::size_t length;
+ std::size_t total_length = decrypted_content_->length();
+
+ while (decrypted_content_->length()) {
+ buffer = nullptr;
+ length = 0;
+ read_callback_decrypted_->getReadBuffer(&buffer, &length);
+
+ if (!buffer || !length) {
+ throw errors::RuntimeException(
+ "Invalid buffer provided by the application.");
+ }
+
+ auto to_copy = std::min(decrypted_content_->length(), length);
+ std::memcpy(buffer, decrypted_content_->data(), to_copy);
+ decrypted_content_->trimStart(to_copy);
+ }
+
+ read_callback_decrypted_->readDataAvailable(total_length);
+ decrypted_content_->clear();
+ }
+ }
+ }
+
+ read_callback_decrypted_->readSuccess(size);
+
+ return CONSUMER_FINISHED;
+}
+
+int TLSConsumerSocket::asyncConsume(const Name &name,
+ std::unique_ptr<utils::MemBuf> &&buffer) {
+ this->payload_ = std::move(buffer);
+
+ this->ConsumerSocket::setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1,
+ std::placeholders::_2));
+
+ return asyncConsume(name);
+}
+
+int TLSConsumerSocket::asyncConsume(const Name &name) {
+ if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
+ throw errors::RuntimeException("Handshake not performed");
+ }
+
+ if (!async_downloader_tls_.stopped()) {
+ async_downloader_tls_.add([this, name]() {
+ is_async_ = true;
+ download_content(name);
+ });
+ }
+
+ return CONSUMER_RUNNING;
+}
+
+void TLSConsumerSocket::registerPrefix(const Prefix &producer_namespace) {
+ producer_namespace_ = producer_namespace;
+}
+
+int TLSConsumerSocket::setSocketOption(int socket_option_key,
+ ReadCallback *socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key, ReadCallback *socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ read_callback_decrypted_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+void TLSConsumerSocket::getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) {}
+
+void TLSConsumerSocket::readDataAvailable(size_t length) noexcept {}
+
+size_t TLSConsumerSocket::maxBufferSize() const {
+ return SSL3_RT_MAX_PLAIN_LENGTH;
+}
+
+void TLSConsumerSocket::readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept {
+ std::unique_lock<std::mutex> lck(this->mtx_);
+ if (head_) {
+ head_->prependChain(std::move(buffer));
+ } else {
+ head_ = std::move(buffer);
+ }
+
+ something_to_read_ = true;
+ cv_.notify_one();
+}
+
+void TLSConsumerSocket::readError(const std::error_code ec) noexcept {}
+
+void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept {
+ std::unique_lock<std::mutex> lck(this->mtx_);
+ content_downloaded_ = true;
+ something_to_read_ = true;
+ cv_.notify_one();
+}
+
+bool TLSConsumerSocket::isBufferMovable() noexcept { return true; }
+
+} // namespace implementation
+
+} // namespace transport
diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h
new file mode 100644
index 000000000..2e88dc47e
--- /dev/null
+++ b/libtransport/src/implementation/tls_socket_consumer.h
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+
+#include <implementation/socket_consumer.h>
+
+#include <openssl/ssl.h>
+
+namespace transport {
+namespace implementation {
+
+class TLSConsumerSocket : public ConsumerSocket,
+ public interface::ConsumerSocket::ReadCallback {
+ /* Return the number of read bytes in readbytes */
+ friend int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes);
+
+ /* Return the number of read bytes in the return param */
+ friend int readOldTLS(BIO *h, char *buf, int size);
+
+ /* Return the number of written bytes in written */
+ friend int writeTLS(BIO *b, const char *buf, size_t size, size_t *written);
+
+ /* Return the number of written bytes in the return param */
+ friend int writeOldTLS(BIO *h, const char *buf, int num);
+
+ friend long ctrlTLS(BIO *b, int cmd, long num, void *ptr);
+
+ public:
+ explicit TLSConsumerSocket(interface::ConsumerSocket *consumer_socket,
+ int protocol, SSL *ssl_);
+
+ ~TLSConsumerSocket();
+
+ int consume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer);
+ int consume(const Name &name) override;
+
+ int asyncConsume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer);
+ int asyncConsume(const Name &name) override;
+
+ void registerPrefix(const Prefix &producer_namespace);
+
+ int setSocketOption(
+ int socket_option_key,
+ interface::ConsumerSocket::ReadCallback *socket_option_value) override;
+
+ using ConsumerSocket::getSocketOption;
+ using ConsumerSocket::setSocketOption;
+
+ protected:
+ /* Callback invoked once an interest has been received and its payload
+ * decrypted */
+ ConsumerInterestCallback on_interest_input_decrypted_;
+ ConsumerInterestCallback on_interest_process_decrypted_;
+
+ private:
+ Name name_;
+
+ /* SSL handle */
+ SSL *ssl_;
+ SSL_CTX *ctx_;
+
+ /* Chain of MemBuf to be used as a temporary buffer to pass descypted data
+ * from the underlying layer to the application */
+ utils::ObjectPool<utils::MemBuf> buf_pool_;
+ std::unique_ptr<utils::MemBuf> decrypted_content_;
+
+ /* Chain of MemBuf holding the payload to be written into interest or data
+ */
+ std::unique_ptr<utils::MemBuf> payload_;
+
+ /* Chain of MemBuf holding the data retrieved from the underlying layer */
+ std::unique_ptr<utils::MemBuf> head_;
+
+ bool something_to_read_;
+
+ bool content_downloaded_;
+
+ double old_max_win_;
+
+ double old_current_win_;
+
+ uint32_t random_suffix_;
+
+ Prefix producer_namespace_;
+
+ interface::ConsumerSocket::ReadCallback *read_callback_decrypted_;
+
+ std::mutex mtx_;
+
+ /* Condition variable for the wait */
+ std::condition_variable cv_;
+
+ utils::EventThread async_downloader_tls_;
+
+ void setInterestPayload(interface::ConsumerSocket &c,
+ const core::Interest &interest);
+
+ virtual void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override;
+
+ virtual void readDataAvailable(size_t length) noexcept override;
+
+ virtual size_t maxBufferSize() const override;
+
+ virtual void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override;
+
+ virtual void readError(const std::error_code ec) noexcept override;
+
+ virtual void readSuccess(std::size_t total_size) noexcept override;
+ virtual bool isBufferMovable() noexcept override;
+
+ int download_content(const Name &name);
+};
+
+} // namespace implementation
+
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc
new file mode 100644
index 000000000..9a5b94a1c
--- /dev/null
+++ b/libtransport/src/implementation/tls_socket_producer.cc
@@ -0,0 +1,611 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_producer.h>
+
+#include <implementation/p2psecure_socket_producer.h>
+#include <implementation/tls_socket_producer.h>
+
+#include <openssl/bio.h>
+#include <openssl/rand.h>
+#include <openssl/ssl.h>
+
+namespace transport {
+namespace implementation {
+
+/* Return the number of read bytes in readbytes */
+int TLSProducerSocket::read(BIO *b, char *buf, size_t size, size_t *readbytes) {
+ int ret;
+
+ if (size > INT_MAX) size = INT_MAX;
+
+ ret = TLSProducerSocket::readOld(b, buf, (int)size);
+
+ if (ret <= 0) {
+ *readbytes = 0;
+ return ret;
+ }
+
+ *readbytes = (size_t)ret;
+
+ return 1;
+}
+
+/* Return the number of read bytes in the return param */
+int TLSProducerSocket::readOld(BIO *b, char *buf, int size) {
+ TLSProducerSocket *socket;
+ socket = (TLSProducerSocket *)BIO_get_data(b);
+
+ /* take a lock on the mutex. It will be unlocked by */
+ std::unique_lock<std::mutex> lck(socket->mtx_);
+ if (!socket->something_to_read_) {
+ (socket->cv_).wait(lck);
+ }
+
+ /* Either there already is something to read, or the thread has been waken up
+ */
+ /* must return the payload in the interest */
+
+ utils::MemBuf *membuf = socket->packet_->next();
+ int size_to_read;
+ if ((int)membuf->length() > size) {
+ size_to_read = size;
+ } else {
+ size_to_read = membuf->length();
+ socket->something_to_read_ = false;
+ }
+
+ std::memcpy(buf, membuf->data(), size_to_read);
+ membuf->trimStart(size_to_read);
+
+ return size_to_read;
+}
+
+/* Return the number of written bytes in written */
+int TLSProducerSocket::write(BIO *b, const char *buf, size_t size,
+ size_t *written) {
+ int ret;
+
+ if (size > INT_MAX) size = INT_MAX;
+
+ ret = TLSProducerSocket::writeOld(b, buf, (int)size);
+
+ if (ret <= 0) {
+ *written = 0;
+ return ret;
+ }
+
+ *written = (size_t)ret;
+
+ return 1;
+}
+
+/* Return the number of written bytes in the return param */
+int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) {
+ TLSProducerSocket *socket;
+ socket = (TLSProducerSocket *)BIO_get_data(b);
+
+ if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) &&
+ socket->first_) {
+ //! socket->tls_chunks_ corresponds to is_last
+ socket->tls_chunks_--;
+ bool making_manifest = socket->parent_->making_manifest_;
+ socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
+ false);
+ socket->parent_->ProducerSocket::produce(
+ socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0,
+ socket->last_segment_);
+ socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
+ making_manifest);
+ socket->first_ = false;
+ } else {
+ socket->still_writing_ = true;
+
+ std::unique_ptr<utils::MemBuf> mbuf =
+ utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0);
+ auto a = mbuf.release();
+ socket->async_thread_.add([socket = socket, a]() {
+ socket->tls_chunks_--;
+ socket->to_call_oncontentproduced_--;
+ auto mbuf = std::unique_ptr<utils::MemBuf>(a);
+ socket->last_segment_ += socket->ProducerSocket::produce(
+ socket->name_, std::move(mbuf), socket->tls_chunks_ == 0,
+ socket->last_segment_);
+ ProducerContentCallback on_content_produced_application;
+ socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
+ on_content_produced_application);
+ if (socket->to_call_oncontentproduced_ == 0 &&
+ on_content_produced_application) {
+ on_content_produced_application(*socket->getInterface(),
+ std::error_code(), 0);
+ }
+ });
+ }
+
+ return num;
+}
+
+TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
+ P2PSecureProducerSocket *parent,
+ const Name &handshake_name)
+ : ProducerSocket(producer_socket),
+ on_content_produced_application_(),
+ mtx_(),
+ cv_(),
+ something_to_read_(),
+ name_(),
+ last_segment_(0),
+ parent_(parent),
+ first_(true),
+ handshake_name_(handshake_name),
+ tls_chunks_(0),
+ to_call_oncontentproduced_(0),
+ still_writing_(false),
+ encryption_thread_() {
+ const SSL_METHOD *meth = TLS_server_method();
+ ctx_ = SSL_CTX_new(meth);
+
+ /*
+ * Setup SSL context (identity and parameter to use TLS 1.3)
+ */
+ SSL_CTX_use_certificate(ctx_, parent->cert_509_);
+ SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_);
+
+ int result =
+ SSL_CTX_set_ciphersuites(ctx_,
+ "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_"
+ "SHA256:TLS_AES_128_GCM_SHA256");
+ if (result != 1) {
+ throw errors::RuntimeException(
+ "Unable to set cipher list on TLS subsystem. Aborting.");
+ }
+
+ // We force it to be TLS 1.3
+ SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION);
+ SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL);
+ SSL_CTX_set_num_tickets(ctx_, 0);
+
+ result = SSL_CTX_add_custom_ext(
+ ctx_, 100, SSL_EXT_CLIENT_HELLO | SSL_EXT_TLS1_3_ENCRYPTED_EXTENSIONS,
+ TLSProducerSocket::addHicnKeyIdCb, TLSProducerSocket::freeHicnKeyIdCb,
+ this, TLSProducerSocket::parseHicnKeyIdCb, NULL);
+
+ ssl_ = SSL_new(ctx_);
+ /*
+ * Setup this producer socker as the bio that TLS will use to write and read
+ * data (in stream mode)
+ */
+ BIO_METHOD *bio_meth =
+ BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket");
+ BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld);
+ BIO_meth_set_write(bio_meth, TLSProducerSocket::writeOld);
+ BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl);
+ BIO *bio = BIO_new(bio_meth);
+ BIO_set_init(bio, 1);
+ BIO_set_data(bio, this);
+ SSL_set_bio(ssl_, bio, bio);
+ /*
+ * Set the callback so that when an interest is received we catch it and we
+ * decrypt the payload before passing it to the application.
+ */
+ this->ProducerSocket::setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+ this->ProducerSocket::setSocketOption(
+ ProducerCallbacksOptions::CONTENT_PRODUCED,
+ (ProducerContentCallback)bind(
+ &TLSProducerSocket::onContentProduced, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
+}
+
+/*
+ * The producer interface is not owned by the application, so is TLSSocket task
+ * to deallocate the memory
+ */
+TLSProducerSocket::~TLSProducerSocket() { delete producer_interface_; }
+
+void TLSProducerSocket::accept() {
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ tls_chunks_ = 1;
+ int result = SSL_accept(ssl_);
+ if (result != 1)
+ throw errors::RuntimeException("Unable to perform client handshake");
+ }
+ TRANSPORT_LOGD("Handshake performed!");
+ parent_->list_secure_producers.push_front(
+ std::move(parent_->map_secure_producers[handshake_name_]));
+ parent_->map_secure_producers.erase(handshake_name_);
+
+ ProducerInterestCallback on_interest_process_decrypted;
+ getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ on_interest_process_decrypted);
+
+ if (on_interest_process_decrypted) {
+ Interest inter(std::move(packet_));
+ on_interest_process_decrypted(*getInterface(), inter);
+ } else {
+ throw errors::RuntimeException(
+ "On interest process unset. Unable to perform handshake");
+ }
+}
+
+int TLSProducerSocket::async_accept() {
+ if (!async_thread_.stopped()) {
+ async_thread_.add([this]() { this->accept(); });
+ } else {
+ throw errors::RuntimeException(
+ "Async thread not running, impossible to perform handshake");
+ }
+
+ return 1;
+}
+
+void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) {
+ /* Based on the state machine of (D)TLS, we know what action to do */
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ std::unique_lock<std::mutex> lck(mtx_);
+ name_ = interest.getName();
+ something_to_read_ = true;
+ packet_ = interest.acquireMemBufReference();
+ if (head_) {
+ payload_->prependChain(interest.getPayload());
+ } else {
+ payload_ = interest.getPayload(); // std::move(interest.getPayload());
+ }
+ cv_.notify_one();
+ } else {
+ name_ = interest.getName();
+ packet_ = interest.acquireMemBufReference();
+ payload_ = interest.getPayload();
+ something_to_read_ = true;
+
+ if (interest.getPayload()->length() > 0)
+ SSL_read(
+ ssl_,
+ const_cast<unsigned char *>(interest.getPayload()->writableData()),
+ interest.getPayload()->length());
+ }
+
+ ProducerInterestCallback on_interest_input_decrypted;
+ getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT,
+ on_interest_input_decrypted);
+ if (on_interest_input_decrypted)
+ (on_interest_input_decrypted)(*getInterface(), interest);
+}
+
+void TLSProducerSocket::cacheMiss(interface::ProducerSocket &p,
+ Interest &interest) {
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ std::unique_lock<std::mutex> lck(mtx_);
+ name_ = interest.getName();
+ something_to_read_ = true;
+ packet_ = interest.acquireMemBufReference();
+ payload_ = interest.getPayload();
+ cv_.notify_one();
+ } else {
+ name_ = interest.getName();
+ packet_ = interest.acquireMemBufReference();
+ payload_ = interest.getPayload();
+ something_to_read_ = true;
+
+ if (interest.getPayload()->length() > 0)
+ SSL_read(
+ ssl_,
+ const_cast<unsigned char *>(interest.getPayload()->writableData()),
+ interest.getPayload()->length());
+
+ if (on_interest_process_decrypted_ != VOID_HANDLER)
+ on_interest_process_decrypted_(*getInterface(), interest);
+ }
+}
+
+void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p,
+ const std::error_code &err,
+ uint64_t bytes_written) {}
+
+uint32_t TLSProducerSocket::produce(Name content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t start_offset) {
+ if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ throw errors::RuntimeException(
+ "New handshake on the same P2P secure producer socket not supported");
+ }
+ size_t buf_size = buffer->length();
+ name_ = served_namespaces_.front().mapName(content_name);
+
+ tls_chunks_ = to_call_oncontentproduced_ =
+ ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH);
+
+ if (!is_last) {
+ tls_chunks_++;
+ }
+
+ last_segment_ = start_offset;
+
+ SSL_write(ssl_, buffer->data(), buf_size);
+ BIO *wbio = SSL_get_wbio(ssl_);
+ int i = BIO_flush(wbio);
+ (void)i; // To shut up gcc 5
+
+ return 0;
+}
+
+void TLSProducerSocket::asyncProduce(const Name &content_name,
+ const uint8_t *buf, size_t buffer_size,
+ bool is_last, uint32_t *start_offset) {
+ if (!encryption_thread_.stopped()) {
+ encryption_thread_.add([this, content_name, buffer = buf,
+ size = buffer_size, is_last, start_offset]() {
+ if (start_offset != NULL) {
+ produce(content_name, buffer, size, is_last, *start_offset);
+ } else {
+ produce(content_name, buffer, size, is_last, 0);
+ }
+ });
+ }
+}
+
+void TLSProducerSocket::asyncProduce(Name content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t offset,
+ uint32_t **last_segment) {
+ if (!encryption_thread_.stopped()) {
+ auto a = buffer.release();
+ encryption_thread_.add(
+ [this, content_name, a, is_last, offset, last_segment]() {
+ auto buf = std::unique_ptr<utils::MemBuf>(a);
+ if (last_segment != NULL) {
+ *last_segment = &last_segment_;
+ }
+ produce(content_name, std::move(buf), is_last, offset);
+ });
+ }
+}
+
+void TLSProducerSocket::asyncProduce(ContentObject &content_object) {
+ throw errors::RuntimeException("API not supported");
+}
+
+void TLSProducerSocket::produce(ContentObject &content_object) {
+ throw errors::RuntimeException("API not supported");
+}
+
+long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) {
+ if (cmd == BIO_CTRL_FLUSH) {
+ }
+ return 1;
+}
+
+int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char **out, size_t *outlen,
+ X509 *x, size_t chainidx, int *al,
+ void *add_arg) {
+ TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg);
+ if (ext_type == 100) {
+ ip_prefix_t ip_prefix =
+ socket->parent_->served_namespaces_.front().toIpPrefixStruct();
+ int inet_family =
+ socket->parent_->served_namespaces_.front().getAddressFamily();
+ uint16_t prefix_len_bits =
+ socket->parent_->served_namespaces_.front().getPrefixLength();
+ uint8_t prefix_len_bytes = prefix_len_bits / 8;
+ uint8_t prefix_len_u32 = prefix_len_bits / 32;
+
+ ip_prefix_t *out_ip = (ip_prefix_t *)malloc(sizeof(ip_prefix_t));
+ out_ip->family = inet_family;
+ out_ip->len = prefix_len_bits + 32;
+ u8 *out_ip_buf = const_cast<u8 *>(
+ ip_address_get_buffer(&(out_ip->address), inet_family));
+ *out = reinterpret_cast<unsigned char *>(out_ip);
+
+ RAND_bytes((unsigned char *)&socket->key_id_, 4);
+
+ memcpy(out_ip_buf, ip_address_get_buffer(&(ip_prefix.address), inet_family),
+ prefix_len_bytes);
+ memcpy((out_ip_buf + prefix_len_bytes), &socket->key_id_, 4);
+ *outlen = sizeof(ip_prefix_t);
+
+ ip_address_t mask = {};
+ ip_address_t keyId_component = {};
+ u32 *mask_buf;
+ u32 *keyId_component_buf;
+ switch (inet_family) {
+ case AF_INET:
+ mask_buf = &(mask.v4.as_u32);
+ keyId_component_buf = &(keyId_component.v4.as_u32);
+ break;
+ case AF_INET6:
+ mask_buf = mask.v6.as_u32;
+ keyId_component_buf = keyId_component.v6.as_u32;
+ break;
+ default:
+ throw errors::RuntimeException("Unknown protocol");
+ }
+
+ if (prefix_len_bits > (inet_family == AF_INET6 ? IPV6_ADDR_LEN_BITS - 32
+ : IPV4_ADDR_LEN_BITS - 32))
+ throw errors::RuntimeException(
+ "Not enough space in the content name to add key_id");
+
+ mask_buf[prefix_len_u32] = 0xffffffff;
+ keyId_component_buf[prefix_len_u32] = socket->key_id_;
+ socket->last_segment_ = 0;
+
+ socket->on_interest_process_decrypted_ =
+ socket->parent_->on_interest_process_decrypted_;
+
+ socket->registerPrefix(
+ Prefix(socket->parent_->served_namespaces_.front().getName(
+ Name(inet_family, (uint8_t *)&mask),
+ Name(inet_family, (uint8_t *)&keyId_component),
+ socket->parent_->served_namespaces_.front().getName()),
+ out_ip->len));
+ socket->connect();
+ }
+ return 1;
+}
+
+void TLSProducerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char *out,
+ void *add_arg) {
+ free(const_cast<unsigned char *>(out));
+}
+
+int TLSProducerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context,
+ const unsigned char *in, size_t inlen,
+ X509 *x, size_t chainidx, int *al,
+ void *add_arg) {
+ return 1;
+}
+
+int TLSProducerSocket::setSocketOption(
+ int socket_option_key, ProducerInterestCallback socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerInterestCallback socket_option_value) -> int {
+ int result = SOCKET_OPTION_SET;
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ on_interest_input_decrypted_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ on_interest_dropped_input_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ on_interest_inserted_input_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ on_interest_satisfied_output_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ on_interest_process_decrypted_ = socket_option_value;
+ break;
+
+ default:
+ result = SOCKET_OPTION_NOT_SET;
+ break;
+ }
+ return result;
+ });
+}
+
+int TLSProducerSocket::setSocketOption(
+ int socket_option_key, ProducerContentCallback socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ on_content_produced_application_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int TLSProducerSocket::getSocketOption(
+ int socket_option_key, ProducerContentCallback **socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ *socket_option_value = &on_content_produced_application_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int TLSProducerSocket::getSocketOption(
+ int socket_option_key, ProducerContentCallback &socket_option_value) {
+ return rescheduleOnIOServiceWithReference(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentCallback &socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ socket_option_value = on_content_produced_application_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int TLSProducerSocket::getSocketOption(
+ int socket_option_key, ProducerInterestCallback &socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOServiceWithReference(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerInterestCallback &socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ socket_option_value = on_interest_input_decrypted_;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ socket_option_value = on_interest_dropped_input_buffer_;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ socket_option_value = on_interest_inserted_input_buffer_;
+ break;
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ socket_option_value = on_interest_satisfied_output_buffer_;
+ break;
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ socket_option_value = on_interest_process_decrypted_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+} // namespace implementation
+
+} // namespace transport
diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h
new file mode 100644
index 000000000..e910c8259
--- /dev/null
+++ b/libtransport/src/implementation/tls_socket_producer.h
@@ -0,0 +1,163 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <implementation/socket_producer.h>
+
+#include <openssl/ssl.h>
+#include <condition_variable>
+#include <mutex>
+
+namespace transport {
+namespace implementation {
+
+class P2PSecureProducerSocket;
+
+class TLSProducerSocket : virtual public ProducerSocket {
+ friend class P2PSecureProducerSocket;
+
+ public:
+ explicit TLSProducerSocket(interface::ProducerSocket *producer_socket,
+ P2PSecureProducerSocket *parent,
+ const Name &handshake_name);
+
+ ~TLSProducerSocket();
+
+ uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size,
+ bool is_last = true, uint32_t start_offset = 0) override {
+ return produce(content_name, utils::MemBuf::copyBuffer(buffer, buffer_size),
+ is_last, start_offset);
+ }
+
+ uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true, uint32_t start_offset = 0) override;
+
+ void produce(ContentObject &content_object) override;
+
+ void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size,
+ bool is_last = true,
+ uint32_t *start_offset = nullptr) override;
+
+ void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t offset,
+ uint32_t **last_segment = nullptr) override;
+
+ void asyncProduce(ContentObject &content_object) override;
+
+ virtual void accept();
+
+ virtual int async_accept();
+
+ virtual int setSocketOption(
+ int socket_option_key,
+ ProducerInterestCallback socket_option_value) override;
+
+ virtual int setSocketOption(
+ int socket_option_key,
+ ProducerContentCallback socket_option_value) override;
+
+ virtual int getSocketOption(
+ int socket_option_key,
+ ProducerContentCallback **socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ProducerContentCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key,
+ ProducerInterestCallback &socket_option_value);
+
+ using ProducerSocket::getSocketOption;
+ using ProducerSocket::onInterest;
+ using ProducerSocket::setSocketOption;
+
+ protected:
+ /* Callback invoked once an interest has been received and its payload
+ * decrypted */
+ ProducerInterestCallback on_interest_input_decrypted_;
+ ProducerInterestCallback on_interest_process_decrypted_;
+ ProducerContentCallback on_content_produced_application_;
+
+ std::mutex mtx_;
+
+ /* Condition variable for the wait */
+ std::condition_variable cv_;
+
+ /* Bool variable, true if there is something to read (an interest arrived) */
+ bool something_to_read_;
+
+ /* First interest that open a secure connection */
+ transport::core::Name name_;
+
+ /* SSL handle */
+ SSL *ssl_;
+ SSL_CTX *ctx_;
+
+ Packet::MemBufPtr packet_;
+
+ std::unique_ptr<utils::MemBuf> head_;
+ std::uint32_t last_segment_;
+ std::shared_ptr<utils::MemBuf> payload_;
+ std::uint32_t key_id_;
+
+ std::thread *handshake;
+ P2PSecureProducerSocket *parent_;
+
+ bool first_;
+ Name handshake_name_;
+ int tls_chunks_;
+ int to_call_oncontentproduced_;
+
+ bool still_writing_;
+
+ utils::EventThread encryption_thread_;
+
+ void onInterest(ProducerSocket &p, Interest &interest);
+ void cacheMiss(interface::ProducerSocket &p, Interest &interest);
+
+ /* Return the number of read bytes in readbytes */
+ static int read(BIO *b, char *buf, size_t size, size_t *readbytes);
+
+ /* Return the number of read bytes in the return param */
+ static int readOld(BIO *h, char *buf, int size);
+
+ /* Return the number of written bytes in written */
+ static int write(BIO *b, const char *buf, size_t size, size_t *written);
+
+ /* Return the number of written bytes in the return param */
+ static int writeOld(BIO *h, const char *buf, int num);
+
+ static long ctrl(BIO *b, int cmd, long num, void *ptr);
+
+ static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context,
+ const unsigned char **out, size_t *outlen, X509 *x,
+ size_t chainidx, int *al, void *add_arg);
+
+ static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context, const unsigned char *out,
+ void *add_arg);
+
+ static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
+ unsigned int context, const unsigned char *in,
+ size_t inlen, X509 *x, size_t chainidx, int *al,
+ void *add_arg);
+
+ void onContentProduced(interface::ProducerSocket &p,
+ const std::error_code &err, uint64_t bytes_written);
+};
+
+} // namespace implementation
+
+} // end namespace transport