aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-02-21 11:52:28 +0100
committerMauro Sardara <msardara@cisco.com>2020-02-26 13:19:16 +0100
commitf4433f28b509a9f67ca85d79000ccf9c2f4b7a24 (patch)
tree0f754bc9d8222f3ace11849165753acd85be3b38 /libtransport/src/hicn/transport/interfaces
parent0e7669445b6be1163189521eabed7dd0124043c8 (diff)
[HICN-534] Major rework on libtransport organization
Change-Id: I361b83a18b4fd59be136d5f0817fc28e17e89884 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces')
-rw-r--r--libtransport/src/hicn/transport/interfaces/CMakeLists.txt59
-rw-r--r--libtransport/src/hicn/transport/interfaces/callbacks.cc26
-rw-r--r--libtransport/src/hicn/transport/interfaces/callbacks.h125
-rw-r--r--libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.cc382
-rw-r--r--libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.h147
-rw-r--r--libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.cc380
-rw-r--r--libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.h129
-rw-r--r--libtransport/src/hicn/transport/interfaces/publication_options.h43
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc368
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h79
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket.h92
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.cc862
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h412
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_options_default_values.h69
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_options_keys.h113
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc909
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h295
-rw-r--r--libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.cc178
-rw-r--r--libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.h58
-rw-r--r--libtransport/src/hicn/transport/interfaces/tls_socket_consumer.cc364
-rw-r--r--libtransport/src/hicn/transport/interfaces/tls_socket_consumer.h132
-rw-r--r--libtransport/src/hicn/transport/interfaces/tls_socket_producer.cc587
-rw-r--r--libtransport/src/hicn/transport/interfaces/tls_socket_producer.h163
-rw-r--r--libtransport/src/hicn/transport/interfaces/verification_policy.h33
24 files changed, 0 insertions, 6005 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
deleted file mode 100644
index 88b83e9d4..000000000
--- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
+++ /dev/null
@@ -1,59 +0,0 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
-
-list(APPEND HEADER_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/socket.h
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/publication_options.h
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_keys.h
- ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h
- ${CMAKE_CURRENT_SOURCE_DIR}/verification_policy.h
-)
-
-list(APPEND SOURCE_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.cc
-)
-
-if (${OPENSSL_VERSION} VERSION_EQUAL "1.1.1a" OR ${OPENSSL_VERSION} VERSION_GREATER "1.1.1a")
- list(APPEND SOURCE_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.cc
- )
-
- list(APPEND HEADER_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h
- )
-endif()
-
-set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
-set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.cc b/libtransport/src/hicn/transport/interfaces/callbacks.cc
deleted file mode 100644
index 2574e7720..000000000
--- a/libtransport/src/hicn/transport/interfaces/callbacks.cc
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "callbacks.h"
-
-namespace transport {
-
-namespace interface {
-
-std::nullptr_t VOID_HANDLER = nullptr;
-
-} // namespace interface
-
-} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h
deleted file mode 100644
index 9d3d57992..000000000
--- a/libtransport/src/hicn/transport/interfaces/callbacks.h
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/core/facade.h>
-#include <hicn/transport/interfaces/verification_policy.h>
-
-#include <functional>
-#include <system_error>
-
-namespace utils {
-class MemBuf;
-}
-
-namespace transport {
-
-namespace protocol {
-
-class IcnObserver;
-class TransportStatistics;
-
-} // namespace protocol
-
-namespace core {
-
-class ContentObject;
-class Interest;
-} // namespace core
-
-namespace interface {
-
-// Forward declarations
-class ConsumerSocket;
-class ProducerSocket;
-
-/**
- * The ConsumerInterestCallback will be called in different parts of the
- * consumer socket processing pipeline, with a ConsumerSocket and an Interest as
- * parameters.
- */
-using ConsumerInterestCallback =
- std::function<void(ConsumerSocket &, const core::Interest &)>;
-
-/**
- * The ConsumerTimerCallback is called periodically for exposing to applications
- * a summary of the statistics of the transport protocol in use.
- */
-using ConsumerTimerCallback = std::function<void(
- ConsumerSocket &, const protocol::TransportStatistics &stats)>;
-
-/**
- * The ProducerContentCallback will be called by the producer socket right after
- * a content has been segmented and published.
- */
-using ProducerContentCallback = std::function<void(
- ProducerSocket &, const std::error_code &, uint64_t bytes_written)>;
-
-/**
- * The ConsumerContentObjectCallback will be called in different parts of the
- * consumer socket processing pipeline, with a ConsumerSocket and an
- * ContentObject as parameters.
- */
-using ConsumerContentObjectCallback =
- std::function<void(ConsumerSocket &, const core::ContentObject &)>;
-
-/**
- * The ConsumerContentObjectVerificationCallback will be called by the transport
- * if an application is willing to verify each content object. Note that a
- * better alternative is to instrument the transport to perform the verification
- * autonomously, without requiring the intervention of the application.
- */
-using ConsumerContentObjectVerificationCallback =
- std::function<bool(ConsumerSocket &, const core::ContentObject &)>;
-
-/**
- * The ConsumerContentObjectVerificationFailedCallback will be caled by the
- * transport if a data packet (either manifest or content object) cannot be
- * verified. The application here decides what to do by returning a
- * VerificationFailedPolicy object.
- */
-using ConsumerContentObjectVerificationFailedCallback =
- std::function<VerificationPolicy(
- ConsumerSocket &, const core::ContentObject &, std::error_code ec)>;
-
-/**
- * The ConsumerManifestCallback will be called by the consumer socket when a
- * manifest is received.
- */
-using ConsumerManifestCallback =
- std::function<void(ConsumerSocket &, const core::ContentObjectManifest &)>;
-
-/**
- * The ProducerContentObjectCallback will be called in different parts of the
- * consumer socket processing pipeline, with a ProducerSocket and an
- * ContentObject as parameters.
- */
-using ProducerContentObjectCallback =
- std::function<void(ProducerSocket &, core::ContentObject &)>;
-
-/**
- * The ProducerContentObjectCallback will be called in different parts of the
- * consumer socket processing pipeline, with a ProducerSocket and an
- * Interest as parameters.
- */
-using ProducerInterestCallback =
- std::function<void(ProducerSocket &, core::Interest &)>;
-
-extern std::nullptr_t VOID_HANDLER;
-
-} // namespace interface
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.cc
deleted file mode 100644
index ec966e509..000000000
--- a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.cc
+++ /dev/null
@@ -1,382 +0,0 @@
-#include <hicn/transport/interfaces/p2psecure_socket_consumer.h>
-#include <openssl/bio.h>
-#include <openssl/ssl.h>
-#include <openssl/tls1.h>
-
-#include <random>
-
-namespace transport {
-
-namespace interface {
-
-void P2PSecureConsumerSocket::setInterestPayload(
- ConsumerSocket &c, const core::Interest &interest) {
- Interest &int2 = const_cast<Interest &>(interest);
- random_suffix_ = int2.getName().getSuffix();
-
- if (payload_ != NULL) int2.appendPayload(std::move(payload_));
-}
-
-// implement void readBufferAvailable(), size_t maxBufferSize() const override,
-// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable()
-// must be implemented even if empty.
-
-/* Return the number of read bytes in the return param */
-int readOld(BIO *b, char *buf, int size) {
- if (size < 0) return size;
-
- P2PSecureConsumerSocket *socket;
- socket = (P2PSecureConsumerSocket *)BIO_get_data(b);
-
- std::unique_lock<std::mutex> lck(socket->mtx_);
-
- if (!socket->something_to_read_) {
- if (!socket->transport_protocol_->isRunning()) {
- socket->network_name_.setSuffix(socket->random_suffix_);
- socket->ConsumerSocket::asyncConsume(socket->network_name_);
- }
- if (!socket->something_to_read_) socket->cv_.wait(lck);
- }
-
- size_t size_to_read, read;
- size_t chain_size = socket->head_->length();
- if (socket->head_->isChained())
- chain_size = socket->head_->computeChainDataLength();
-
- if (chain_size > (size_t)size) {
- read = size_to_read = (size_t)size;
- } else {
- read = size_to_read = chain_size;
- socket->something_to_read_ = false;
- }
-
- while (size_to_read) {
- if (socket->head_->length() < size_to_read) {
- std::memcpy(buf, socket->head_->data(), socket->head_->length());
- size_to_read -= socket->head_->length();
- buf += socket->head_->length();
- socket->head_ = socket->head_->pop();
- } else {
- std::memcpy(buf, socket->head_->data(), size_to_read);
- socket->head_->trimStart(size_to_read);
- size_to_read = 0;
- }
- }
-
- return read;
-}
-
-/* Return the number of read bytes in readbytes */
-int read(BIO *b, char *buf, size_t size, size_t *readbytes) {
- int ret;
-
- if (size > INT_MAX) size = INT_MAX;
-
- ret = transport::interface::readOld(b, buf, (int)size);
-
- if (ret <= 0) {
- *readbytes = 0;
- return ret;
- }
-
- *readbytes = (size_t)ret;
-
- return 1;
-}
-
-/* Return the number of written bytes in the return param */
-int writeOld(BIO *b, const char *buf, int num) {
- P2PSecureConsumerSocket *socket;
- socket = (P2PSecureConsumerSocket *)BIO_get_data(b);
-
- socket->payload_ = utils::MemBuf::copyBuffer(buf, num);
- socket->ConsumerSocket::setSocketOption(
- ConsumerCallbacksOptions::INTEREST_OUTPUT,
- (ConsumerInterestCallback)std::bind(
- &P2PSecureConsumerSocket::setInterestPayload, socket,
- std::placeholders::_1, std::placeholders::_2));
-
- return num;
-}
-
-/* Return the number of written bytes in written */
-int write(BIO *b, const char *buf, size_t size, size_t *written) {
- int ret;
-
- if (size > INT_MAX) size = INT_MAX;
-
- ret = transport::interface::writeOld(b, buf, (int)size);
-
- if (ret <= 0) {
- *written = 0;
- return ret;
- }
-
- *written = (size_t)ret;
-
- return 1;
-}
-
-long ctrl(BIO *b, int cmd, long num, void *ptr) { return 1; }
-
-int P2PSecureConsumerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
- unsigned int context,
- const unsigned char **out,
- size_t *outlen, X509 *x,
- size_t chainidx, int *al,
- void *add_arg) {
- if (ext_type == 100) {
- *out = (unsigned char *)malloc(4);
- *(uint32_t *)*out = 10;
- *outlen = 4;
- }
- return 1;
-}
-
-void P2PSecureConsumerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
- unsigned int context,
- const unsigned char *out,
- void *add_arg) {
- free(const_cast<unsigned char *>(out));
-}
-
-int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
- unsigned int context,
- const unsigned char *in,
- size_t inlen, X509 *x,
- size_t chainidx, int *al,
- void *add_arg) {
- P2PSecureConsumerSocket *socket =
- reinterpret_cast<P2PSecureConsumerSocket *>(add_arg);
- if (ext_type == 100) {
- memcpy(&socket->secure_prefix_, in, sizeof(ip_prefix_t));
- }
- return 1;
-}
-
-P2PSecureConsumerSocket::P2PSecureConsumerSocket(int handshake_protocol, int transport_protocol)
- : ConsumerSocket(handshake_protocol),
- name_(),
- tls_consumer_(),
- buf_pool_(),
- decrypted_content_(),
- payload_(),
- head_(),
- something_to_read_(false),
- content_downloaded_(false),
- random_suffix_(),
- secure_prefix_(),
- producer_namespace_(),
- read_callback_decrypted_(),
- mtx_(),
- cv_(),
- protocol_(transport_protocol) {
- /* Create the (d)TLS state */
- const SSL_METHOD *meth = TLS_client_method();
- ctx_ = SSL_CTX_new(meth);
-
- int result =
- SSL_CTX_set_ciphersuites(ctx_,
- "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_"
- "SHA256:TLS_AES_128_GCM_SHA256");
- if (result != 1) {
- throw errors::RuntimeException(
- "Unable to set cipher list on TLS subsystem. Aborting.");
- }
-
- SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION);
- SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION);
- SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL);
- SSL_CTX_set_ssl_version(ctx_, meth);
-
- result = SSL_CTX_add_custom_ext(
- ctx_, 100, SSL_EXT_CLIENT_HELLO | SSL_EXT_TLS1_3_ENCRYPTED_EXTENSIONS,
- P2PSecureConsumerSocket::addHicnKeyIdCb,
- P2PSecureConsumerSocket::freeHicnKeyIdCb, NULL,
- P2PSecureConsumerSocket::parseHicnKeyIdCb, this);
-
- ssl_ = SSL_new(ctx_);
-
- bio_meth_ = BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket");
- BIO_meth_set_read(bio_meth_, transport::interface::readOld);
- BIO_meth_set_write(bio_meth_, transport::interface::writeOld);
- BIO_meth_set_ctrl(bio_meth_, transport::interface::ctrl);
- BIO *bio = BIO_new(bio_meth_);
- BIO_set_init(bio, 1);
- BIO_set_data(bio, this);
- SSL_set_bio(ssl_, bio, bio);
-
- ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
-
- ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
-
- std::default_random_engine generator;
- std::uniform_int_distribution<int> distribution(
- 1, std::numeric_limits<uint32_t>::max());
- random_suffix_ = 0;
-
- this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
- this);
-};
-
-P2PSecureConsumerSocket::~P2PSecureConsumerSocket() {
- BIO_meth_free(bio_meth_);
- SSL_shutdown(ssl_);
-}
-
-int P2PSecureConsumerSocket::consume(const Name &name) {
- if (transport_protocol_->isRunning()) {
- return CONSUMER_BUSY;
- }
-
- if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
- network_name_ = producer_namespace_.getRandomName();
- network_name_.setSuffix(0);
- int result = SSL_connect(this->ssl_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- if (result != 1)
- throw errors::RuntimeException("Unable to perform client handshake");
- }
- std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
- secure_prefix_.family,
- ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
- std::shared_ptr<Prefix> prefix =
- std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
- TLSConsumerSocket tls_consumer(this->protocol_, this->ssl_);
-
- ConsumerTimerCallback *stats_summary_callback = nullptr;
- this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_summary_callback);
-
- uint32_t lifetime;
- this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
- tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- lifetime);
- tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
- read_callback_decrypted_);
- tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- *stats_summary_callback);
- tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL,
- this->timer_interval_milliseconds_);
- tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- tls_consumer.connect();
-
- if (payload_ != NULL)
- return tls_consumer.consume((prefix->mapName(name)), std::move(payload_));
- else
- return tls_consumer.consume((prefix->mapName(name)));
-}
-
-int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
- if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
- network_name_ = producer_namespace_.getRandomName();
- network_name_.setSuffix(0);
- TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str());
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload);
- int result = SSL_connect(this->ssl_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- if (result != 1)
- throw errors::RuntimeException("Unable to perform client handshake");
- TRANSPORT_LOGD("Handshake performed!");
- }
-
- std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
- secure_prefix_.family,
- ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
- std::shared_ptr<Prefix> prefix =
- std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
- tls_consumer_ =
- std::make_shared<TLSConsumerSocket>(this->protocol_, this->ssl_);
-
- ConsumerTimerCallback *stats_summary_callback = nullptr;
- this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_summary_callback);
-
- uint32_t lifetime;
- this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
- tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- lifetime);
- tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
- read_callback_decrypted_);
- tls_consumer_->setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- *stats_summary_callback);
- tls_consumer_->setSocketOption(GeneralTransportOptions::STATS_INTERVAL,
- this->timer_interval_milliseconds_);
- tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- tls_consumer_->connect();
-
- if (payload_ != NULL)
- return tls_consumer_->asyncConsume((prefix->mapName(name)),
- std::move(payload_));
- else
- return tls_consumer_->asyncConsume((prefix->mapName(name)));
-}
-
-void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) {
- producer_namespace_ = producer_namespace;
-}
-
-int P2PSecureConsumerSocket::setSocketOption(
- int socket_option_key, ConsumerSocket::ReadCallback *socket_option_value) {
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerSocket::ReadCallback *socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::READ_CALLBACK:
- read_callback_decrypted_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-void P2PSecureConsumerSocket::getReadBuffer(uint8_t **application_buffer,
- size_t *max_length){};
-
-void P2PSecureConsumerSocket::readDataAvailable(size_t length) noexcept {};
-
-size_t P2PSecureConsumerSocket::maxBufferSize() const {
- return SSL3_RT_MAX_PLAIN_LENGTH;
-}
-
-void P2PSecureConsumerSocket::readBufferAvailable(
- std::unique_ptr<utils::MemBuf> &&buffer) noexcept {
- std::unique_lock<std::mutex> lck(this->mtx_);
- if (head_) {
- head_->prependChain(std::move(buffer));
- } else {
- head_ = std::move(buffer);
- }
-
- something_to_read_ = true;
- cv_.notify_one();
-}
-
-void P2PSecureConsumerSocket::readError(const std::error_code ec) noexcept {};
-
-void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept {
- std::unique_lock<std::mutex> lck(this->mtx_);
- content_downloaded_ = true;
- something_to_read_ = true;
- cv_.notify_one();
-}
-
-bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; }
-
-} // namespace interface
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.h b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.h
deleted file mode 100644
index ff867f07b..000000000
--- a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_consumer.h
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/interfaces/tls_socket_consumer.h>
-#include <openssl/bio.h>
-#include <openssl/ssl.h>
-
-namespace transport {
-
-namespace interface {
-
-class P2PSecureConsumerSocket : public ConsumerSocket,
- public ConsumerSocket::ReadCallback {
- /* Return the number of read bytes in readbytes */
- friend int read(BIO *b, char *buf, size_t size, size_t *readbytes);
-
- /* Return the number of read bytes in the return param */
- friend int readOld(BIO *h, char *buf, int size);
-
- /* Return the number of written bytes in written */
- friend int write(BIO *b, const char *buf, size_t size, size_t *written);
-
- /* Return the number of written bytes in the return param */
- friend int writeOld(BIO *h, const char *buf, int num);
-
- friend long ctrl(BIO *b, int cmd, long num, void *ptr);
-
- public:
- explicit P2PSecureConsumerSocket(int handshake_protocol, int transport_protocol);
-
- ~P2PSecureConsumerSocket();
-
- int consume(const Name &name) override;
-
- int asyncConsume(const Name &name) override;
-
- void registerPrefix(const Prefix &producer_namespace);
-
- int setSocketOption(
- int socket_option_key,
- ConsumerSocket::ReadCallback *socket_option_value) override;
-
- using ConsumerSocket::getSocketOption;
- using ConsumerSocket::setSocketOption;
-
- protected:
- /* Callback invoked once an interest has been received and its payload
- * decrypted */
- ConsumerInterestCallback on_interest_input_decrypted_;
- ConsumerInterestCallback on_interest_process_decrypted_;
-
- private:
- Name name_;
- std::shared_ptr<TLSConsumerSocket> tls_consumer_;
-
- /* SSL handle */
- SSL *ssl_;
- SSL_CTX *ctx_;
- BIO_METHOD *bio_meth_;
-
- /* Chain of MemBuf to be used as a temporary buffer to pass descypted data
- * from the underlying layer to the application */
- utils::ObjectPool<utils::MemBuf> buf_pool_;
- std::unique_ptr<utils::MemBuf> decrypted_content_;
-
- /* Chain of MemBuf holding the payload to be written into interest or data */
- std::unique_ptr<utils::MemBuf> payload_;
-
- /* Chain of MemBuf holding the data retrieved from the underlying layer */
- std::unique_ptr<utils::MemBuf> head_;
-
- bool something_to_read_;
-
- bool content_downloaded_;
-
- double old_max_win_;
-
- double old_current_win_;
-
- uint32_t random_suffix_;
-
- ip_prefix_t secure_prefix_;
-
- Prefix producer_namespace_;
-
- ConsumerSocket::ReadCallback *read_callback_decrypted_;
-
- std::mutex mtx_;
-
- /* Condition variable for the wait */
- std::condition_variable cv_;
-
- int protocol_;
-
- void setInterestPayload(ConsumerSocket &c, const core::Interest &interest);
- void processPayload(ConsumerSocket &c, std::size_t bytes_transferred,
- const std::error_code &ec);
-
- static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context,
- const unsigned char **out, size_t *outlen, X509 *x,
- size_t chainidx, int *al, void *add_arg);
-
- static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
- unsigned int context, const unsigned char *out,
- void *add_arg);
-
- static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
- unsigned int context, const unsigned char *in,
- size_t inlen, X509 *x, size_t chainidx, int *al,
- void *add_arg);
-
- virtual void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) override;
-
- virtual void readDataAvailable(size_t length) noexcept override;
-
- virtual size_t maxBufferSize() const override;
-
- virtual void readBufferAvailable(
- std::unique_ptr<utils::MemBuf> &&buffer) noexcept override;
-
- virtual void readError(const std::error_code ec) noexcept override;
-
- virtual void readSuccess(std::size_t total_size) noexcept override;
- virtual bool isBufferMovable() noexcept override;
-
- int download_content(const Name &name);
-};
-
-} // namespace interface
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.cc
deleted file mode 100644
index 8850bde8a..000000000
--- a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.cc
+++ /dev/null
@@ -1,380 +0,0 @@
-#include <hicn/transport/core/interest.h>
-#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
-#include <hicn/transport/interfaces/tls_rtc_socket_producer.h>
-#include <hicn/transport/interfaces/tls_socket_producer.h>
-
-#include <openssl/bio.h>
-#include <openssl/rand.h>
-#include <openssl/ssl.h>
-
-namespace transport {
-
-namespace interface {
-
-/* Workaround to prevent content with expiry time equal to 0 to be lost when
- * pushed in the forwarder */
-#define HICN_HANDSHAKE_CONTENT_EXPIRY_TIME 100;
-
-P2PSecureProducerSocket::P2PSecureProducerSocket()
- : ProducerSocket(),
- mtx_(),
- cv_(),
- map_secure_producers(),
- map_secure_rtc_producers(),
- list_secure_producers() {}
-
-P2PSecureProducerSocket::P2PSecureProducerSocket(
- bool rtc, const std::shared_ptr<utils::Identity> &identity)
- : ProducerSocket(),
- rtc_(rtc),
- mtx_(),
- cv_(),
- map_secure_producers(),
- map_secure_rtc_producers(),
- list_secure_producers() {
- /*
- * Setup SSL context (identity and parameter to use TLS 1.3)
- */
- der_cert_ = parcKeyStore_GetDEREncodedCertificate(
- (identity->getSigner()->getKeyStore()));
- der_prk_ = parcKeyStore_GetDEREncodedPrivateKey(
- (identity->getSigner()->getKeyStore()));
-
- int cert_size = parcBuffer_Limit(der_cert_);
- int prk_size = parcBuffer_Limit(der_prk_);
- const uint8_t *cert =
- reinterpret_cast<uint8_t *>(parcBuffer_Overlay(der_cert_, cert_size));
- const uint8_t *prk =
- reinterpret_cast<uint8_t *>(parcBuffer_Overlay(der_prk_, prk_size));
- cert_509_ = d2i_X509(NULL, &cert, cert_size);
- pkey_rsa_ = d2i_AutoPrivateKey(NULL, &prk, prk_size);
-
- /*
- * Set the callback so that when an interest is received we catch it and we
- * decrypt the payload before passing it to the application.
- */
- ProducerSocket::setSocketOption(
- ProducerCallbacksOptions::INTEREST_INPUT,
- (ProducerInterestCallback)std::bind(
- &P2PSecureProducerSocket::onInterestCallback, this,
- std::placeholders::_1, std::placeholders::_2));
-}
-
-P2PSecureProducerSocket::~P2PSecureProducerSocket() {
- if (der_cert_) parcBuffer_Release(&der_cert_);
- if (der_prk_) parcBuffer_Release(&der_prk_);
-}
-
-void P2PSecureProducerSocket::onInterestCallback(ProducerSocket &p,
- Interest &interest) {
- std::unique_lock<std::mutex> lck(mtx_);
-
- TRANSPORT_LOGD("Start handshake at %s", interest.getName().toString().c_str());
- if (!rtc_) {
- auto it = map_secure_producers.find(interest.getName());
- if (it != map_secure_producers.end()) return;
- TLSProducerSocket *tls_producer =
- new TLSProducerSocket(this, interest.getName());
- tls_producer->on_content_produced_application_ =
- this->on_content_produced_application_;
- tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
- this->content_object_expiry_time_);
- tls_producer->setSocketOption(SIGNER, this->signer_);
- tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
- tls_producer->setSocketOption(DATA_PACKET_SIZE,
- (uint32_t)(this->data_packet_size_));
- tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
- map_secure_producers.insert(
- {interest.getName(), std::unique_ptr<TLSProducerSocket>(tls_producer)});
- tls_producer->onInterest(*tls_producer, interest);
- tls_producer->async_accept();
- } else {
- auto it = map_secure_rtc_producers.find(interest.getName());
- if (it != map_secure_rtc_producers.end()) return;
- TLSRTCProducerSocket *tls_producer =
- new TLSRTCProducerSocket(this, interest.getName());
- tls_producer->on_content_produced_application_ =
- this->on_content_produced_application_;
- tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
- this->content_object_expiry_time_);
- tls_producer->setSocketOption(SIGNER, this->signer_);
- tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
- tls_producer->setSocketOption(DATA_PACKET_SIZE,
- (uint32_t)(this->data_packet_size_));
- tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
- map_secure_rtc_producers.insert(
- {interest.getName(),
- std::unique_ptr<TLSRTCProducerSocket>(tls_producer)});
- tls_producer->onInterest(*tls_producer, interest);
- tls_producer->async_accept();
- }
-}
-
-void P2PSecureProducerSocket::produce(const uint8_t *buffer,
- size_t buffer_size) {
- if (!rtc_) {
- throw errors::RuntimeException(
- "RTC must be the transport protocol to start the production of current "
- "data. Aborting.");
- }
-
- std::unique_lock<std::mutex> lck(mtx_);
- if (list_secure_rtc_producers.empty()) cv_.wait(lck);
-
- for (auto it = list_secure_rtc_producers.cbegin();
- it != list_secure_rtc_producers.cend(); it++) {
- (*it)->produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
- }
-}
-
-uint32_t P2PSecureProducerSocket::produce(
- Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last,
- uint32_t start_offset) {
- if (rtc_) {
- throw errors::RuntimeException(
- "RTC transport protocol is not compatible with the production of "
- "current data. Aborting.");
- }
-
- std::unique_lock<std::mutex> lck(mtx_);
- uint32_t segments = 0;
- if (list_secure_producers.empty()) cv_.wait(lck);
-
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- segments +=
- (*it)->produce(content_name, buffer->clone(), is_last, start_offset);
- return segments;
-}
-
-uint32_t P2PSecureProducerSocket::produce(Name content_name,
- const uint8_t *buffer,
- size_t buffer_size, bool is_last,
- uint32_t start_offset) {
- if (rtc_) {
- throw errors::RuntimeException(
- "RTC transport protocol is not compatible with the production of "
- "current data. Aborting.");
- }
-
- std::unique_lock<std::mutex> lck(mtx_);
- uint32_t segments = 0;
- if (list_secure_producers.empty()) cv_.wait(lck);
-
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- segments += (*it)->produce(content_name, buffer, buffer_size, is_last,
- start_offset);
- return segments;
-}
-
-void P2PSecureProducerSocket::asyncProduce(const Name &content_name,
- const uint8_t *buf,
- size_t buffer_size, bool is_last,
- uint32_t *start_offset) {
- if (rtc_) {
- throw errors::RuntimeException(
- "RTC transport protocol is not compatible with the production of "
- "current data. Aborting.");
- }
-
- std::unique_lock<std::mutex> lck(mtx_);
- if (list_secure_producers.empty()) cv_.wait(lck);
-
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++) {
- (*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset);
- }
-}
-
-void P2PSecureProducerSocket::asyncProduce(
- Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last,
- uint32_t offset, uint32_t **last_segment) {
- if (rtc_) {
- throw errors::RuntimeException(
- "RTC transport protocol is not compatible with the production of "
- "current data. Aborting.");
- }
-
- std::unique_lock<std::mutex> lck(mtx_);
- if (list_secure_producers.empty()) cv_.wait(lck);
-
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++) {
- (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset,
- last_segment);
- }
-}
-
-// Socket Option Redefinition to avoid name hiding
-
-int P2PSecureProducerSocket::setSocketOption(
- int socket_option_key, ProducerInterestCallback socket_option_value) {
- if (!list_secure_producers.empty()) {
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
- }
-
- switch (socket_option_key) {
- case ProducerCallbacksOptions::INTEREST_INPUT:
- on_interest_input_decrypted_ = socket_option_value;
- return SOCKET_OPTION_SET;
-
- case ProducerCallbacksOptions::INTEREST_DROP:
- on_interest_dropped_input_buffer_ = socket_option_value;
- return SOCKET_OPTION_SET;
-
- case ProducerCallbacksOptions::INTEREST_PASS:
- on_interest_inserted_input_buffer_ = socket_option_value;
- return SOCKET_OPTION_SET;
-
- case ProducerCallbacksOptions::CACHE_HIT:
- on_interest_satisfied_output_buffer_ = socket_option_value;
- return SOCKET_OPTION_SET;
-
- case ProducerCallbacksOptions::CACHE_MISS:
- on_interest_process_decrypted_ = socket_option_value;
- return SOCKET_OPTION_SET;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-}
-
-int P2PSecureProducerSocket::setSocketOption(
- int socket_option_key,
- const std::shared_ptr<utils::Signer> &socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
-
- switch (socket_option_key) {
- case GeneralTransportOptions::SIGNER: {
- signer_.reset();
- signer_ = socket_option_value;
-
- return SOCKET_OPTION_SET;
- }
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-}
-
-int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
- uint32_t socket_option_value) {
- if (!list_secure_producers.empty()) {
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
- }
- switch (socket_option_key) {
- case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
- content_object_expiry_time_ =
- socket_option_value; // HICN_HANDSHAKE_CONTENT_EXPIRY_TIME;
- return SOCKET_OPTION_SET;
- }
- return ProducerSocket::setSocketOption(socket_option_key,
- socket_option_value);
-}
-
-int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
- bool socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
-
- return ProducerSocket::setSocketOption(socket_option_key,
- socket_option_value);
-}
-
-int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
- Name *socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
-
- return ProducerSocket::setSocketOption(socket_option_key,
- socket_option_value);
-}
-
-int P2PSecureProducerSocket::setSocketOption(
- int socket_option_key, std::list<Prefix> socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
-
- return ProducerSocket::setSocketOption(socket_option_key,
- socket_option_value);
-}
-
-int P2PSecureProducerSocket::setSocketOption(
- int socket_option_key, ProducerContentObjectCallback socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
-
- return ProducerSocket::setSocketOption(socket_option_key,
- socket_option_value);
-}
-
-int P2PSecureProducerSocket::setSocketOption(
- int socket_option_key, ProducerContentCallback socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
-
- switch (socket_option_key) {
- case ProducerCallbacksOptions::CONTENT_PRODUCED:
- on_content_produced_application_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
-}
-
-int P2PSecureProducerSocket::setSocketOption(
- int socket_option_key, HashAlgorithm socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
-
- return ProducerSocket::setSocketOption(socket_option_key,
- socket_option_value);
-}
-
-int P2PSecureProducerSocket::setSocketOption(
- int socket_option_key, utils::CryptoSuite socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
-
- return ProducerSocket::setSocketOption(socket_option_key,
- socket_option_value);
-}
-
-int P2PSecureProducerSocket::setSocketOption(
- int socket_option_key, const std::string &socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
-
- return ProducerSocket::setSocketOption(socket_option_key,
- socket_option_value);
-}
-
-} // namespace interface
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.h b/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.h
deleted file mode 100644
index ba3fa0189..000000000
--- a/libtransport/src/hicn/transport/interfaces/p2psecure_socket_producer.h
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/socket_producer.h>
-#include <hicn/transport/interfaces/tls_socket_producer.h>
-#include <hicn/transport/interfaces/tls_rtc_socket_producer.h>
-#include <hicn/transport/utils/content_store.h>
-#include <hicn/transport/utils/identity.h>
-#include <hicn/transport/utils/signer.h>
-
-#include <openssl/ssl.h>
-#include <condition_variable>
-#include <forward_list>
-#include <mutex>
-
-namespace transport {
-
-namespace interface {
-
-class P2PSecureProducerSocket : public ProducerSocket {
- friend class TLSProducerSocket;
- friend class TLSRTCProducerSocket;
-
- public:
- explicit P2PSecureProducerSocket();
- explicit P2PSecureProducerSocket(
- bool rtc, const std::shared_ptr<utils::Identity> &identity);
- ~P2PSecureProducerSocket();
-
- void produce(const uint8_t *buffer, size_t buffer_size) override;
-
- uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size,
- bool is_last = true, uint32_t start_offset = 0) override;
-
- uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last = true, uint32_t start_offset = 0) override;
-
- void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size,
- bool is_last = true,
- uint32_t *start_offset = nullptr) override;
-
- void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t offset,
- uint32_t **last_segment = nullptr) override;
-
- int setSocketOption(int socket_option_key,
- ProducerInterestCallback socket_option_value) override;
-
- int setSocketOption(
- int socket_option_key,
- const std::shared_ptr<utils::Signer> &socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- uint32_t socket_option_value) override;
-
- int setSocketOption(int socket_option_key, bool socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- Name *socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- std::list<Prefix> socket_option_value) override;
-
- int setSocketOption(
- int socket_option_key,
- ProducerContentObjectCallback socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- ProducerContentCallback socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- HashAlgorithm socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- utils::CryptoSuite socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- const std::string &socket_option_value) override;
-
- using ProducerSocket::getSocketOption;
- using ProducerSocket::onInterest;
-
- protected:
- bool rtc_;
- /* Callback invoked once an interest has been received and its payload
- * decrypted */
- ProducerInterestCallback on_interest_input_decrypted_;
- ProducerInterestCallback on_interest_process_decrypted_;
- ProducerContentCallback on_content_produced_application_;
-
- private:
- std::mutex mtx_;
-
- /* Condition variable for the wait */
- std::condition_variable cv_;
-
- PARCBuffer *der_cert_;
- PARCBuffer *der_prk_;
- X509 *cert_509_;
- EVP_PKEY *pkey_rsa_;
- std::unordered_map<core::Name, std::unique_ptr<TLSProducerSocket>,
- core::hash<core::Name>, core::compare2<core::Name>>
- map_secure_producers;
- std::unordered_map<core::Name, std::unique_ptr<TLSRTCProducerSocket>,
- core::hash<core::Name>, core::compare2<core::Name>>
- map_secure_rtc_producers;
- std::list<std::unique_ptr<TLSProducerSocket>> list_secure_producers;
- std::list<std::unique_ptr<TLSRTCProducerSocket>> list_secure_rtc_producers;
-
- void onInterestCallback(ProducerSocket &p, Interest &interest);
-};
-
-} // namespace interface
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/publication_options.h b/libtransport/src/hicn/transport/interfaces/publication_options.h
deleted file mode 100644
index 6910e5371..000000000
--- a/libtransport/src/hicn/transport/interfaces/publication_options.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/core/name.h>
-
-namespace transport {
-
-namespace interface {
-
-class PublicationOptions {
- public:
- template <typename T>
- PublicationOptions(T&& name, uint32_t lifetime)
- : name_(std::forward<T&&>(name)),
- content_lifetime_milliseconds_(lifetime) {}
-
- TRANSPORT_ALWAYS_INLINE const core::Name& getName() const { return name_; }
- TRANSPORT_ALWAYS_INLINE uint32_t getLifetime() const {
- return content_lifetime_milliseconds_;
- }
-
- private:
- core::Name name_;
- uint32_t content_lifetime_milliseconds_;
- // TODO Signature
-};
-} // namespace interface
-
-} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
deleted file mode 100644
index fefa419a9..000000000
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/callbacks.h>
-#include <hicn/transport/interfaces/rtc_socket_producer.h>
-#include <stdlib.h>
-#include <time.h>
-
-#define NACK_HEADER_SIZE 8 // bytes
-#define TIMESTAMP_LEN 8 // bytes
-#define TCP_HEADER_SIZE 20
-#define IP6_HEADER_SIZE 40
-#define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps)
-#define STATS_INTERVAL_DURATION 500 // ms
-#define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8
-#define INACTIVE_TIME \
- 500 // ms without producing before the socket
- // is considered inactive
-#define MILLI_IN_A_SEC 1000 // ms in a second
-
-#define HICN_MAX_DATA_SEQ 0xefffffff
-
-// slow production rate param
-#define MIN_PRODUCTION_RATE \
- 10 // in pacekts per sec. this value is computed
- // through experiments
-#define LIFETIME_FRACTION 0.5
-
-// NACK HEADER
-// +-----------------------------------------+
-// | 4 bytes: current segment in production |
-// +-----------------------------------------+
-// | 4 bytes: production rate (bytes x sec) |
-// +-----------------------------------------+
-//
-
-// PACKET HEADER
-// +-----------------------------------------+
-// | 8 bytes: TIMESTAMP |
-// +-----------------------------------------+
-// | packet |
-// +-----------------------------------------+
-
-namespace transport {
-
-namespace interface {
-
-RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service)
- : ProducerSocket(io_service),
- currentSeg_(1),
- producedBytes_(0),
- producedPackets_(0),
- bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400),
- packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
- perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
- timer_on_(false) {
- srand((unsigned int)time(NULL));
- prodLabel_ = ((rand() % 255) << 24UL);
- interests_cache_timer_ =
- std::make_unique<asio::steady_timer>(this->getIoService());
- round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService());
- setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U);
- scheduleRoundTimer();
-}
-
-RTCProducerSocket::RTCProducerSocket()
- : ProducerSocket(),
- currentSeg_(1),
- producedBytes_(0),
- producedPackets_(0),
- bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400),
- packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
- perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
- timer_on_(false) {
- srand((unsigned int)time(NULL));
- prodLabel_ = ((rand() % 255) << 24UL);
- interests_cache_timer_ =
- std::make_unique<asio::steady_timer>(this->getIoService());
- round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService());
- setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U);
- scheduleRoundTimer();
-}
-
-RTCProducerSocket::~RTCProducerSocket() {}
-
-void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) {
- ProducerSocket::registerPrefix(producer_namespace);
-
- flowName_ = producer_namespace.getName();
- auto family = flowName_.getAddressFamily();
-
- switch (family) {
- case AF_INET6:
- headerSize_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP);
- break;
- case AF_INET:
- headerSize_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP);
- break;
- default:
- throw errors::RuntimeException("Unknown name format.");
- }
-}
-
-void RTCProducerSocket::scheduleRoundTimer() {
- round_timer_->expires_from_now(
- std::chrono::milliseconds(STATS_INTERVAL_DURATION));
- round_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- updateStats();
- });
-}
-
-void RTCProducerSocket::updateStats() {
- bytesProductionRate_ = producedBytes_.load() * perSecondFactor_;
- packetsProductionRate_ = producedPackets_.load() * perSecondFactor_;
- if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1;
- producedBytes_ = 0;
- producedPackets_ = 0;
- scheduleRoundTimer();
-}
-
-void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
- auto buffer_size = buffer->length();
-
- if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) {
- return;
- }
-
- if (TRANSPORT_EXPECT_FALSE((buffer_size + headerSize_ + TIMESTAMP_LEN) >
- data_packet_size_)) {
- return;
- }
-
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN);
- producedPackets_++;
-
- Name n(flowName_);
- auto content_object =
- std::make_shared<ContentObject>(n.setSuffix(currentSeg_.load()));
- auto payload = utils::MemBuf::create(TIMESTAMP_LEN);
-
- memcpy(payload->writableData(), &now, TIMESTAMP_LEN);
- payload->append(TIMESTAMP_LEN);
- payload->prependChain(std::move(buffer));
- content_object->appendPayload(std::move(payload));
-
- content_object->setLifetime(500); // XXX this should be set by the APP
-
- content_object->setPathLabel(prodLabel_);
-
- output_buffer_.insert(std::static_pointer_cast<ContentObject>(
- content_object->shared_from_this()));
-
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*this, *content_object);
- }
-
- TRANSPORT_LOGD("Send content %u (produce)", content_object->getName().getSuffix());
- portal_->sendContentObject(*content_object);
-
- if (on_content_object_output_) {
- on_content_object_output_(*this, *content_object);
- }
-
- uint32_t old_curr = currentSeg_.load();
- currentSeg_ = (currentSeg_.load() + 1) % HICN_MAX_DATA_SEQ;
-
- // remove interests from the interest cache if it exists
- // this generates nacks that will tell to the consumer
- // that a new data packet was produced
- utils::SpinLock::Acquire locked(interests_cache_lock_);
- if (!seqs_map_.empty()) {
- for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
- if (it->first != old_curr) sendNack(it->first);
- }
- seqs_map_.clear();
- timers_map_.clear();
- }
-}
-
-void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
- uint32_t interestSeg = interest->getName().getSuffix();
- uint32_t lifetime = interest->getLifetime();
-
- if (on_interest_input_) {
- on_interest_input_(*this, *interest);
- }
-
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- if (interestSeg > HICN_MAX_DATA_SEQ) {
- sendNack(interestSeg);
- return;
- }
-
- const std::shared_ptr<ContentObject> content_object =
- output_buffer_.find(*interest);
-
- if (content_object) {
- if (on_interest_satisfied_output_buffer_) {
- on_interest_satisfied_output_buffer_(*this, *interest);
- }
-
- if (on_content_object_output_) {
- on_content_object_output_(*this, *content_object);
- }
-
- TRANSPORT_LOGD("Send content %u (onInterest)", content_object->getName().getSuffix());
- portal_->sendContentObject(*content_object);
- return;
- } else {
- if (on_interest_process_) {
- on_interest_process_(*this, *interest);
- }
- }
-
- // if the production rate is less than MIN_PRODUCTION_RATE we put the
- // interest in a queue, otherwise we handle it in the usual way
- if (packetsProductionRate_.load() < MIN_PRODUCTION_RATE &&
- interestSeg >= currentSeg_.load()) {
- utils::SpinLock::Acquire locked(interests_cache_lock_);
-
- uint64_t next_timer = ~0;
- if (!timers_map_.empty()) {
- next_timer = timers_map_.begin()->first;
- }
-
- uint64_t expiration = now + (lifetime * LIFETIME_FRACTION);
- // check if the seq number exists already
- auto it_seqs = seqs_map_.find(interestSeg);
- if (it_seqs != seqs_map_.end()) {
- // the seq already exists
- if (expiration < it_seqs->second) {
- // we need to update the timer becasue we got a smaller one
- // 1) remove the entry from the multimap
- // 2) update this entry
- auto range = timers_map_.equal_range(it_seqs->second);
- for (auto it_timers = range.first; it_timers != range.second;
- it_timers++) {
- if (it_timers->second == it_seqs->first) {
- timers_map_.erase(it_timers);
- break;
- }
- }
- timers_map_.insert(
- std::pair<uint64_t, uint32_t>(expiration, interestSeg));
- it_seqs->second = expiration;
- } else {
- // nothing to do here
- return;
- }
- } else {
- // add the new seq
- timers_map_.insert(
- std::pair<uint64_t, uint32_t>(expiration, interestSeg));
- seqs_map_.insert(std::pair<uint32_t, uint64_t>(interestSeg, expiration));
- }
-
- // here we have at least one interest in the queue, we need to start or
- // update the timer
- if (!timer_on_) {
- // set timeout
- timer_on_ = true;
- scheduleCacheTimer(timers_map_.begin()->first - now);
- } else {
- // re-schedule the timer because a new interest will expires sooner
- if (next_timer > timers_map_.begin()->first) {
- interests_cache_timer_->cancel();
- scheduleCacheTimer(timers_map_.begin()->first - now);
- }
- }
- return;
- }
-
- uint32_t max_gap = (uint32_t)floor(
- (double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR /
- 1000.0) *
- (double)packetsProductionRate_.load()));
-
- if (interestSeg < currentSeg_.load() ||
- interestSeg > (max_gap + currentSeg_.load())) {
- sendNack(interestSeg);
- }
- // else drop packet
-}
-
-void RTCProducerSocket::scheduleCacheTimer(uint64_t wait) {
- interests_cache_timer_->expires_from_now(std::chrono::milliseconds(wait));
- interests_cache_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- interestCacheTimer();
- });
-}
-
-void RTCProducerSocket::interestCacheTimer() {
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- utils::SpinLock::Acquire locked(interests_cache_lock_);
-
- for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) {
- uint64_t expire = it_timers->first;
- if (expire <= now) {
- uint32_t seq = it_timers->second;
- sendNack(seq);
- // remove the interest from the other map
- seqs_map_.erase(seq);
- it_timers = timers_map_.erase(it_timers);
- } else {
- // stop, we are done!
- break;
- }
- }
- if (timers_map_.empty()) {
- timer_on_ = false;
- } else {
- timer_on_ = true;
- scheduleCacheTimer(timers_map_.begin()->first - now);
- }
-}
-
-void RTCProducerSocket::sendNack(uint32_t sequence) {
- auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
- nack_payload->append(NACK_HEADER_SIZE);
- ContentObject nack;
-
- Name n(flowName_);
- nack.appendPayload(std::move(nack_payload));
- nack.setName(n.setSuffix(sequence));
-
- uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data();
- *payload_ptr = currentSeg_.load();
-
- *(++payload_ptr) = bytesProductionRate_.load();
-
- nack.setLifetime(0);
- nack.setPathLabel(prodLabel_);
-
- if (on_content_object_output_) {
- on_content_object_output_(*this, nack);
- }
-
- TRANSPORT_LOGD("Send nack %u", sequence);
- portal_->sendContentObject(nack);
-}
-
-} // namespace interface
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
deleted file mode 100644
index d7917a8c0..000000000
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/socket_producer.h>
-#include <hicn/transport/utils/content_store.h>
-
-#include <atomic>
-#include <map>
-#include <mutex>
-
-namespace transport {
-
-namespace interface {
-
-class RTCProducerSocket : virtual public ProducerSocket {
- public:
- RTCProducerSocket(asio::io_service &io_service);
-
- RTCProducerSocket();
-
- ~RTCProducerSocket();
-
- void registerPrefix(const Prefix &producer_namespace) override;
-
- virtual void produce(std::unique_ptr<utils::MemBuf> &&buffer) override;
-
- void onInterest(Interest::Ptr &&interest) override;
-
- private:
- void sendNack(uint32_t sequence);
- void updateStats();
- void scheduleCacheTimer(uint64_t wait);
- void scheduleRoundTimer();
- void interestCacheTimer();
-
- std::atomic<uint32_t> currentSeg_;
- uint32_t prodLabel_;
- uint16_t headerSize_;
- Name flowName_;
- std::atomic<uint32_t> producedBytes_;
- std::atomic<uint32_t> producedPackets_;
- std::atomic<uint32_t> bytesProductionRate_;
- std::atomic<uint32_t> packetsProductionRate_;
- uint32_t perSecondFactor_;
-
- std::unique_ptr<asio::steady_timer> round_timer_;
-
- // cache for the received interests
- // this map maps the expiration time of an interest to
- // its sequence number. the map is sorted by timeouts
- // the same timeout may be used for multiple sequence numbers
- // but for each sequence number we store only the smallest
- // expiry time. In this way the mapping from seqs_map_ to
- // timers_map_ is unique
- std::multimap<uint64_t,uint32_t> timers_map_;
- // this map does the opposite, this map is not ordered
- std::unordered_map<uint32_t,uint64_t> seqs_map_;
- bool timer_on_;
- std::unique_ptr<asio::steady_timer> interests_cache_timer_;
- utils::SpinLock interests_cache_lock_;
-};
-
-} // namespace interface
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket.h b/libtransport/src/hicn/transport/interfaces/socket.h
deleted file mode 100644
index 4c9bda7df..000000000
--- a/libtransport/src/hicn/transport/interfaces/socket.h
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/config.h>
-#include <hicn/transport/core/facade.h>
-#include <hicn/transport/interfaces/callbacks.h>
-#include <hicn/transport/interfaces/socket_options_default_values.h>
-#include <hicn/transport/interfaces/socket_options_keys.h>
-
-#define SOCKET_OPTION_GET 0
-#define SOCKET_OPTION_NOT_GET 1
-#define SOCKET_OPTION_SET 2
-#define SOCKET_OPTION_NOT_SET 3
-#define SOCKET_OPTION_DEFAULT 12345
-
-namespace transport {
-
-namespace interface {
-
-// Forward Declarations
-template <typename PortalType>
-class Socket;
-
-// Define the portal and its connector, depending on the compilation options
-// passed by the build tool.
-using HicnForwarderPortal = core::HicnForwarderPortal;
-
-#ifdef __linux__
-#ifndef __ANDROID__
-using RawSocketPortal = core::RawSocketPortal;
-#endif
-#endif
-
-#ifdef __vpp__
-using VPPForwarderPortal = core::VPPForwarderPortal;
-using BaseSocket = Socket<VPPForwarderPortal>;
-using BasePortal = VPPForwarderPortal;
-#else
-using BaseSocket = Socket<HicnForwarderPortal>;
-using BasePortal = HicnForwarderPortal;
-#endif
-
-template <typename PortalType>
-class Socket {
- static_assert(std::is_same<PortalType, HicnForwarderPortal>::value
-#ifdef __linux__
-#ifndef __ANDROID__
- || std::is_same<PortalType, RawSocketPortal>::value
-#ifdef __vpp__
- || std::is_same<PortalType, VPPForwarderPortal>::value
-#endif
-#endif
- ,
-#else
- ,
-
-#endif
- "This class is not allowed as Portal");
-
- public:
- using Portal = PortalType;
-
- virtual asio::io_service &getIoService() = 0;
-
- virtual void connect() = 0;
-
- virtual bool isRunning() = 0;
-
- protected:
- virtual ~Socket(){};
-
- protected:
- std::string output_interface_;
-};
-
-} // namespace interface
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
deleted file mode 100644
index b2c054947..000000000
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ /dev/null
@@ -1,862 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/cbr.h>
-#include <hicn/transport/protocols/raaqm.h>
-#include <hicn/transport/protocols/rtc.h>
-
-namespace transport {
-
-namespace interface {
-
-ConsumerSocket::ConsumerSocket(int protocol)
- : ConsumerSocket(protocol, internal_io_service_) {}
-
-ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service)
- : io_service_(io_service),
- portal_(std::make_shared<Portal>(io_service_)),
- async_downloader_(),
- interest_lifetime_(default_values::interest_lifetime),
- min_window_size_(default_values::min_window_size),
- max_window_size_(default_values::max_window_size),
- current_window_size_(-1),
- max_retransmissions_(
- default_values::transport_protocol_max_retransmissions),
- /****** RAAQM Parameters ******/
- minimum_drop_probability_(default_values::minimum_drop_probability),
- sample_number_(default_values::sample_number),
- gamma_(default_values::gamma_value),
- beta_(default_values::beta_value),
- drop_factor_(default_values::drop_factor),
- /****** END RAAQM Parameters ******/
- rate_estimation_alpha_(default_values::rate_alpha),
- rate_estimation_observer_(nullptr),
- rate_estimation_batching_parameter_(default_values::batch),
- rate_estimation_choice_(0),
- verifier_(std::make_shared<utils::Verifier>()),
- verify_signature_(false),
- key_content_(false),
- on_interest_output_(VOID_HANDLER),
- on_interest_timeout_(VOID_HANDLER),
- on_interest_satisfied_(VOID_HANDLER),
- on_content_object_input_(VOID_HANDLER),
- on_content_object_verification_(VOID_HANDLER),
- on_content_object_(VOID_HANDLER),
- on_manifest_(VOID_HANDLER),
- stats_summary_(VOID_HANDLER),
- read_callback_(nullptr),
- virtual_download_(false),
- timer_interval_milliseconds_(0),
- guard_raaqm_params_() {
- switch (protocol) {
- case TransportProtocolAlgorithms::CBR:
- transport_protocol_ = std::make_unique<CbrTransportProtocol>(this);
- break;
- case TransportProtocolAlgorithms::RTC:
- transport_protocol_ = std::make_unique<RTCTransportProtocol>(this);
- break;
- case TransportProtocolAlgorithms::RAAQM:
- default:
- transport_protocol_ = std::make_unique<RaaqmTransportProtocol>(this);
- break;
- }
-}
-
-ConsumerSocket::~ConsumerSocket() {
- stop();
- async_downloader_.stop();
-}
-
-void ConsumerSocket::connect() { portal_->connect(); }
-
-int ConsumerSocket::consume(const Name &name) {
- if (transport_protocol_->isRunning()) {
- return CONSUMER_BUSY;
- }
-
- network_name_ = name;
- network_name_.setSuffix(0);
-
- transport_protocol_->start();
-
- return CONSUMER_FINISHED;
-}
-
-int ConsumerSocket::asyncConsume(const Name &name) {
- if (!async_downloader_.stopped()) {
- async_downloader_.add([this, name]() {
- network_name_ = std::move(name);
- network_name_.setSuffix(0);
- transport_protocol_->start();
- });
- }
-
- return CONSUMER_RUNNING;
-}
-
-bool ConsumerSocket::verifyKeyPackets() {
- return transport_protocol_->verifyKeyPackets();
-}
-
-void ConsumerSocket::stop() {
- if (transport_protocol_) {
- if (transport_protocol_->isRunning()) transport_protocol_->stop();
- }
-}
-
-void ConsumerSocket::resume() {
- if (!transport_protocol_->isRunning()) {
- transport_protocol_->resume();
- }
-}
-
-asio::io_service &ConsumerSocket::getIoService() {
- return portal_->getIoService();
-}
-
-int ConsumerSocket::setSocketOption(int socket_option_key,
- ReadCallback *socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key, ReadCallback *socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::READ_CALLBACK:
- read_callback_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ConsumerSocket::getSocketOption(int socket_option_key,
- ReadCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key, ReadCallback **socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::READ_CALLBACK:
- *socket_option_value = read_callback_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int ConsumerSocket::setSocketOption(int socket_option_key,
- double socket_option_value) {
- utils::SpinLock::Acquire locked(guard_raaqm_params_);
- switch (socket_option_key) {
- case MIN_WINDOW_SIZE:
- min_window_size_ = socket_option_value;
- break;
-
- case MAX_WINDOW_SIZE:
- max_window_size_ = socket_option_value;
- break;
-
- case CURRENT_WINDOW_SIZE:
- current_window_size_ = socket_option_value;
- break;
-
- case GAMMA_VALUE:
- gamma_ = socket_option_value;
- break;
-
- case BETA_VALUE:
- beta_ = socket_option_value;
- break;
-
- case DROP_FACTOR:
- drop_factor_ = socket_option_value;
- break;
-
- case MINIMUM_DROP_PROBABILITY:
- minimum_drop_probability_ = socket_option_value;
- break;
-
- case RATE_ESTIMATION_ALPHA:
- if (socket_option_value >= 0 && socket_option_value < 1) {
- rate_estimation_alpha_ = socket_option_value;
- } else {
- rate_estimation_alpha_ = default_values::alpha;
- }
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
-}
-
-int ConsumerSocket::setSocketOption(int socket_option_key,
- uint32_t socket_option_value) {
- utils::SpinLock::Acquire locked(guard_raaqm_params_);
- switch (socket_option_key) {
- case GeneralTransportOptions::MAX_INTEREST_RETX:
- max_retransmissions_ = socket_option_value;
- break;
-
- case GeneralTransportOptions::INTEREST_LIFETIME:
- interest_lifetime_ = socket_option_value;
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
- if (socket_option_value > 0) {
- rate_estimation_batching_parameter_ = socket_option_value;
- } else {
- rate_estimation_batching_parameter_ = default_values::batch;
- }
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
- if (socket_option_value > 0) {
- rate_estimation_choice_ = socket_option_value;
- } else {
- rate_estimation_choice_ = default_values::rate_choice;
- }
- break;
-
- case GeneralTransportOptions::STATS_INTERVAL:
- timer_interval_milliseconds_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
-}
-
-int ConsumerSocket::setSocketOption(int socket_option_key,
- std::nullptr_t socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key, std::nullptr_t socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_retransmission_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::INTEREST_EXPIRED:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_timeout_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::INTEREST_SATISFIED:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_satisfied_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::INTEREST_OUTPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_output_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_input_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_verification_ = VOID_HANDLER;
- break;
- }
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ConsumerSocket::setSocketOption(int socket_option_key,
- bool socket_option_value) {
- int result = SOCKET_OPTION_NOT_SET;
- if (!transport_protocol_->isRunning()) {
- switch (socket_option_key) {
- case OtherOptions::VIRTUAL_DOWNLOAD:
- virtual_download_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
-
- case GeneralTransportOptions::VERIFY_SIGNATURE:
- verify_signature_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
-
- case GeneralTransportOptions::KEY_CONTENT:
- key_content_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
-
- default:
- return result;
- }
- }
- return result;
-}
-
-int ConsumerSocket::setSocketOption(
- int socket_option_key, ConsumerContentObjectCallback socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerContentObjectCallback socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
- on_content_object_input_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ConsumerSocket::setSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationCallback socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerContentObjectVerificationCallback socket_option_value)
- -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- on_content_object_verification_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ConsumerSocket::setSocketOption(
- int socket_option_key, ConsumerInterestCallback socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerInterestCallback socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
- on_interest_retransmission_ = socket_option_value;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_OUTPUT:
- on_interest_output_ = socket_option_value;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_EXPIRED:
- on_interest_timeout_ = socket_option_value;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_SATISFIED:
- on_interest_satisfied_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ConsumerSocket::setSocketOption(
- int socket_option_key, ConsumerManifestCallback socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerManifestCallback socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::MANIFEST_INPUT:
- on_manifest_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ConsumerSocket::setSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback socket_option_value) {
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback socket_option_value)
- -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::VERIFICATION_FAILED:
- verification_failed_callback_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ConsumerSocket::setSocketOption(int socket_option_key,
- IcnObserver *socket_option_value) {
- utils::SpinLock::Acquire locked(guard_raaqm_params_);
- switch (socket_option_key) {
- case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
- rate_estimation_observer_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
-}
-
-int ConsumerSocket::setSocketOption(
- int socket_option_key,
- const std::shared_ptr<utils::Verifier> &socket_option_value) {
- int result = SOCKET_OPTION_NOT_SET;
- if (!transport_protocol_->isRunning()) {
- switch (socket_option_key) {
- case GeneralTransportOptions::VERIFIER:
- verifier_.reset();
- verifier_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
- default:
- return result;
- }
- }
-
- return result;
-}
-
-int ConsumerSocket::setSocketOption(int socket_option_key,
- const std::string &socket_option_value) {
- int result = SOCKET_OPTION_NOT_SET;
- if (!transport_protocol_->isRunning()) {
- switch (socket_option_key) {
- case GeneralTransportOptions::CERTIFICATE:
- key_id_ = verifier_->addKeyFromCertificate(socket_option_value);
- if (key_id_ != nullptr) result = SOCKET_OPTION_SET;
- break;
-
- case DataLinkOptions::OUTPUT_INTERFACE:
- output_interface_ = socket_option_value;
- portal_->setOutputInterface(output_interface_);
- result = SOCKET_OPTION_SET;
- break;
-
- default:
- return result;
- }
- }
- return result;
-}
-
-int ConsumerSocket::setSocketOption(int socket_option_key,
- ConsumerTimerCallback socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerTimerCallback socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::STATS_SUMMARY:
- stats_summary_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ConsumerSocket::getSocketOption(int socket_option_key,
- double &socket_option_value) {
- utils::SpinLock::Acquire locked(guard_raaqm_params_);
- switch (socket_option_key) {
- case GeneralTransportOptions::MIN_WINDOW_SIZE:
- socket_option_value = min_window_size_;
- break;
-
- case GeneralTransportOptions::MAX_WINDOW_SIZE:
- socket_option_value = max_window_size_;
- break;
-
- case GeneralTransportOptions::CURRENT_WINDOW_SIZE:
- socket_option_value = current_window_size_;
- break;
-
- // RAAQM parameters
-
- case RaaqmTransportOptions::GAMMA_VALUE:
- socket_option_value = gamma_;
- break;
-
- case RaaqmTransportOptions::BETA_VALUE:
- socket_option_value = beta_;
- break;
-
- case RaaqmTransportOptions::DROP_FACTOR:
- socket_option_value = drop_factor_;
- break;
-
- case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY:
- socket_option_value = minimum_drop_probability_;
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_ALPHA:
- socket_option_value = rate_estimation_alpha_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ConsumerSocket::getSocketOption(int socket_option_key,
- uint32_t &socket_option_value) {
- utils::SpinLock::Acquire locked(guard_raaqm_params_);
- switch (socket_option_key) {
- case GeneralTransportOptions::MAX_INTEREST_RETX:
- socket_option_value = max_retransmissions_;
- break;
-
- case GeneralTransportOptions::INTEREST_LIFETIME:
- socket_option_value = interest_lifetime_;
- break;
-
- case RaaqmTransportOptions::SAMPLE_NUMBER:
- socket_option_value = sample_number_;
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
- socket_option_value = rate_estimation_batching_parameter_;
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
- socket_option_value = rate_estimation_choice_;
- break;
-
- case GeneralTransportOptions::STATS_INTERVAL:
- socket_option_value = timer_interval_milliseconds_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ConsumerSocket::getSocketOption(int socket_option_key,
- bool &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::RUNNING:
- socket_option_value = transport_protocol_->isRunning();
- break;
-
- case OtherOptions::VIRTUAL_DOWNLOAD:
- socket_option_value = virtual_download_;
- break;
-
- case GeneralTransportOptions::VERIFY_SIGNATURE:
- socket_option_value = verify_signature_;
- break;
-
- case GeneralTransportOptions::KEY_CONTENT:
- socket_option_value = key_content_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ConsumerSocket::getSocketOption(int socket_option_key,
- Name **socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- *socket_option_value = &network_name_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ConsumerSocket::getSocketOption(
- int socket_option_key,
- ConsumerContentObjectCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerContentObjectCallback **socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
- *socket_option_value = &on_content_object_input_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int ConsumerSocket::getSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerContentObjectVerificationCallback **socket_option_value)
- -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- *socket_option_value = &on_content_object_verification_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int ConsumerSocket::getSocketOption(
- int socket_option_key, ConsumerInterestCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerInterestCallback **socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
- *socket_option_value = &on_interest_retransmission_;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_OUTPUT:
- *socket_option_value = &on_interest_output_;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_EXPIRED:
- *socket_option_value = &on_interest_timeout_;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_SATISFIED:
- *socket_option_value = &on_interest_satisfied_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int ConsumerSocket::getSocketOption(
- int socket_option_key, ConsumerManifestCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerManifestCallback **socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::MANIFEST_INPUT:
- *socket_option_value = &on_manifest_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int ConsumerSocket::getSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback **socket_option_value)
- -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::VERIFICATION_FAILED:
- *socket_option_value = &verification_failed_callback_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int ConsumerSocket::getSocketOption(
- int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
- switch (socket_option_key) {
- case PORTAL:
- socket_option_value = portal_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ConsumerSocket::getSocketOption(int socket_option_key,
- IcnObserver **socket_option_value) {
- utils::SpinLock::Acquire locked(guard_raaqm_params_);
- switch (socket_option_key) {
- case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
- *socket_option_value = (rate_estimation_observer_);
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ConsumerSocket::getSocketOption(
- int socket_option_key,
- std::shared_ptr<utils::Verifier> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::VERIFIER:
- socket_option_value = verifier_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ConsumerSocket::getSocketOption(int socket_option_key,
- std::string &socket_option_value) {
- switch (socket_option_key) {
- case DataLinkOptions::OUTPUT_INTERFACE:
- socket_option_value = output_interface_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ConsumerSocket::getSocketOption(int socket_option_key,
- TransportStatistics **socket_option_value) {
- switch (socket_option_key) {
- case OtherOptions::STATISTICS:
- *socket_option_value = &stats_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ConsumerSocket::getSocketOption(
- int socket_option_key, ConsumerTimerCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerTimerCallback **socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::STATS_SUMMARY:
- *socket_option_value = &stats_summary_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-} // namespace interface
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
deleted file mode 100644
index 48a594adf..000000000
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/socket.h>
-#include <hicn/transport/interfaces/socket_options_default_values.h>
-#include <hicn/transport/protocols/protocol.h>
-#include <hicn/transport/protocols/statistics.h>
-#include <hicn/transport/utils/event_thread.h>
-#include <hicn/transport/utils/verifier.h>
-
-extern "C" {
-#include <parc/security/parc_KeyId.h>
-}
-
-#define CONSUMER_FINISHED 0
-#define CONSUMER_BUSY 1
-#define CONSUMER_RUNNING 2
-
-namespace transport {
-
-namespace interface {
-
-using namespace core;
-using namespace protocol;
-
-/**
- * @brief Main interface for consumer applications.
- *
- * The consumer socket is the main interface for a consumer application.
- * It allows to retrieve an application data from one/many producers, by
- * hiding all the complexity of the transport protocol used underneath.
- */
-class ConsumerSocket : public BaseSocket {
- public:
- /**
- * The ReadCallback is a class which can be used by the transport for both
- * querying the application needs and notifying events.
- *
- * Beware that the methods of this class will be called synchronously while
- * the transport is working, so the operations the application is performing
- * on the data retrieved should be executed in another thread in an
- * asynchronous manner. Blocking one of these callbacks means blocking the
- * transport.
- */
- class ReadCallback {
- public:
- virtual ~ReadCallback() = default;
-
- /**
- * This API will specify to the transport whether the buffer should be
- * allocated by the application (and then the retrieved content will be
- * copied there) or the transport should allocate the buffer and "move" it
- * to the application. In other words, if isBufferMovable return true, the
- * transport will transfer the ownership of the read buffer to the
- * application, without performing an additional copy, while if it returns
- * false the transport will use the getReadBuffer API.
- *
- * By default this method returns true.
- *
- */
- virtual bool isBufferMovable() noexcept { return true; }
-
- /**
- * This method will be called by the transport when the content is
- * available. The application can then allocate its own buffer and provide
- * the address to the transport, which will use it for writing the data.
- * Note that if the application won't allocate enough memory this method
- * will be called several times, until the internal read buffer will be
- * emptied. For ensuring this method will be called once, applications
- * should allocate at least maxBufferSize() bytes.
- *
- * @param application_buffer - Pointer to the application's buffer.
- * @param max_length - The length of the application buffer.
- */
- virtual void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) = 0;
-
- /**
- * This method will be called by the transport after calling getReadBuffer,
- * in order to notify the application that length bytes are available in the
- * buffer. The max_length size of the buffer could be larger than the actual
- * amount of bytes written.
- *
- * @param length - The number of bytes placed in the buffer.
- */
- virtual void readDataAvailable(size_t length) noexcept = 0;
-
- /**
- * This method will be called by the transport for understanding how many
- * bytes it should read (at most) before notifying the application.
- *
- * By default it reads 64 KB.
- */
- virtual size_t maxBufferSize() const { return 64 * 1024; }
-
- /**
- * This method will be called by the transport iff (isBufferMovable ==
- * true). The unique_ptr underlines the fact that the ownership of the
- * buffer is being transferred to the application.
- *
- * @param buffer - The buffer
- */
- virtual void readBufferAvailable(
- std::unique_ptr<utils::MemBuf> &&buffer) noexcept {}
-
- /**
- * readError() will be invoked if an error occurs reading from the
- * transport.
- *
- * @param ec - An error code describing the error.
- */
- virtual void readError(const std::error_code ec) noexcept = 0;
-
- /**
- * This callback will be invoked when the whole content is retrieved. The
- * transport itself knows when a content is retrieved (since it is not an
- * opaque bytestream like TCP), and the transport itself is able to tell
- * the application when the transfer is done.
- */
- virtual void readSuccess(std::size_t total_size) noexcept = 0;
-
- virtual void afterRead() {}
- };
-
- /**
- * @brief Create a new consumer socket.
- *
- * @param protocol - The transport protocol to use. So far the following
- * transport are supported:
- * - CBR: Constant bitrate
- * - Raaqm: Based on paper: Optimal multipath congestion control and request
- * forwarding in information-centric networks: Protocol design and
- * experimentation. G Carofiglio, M Gallo, L Muscariello. Computer Networks
- * 110, 104-117
- * - RTC: Real time communication
- */
- explicit ConsumerSocket(int protocol);
- explicit ConsumerSocket(int protocol, asio::io_service &io_service);
-
- /**
- * @brief Destroy the consumer socket.
- */
- ~ConsumerSocket();
-
- /**
- * @brief Connect the consumer socket to the underlying hICN forwarder.
- */
- void connect() override;
-
- /**
- * @brief Check whether consumer socket is active or not.
- */
- bool isRunning() override { return transport_protocol_->isRunning(); }
-
- /**
- * Retrieve a content using the protocol specified in the constructor.
- * This function blocks until the whole content is downloaded.
- * For monitoring the status of the download, the application MUST set the
- * ConsumerRead callback. This callback will be called periodically (depending
- * on the needs of the application), allowing the application to save the
- * retrieved data.
- *
- * @param name - The name of the content to retrieve.
- *
- * @return CONSUMER_BUSY if a pending download exists
- * @return CONSUMER_FINISHED when the download finishes
- *
- * Notice that the fact consume() returns CONSUMER_FINISHED does not imply the
- * content retrieval succeeded. This information can be obtained from the
- * error code in CONTENT_RETRIEVED callback.
- */
- virtual int consume(const Name &name);
- virtual int asyncConsume(const Name &name);
-
- /**
- * Verify the packets containing a key after the origin of the key has been
- * validated by the client.
- *
- * @return true if all packets are valid, false otherwise
- */
- virtual bool verifyKeyPackets();
-
- /**
- * Stops the consumer socket. If several downloads are queued (using
- * asyncConsume), this call stops just the current one.
- */
- void stop();
-
- /**
- * Resume the download from the same exact point it stopped.
- */
- void resume();
-
- /**
- * Get the io_service which is running the transport protocol event loop.
- *
- * @return A reference to the internal io_service where the transport protocol
- * is running.
- */
- asio::io_service &getIoService() override;
-
- virtual int setSocketOption(int socket_option_key,
- ReadCallback *socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- ReadCallback **socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- double socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- uint32_t socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- std::nullptr_t socket_option_value);
-
- virtual int setSocketOption(int socket_option_key, bool socket_option_value);
-
- virtual int setSocketOption(
- int socket_option_key, ConsumerContentObjectCallback socket_option_value);
-
- virtual int setSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback socket_option_value);
-
- virtual int setSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationCallback socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- ConsumerInterestCallback socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- ConsumerManifestCallback socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- IcnObserver *socket_option_value);
-
- virtual int setSocketOption(
- int socket_option_key,
- const std::shared_ptr<utils::Verifier> &socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- const std::string &socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- ConsumerTimerCallback socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- double &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- uint32_t &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key, bool &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- Name **socket_option_value);
-
- virtual int getSocketOption(
- int socket_option_key,
- ConsumerContentObjectCallback **socket_option_value);
-
- virtual int getSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback **socket_option_value);
-
- virtual int getSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationCallback **socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- ConsumerInterestCallback **socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- ConsumerManifestCallback **socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- std::shared_ptr<Portal> &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- IcnObserver **socket_option_value);
-
- virtual int getSocketOption(
- int socket_option_key,
- std::shared_ptr<utils::Verifier> &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- std::string &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- ConsumerTimerCallback **socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- TransportStatistics **socket_option_value);
-
- protected:
- // If the thread calling lambda_func is not the same of io_service, this
- // function reschedule the function on it
- template <typename Lambda, typename arg2>
- int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
- Lambda lambda_func) {
- // To enforce type check
- std::function<int(int, arg2)> func = lambda_func;
- int result = SOCKET_OPTION_SET;
- if (transport_protocol_->isRunning()) {
- std::mutex mtx;
- /* Condition variable for the wait */
- std::condition_variable cv;
- bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
- std::unique_lock<std::mutex> lck(mtx);
- done = true;
- result = func(socket_option_key, socket_option_value);
- cv.notify_all();
- });
- std::unique_lock<std::mutex> lck(mtx);
- if (!done) {
- cv.wait(lck);
- }
- } else {
- result = func(socket_option_key, socket_option_value);
- }
-
- return result;
- }
-
- // context inner state variables
- asio::io_service internal_io_service_;
- asio::io_service &io_service_;
-
- std::shared_ptr<Portal> portal_;
-
- utils::EventThread async_downloader_;
-
- Name network_name_;
-
- int interest_lifetime_;
-
- double min_window_size_;
- double max_window_size_;
- double current_window_size_;
- uint32_t max_retransmissions_;
-
- // RAAQM Parameters
- double minimum_drop_probability_;
- unsigned int sample_number_;
- double gamma_;
- double beta_;
- double drop_factor_;
-
- // Rate estimation parameters
- double rate_estimation_alpha_;
- IcnObserver *rate_estimation_observer_;
- int rate_estimation_batching_parameter_;
- int rate_estimation_choice_;
-
- bool is_async_;
-
- // Verification parameters
- std::shared_ptr<utils::Verifier> verifier_;
- PARCKeyId *key_id_;
- std::atomic_bool verify_signature_;
- std::atomic_bool key_content_;
-
- ConsumerInterestCallback on_interest_retransmission_;
- ConsumerInterestCallback on_interest_output_;
- ConsumerInterestCallback on_interest_timeout_;
- ConsumerInterestCallback on_interest_satisfied_;
-
- ConsumerContentObjectCallback on_content_object_input_;
- ConsumerContentObjectVerificationCallback on_content_object_verification_;
-
- ConsumerContentObjectCallback on_content_object_;
- ConsumerManifestCallback on_manifest_;
- ConsumerTimerCallback stats_summary_;
- ConsumerContentObjectVerificationFailedCallback verification_failed_callback_;
-
- ReadCallback *read_callback_;
-
- // Virtual download for traffic generator
- bool virtual_download_;
-
- uint32_t timer_interval_milliseconds_;
-
- // Transport protocol
- std::unique_ptr<TransportProtocol> transport_protocol_;
-
- // Statistic
- TransportStatistics stats_;
-
- utils::SpinLock guard_raaqm_params_;
-};
-
-} // namespace interface
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h
deleted file mode 100644
index bcf103b8c..000000000
--- a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/base.h>
-#include <chrono>
-#include <cstdint>
-
-namespace transport {
-
-namespace interface {
-
-namespace default_values {
-
-static constexpr uint32_t interest_lifetime = 1001; // milliseconds
-static constexpr uint32_t never_expire_time = HICN_MAX_LIFETIME;
-static constexpr uint32_t content_object_expiry_time =
- never_expire_time; // milliseconds -> 50 seconds
-static constexpr uint32_t content_object_packet_size =
- 1500; // The ethernet MTU
-static constexpr uint32_t producer_socket_output_buffer_size =
- 150000; // Content Object
-static constexpr uint32_t log_2_default_buffer_size = 12;
-static constexpr uint32_t signature_size = 260; // bytes
-static constexpr uint32_t key_locator_size = 60; // bytes
-static constexpr uint32_t limit_guard = 80; // bytes
-static constexpr uint32_t digest_size = 34; // bytes
-static constexpr uint32_t max_out_of_order_segments = 3; // content object
-
-// RAAQM
-static constexpr int sample_number = 30;
-static constexpr double gamma_value = 1;
-static constexpr double beta_value = 0.8;
-static constexpr double drop_factor = 0.2;
-static constexpr double minimum_drop_probability = 0.00001;
-static constexpr int path_id = 0;
-static constexpr double rate_alpha = 0.8;
-
-// Rate estimation
-static constexpr uint32_t batch = 50;
-static constexpr uint32_t kv = 20;
-static constexpr double alpha = 0.8;
-static constexpr uint32_t rate_choice = 0;
-
-// maximum allowed values
-static constexpr uint32_t transport_protocol_min_retransmissions = 0;
-static constexpr uint32_t transport_protocol_max_retransmissions = 128;
-static constexpr uint32_t max_content_object_size = 8096;
-static constexpr uint32_t min_window_size = 1; // Interests
-static constexpr uint32_t max_window_size = 256 * 2; // Interests
-
-} // namespace default_values
-
-} // namespace interface
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
deleted file mode 100644
index a38131271..000000000
--- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-namespace transport {
-
-namespace interface {
-
-typedef enum {
- RAAQM = 0,
- CBR = 1,
- RTC = 2,
-} TransportProtocolAlgorithms;
-
-typedef enum {
- INPUT_BUFFER_SIZE = 101,
- OUTPUT_BUFFER_SIZE = 102,
- NETWORK_NAME = 103,
- NAME_SUFFIX = 104,
- MAX_INTEREST_RETX = 105,
- DATA_PACKET_SIZE = 106,
- INTEREST_LIFETIME = 107,
- CONTENT_OBJECT_EXPIRY_TIME = 108,
- KEY_CONTENT = 110,
- MIN_WINDOW_SIZE = 111,
- MAX_WINDOW_SIZE = 112,
- CURRENT_WINDOW_SIZE = 113,
- ASYNC_MODE = 114,
- MAKE_MANIFEST = 115,
- PORTAL = 116,
- RUNNING = 117,
- APPLICATION_BUFFER = 118,
- HASH_ALGORITHM = 119,
- CRYPTO_SUITE = 120,
- SIGNER = 121,
- VERIFIER = 122,
- CERTIFICATE = 123,
- VERIFY_SIGNATURE = 124,
- STATS_INTERVAL = 125,
-} GeneralTransportOptions;
-
-typedef enum {
- SAMPLE_NUMBER = 201,
- GAMMA_VALUE = 202,
- BETA_VALUE = 203,
- DROP_FACTOR = 204,
- MINIMUM_DROP_PROBABILITY = 205,
- PATH_ID = 206,
- RTT_STATS = 207,
-} RaaqmTransportOptions;
-
-typedef enum {
- RATE_ESTIMATION_ALPHA = 301,
- RATE_ESTIMATION_OBSERVER = 302,
- RATE_ESTIMATION_BATCH_PARAMETER = 303,
- RATE_ESTIMATION_CHOICE = 304,
-} RateEstimationOptions;
-
-typedef enum {
- INTEREST_OUTPUT = 401,
- INTEREST_RETRANSMISSION = 402,
- INTEREST_EXPIRED = 403,
- INTEREST_SATISFIED = 404,
- CONTENT_OBJECT_INPUT = 411,
- MANIFEST_INPUT = 412,
- CONTENT_OBJECT_TO_VERIFY = 413,
- VERIFICATION_FAILED = 414,
- READ_CALLBACK = 415,
- STATS_SUMMARY = 416
-} ConsumerCallbacksOptions;
-
-typedef enum {
- INTEREST_INPUT = 501,
- INTEREST_DROP = 502,
- INTEREST_PASS = 503,
- CACHE_HIT = 506,
- CACHE_MISS = 508,
- NEW_CONTENT_OBJECT = 509,
- CONTENT_OBJECT_SIGN = 513,
- CONTENT_OBJECT_READY = 510,
- CONTENT_OBJECT_OUTPUT = 511,
- CONTENT_PRODUCED = 512,
-} ProducerCallbacksOptions;
-
-typedef enum { OUTPUT_INTERFACE = 601 } DataLinkOptions;
-
-typedef enum {
- VIRTUAL_DOWNLOAD = 701,
- USE_CFG_FILE = 702,
- STATISTICS
-} OtherOptions;
-
-typedef enum {
- SHA_256 = 801,
- RSA_256 = 802,
-} SignatureType;
-
-} // namespace interface
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
deleted file mode 100644
index 26a7208b6..000000000
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ /dev/null
@@ -1,909 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_producer.h>
-#include <hicn/transport/utils/signer.h>
-
-#include <functional>
-
-namespace transport {
-
-namespace interface {
-
-namespace details {}
-
-typedef std::chrono::time_point<std::chrono::steady_clock> Time;
-typedef std::chrono::microseconds TimeDuration;
-
-ProducerSocket::ProducerSocket() : ProducerSocket(internal_io_service_) {}
-
-ProducerSocket::ProducerSocket(asio::io_service &io_service)
- : io_service_(io_service),
- portal_(std::make_shared<Portal>(io_service_)),
- data_packet_size_(default_values::content_object_packet_size),
- content_object_expiry_time_(default_values::content_object_expiry_time),
- output_buffer_(default_values::producer_socket_output_buffer_size),
- async_thread_(),
- registration_status_(REGISTRATION_NOT_ATTEMPTED),
- making_manifest_(false),
- hash_algorithm_(HashAlgorithm::SHA_256),
- suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL),
- on_interest_input_(VOID_HANDLER),
- on_interest_dropped_input_buffer_(VOID_HANDLER),
- on_interest_inserted_input_buffer_(VOID_HANDLER),
- on_interest_satisfied_output_buffer_(VOID_HANDLER),
- on_interest_process_(VOID_HANDLER),
- on_new_segment_(VOID_HANDLER),
- on_content_object_to_sign_(VOID_HANDLER),
- on_content_object_in_output_buffer_(VOID_HANDLER),
- on_content_object_output_(VOID_HANDLER),
- on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
- on_content_produced_(VOID_HANDLER) {}
-
-ProducerSocket::~ProducerSocket() {
- stop();
- if (listening_thread_.joinable()) {
- listening_thread_.join();
- }
-}
-
-void ProducerSocket::connect() {
- portal_->connect(false);
- listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this));
-}
-
-void ProducerSocket::serveForever() {
- if (listening_thread_.joinable()) {
- listening_thread_.join();
- }
-}
-
-void ProducerSocket::stop() { portal_->stopEventsLoop(); }
-
-void ProducerSocket::registerPrefix(const Prefix &producer_namespace) {
- served_namespaces_.push_back(producer_namespace);
-}
-
-void ProducerSocket::listen() {
- registration_status_ = REGISTRATION_IN_PROGRESS;
- bool first = true;
-
- for (core::Prefix &producer_namespace : served_namespaces_) {
- if (first) {
- core::BindConfig bind_config(producer_namespace, 1000);
- portal_->bind(bind_config);
- portal_->setProducerCallback(this);
- first = !first;
- } else {
- portal_->registerRoute(producer_namespace);
- }
- }
-
- portal_->runEventsLoop();
-}
-
-void ProducerSocket::passContentObjectToCallbacks(
- const std::shared_ptr<ContentObject> &content_object) {
- if (content_object) {
- io_service_.dispatch([this, content_object]() {
- if (on_new_segment_) {
- on_new_segment_(*this, *content_object);
- }
-
- if (on_content_object_to_sign_) {
- on_content_object_to_sign_(*this, *content_object);
- }
-
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*this, *content_object);
- }
- });
-
- output_buffer_.insert(content_object);
-
- io_service_.dispatch([this, content_object]() {
- if (on_content_object_output_) {
- on_content_object_output_(*this, *content_object);
- }
- });
-
- portal_->sendContentObject(*content_object);
- }
-}
-
-void ProducerSocket::produce(ContentObject &content_object) {
- io_service_.dispatch([this, &content_object]() {
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*this, content_object);
- }
- });
-
- output_buffer_.insert(std::static_pointer_cast<ContentObject>(
- content_object.shared_from_this()));
-
- io_service_.dispatch([this, &content_object]() {
- if (on_content_object_output_) {
- on_content_object_output_(*this, content_object);
- }
- });
-
- portal_->sendContentObject(content_object);
-}
-
-uint32_t ProducerSocket::produce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t start_offset) {
- if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) {
- return 0;
- }
-
- // Copy the atomic variables to ensure they keep the same value
- // during the production
- std::size_t data_packet_size = data_packet_size_;
- uint32_t content_object_expiry_time = content_object_expiry_time_;
- HashAlgorithm hash_algo = hash_algorithm_;
- bool making_manifest = making_manifest_;
- auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy(
- suffix_strategy_, start_offset);
- std::shared_ptr<utils::Signer> signer;
- getSocketOption(GeneralTransportOptions::SIGNER, signer);
-
- auto buffer_size = buffer->length();
- int bytes_segmented = 0;
- std::size_t header_size;
- std::size_t manifest_header_size = 0;
- std::size_t signature_length = 0;
- std::uint32_t final_block_number = start_offset;
- uint64_t free_space_for_content = 0;
-
- core::Packet::Format format;
- std::shared_ptr<ContentObjectManifest> manifest;
- bool is_last_manifest = false;
-
- // TODO Manifest may still be used for indexing
- if (making_manifest && !signer) {
- TRANSPORT_LOGD("Making manifests without setting producer identity.");
- }
-
- core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
- core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC;
- if (content_name.getType() == HNT_CONTIGUOUS_V4 ||
- content_name.getType() == HNT_IOV_V4) {
- hf_format = core::Packet::Format::HF_INET_TCP;
- hf_format_ah = core::Packet::Format::HF_INET_TCP_AH;
- } else if (content_name.getType() == HNT_CONTIGUOUS_V6 ||
- content_name.getType() == HNT_IOV_V6) {
- hf_format = core::Packet::Format::HF_INET6_TCP;
- hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH;
- } else {
- throw errors::RuntimeException("Unknown name format.");
- }
-
- format = hf_format;
- if (making_manifest) {
- manifest_header_size = core::Packet::getHeaderSizeFromFormat(
- signer ? hf_format_ah : hf_format,
- signer ? signer->getSignatureLength() : 0);
- } else if (signer) {
- format = hf_format_ah;
- signature_length = signer->getSignatureLength();
- }
-
- header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length);
- free_space_for_content = data_packet_size - header_size;
- uint32_t number_of_segments =
- uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content)));
- if (free_space_for_content * number_of_segments < buffer_size) {
- number_of_segments++;
- }
-
- // TODO allocate space for all the headers
- if (making_manifest) {
- uint32_t segment_in_manifest = static_cast<uint32_t>(
- std::floor(double(data_packet_size - manifest_header_size -
- ContentObjectManifest::getManifestHeaderSize()) /
- ContentObjectManifest::getManifestEntrySize()) -
- 1.0);
- uint32_t number_of_manifests = static_cast<uint32_t>(
- std::ceil(float(number_of_segments) / segment_in_manifest));
- final_block_number += number_of_segments + number_of_manifests - 1;
-
- manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
- core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
- hash_algo, is_last_manifest, content_name, suffix_strategy_,
- signer ? signer->getSignatureLength() : 0));
- manifest->setLifetime(content_object_expiry_time);
-
- if (is_last) {
- manifest->setFinalBlockNumber(final_block_number);
- } else {
- manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX);
- }
- }
-
- TRANSPORT_LOGD("--------- START PRODUCE ----------");
- for (unsigned int packaged_segments = 0;
- packaged_segments < number_of_segments; packaged_segments++) {
- if (making_manifest) {
- if (manifest->estimateManifestSize(2) >
- data_packet_size - manifest_header_size) {
- // Send the current manifest
- manifest->encode();
-
- // If identity set, sign manifest
- if (signer) {
- signer->sign(*manifest);
- }
-
- passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
-
- // Send content objects stored in the queue
- while (!content_queue_.empty()) {
- passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %u",
- content_queue_.front()->getName().getSuffix());
- content_queue_.pop();
- }
-
- // Create new manifest. The reference to the last manifest has been
- // acquired in the passContentObjectToCallbacks function, so we can
- // safely release this reference
- manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
- core::ManifestVersion::VERSION_1,
- core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
- content_name, suffix_strategy_,
- signer ? signer->getSignatureLength() : 0));
-
- manifest->setLifetime(content_object_expiry_time);
- manifest->setFinalBlockNumber(
- is_last ? final_block_number
- : utils::SuffixStrategy::INVALID_SUFFIX);
- }
- }
-
- auto content_suffix = suffix_strategy->getNextContentSuffix();
- auto content_object = std::make_shared<ContentObject>(
- content_name.setSuffix(content_suffix), format);
- content_object->setLifetime(content_object_expiry_time);
-
- auto b = buffer->cloneOne();
- b->trimStart(free_space_for_content * packaged_segments);
- b->trimEnd(b->length());
-
- if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) {
- b->append(buffer_size - bytes_segmented);
- bytes_segmented += (int)(buffer_size - bytes_segmented);
-
- if (is_last && making_manifest) {
- is_last_manifest = true;
- } else if (is_last) {
- content_object->setRst();
- }
-
- } else {
- b->append(free_space_for_content);
- bytes_segmented += (int)(free_space_for_content);
- }
-
- content_object->appendPayload(std::move(b));
-
- if (making_manifest) {
- using namespace std::chrono_literals;
- utils::CryptoHash hash = content_object->computeDigest(hash_algo);
- manifest->addSuffixHash(content_suffix, hash);
- content_queue_.push(content_object);
- } else {
- if (signer) {
- signer->sign(*content_object);
- }
- passContentObjectToCallbacks(content_object);
- TRANSPORT_LOGD("Send content %u", content_object->getName().getSuffix());
- }
- }
-
- if (making_manifest) {
- if (is_last_manifest) {
- manifest->setFinalManifest(is_last_manifest);
- }
-
- manifest->encode();
- if (signer) {
- signer->sign(*manifest);
- }
-
- passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
- while (!content_queue_.empty()) {
- passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %u",
- content_queue_.front()->getName().getSuffix());
- content_queue_.pop();
- }
- }
-
- io_service_.dispatch([this, buffer_size]() {
- if (on_content_produced_) {
- on_content_produced_(*this, std::make_error_code(std::errc(0)),
- buffer_size);
- }
- });
-
- TRANSPORT_LOGD("--------- END PRODUCE ------------");
- return suffix_strategy->getTotalCount();
-}
-
-void ProducerSocket::asyncProduce(ContentObject &content_object) {
- if (!async_thread_.stopped()) {
- auto co_ptr = std::static_pointer_cast<ContentObject>(
- content_object.shared_from_this());
- async_thread_.add([this, content_object = std::move(co_ptr)]() {
- ProducerSocket::produce(*content_object);
- });
- }
-}
-
-void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf,
- size_t buffer_size, bool is_last,
- uint32_t *start_offset) {
- if (!async_thread_.stopped()) {
- async_thread_.add([this, suffix, buffer = buf, size = buffer_size, is_last,
- start_offset]() {
- if (start_offset != NULL) {
- *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last,
- *start_offset);
- } else {
- ProducerSocket::produce(suffix, buffer, size, is_last, 0);
- }
- });
- }
-}
-
-void ProducerSocket::asyncProduce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t offset,
- uint32_t **last_segment) {
- if (!async_thread_.stopped()) {
- auto a = buffer.release();
- async_thread_.add([this, content_name, a, is_last, offset, last_segment]() {
- auto buf = std::unique_ptr<utils::MemBuf>(a);
- if (last_segment != NULL) {
- **last_segment =
- offset + ProducerSocket::produce(content_name, std::move(buf),
- is_last, offset);
- } else {
- ProducerSocket::produce(content_name, std::move(buf), is_last, offset);
- }
- });
- }
-}
-
-void ProducerSocket::onInterest(Interest &interest) {
- if (on_interest_input_) {
- on_interest_input_(*this, interest);
- }
-
- const std::shared_ptr<ContentObject> content_object =
- output_buffer_.find(interest);
-
- if (content_object) {
- if (on_interest_satisfied_output_buffer_) {
- on_interest_satisfied_output_buffer_(*this, interest);
- }
-
- if (on_content_object_output_) {
- on_content_object_output_(*this, *content_object);
- }
-
- portal_->sendContentObject(*content_object);
- } else {
- if (on_interest_process_) {
- on_interest_process_(*this, interest);
- }
- }
-}
-
-int ProducerSocket::setSocketOption(int socket_option_key,
- uint32_t socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::DATA_PACKET_SIZE:
- if (socket_option_value < default_values::max_content_object_size &&
- socket_option_value > 0) {
- data_packet_size_ = socket_option_value;
- }
- break;
-
- case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- output_buffer_.setLimit(socket_option_value);
- break;
-
- case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
- content_object_expiry_time_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
-}
-
-int ProducerSocket::setSocketOption(int socket_option_key,
- std::nullptr_t socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerContentObjectCallback socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::INTEREST_INPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_input_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::INTEREST_DROP:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_dropped_input_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::INTEREST_PASS:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_inserted_input_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CACHE_HIT:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_satisfied_output_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CACHE_MISS:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_process_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
- if (socket_option_value == VOID_HANDLER) {
- on_new_segment_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_to_sign_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_in_output_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_output_ = VOID_HANDLER;
- break;
- }
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ProducerSocket::setSocketOption(int socket_option_key,
- bool socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::MAKE_MANIFEST:
- making_manifest_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
-}
-
-int ProducerSocket::setSocketOption(int socket_option_key,
- Name *socket_option_value) {
- return SOCKET_OPTION_NOT_SET;
-}
-
-int ProducerSocket::setSocketOption(int socket_option_key,
- std::list<Prefix> socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- served_namespaces_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
-}
-
-int ProducerSocket::setSocketOption(
- int socket_option_key, ProducerContentObjectCallback socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerContentObjectCallback socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
- on_new_segment_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- on_content_object_to_sign_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
- on_content_object_in_output_buffer_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
- on_content_object_output_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ProducerSocket::setSocketOption(
- int socket_option_key, ProducerInterestCallback socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerInterestCallback socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::INTEREST_INPUT:
- on_interest_input_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::INTEREST_DROP:
- on_interest_dropped_input_buffer_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::INTEREST_PASS:
- on_interest_inserted_input_buffer_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CACHE_HIT:
- on_interest_satisfied_output_buffer_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CACHE_MISS:
- on_interest_process_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ProducerSocket::setSocketOption(
- int socket_option_key, ProducerContentCallback socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerContentCallback socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::CONTENT_PRODUCED:
- on_content_produced_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int ProducerSocket::setSocketOption(int socket_option_key,
- HashAlgorithm socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- hash_algorithm_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
-}
-
-int ProducerSocket::setSocketOption(int socket_option_key,
- utils::CryptoSuite socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::CRYPTO_SUITE:
- crypto_suite_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
-}
-
-int ProducerSocket::setSocketOption(
- int socket_option_key,
- const std::shared_ptr<utils::Signer> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::SIGNER: {
- utils::SpinLock::Acquire locked(signer_lock_);
- signer_.reset();
- signer_ = socket_option_value;
- } break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
-}
-
-int ProducerSocket::setSocketOption(int socket_option_key,
- const std::string &socket_option_value) {
- switch (socket_option_key) {
- case DataLinkOptions::OUTPUT_INTERFACE:
- output_interface_ = socket_option_value;
- portal_->setOutputInterface(output_interface_);
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
-}
-
-int ProducerSocket::getSocketOption(int socket_option_key,
- uint32_t &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)output_buffer_.getLimit();
- break;
-
- case GeneralTransportOptions::DATA_PACKET_SIZE:
- socket_option_value = (uint32_t)data_packet_size_;
- break;
-
- case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
- socket_option_value = content_object_expiry_time_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ProducerSocket::getSocketOption(int socket_option_key,
- bool &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::MAKE_MANIFEST:
- socket_option_value = making_manifest_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ProducerSocket::getSocketOption(int socket_option_key,
- std::list<Prefix> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- socket_option_value = served_namespaces_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ProducerSocket::getSocketOption(
- int socket_option_key,
- ProducerContentObjectCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerContentObjectCallback **socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
- *socket_option_value = &on_new_segment_;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- *socket_option_value = &on_content_object_to_sign_;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
- *socket_option_value = &on_content_object_in_output_buffer_;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
- *socket_option_value = &on_content_object_output_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int ProducerSocket::getSocketOption(
- int socket_option_key, ProducerContentCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerContentCallback **socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::CONTENT_PRODUCED:
- *socket_option_value = &on_content_produced_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int ProducerSocket::getSocketOption(
- int socket_option_key, ProducerInterestCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerInterestCallback **socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::INTEREST_INPUT:
- *socket_option_value = &on_interest_input_;
- break;
-
- case ProducerCallbacksOptions::INTEREST_DROP:
- *socket_option_value = &on_interest_dropped_input_buffer_;
- break;
-
- case ProducerCallbacksOptions::INTEREST_PASS:
- *socket_option_value = &on_interest_inserted_input_buffer_;
- break;
-
- case CACHE_HIT:
- *socket_option_value = &on_interest_satisfied_output_buffer_;
- break;
-
- case CACHE_MISS:
- *socket_option_value = &on_interest_process_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int ProducerSocket::getSocketOption(
- int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
- switch (socket_option_key) {
- case PORTAL:
- socket_option_value = portal_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- ;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ProducerSocket::getSocketOption(int socket_option_key,
- HashAlgorithm &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- socket_option_value = hash_algorithm_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ProducerSocket::getSocketOption(int socket_option_key,
- utils::CryptoSuite &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- socket_option_value = crypto_suite_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ProducerSocket::getSocketOption(
- int socket_option_key,
- std::shared_ptr<utils::Signer> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::SIGNER: {
- utils::SpinLock::Acquire locked(signer_lock_);
- socket_option_value = signer_;
- } break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-int ProducerSocket::getSocketOption(int socket_option_key,
- std::string &socket_option_value) {
- switch (socket_option_key) {
- case DataLinkOptions::OUTPUT_INTERFACE:
- socket_option_value = output_interface_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
-}
-
-asio::io_service &ProducerSocket::getIoService() { return io_service_; }
-
-} // namespace interface
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
deleted file mode 100644
index 5f360f2ce..000000000
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/socket.h>
-#include <hicn/transport/utils/content_store.h>
-#include <hicn/transport/utils/event_thread.h>
-#include <hicn/transport/utils/signer.h>
-#include <hicn/transport/utils/suffix_strategy.h>
-
-#include <atomic>
-#include <cmath>
-#include <condition_variable>
-#include <mutex>
-#include <queue>
-#include <thread>
-
-#define REGISTRATION_NOT_ATTEMPTED 0
-#define REGISTRATION_SUCCESS 1
-#define REGISTRATION_FAILURE 2
-#define REGISTRATION_IN_PROGRESS 3
-
-namespace transport {
-
-namespace interface {
-
-using namespace core;
-
-class ProducerSocket : public Socket<BasePortal>,
- public BasePortal::ProducerCallback {
- public:
- explicit ProducerSocket();
- explicit ProducerSocket(asio::io_service &io_service);
-
- ~ProducerSocket();
-
- void connect() override;
-
- bool isRunning() override { return !io_service_.stopped(); };
-
- virtual uint32_t produce(Name content_name, const uint8_t *buffer,
- size_t buffer_size, bool is_last = true,
- uint32_t start_offset = 0) {
- return ProducerSocket::produce(
- content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), is_last,
- start_offset);
- }
-
- virtual uint32_t produce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last = true, uint32_t start_offset = 0);
-
- virtual void produce(ContentObject &content_object);
-
- virtual void produce(const uint8_t *buffer, size_t buffer_size) {
- produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
- }
-
- virtual void produce(std::unique_ptr<utils::MemBuf> &&buffer) {
- // This API is meant to be used just with the RTC producer.
- // Here it cannot be used since no name for the content is specified.
- throw errors::NotImplementedException();
- }
-
- virtual void asyncProduce(const Name &suffix, const uint8_t *buf,
- size_t buffer_size, bool is_last = true,
- uint32_t *start_offset = nullptr);
-
- void asyncProduce(const Name &suffix);
-
- virtual void asyncProduce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t offset,
- uint32_t **last_segment = nullptr);
-
- virtual void asyncProduce(ContentObject &content_object);
-
- virtual void registerPrefix(const Prefix &producer_namespace);
-
- void serveForever();
-
- void stop();
-
- asio::io_service &getIoService() override;
-
- virtual void onInterest(Interest &interest);
-
- virtual void onInterest(Interest::Ptr &&interest) override {
- onInterest(*interest);
- };
-
- virtual int setSocketOption(int socket_option_key,
- uint32_t socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- std::nullptr_t socket_option_value);
-
- virtual int setSocketOption(int socket_option_key, bool socket_option_value);
-
- virtual int setSocketOption(int socket_option_key, Name *socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- std::list<Prefix> socket_option_value);
-
- virtual int setSocketOption(
- int socket_option_key, ProducerContentObjectCallback socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- ProducerInterestCallback socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- ProducerContentCallback socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- HashAlgorithm socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- utils::CryptoSuite socket_option_value);
-
- virtual int setSocketOption(
- int socket_option_key,
- const std::shared_ptr<utils::Signer> &socket_option_value);
-
- virtual int setSocketOption(int socket_option_key,
- const std::string &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- uint32_t &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key, bool &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- std::list<Prefix> &socket_option_value);
-
- virtual int getSocketOption(
- int socket_option_key,
- ProducerContentObjectCallback **socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- ProducerContentCallback **socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- ProducerInterestCallback **socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- std::shared_ptr<Portal> &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- HashAlgorithm &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- utils::CryptoSuite &socket_option_value);
-
- virtual int getSocketOption(
- int socket_option_key,
- std::shared_ptr<utils::Signer> &socket_option_value);
-
- virtual int getSocketOption(int socket_option_key,
- std::string &socket_option_value);
-
- // If the thread calling lambda_func is not the same of io_service, this
- // function reschedule the function on it
- template <typename Lambda, typename arg2>
- int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
- Lambda lambda_func) {
- // To enforce type check
- std::function<int(int, arg2)> func = lambda_func;
- int result = SOCKET_OPTION_SET;
- if (listening_thread_.joinable() &&
- std::this_thread::get_id() != listening_thread_.get_id()) {
- std::mutex mtx;
- /* Condition variable for the wait */
- std::condition_variable cv;
- bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
- std::unique_lock<std::mutex> lck(mtx);
- done = true;
- result = func(socket_option_key, socket_option_value);
- cv.notify_all();
- });
- std::unique_lock<std::mutex> lck(mtx);
- if (!done) {
- cv.wait(lck);
- }
- } else {
- result = func(socket_option_key, socket_option_value);
- }
-
- return result;
- }
-
- template <typename Lambda, typename arg2>
- int rescheduleOnIOServiceWithReference(int socket_option_key,
- arg2 &socket_option_value,
- Lambda lambda_func) {
- // To enforce type check
- std::function<int(int, arg2 &)> func = lambda_func;
- int result = SOCKET_OPTION_SET;
- if (listening_thread_.joinable() &&
- std::this_thread::get_id() != this->listening_thread_.get_id()) {
- std::mutex mtx;
- /* Condition variable for the wait */
- std::condition_variable cv;
-
- bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
- std::unique_lock<std::mutex> lck(mtx);
- done = true;
- result = func(socket_option_key, socket_option_value);
- cv.notify_all();
- });
- std::unique_lock<std::mutex> lck(mtx);
- if (!done) {
- cv.wait(lck);
- }
- } else {
- result = func(socket_option_key, socket_option_value);
- }
-
- return result;
- }
- // Threads
- protected:
- std::thread listening_thread_;
- asio::io_service internal_io_service_;
- asio::io_service &io_service_;
- std::shared_ptr<Portal> portal_;
- std::atomic<size_t> data_packet_size_;
- std::list<Prefix>
- served_namespaces_; // No need to be threadsafe, this is always modified
- // by the application thread
- std::atomic<uint32_t> content_object_expiry_time_;
-
- // buffers
- // ContentStore is thread-safe
- utils::ContentStore output_buffer_;
-
- utils::EventThread async_thread_;
- int registration_status_;
-
- std::atomic<bool> making_manifest_;
-
- // map for storing sequence numbers for several calls of the publish
- // function
- std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_;
-
- std::atomic<HashAlgorithm> hash_algorithm_;
- std::atomic<utils::CryptoSuite> crypto_suite_;
- utils::SpinLock signer_lock_;
- std::shared_ptr<utils::Signer> signer_;
- core::NextSegmentCalculationStrategy suffix_strategy_;
-
- // While manifests are being built, contents are stored in a queue
- std::queue<std::shared_ptr<ContentObject>> content_queue_;
-
- // callbacks
- ProducerInterestCallback on_interest_input_;
- ProducerInterestCallback on_interest_dropped_input_buffer_;
- ProducerInterestCallback on_interest_inserted_input_buffer_;
- ProducerInterestCallback on_interest_satisfied_output_buffer_;
- ProducerInterestCallback on_interest_process_;
-
- ProducerContentObjectCallback on_new_segment_;
- ProducerContentObjectCallback on_content_object_to_sign_;
- ProducerContentObjectCallback on_content_object_in_output_buffer_;
- ProducerContentObjectCallback on_content_object_output_;
- ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_;
-
- ProducerContentCallback on_content_produced_;
-
- private:
- void listen();
-
- void passContentObjectToCallbacks(
- const std::shared_ptr<ContentObject> &content_object);
-};
-
-} // namespace interface
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.cc
deleted file mode 100644
index 27c1e54bd..000000000
--- a/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.cc
+++ /dev/null
@@ -1,178 +0,0 @@
-#include <hicn/transport/core/interest.h>
-#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
-#include <hicn/transport/interfaces/tls_rtc_socket_producer.h>
-
-#include <openssl/bio.h>
-#include <openssl/rand.h>
-#include <openssl/ssl.h>
-
-namespace transport {
-
-namespace interface {
-int TLSRTCProducerSocket::read(BIO *b, char *buf, size_t size,
- size_t *readbytes) {
- int ret;
-
- if (size > INT_MAX) size = INT_MAX;
-
- ret = TLSRTCProducerSocket::readOld(b, buf, (int)size);
-
- if (ret <= 0) {
- *readbytes = 0;
- return ret;
- }
-
- *readbytes = (size_t)ret;
-
- return 1;
-}
-
-int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) {
- TLSRTCProducerSocket *socket;
- socket = (TLSRTCProducerSocket *)BIO_get_data(b);
-
- std::unique_lock<std::mutex> lck(socket->mtx_);
- if (!socket->something_to_read_) {
- (socket->cv_).wait(lck);
- }
-
- utils::MemBuf *membuf = socket->packet_->next();
- int size_to_read;
-
- if ((int)membuf->length() > size) {
- size_to_read = size;
- } else {
- size_to_read = membuf->length();
- socket->something_to_read_ = false;
- }
-
- std::memcpy(buf, membuf->data(), size_to_read);
- membuf->trimStart(size_to_read);
-
- return size_to_read;
-}
-
-int TLSRTCProducerSocket::write(BIO *b, const char *buf, size_t size,
- size_t *written) {
- int ret;
-
- if (size > INT_MAX) size = INT_MAX;
-
- ret = TLSRTCProducerSocket::writeOld(b, buf, (int)size);
-
- if (ret <= 0) {
- *written = 0;
- return ret;
- }
-
- *written = (size_t)ret;
-
- return 1;
-}
-
-int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) {
- TLSRTCProducerSocket *socket;
- socket = (TLSRTCProducerSocket *)BIO_get_data(b);
-
- if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) &&
- socket->first_) {
- socket->tls_chunks_--;
- bool making_manifest = socket->parent_->making_manifest_;
- socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
- false);
- socket->parent_->ProducerSocket::produce(
- socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, 0);
- socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
- making_manifest);
- socket->first_ = false;
-
- } else {
- std::unique_ptr<utils::MemBuf> mbuf =
- utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0);
- auto a = mbuf.release();
- socket->async_thread_.add([socket = socket, a]() {
- socket->to_call_oncontentproduced_--;
- auto mbuf = std::unique_ptr<utils::MemBuf>(a);
- socket->RTCProducerSocket::produce(std::move(mbuf));
- ProducerContentCallback on_content_produced_application;
- socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
- on_content_produced_application);
- if (socket->to_call_oncontentproduced_ == 0 &&
- on_content_produced_application) {
- on_content_produced_application(*socket, std::error_code(), 0);
- }
- });
- }
-
- return num;
-}
-
-TLSRTCProducerSocket::TLSRTCProducerSocket(P2PSecureProducerSocket *parent,
- const Name &handshake_name)
- : RTCProducerSocket(), TLSProducerSocket(parent, handshake_name) {
- BIO_METHOD *bio_meth =
- BIO_meth_new(BIO_TYPE_ACCEPT, "secure rtc producer socket");
- BIO_meth_set_read(bio_meth, TLSRTCProducerSocket::readOld);
- BIO_meth_set_write(bio_meth, TLSRTCProducerSocket::writeOld);
- BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl);
- BIO *bio = BIO_new(bio_meth);
- BIO_set_init(bio, 1);
- BIO_set_data(bio, this);
- SSL_set_bio(ssl_, bio, bio);
-}
-
-void TLSRTCProducerSocket::accept() {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
- tls_chunks_ = 1;
- int result = SSL_accept(ssl_);
- if (result != 1)
- throw errors::RuntimeException("Unable to perform client handshake");
- }
-
- TRANSPORT_LOGD("Handshake performed!");
- parent_->list_secure_rtc_producers.push_front(
- std::move(parent_->map_secure_rtc_producers[handshake_name_]));
- parent_->map_secure_rtc_producers.erase(handshake_name_);
-
- ProducerInterestCallback on_interest_process_decrypted;
- getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- on_interest_process_decrypted);
-
- if (on_interest_process_decrypted) {
- Interest inter(std::move(packet_));
- on_interest_process_decrypted(*this, inter);
- }
-
- parent_->cv_.notify_one();
-}
-
-int TLSRTCProducerSocket::async_accept() {
- if (!async_thread_.stopped()) {
- async_thread_.add([this]() { this->TLSRTCProducerSocket::accept(); });
- } else {
- throw errors::RuntimeException(
- "Async thread not running, impossible to perform handshake");
- }
-
- return 1;
-}
-
-void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
- throw errors::RuntimeException(
- "New handshake on the same P2P secure producer socket not supported");
- }
-
- size_t buf_size = buffer->length();
- tls_chunks_ = ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH);
- to_call_oncontentproduced_ = tls_chunks_;
-
- SSL_write(ssl_, buffer->data(), buf_size);
- BIO *wbio = SSL_get_wbio(ssl_);
- int i = BIO_flush(wbio);
- (void)i; // To shut up gcc 5
-}
-
-} // namespace interface
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.h
deleted file mode 100644
index 16125f889..000000000
--- a/libtransport/src/hicn/transport/interfaces/tls_rtc_socket_producer.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/rtc_socket_producer.h>
-#include <hicn/transport/interfaces/tls_socket_producer.h>
-
-namespace transport {
-
-namespace interface {
-
-class P2PSecureProducerSocket;
-
-class TLSRTCProducerSocket : public RTCProducerSocket,
- public TLSProducerSocket {
- friend class P2PSecureProducerSocket;
-
- public:
- explicit TLSRTCProducerSocket(P2PSecureProducerSocket *parent,
- const Name &handshake_name);
-
- ~TLSRTCProducerSocket() = default;
-
- void produce(std::unique_ptr<utils::MemBuf> &&buffer) override;
-
- void accept() override;
-
- int async_accept() override;
-
- using TLSProducerSocket::produce;
- using TLSProducerSocket::onInterest;
-
- protected:
- static int read(BIO *b, char *buf, size_t size, size_t *readbytes);
-
- static int readOld(BIO *h, char *buf, int size);
-
- static int write(BIO *b, const char *buf, size_t size, size_t *written);
-
- static int writeOld(BIO *h, const char *buf, int num);
-};
-
-} // namespace interface
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.cc
deleted file mode 100644
index 58b3c1d7d..000000000
--- a/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.cc
+++ /dev/null
@@ -1,364 +0,0 @@
-#include <hicn/transport/interfaces/tls_socket_consumer.h>
-#include <openssl/bio.h>
-#include <openssl/ssl.h>
-#include <openssl/tls1.h>
-
-#include <random>
-
-namespace transport {
-
-namespace interface {
-
-void TLSConsumerSocket::setInterestPayload(ConsumerSocket &c,
- const core::Interest &interest) {
- Interest &int2 = const_cast<Interest &>(interest);
- random_suffix_ = int2.getName().getSuffix();
-
- if (payload_ != NULL) int2.appendPayload(std::move(payload_));
-}
-
-/* Return the number of read bytes in the return param */
-int readOldTLS(BIO *b, char *buf, int size) {
- if (size < 0) return size;
-
- TLSConsumerSocket *socket;
- socket = (TLSConsumerSocket *)BIO_get_data(b);
-
- std::unique_lock<std::mutex> lck(socket->mtx_);
-
- if (!socket->something_to_read_) {
- if (!socket->transport_protocol_->isRunning()) {
- socket->network_name_.setSuffix(socket->random_suffix_);
- socket->ConsumerSocket::asyncConsume(socket->network_name_);
- }
- if (!socket->something_to_read_) socket->cv_.wait(lck);
- }
-
- size_t size_to_read, read;
- size_t chain_size = socket->head_->length();
- if (socket->head_->isChained())
- chain_size = socket->head_->computeChainDataLength();
-
- if (chain_size > (size_t)size) {
- read = size_to_read = (size_t)size;
- } else {
- read = size_to_read = chain_size;
- socket->something_to_read_ = false;
- }
-
- while (size_to_read) {
- if (socket->head_->length() < size_to_read) {
- std::memcpy(buf, socket->head_->data(), socket->head_->length());
- size_to_read -= socket->head_->length();
- buf += socket->head_->length();
- socket->head_ = socket->head_->pop();
- } else {
- std::memcpy(buf, socket->head_->data(), size_to_read);
- socket->head_->trimStart(size_to_read);
- size_to_read = 0;
- }
- }
-
- return read;
-}
-
-/* Return the number of read bytes in readbytes */
-int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes) {
- int ret;
-
- if (size > INT_MAX) size = INT_MAX;
-
- ret = transport::interface::readOldTLS(b, buf, (int)size);
-
- if (ret <= 0) {
- *readbytes = 0;
- return ret;
- }
-
- *readbytes = (size_t)ret;
-
- return 1;
-}
-
-/* Return the number of written bytes in the return param */
-int writeOldTLS(BIO *b, const char *buf, int num) {
- TLSConsumerSocket *socket;
- socket = (TLSConsumerSocket *)BIO_get_data(b);
-
- socket->payload_ = utils::MemBuf::copyBuffer(buf, num);
- socket->ConsumerSocket::setSocketOption(
- ConsumerCallbacksOptions::INTEREST_OUTPUT,
- (ConsumerInterestCallback)std::bind(
- &TLSConsumerSocket::setInterestPayload, socket, std::placeholders::_1,
- std::placeholders::_2));
-
- return num;
-}
-
-/* Return the number of written bytes in written */
-int writeTLS(BIO *b, const char *buf, size_t size, size_t *written) {
- int ret;
-
- if (size > INT_MAX) size = INT_MAX;
-
- ret = transport::interface::writeOldTLS(b, buf, (int)size);
-
- if (ret <= 0) {
- *written = 0;
- return ret;
- }
-
- *written = (size_t)ret;
-
- return 1;
-}
-
-long ctrlTLS(BIO *b, int cmd, long num, void *ptr) { return 1; }
-
-TLSConsumerSocket::TLSConsumerSocket(int protocol, SSL *ssl)
- : ConsumerSocket(protocol),
- name_(),
- buf_pool_(),
- decrypted_content_(),
- payload_(),
- head_(),
- something_to_read_(false),
- content_downloaded_(false),
- random_suffix_(),
- secure_prefix_(),
- producer_namespace_(),
- read_callback_decrypted_(),
- mtx_(),
- cv_(),
- async_downloader_tls_() {
- /* Create the (d)TLS state */
- const SSL_METHOD *meth = TLS_client_method();
- ctx_ = SSL_CTX_new(meth);
-
- int result =
- SSL_CTX_set_ciphersuites(ctx_,
- "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_"
- "SHA256:TLS_AES_128_GCM_SHA256");
- if (result != 1) {
- throw errors::RuntimeException(
- "Unable to set cipher list on TLS subsystem. Aborting.");
- }
-
- SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION);
- SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION);
- SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL);
- SSL_CTX_set_ssl_version(ctx_, meth);
-
- ssl_ = ssl;
-
- BIO_METHOD *bio_meth =
- BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket");
- BIO_meth_set_read(bio_meth, transport::interface::readOldTLS);
- BIO_meth_set_write(bio_meth, transport::interface::writeOldTLS);
- BIO_meth_set_ctrl(bio_meth, transport::interface::ctrlTLS);
- BIO *bio = BIO_new(bio_meth);
- BIO_set_init(bio, 1);
- BIO_set_data(bio, this);
- SSL_set_bio(ssl_, bio, bio);
-
- ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
-
- ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
-
- std::default_random_engine generator;
- std::uniform_int_distribution<int> distribution(
- 1, std::numeric_limits<uint32_t>::max());
- random_suffix_ = 0;
-
- this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
- this);
-};
-
-int TLSConsumerSocket::consume(const Name &name,
- std::unique_ptr<utils::MemBuf> &&buffer) {
- this->payload_ = std::move(buffer);
-
- this->ConsumerSocket::setSocketOption(
- ConsumerCallbacksOptions::INTEREST_OUTPUT,
- (ConsumerInterestCallback)std::bind(
- &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1,
- std::placeholders::_2));
-
- return consume(name);
-}
-
-int TLSConsumerSocket::consume(const Name &name) {
- if (transport_protocol_->isRunning()) {
- return CONSUMER_BUSY;
- }
-
- if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
- throw errors::RuntimeException("Handshake not performed");
- }
-
- return download_content(name);
-}
-
-int TLSConsumerSocket::download_content(const Name &name) {
- network_name_ = name;
- network_name_.setSuffix(0);
- something_to_read_ = false;
- content_downloaded_ = false;
-
- decrypted_content_ = utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH);
- uint8_t *buf = decrypted_content_->writableData();
- size_t size = 0;
- int result = -1;
-
- while (!content_downloaded_ || something_to_read_) {
- if (decrypted_content_->tailroom() < SSL3_RT_MAX_PLAIN_LENGTH) {
- decrypted_content_->appendChain(
- utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH));
- // decrypted_content_->computeChainDataLength();
- buf = decrypted_content_->prev()->writableData();
- } else {
- buf = decrypted_content_->writableTail();
- }
-
- result = SSL_read(this->ssl_, buf, SSL3_RT_MAX_PLAIN_LENGTH);
-
- /* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of
- * the data has been fully downloaded */
-
- /* ASSERT((result < SSL3_RT_MAX_PLAIN_LENGTH && content_downloaded_) || */
- /* result == SSL3_RT_MAX_PLAIN_LENGTH); */
-
- if (result >= 0) {
- size += result;
- decrypted_content_->prepend(result);
- } else
- throw errors::RuntimeException("Unable to download content");
-
- if (size >= read_callback_decrypted_->maxBufferSize()) {
- if (read_callback_decrypted_->isBufferMovable()) {
- // No need to perform an additional copy. The whole buffer will be
- // tranferred to the application.
-
- read_callback_decrypted_->readBufferAvailable(
- std::move(decrypted_content_));
- decrypted_content_ = utils::MemBuf::create(SSL3_RT_MAX_PLAIN_LENGTH);
- } else {
- // The buffer will be copied into the application-provided buffer
- uint8_t *buffer;
- std::size_t length;
- std::size_t total_length = decrypted_content_->length();
-
- while (decrypted_content_->length()) {
- buffer = nullptr;
- length = 0;
- read_callback_decrypted_->getReadBuffer(&buffer, &length);
-
- if (!buffer || !length) {
- throw errors::RuntimeException(
- "Invalid buffer provided by the application.");
- }
-
- auto to_copy = std::min(decrypted_content_->length(), length);
- std::memcpy(buffer, decrypted_content_->data(), to_copy);
- decrypted_content_->trimStart(to_copy);
- }
-
- read_callback_decrypted_->readDataAvailable(total_length);
- decrypted_content_->clear();
- }
- }
- }
-
- read_callback_decrypted_->readSuccess(size);
-
- return CONSUMER_FINISHED;
-}
-
-int TLSConsumerSocket::asyncConsume(const Name &name,
- std::unique_ptr<utils::MemBuf> &&buffer) {
- this->payload_ = std::move(buffer);
-
- this->ConsumerSocket::setSocketOption(
- ConsumerCallbacksOptions::INTEREST_OUTPUT,
- (ConsumerInterestCallback)std::bind(
- &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1,
- std::placeholders::_2));
-
- return asyncConsume(name);
-}
-
-int TLSConsumerSocket::asyncConsume(const Name &name) {
- if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
- throw errors::RuntimeException("Handshake not performed");
- }
-
- if (!async_downloader_tls_.stopped()) {
- async_downloader_tls_.add([this, name]() {
- is_async_ = true;
- download_content(name);
- });
- }
-
- return CONSUMER_RUNNING;
-}
-
-void TLSConsumerSocket::registerPrefix(const Prefix &producer_namespace) {
- producer_namespace_ = producer_namespace;
-}
-
-int TLSConsumerSocket::setSocketOption(
- int socket_option_key, ConsumerSocket::ReadCallback *socket_option_value) {
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerSocket::ReadCallback *socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::READ_CALLBACK:
- read_callback_decrypted_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-void TLSConsumerSocket::getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) {}
-
-void TLSConsumerSocket::readDataAvailable(size_t length) noexcept {}
-
-size_t TLSConsumerSocket::maxBufferSize() const {
- return SSL3_RT_MAX_PLAIN_LENGTH;
-}
-
-void TLSConsumerSocket::readBufferAvailable(
- std::unique_ptr<utils::MemBuf> &&buffer) noexcept {
- std::unique_lock<std::mutex> lck(this->mtx_);
- if (head_) {
- head_->prependChain(std::move(buffer));
- } else {
- head_ = std::move(buffer);
- }
-
- something_to_read_ = true;
- cv_.notify_one();
-}
-
-void TLSConsumerSocket::readError(const std::error_code ec) noexcept {}
-
-void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept {
- std::unique_lock<std::mutex> lck(this->mtx_);
- content_downloaded_ = true;
- something_to_read_ = true;
- cv_.notify_one();
-}
-
-bool TLSConsumerSocket::isBufferMovable() noexcept { return true; }
-
-} // namespace interface
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.h b/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.h
deleted file mode 100644
index 05f7fe6a5..000000000
--- a/libtransport/src/hicn/transport/interfaces/tls_socket_consumer.h
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <openssl/ssl.h>
-
-namespace transport {
-
-namespace interface {
-
-class TLSConsumerSocket : public ConsumerSocket,
- public ConsumerSocket::ReadCallback {
- /* Return the number of read bytes in readbytes */
- friend int readTLS(BIO *b, char *buf, size_t size, size_t *readbytes);
-
- /* Return the number of read bytes in the return param */
- friend int readOldTLS(BIO *h, char *buf, int size);
-
- /* Return the number of written bytes in written */
- friend int writeTLS(BIO *b, const char *buf, size_t size, size_t *written);
-
- /* Return the number of written bytes in the return param */
- friend int writeOldTLS(BIO *h, const char *buf, int num);
-
- friend long ctrlTLS(BIO *b, int cmd, long num, void *ptr);
-
- public:
- explicit TLSConsumerSocket(int protocol, SSL *ssl_);
-
- ~TLSConsumerSocket() = default;
-
- int consume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer);
- int consume(const Name &name) override;
-
- int asyncConsume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer);
- int asyncConsume(const Name &name) override;
-
- void registerPrefix(const Prefix &producer_namespace);
-
- int setSocketOption(
- int socket_option_key,
- ConsumerSocket::ReadCallback *socket_option_value) override;
-
- using ConsumerSocket::getSocketOption;
- using ConsumerSocket::setSocketOption;
-
- protected:
- /* Callback invoked once an interest has been received and its payload
- * decrypted */
- ConsumerInterestCallback on_interest_input_decrypted_;
- ConsumerInterestCallback on_interest_process_decrypted_;
-
- private:
- Name name_;
-
- /* SSL handle */
- SSL *ssl_;
- SSL_CTX *ctx_;
-
- /* Chain of MemBuf to be used as a temporary buffer to pass descypted data
- * from the underlying layer to the application */
- utils::ObjectPool<utils::MemBuf> buf_pool_;
- std::unique_ptr<utils::MemBuf> decrypted_content_;
-
- /* Chain of MemBuf holding the payload to be written into interest or data */
- std::unique_ptr<utils::MemBuf> payload_;
-
- /* Chain of MemBuf holding the data retrieved from the underlying layer */
- std::unique_ptr<utils::MemBuf> head_;
-
- bool something_to_read_;
-
- bool content_downloaded_;
-
- double old_max_win_;
-
- double old_current_win_;
-
- uint32_t random_suffix_;
-
- ip_address_t secure_prefix_;
-
- Prefix producer_namespace_;
-
- ConsumerSocket::ReadCallback *read_callback_decrypted_;
-
- std::mutex mtx_;
-
- /* Condition variable for the wait */
- std::condition_variable cv_;
-
- utils::EventThread async_downloader_tls_;
-
- void setInterestPayload(ConsumerSocket &c, const core::Interest &interest);
- void processPayload(ConsumerSocket &c, std::size_t bytes_transferred,
- const std::error_code &ec);
-
- virtual void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) override;
-
- virtual void readDataAvailable(size_t length) noexcept override;
-
- virtual size_t maxBufferSize() const override;
-
- virtual void readBufferAvailable(
- std::unique_ptr<utils::MemBuf> &&buffer) noexcept override;
-
- virtual void readError(const std::error_code ec) noexcept override;
-
- virtual void readSuccess(std::size_t total_size) noexcept override;
- virtual bool isBufferMovable() noexcept override;
-
- int download_content(const Name &name);
-};
-
-} // namespace interface
-
-} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/tls_socket_producer.cc
deleted file mode 100644
index ad85ec6ea..000000000
--- a/libtransport/src/hicn/transport/interfaces/tls_socket_producer.cc
+++ /dev/null
@@ -1,587 +0,0 @@
-#include <hicn/transport/core/interest.h>
-#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
-#include <hicn/transport/interfaces/tls_socket_producer.h>
-
-#include <openssl/bio.h>
-#include <openssl/rand.h>
-#include <openssl/ssl.h>
-
-namespace transport {
-
-namespace interface {
-
-/* Return the number of read bytes in readbytes */
-int TLSProducerSocket::read(BIO *b, char *buf, size_t size, size_t *readbytes) {
- int ret;
-
- if (size > INT_MAX) size = INT_MAX;
-
- ret = TLSProducerSocket::readOld(b, buf, (int)size);
-
- if (ret <= 0) {
- *readbytes = 0;
- return ret;
- }
-
- *readbytes = (size_t)ret;
-
- return 1;
-}
-
-/* Return the number of read bytes in the return param */
-int TLSProducerSocket::readOld(BIO *b, char *buf, int size) {
- TLSProducerSocket *socket;
- socket = (TLSProducerSocket *)BIO_get_data(b);
-
- /* take a lock on the mutex. It will be unlocked by */
- std::unique_lock<std::mutex> lck(socket->mtx_);
- if (!socket->something_to_read_) {
- (socket->cv_).wait(lck);
- }
-
- /* Either there already is something to read, or the thread has been waken up
- */
- /* must return the payload in the interest */
-
- utils::MemBuf *membuf = socket->packet_->next();
- int size_to_read;
- if ((int)membuf->length() > size) {
- size_to_read = size;
- } else {
- size_to_read = membuf->length();
- socket->something_to_read_ = false;
- }
-
- std::memcpy(buf, membuf->data(), size_to_read);
- membuf->trimStart(size_to_read);
-
- return size_to_read;
-}
-
-/* Return the number of written bytes in written */
-int TLSProducerSocket::write(BIO *b, const char *buf, size_t size,
- size_t *written) {
- int ret;
-
- if (size > INT_MAX) size = INT_MAX;
-
- ret = TLSProducerSocket::writeOld(b, buf, (int)size);
-
- if (ret <= 0) {
- *written = 0;
- return ret;
- }
-
- *written = (size_t)ret;
-
- return 1;
-}
-
-/* Return the number of written bytes in the return param */
-int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) {
- TLSProducerSocket *socket;
- socket = (TLSProducerSocket *)BIO_get_data(b);
-
- if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) &&
- socket->first_) {
- //! socket->tls_chunks_ corresponds to is_last
- socket->tls_chunks_--;
- bool making_manifest = socket->parent_->making_manifest_;
- socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
- false);
- socket->parent_->ProducerSocket::produce(
- socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0,
- socket->last_segment_);
- socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
- making_manifest);
- socket->first_ = false;
- } else {
- socket->still_writing_ = true;
-
- std::unique_ptr<utils::MemBuf> mbuf =
- utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0);
- auto a = mbuf.release();
- socket->async_thread_.add([socket = socket, a]() {
- socket->tls_chunks_--;
- socket->to_call_oncontentproduced_--;
- auto mbuf = std::unique_ptr<utils::MemBuf>(a);
- socket->last_segment_ += socket->ProducerSocket::produce(
- socket->name_, std::move(mbuf), socket->tls_chunks_ == 0,
- socket->last_segment_);
- ProducerContentCallback on_content_produced_application;
- socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
- on_content_produced_application);
- if (socket->to_call_oncontentproduced_ == 0 &&
- on_content_produced_application) {
- on_content_produced_application(*socket, std::error_code(), 0);
- }
- });
- }
-
- return num;
-}
-
-TLSProducerSocket::TLSProducerSocket(P2PSecureProducerSocket *parent,
- const Name &handshake_name)
- : ProducerSocket(),
- on_content_produced_application_(),
- mtx_(),
- cv_(),
- something_to_read_(),
- name_(),
- last_segment_(0),
- parent_(parent),
- first_(true),
- handshake_name_(handshake_name),
- tls_chunks_(0),
- to_call_oncontentproduced_(0),
- still_writing_(false),
- encryption_thread_() {
- const SSL_METHOD *meth = TLS_server_method();
- ctx_ = SSL_CTX_new(meth);
-
- /*
- * Setup SSL context (identity and parameter to use TLS 1.3)
- */
- SSL_CTX_use_certificate(ctx_, parent->cert_509_);
- SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_);
-
- int result =
- SSL_CTX_set_ciphersuites(ctx_,
- "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_"
- "SHA256:TLS_AES_128_GCM_SHA256");
- if (result != 1) {
- throw errors::RuntimeException(
- "Unable to set cipher list on TLS subsystem. Aborting.");
- }
-
- // We force it to be TLS 1.3
- SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION);
- SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION);
- SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL);
- SSL_CTX_set_num_tickets(ctx_, 0);
-
- result = SSL_CTX_add_custom_ext(
- ctx_, 100, SSL_EXT_CLIENT_HELLO | SSL_EXT_TLS1_3_ENCRYPTED_EXTENSIONS,
- TLSProducerSocket::addHicnKeyIdCb, TLSProducerSocket::freeHicnKeyIdCb,
- this, TLSProducerSocket::parseHicnKeyIdCb, NULL);
-
- ssl_ = SSL_new(ctx_);
- /*
- * Setup this producer socker as the bio that TLS will use to write and read
- * data (in stream mode)
- */
- BIO_METHOD *bio_meth =
- BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket");
- BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld);
- BIO_meth_set_write(bio_meth, TLSProducerSocket::writeOld);
- BIO_meth_set_ctrl(bio_meth, TLSProducerSocket::ctrl);
- BIO *bio = BIO_new(bio_meth);
- BIO_set_init(bio, 1);
- BIO_set_data(bio, this);
- SSL_set_bio(ssl_, bio, bio);
- /*
- * Set the callback so that when an interest is received we catch it and we
- * decrypt the payload before passing it to the application.
- */
- this->ProducerSocket::setSocketOption(
- ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this,
- std::placeholders::_1,
- std::placeholders::_2));
- this->ProducerSocket::setSocketOption(
- ProducerCallbacksOptions::CONTENT_PRODUCED,
- (ProducerContentCallback)bind(
- &TLSProducerSocket::onContentProduced, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
-}
-
-void TLSProducerSocket::accept() {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
- tls_chunks_ = 1;
- int result = SSL_accept(ssl_);
- if (result != 1)
- throw errors::RuntimeException("Unable to perform client handshake");
- }
- TRANSPORT_LOGD("Handshake performed!");
- parent_->list_secure_producers.push_front(
- std::move(parent_->map_secure_producers[handshake_name_]));
- parent_->map_secure_producers.erase(handshake_name_);
-
- ProducerInterestCallback on_interest_process_decrypted;
- getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- on_interest_process_decrypted);
-
- if (on_interest_process_decrypted) {
- Interest inter(std::move(packet_));
- on_interest_process_decrypted(*this, inter);
- } else {
- throw errors::RuntimeException(
- "On interest process unset. Unable to perform handshake");
- }
-}
-
-int TLSProducerSocket::async_accept() {
- if (!async_thread_.stopped()) {
- async_thread_.add([this]() { this->accept(); });
- } else {
- throw errors::RuntimeException(
- "Async thread not running, impossible to perform handshake");
- }
-
- return 1;
-}
-
-void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) {
- /* Based on the state machine of (D)TLS, we know what action to do */
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
- std::unique_lock<std::mutex> lck(mtx_);
- name_ = interest.getName();
- something_to_read_ = true;
- packet_ = interest.acquireMemBufReference();
- if (head_) {
- payload_->prependChain(interest.getPayload());
- } else {
- payload_ = interest.getPayload(); // std::move(interest.getPayload());
- }
- cv_.notify_one();
- } else {
- name_ = interest.getName();
- packet_ = interest.acquireMemBufReference();
- payload_ = interest.getPayload();
- something_to_read_ = true;
-
- if (interest.getPayload()->length() > 0)
- SSL_read(
- ssl_,
- const_cast<unsigned char *>(interest.getPayload()->writableData()),
- interest.getPayload()->length());
- }
-
- ProducerInterestCallback on_interest_input_decrypted;
- getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT,
- on_interest_input_decrypted);
- if (on_interest_input_decrypted)
- (on_interest_input_decrypted)(*this, interest);
-}
-
-void TLSProducerSocket::cacheMiss(ProducerSocket &p, Interest &interest) {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
- std::unique_lock<std::mutex> lck(mtx_);
- name_ = interest.getName();
- something_to_read_ = true;
- packet_ = interest.acquireMemBufReference();
- payload_ = interest.getPayload();
- cv_.notify_one();
- } else {
- name_ = interest.getName();
- packet_ = interest.acquireMemBufReference();
- payload_ = interest.getPayload();
- something_to_read_ = true;
-
- if (interest.getPayload()->length() > 0)
- SSL_read(
- ssl_,
- const_cast<unsigned char *>(interest.getPayload()->writableData()),
- interest.getPayload()->length());
-
- if (on_interest_process_decrypted_ != VOID_HANDLER)
- on_interest_process_decrypted_(*this, interest);
- }
-}
-
-void TLSProducerSocket::onContentProduced(ProducerSocket &p,
- const std::error_code &err,
- uint64_t bytes_written) {}
-
-uint32_t TLSProducerSocket::produce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t start_offset) {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
- throw errors::RuntimeException(
- "New handshake on the same P2P secure producer socket not supported");
- }
- size_t buf_size = buffer->length();
- name_ = served_namespaces_.front().mapName(content_name);
-
- tls_chunks_ = to_call_oncontentproduced_ =
- ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH);
-
- if (!is_last) {
- tls_chunks_++;
- }
-
- last_segment_ = start_offset;
-
- SSL_write(ssl_, buffer->data(), buf_size);
- BIO *wbio = SSL_get_wbio(ssl_);
- int i = BIO_flush(wbio);
- (void)i; // To shut up gcc 5
-
- return 0;
-}
-
-void TLSProducerSocket::asyncProduce(const Name &content_name,
- const uint8_t *buf, size_t buffer_size,
- bool is_last, uint32_t *start_offset) {
- if (!encryption_thread_.stopped()) {
- encryption_thread_.add([this, content_name, buffer = buf,
- size = buffer_size, is_last, start_offset]() {
- if (start_offset != NULL) {
- produce(content_name, buffer, size, is_last, *start_offset);
- } else {
- produce(content_name, buffer, size, is_last, 0);
- }
- });
- }
-}
-
-void TLSProducerSocket::asyncProduce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t offset,
- uint32_t **last_segment) {
- if (!encryption_thread_.stopped()) {
- auto a = buffer.release();
- encryption_thread_.add(
- [this, content_name, a, is_last, offset, last_segment]() {
- auto buf = std::unique_ptr<utils::MemBuf>(a);
- if (last_segment != NULL) {
- *last_segment = &last_segment_;
- }
- produce(content_name, std::move(buf), is_last, offset);
- });
- }
-}
-
-void TLSProducerSocket::asyncProduce(ContentObject &content_object) {
- throw errors::RuntimeException("API not supported");
-}
-
-void TLSProducerSocket::produce(ContentObject &content_object) {
- throw errors::RuntimeException("API not supported");
-}
-
-long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) {
- if (cmd == BIO_CTRL_FLUSH) {
- }
- return 1;
-}
-
-int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
- unsigned int context,
- const unsigned char **out, size_t *outlen,
- X509 *x, size_t chainidx, int *al,
- void *add_arg) {
- TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg);
- if (ext_type == 100) {
- ip_prefix_t ip_prefix =
- socket->parent_->served_namespaces_.front().toIpPrefixStruct();
- int inet_family =
- socket->parent_->served_namespaces_.front().getAddressFamily();
- uint16_t prefix_len_bits =
- socket->parent_->served_namespaces_.front().getPrefixLength();
- uint8_t prefix_len_bytes = prefix_len_bits / 8;
- uint8_t prefix_len_u32 = prefix_len_bits / 32;
-
- ip_prefix_t *out_ip = (ip_prefix_t *)malloc(sizeof(ip_prefix_t));
- out_ip->family = inet_family;
- out_ip->len = prefix_len_bits + 32;
- u8 *out_ip_buf = const_cast<u8 *>(
- ip_address_get_buffer(&(out_ip->address), inet_family));
- *out = reinterpret_cast<unsigned char *>(out_ip);
-
- RAND_bytes((unsigned char *)&socket->key_id_, 4);
-
- memcpy(out_ip_buf, ip_address_get_buffer(&(ip_prefix.address), inet_family),
- prefix_len_bytes);
- memcpy((out_ip_buf + prefix_len_bytes), &socket->key_id_, 4);
- *outlen = sizeof(ip_prefix_t);
-
- ip_address_t mask = {};
- ip_address_t keyId_component = {};
- u32 *mask_buf;
- u32 *keyId_component_buf;
- switch (inet_family) {
- case AF_INET:
- mask_buf = &(mask.v4.as_u32);
- keyId_component_buf = &(keyId_component.v4.as_u32);
- break;
- case AF_INET6:
- mask_buf = mask.v6.as_u32;
- keyId_component_buf = keyId_component.v6.as_u32;
- break;
- default:
- throw errors::RuntimeException("Unknown protocol");
- }
-
- if (prefix_len_bits > (inet_family == AF_INET6 ? IPV6_ADDR_LEN_BITS - 32
- : IPV4_ADDR_LEN_BITS - 32))
- throw errors::RuntimeException(
- "Not enough space in the content name to add key_id");
-
- mask_buf[prefix_len_u32] = 0xffffffff;
- keyId_component_buf[prefix_len_u32] = socket->key_id_;
- socket->last_segment_ = 0;
-
- socket->on_interest_process_decrypted_ =
- socket->parent_->on_interest_process_decrypted_;
-
- socket->registerPrefix(
- Prefix(socket->parent_->served_namespaces_.front().getName(
- Name(inet_family, (uint8_t *)&mask),
- Name(inet_family, (uint8_t *)&keyId_component),
- socket->parent_->served_namespaces_.front().getName()),
- out_ip->len));
- socket->connect();
- }
- return 1;
-}
-
-void TLSProducerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
- unsigned int context,
- const unsigned char *out,
- void *add_arg) {
- free(const_cast<unsigned char *>(out));
-}
-
-int TLSProducerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
- unsigned int context,
- const unsigned char *in, size_t inlen,
- X509 *x, size_t chainidx, int *al,
- void *add_arg) {
- return 1;
-}
-
-int TLSProducerSocket::setSocketOption(
- int socket_option_key, ProducerInterestCallback socket_option_value) {
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerInterestCallback socket_option_value) -> int {
- int result = SOCKET_OPTION_SET;
- switch (socket_option_key) {
- case ProducerCallbacksOptions::INTEREST_INPUT:
- on_interest_input_decrypted_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::INTEREST_DROP:
- on_interest_dropped_input_buffer_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::INTEREST_PASS:
- on_interest_inserted_input_buffer_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CACHE_HIT:
- on_interest_satisfied_output_buffer_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CACHE_MISS:
- on_interest_process_decrypted_ = socket_option_value;
- break;
-
- default:
- result = SOCKET_OPTION_NOT_SET;
- break;
- }
- return result;
- });
-}
-
-int TLSProducerSocket::setSocketOption(
- int socket_option_key, ProducerContentCallback socket_option_value) {
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerContentCallback socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::CONTENT_PRODUCED:
- on_content_produced_application_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
-}
-
-int TLSProducerSocket::getSocketOption(
- int socket_option_key, ProducerContentCallback **socket_option_value) {
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerContentCallback **socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::CONTENT_PRODUCED:
- *socket_option_value = &on_content_produced_application_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int TLSProducerSocket::getSocketOption(
- int socket_option_key, ProducerContentCallback &socket_option_value) {
- return rescheduleOnIOServiceWithReference(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerContentCallback &socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::CONTENT_PRODUCED:
- socket_option_value = on_content_produced_application_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int TLSProducerSocket::getSocketOption(
- int socket_option_key, ProducerInterestCallback &socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOServiceWithReference(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerInterestCallback &socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::INTEREST_INPUT:
- socket_option_value = on_interest_input_decrypted_;
- break;
-
- case ProducerCallbacksOptions::INTEREST_DROP:
- socket_option_value = on_interest_dropped_input_buffer_;
- break;
-
- case ProducerCallbacksOptions::INTEREST_PASS:
- socket_option_value = on_interest_inserted_input_buffer_;
- break;
-
- case ProducerCallbacksOptions::CACHE_HIT:
- socket_option_value = on_interest_satisfied_output_buffer_;
- break;
-
- case ProducerCallbacksOptions::CACHE_MISS:
- socket_option_value = on_interest_process_decrypted_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-} // namespace interface
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/tls_socket_producer.h b/libtransport/src/hicn/transport/interfaces/tls_socket_producer.h
deleted file mode 100644
index 4c09ddaa5..000000000
--- a/libtransport/src/hicn/transport/interfaces/tls_socket_producer.h
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/socket_producer.h>
-#include <hicn/transport/utils/content_store.h>
-
-#include <openssl/ssl.h>
-#include <condition_variable>
-#include <mutex>
-
-namespace transport {
-
-namespace interface {
-
-class P2PSecureProducerSocket;
-
-class TLSProducerSocket : virtual public ProducerSocket {
- friend class P2PSecureProducerSocket;
-
- public:
- explicit TLSProducerSocket(P2PSecureProducerSocket *parent,
- const Name &handshake_name);
- ~TLSProducerSocket() = default;
-
- uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size,
- bool is_last = true, uint32_t start_offset = 0) override {
- return produce(content_name, utils::MemBuf::copyBuffer(buffer, buffer_size),
- is_last, start_offset);
- }
-
- uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last = true, uint32_t start_offset = 0) override;
-
- void produce(ContentObject &content_object) override;
-
- void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size,
- bool is_last = true,
- uint32_t *start_offset = nullptr) override;
-
- void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t offset,
- uint32_t **last_segment = nullptr) override;
-
- void asyncProduce(ContentObject &content_object) override;
-
- virtual void accept();
-
- virtual int async_accept();
-
- virtual int setSocketOption(
- int socket_option_key,
- ProducerInterestCallback socket_option_value) override;
-
- virtual int setSocketOption(
- int socket_option_key,
- ProducerContentCallback socket_option_value) override;
-
- virtual int getSocketOption(
- int socket_option_key,
- ProducerContentCallback **socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- ProducerContentCallback &socket_option_value);
-
- int getSocketOption(int socket_option_key,
- ProducerInterestCallback &socket_option_value);
-
- using ProducerSocket::getSocketOption;
- using ProducerSocket::onInterest;
- using ProducerSocket::setSocketOption;
-
- protected:
- /* Callback invoked once an interest has been received and its payload
- * decrypted */
- ProducerInterestCallback on_interest_input_decrypted_;
- ProducerInterestCallback on_interest_process_decrypted_;
- ProducerContentCallback on_content_produced_application_;
-
- std::mutex mtx_;
-
- /* Condition variable for the wait */
- std::condition_variable cv_;
-
- /* Bool variable, true if there is something to read (an interest arrived) */
- bool something_to_read_;
-
- /* First interest that open a secure connection */
- transport::core::Name name_;
-
- /* SSL handle */
- SSL *ssl_;
- SSL_CTX *ctx_;
-
- Packet::MemBufPtr packet_;
-
- std::unique_ptr<utils::MemBuf> head_;
- std::uint32_t last_segment_;
- std::shared_ptr<utils::MemBuf> payload_;
- std::uint32_t key_id_;
-
- std::thread *handshake;
- P2PSecureProducerSocket *parent_;
-
- bool first_;
- Name handshake_name_;
- int tls_chunks_;
- int to_call_oncontentproduced_;
-
- bool still_writing_;
-
- utils::EventThread encryption_thread_;
-
- void onInterest(ProducerSocket &p, Interest &interest);
- void cacheMiss(ProducerSocket &p, Interest &interest);
-
- /* Return the number of read bytes in readbytes */
- static int read(BIO *b, char *buf, size_t size, size_t *readbytes);
-
- /* Return the number of read bytes in the return param */
- static int readOld(BIO *h, char *buf, int size);
-
- /* Return the number of written bytes in written */
- static int write(BIO *b, const char *buf, size_t size, size_t *written);
-
- /* Return the number of written bytes in the return param */
- static int writeOld(BIO *h, const char *buf, int num);
-
- static long ctrl(BIO *b, int cmd, long num, void *ptr);
-
- static int addHicnKeyIdCb(SSL *s, unsigned int ext_type, unsigned int context,
- const unsigned char **out, size_t *outlen, X509 *x,
- size_t chainidx, int *al, void *add_arg);
-
- static void freeHicnKeyIdCb(SSL *s, unsigned int ext_type,
- unsigned int context, const unsigned char *out,
- void *add_arg);
-
- static int parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
- unsigned int context, const unsigned char *in,
- size_t inlen, X509 *x, size_t chainidx, int *al,
- void *add_arg);
-
- void onContentProduced(ProducerSocket &p, const std::error_code &err,
- uint64_t bytes_written);
-};
-
-} // namespace interface
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/verification_policy.h b/libtransport/src/hicn/transport/interfaces/verification_policy.h
deleted file mode 100644
index cb5140ac1..000000000
--- a/libtransport/src/hicn/transport/interfaces/verification_policy.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <cstdint>
-
-namespace transport {
-namespace interface {
-
-/**
- * This policy allows the application to tell the transport what to do in case
- * the verification of a content object fails.
- */
-enum class VerificationPolicy : std::uint8_t {
- DROP_PACKET,
- ACCEPT_PACKET,
- ABORT_SESSION
-};
-} // namespace interface
-} // namespace transport \ No newline at end of file