aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/implementation
diff options
context:
space:
mode:
authorLuca Muscariello <muscariello@ieee.org>2021-04-15 09:05:46 +0200
committerMauro Sardara <msardara@cisco.com>2021-04-15 16:36:16 +0200
commite92e9e839ca2cf42b56322b2489ccc0d8bf767af (patch)
tree9f1647c83a87fbf982ae329e800af25dbfb226b5 /libtransport/src/implementation
parent3e541d7c947cc2f9db145f26c9274efd29a6fb56 (diff)
[HICN-690] Transport Library Major Refactory
The current patch provides a major refactory of the transportlibrary. A summary of the different components that underwent major modifications is reported below. - Transport protocol updates The hierarchy of classes has been optimized to have common transport services across different transport protocols. This can allow to customize a transport protocol with new features. - A new real-time communication protocol The RTC protocol has been optimized in terms of algorithms to reduce consumer-producer synchronization latency. - A novel socket API The API has been reworked to be easier to consumer but also to have a more efficient integration in L4 proxies. - Several performance improvements A large number of performance improvements have been included in particular to make the entire stack zero-copy and optimize cache miss. - New memory buffer framework Memory management has been reworked entirely to provide a more efficient infra with a richer API. Buffers are now allocated in blocks and a single buffer holds the memory for (1) the shared_ptr control block, (2) the metadata of the packet (e.g. name, pointer to other buffers if buffer is chained and relevant offsets), and (3) the packet itself, as it is sent/received over the network. - A new slab allocator Dynamic memory allocation is now managed by a novel slab allocator that is optimised for packet processing and connection management. Memory is organized in pools of blocks all of the same size which are used during the processing of outgoing/incoming packets. When a memory block Is allocated is always taken from a global pool and when it is deallocated is returned to the pool, thus avoiding the cost of any heap allocation in the data path. - New transport connectors Consumer and producer end-points can communication either using an hicn packet forwarder or with direct connector based on shared memories or sockets. The usage of transport connectors typically for unit and funcitonal testing but may have additional usage. - Support for FEC/ECC for transport services FEC/ECC via reed solomon is supported by default and made available to transport services as a modular component. Reed solomon block codes is a default FEC model that can be replaced in a modular way by many other codes including RLNC not avaiable in this distribution. The current FEC framework support variable size padding and efficiently makes use of the infra memory buffers to avoid additiona copies. - Secure transport framework for signature computation and verification Crypto support is nativelty used in hICN for integrity and authenticity. Novel support that includes RTC has been implemented and made modular and reusable acrosso different transport protocols. - TLS - Transport layer security over hicn Point to point confidentiality is provided by integrating TLS on top of hICN reliable and non-reliable transport. The integration is common and makes a different use of the TLS record. - MLS - Messaging layer security over hicn MLS integration on top of hICN is made by using the MLSPP implemetation open sourced by Cisco. We have included instrumentation tools to deploy performance and functional tests of groups of end-points. - Android support The overall code has been heavily tested in Android environments and has received heavy lifting to better run natively in recent Android OS. Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> Change-Id: If477ba2fa686e6f47bdf96307ac60938766aef69 Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/implementation')
-rw-r--r--libtransport/src/implementation/CMakeLists.txt10
-rw-r--r--libtransport/src/implementation/p2psecure_socket_consumer.cc2
-rw-r--r--libtransport/src/implementation/p2psecure_socket_consumer.h2
-rw-r--r--libtransport/src/implementation/p2psecure_socket_producer.cc149
-rw-r--r--libtransport/src/implementation/p2psecure_socket_producer.h44
-rw-r--r--libtransport/src/implementation/socket.cc26
-rw-r--r--libtransport/src/implementation/socket.h52
-rw-r--r--libtransport/src/implementation/socket_consumer.h194
-rw-r--r--libtransport/src/implementation/socket_producer.h632
-rw-r--r--libtransport/src/implementation/tls_rtc_socket_producer.cc2
-rw-r--r--libtransport/src/implementation/tls_rtc_socket_producer.h7
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.cc8
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.h3
-rw-r--r--libtransport/src/implementation/tls_socket_producer.cc174
-rw-r--r--libtransport/src/implementation/tls_socket_producer.h33
15 files changed, 341 insertions, 997 deletions
diff --git a/libtransport/src/implementation/CMakeLists.txt b/libtransport/src/implementation/CMakeLists.txt
index 5423a7697..392c99e15 100644
--- a/libtransport/src/implementation/CMakeLists.txt
+++ b/libtransport/src/implementation/CMakeLists.txt
@@ -13,13 +13,8 @@
cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
-list(APPEND SOURCE_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc
-)
-
list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/socket.h
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h
${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h
${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.h
)
@@ -27,15 +22,16 @@ list(APPEND HEADER_FILES
if (${OPENSSL_VERSION} VERSION_EQUAL "1.1.1a" OR ${OPENSSL_VERSION} VERSION_GREATER "1.1.1a")
list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc
+ # ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc
${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.cc
${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.cc
${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/socket.cc
)
list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h
+ # ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h
${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h
${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h
${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc
index 9b79850d6..8c7c175b2 100644
--- a/libtransport/src/implementation/p2psecure_socket_consumer.cc
+++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc
@@ -15,7 +15,6 @@
#include <implementation/p2psecure_socket_consumer.h>
#include <interfaces/tls_socket_consumer.h>
-
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/tls1.h>
@@ -175,7 +174,6 @@ P2PSecureConsumerSocket::P2PSecureConsumerSocket(
: ConsumerSocket(consumer, handshake_protocol),
name_(),
tls_consumer_(nullptr),
- buf_pool_(),
decrypted_content_(),
payload_(),
head_(),
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h
index d4c3b26c2..a35a50352 100644
--- a/libtransport/src/implementation/p2psecure_socket_consumer.h
+++ b/libtransport/src/implementation/p2psecure_socket_consumer.h
@@ -16,7 +16,6 @@
#pragma once
#include <hicn/transport/interfaces/socket_consumer.h>
-
#include <implementation/tls_socket_consumer.h>
#include <openssl/bio.h>
#include <openssl/ssl.h>
@@ -75,7 +74,6 @@ class P2PSecureConsumerSocket : public ConsumerSocket,
BIO_METHOD *bio_meth_;
/* Chain of MemBuf to be used as a temporary buffer to pass descypted data
* from the underlying layer to the application */
- utils::ObjectPool<utils::MemBuf> buf_pool_;
std::unique_ptr<utils::MemBuf> decrypted_content_;
/* Chain of MemBuf holding the payload to be written into interest or data */
std::unique_ptr<utils::MemBuf> payload_;
diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc
index 15c7d25cd..6dff2ba08 100644
--- a/libtransport/src/implementation/p2psecure_socket_producer.cc
+++ b/libtransport/src/implementation/p2psecure_socket_producer.cc
@@ -14,13 +14,11 @@
*/
#include <hicn/transport/core/interest.h>
-
#include <implementation/p2psecure_socket_producer.h>
-#include <implementation/tls_rtc_socket_producer.h>
+// #include <implementation/tls_rtc_socket_producer.h>
#include <implementation/tls_socket_producer.h>
#include <interfaces/tls_rtc_socket_producer.h>
#include <interfaces/tls_socket_producer.h>
-
#include <openssl/bio.h>
#include <openssl/rand.h>
#include <openssl/ssl.h>
@@ -34,7 +32,8 @@ namespace implementation {
P2PSecureProducerSocket::P2PSecureProducerSocket(
interface::ProducerSocket *producer_socket)
- : ProducerSocket(producer_socket),
+ : ProducerSocket(producer_socket,
+ ProductionProtocolAlgorithms::BYTE_STREAM),
mtx_(),
cv_(),
map_producers(),
@@ -42,8 +41,9 @@ P2PSecureProducerSocket::P2PSecureProducerSocket(
P2PSecureProducerSocket::P2PSecureProducerSocket(
interface::ProducerSocket *producer_socket, bool rtc,
- const std::shared_ptr<utils::Identity> &identity)
- : ProducerSocket(producer_socket),
+ const std::shared_ptr<auth::Identity> &identity)
+ : ProducerSocket(producer_socket,
+ ProductionProtocolAlgorithms::BYTE_STREAM),
rtc_(rtc),
mtx_(),
cv_(),
@@ -51,9 +51,9 @@ P2PSecureProducerSocket::P2PSecureProducerSocket(
list_producers() {
/* Setup SSL context (identity and parameter to use TLS 1.3) */
der_cert_ = parcKeyStore_GetDEREncodedCertificate(
- (identity->getSigner()->getKeyStore()));
+ (identity->getSigner()->getParcKeyStore()));
der_prk_ = parcKeyStore_GetDEREncodedPrivateKey(
- (identity->getSigner()->getKeyStore()));
+ (identity->getSigner()->getParcKeyStore()));
int cert_size = parcBuffer_Limit(der_cert_);
int prk_size = parcBuffer_Limit(der_prk_);
@@ -88,15 +88,20 @@ void P2PSecureProducerSocket::initSessionSocket(
producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
producer->setSocketOption(DATA_PACKET_SIZE,
(uint32_t)(this->data_packet_size_));
- producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
+ uint32_t output_buffer_size = 0;
+ this->getSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE,
+ output_buffer_size);
+ producer->setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE,
+ output_buffer_size);
if (!rtc_) {
producer->setInterface(new interface::TLSProducerSocket(producer.get()));
} else {
- TLSRTCProducerSocket *rtc_producer =
- dynamic_cast<TLSRTCProducerSocket *>(producer.get());
- rtc_producer->setInterface(
- new interface::TLSRTCProducerSocket(rtc_producer));
+ // TODO
+ // TLSRTCProducerSocket *rtc_producer =
+ // dynamic_cast<TLSRTCProducerSocket *>(producer.get());
+ // rtc_producer->setInterface(
+ // new interface::TLSRTCProducerSocket(rtc_producer));
}
}
@@ -114,8 +119,9 @@ void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p,
tls_producer =
std::make_unique<TLSProducerSocket>(nullptr, this, interest.getName());
} else {
- tls_producer = std::make_unique<TLSRTCProducerSocket>(nullptr, this,
- interest.getName());
+ // TODO
+ // tls_producer = std::make_unique<TLSRTCProducerSocket>(nullptr, this,
+ // interest.getName());
}
initSessionSocket(tls_producer);
@@ -129,15 +135,19 @@ void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p,
tls_producer_ptr->onInterest(*tls_producer_ptr, interest);
tls_producer_ptr->async_accept();
} else {
- TLSRTCProducerSocket *rtc_producer_ptr =
- dynamic_cast<TLSRTCProducerSocket *>(tls_producer_ptr);
- rtc_producer_ptr->onInterest(*rtc_producer_ptr, interest);
- rtc_producer_ptr->async_accept();
+ // TODO
+ // TLSRTCProducerSocket *rtc_producer_ptr =
+ // dynamic_cast<TLSRTCProducerSocket *>(tls_producer_ptr);
+ // rtc_producer_ptr->onInterest(*rtc_producer_ptr, interest);
+ // rtc_producer_ptr->async_accept();
}
}
-void P2PSecureProducerSocket::produce(const uint8_t *buffer,
- size_t buffer_size) {
+uint32_t P2PSecureProducerSocket::produceDatagram(
+ const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) {
+ // TODO
+ throw errors::NotImplementedException();
+
if (!rtc_) {
throw errors::RuntimeException(
"RTC must be the transport protocol to start the production of current "
@@ -148,16 +158,20 @@ void P2PSecureProducerSocket::produce(const uint8_t *buffer,
if (list_producers.empty()) cv_.wait(lck);
- for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) {
- TLSRTCProducerSocket *rtc_producer =
- dynamic_cast<TLSRTCProducerSocket *>(it->get());
- rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
- }
+ // TODO
+ // for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
+ // {
+ // TLSRTCProducerSocket *rtc_producer =
+ // dynamic_cast<TLSRTCProducerSocket *>(it->get());
+ // rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
+ // }
+
+ return 0;
}
-uint32_t P2PSecureProducerSocket::produce(
- Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last,
- uint32_t start_offset) {
+uint32_t P2PSecureProducerSocket::produceStream(
+ const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t start_offset) {
if (rtc_) {
throw errors::RuntimeException(
"RTC transport protocol is not compatible with the production of "
@@ -170,16 +184,17 @@ uint32_t P2PSecureProducerSocket::produce(
if (list_producers.empty()) cv_.wait(lck);
for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
- segments +=
- (*it)->produce(content_name, buffer->clone(), is_last, start_offset);
+ segments += (*it)->produceStream(content_name, buffer->clone(), is_last,
+ start_offset);
return segments;
}
-uint32_t P2PSecureProducerSocket::produce(Name content_name,
- const uint8_t *buffer,
- size_t buffer_size, bool is_last,
- uint32_t start_offset) {
+uint32_t P2PSecureProducerSocket::produceStream(const Name &content_name,
+ const uint8_t *buffer,
+ size_t buffer_size,
+ bool is_last,
+ uint32_t start_offset) {
if (rtc_) {
throw errors::RuntimeException(
"RTC transport protocol is not compatible with the production of "
@@ -191,29 +206,31 @@ uint32_t P2PSecureProducerSocket::produce(Name content_name,
if (list_producers.empty()) cv_.wait(lck);
for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
- segments += (*it)->produce(content_name, buffer, buffer_size, is_last,
- start_offset);
+ segments += (*it)->produceStream(content_name, buffer, buffer_size, is_last,
+ start_offset);
return segments;
}
-void P2PSecureProducerSocket::asyncProduce(const Name &content_name,
- const uint8_t *buf,
- size_t buffer_size, bool is_last,
- uint32_t *start_offset) {
- if (rtc_) {
- throw errors::RuntimeException(
- "RTC transport protocol is not compatible with the production of "
- "current data. Aborting.");
- }
-
- std::unique_lock<std::mutex> lck(mtx_);
- if (list_producers.empty()) cv_.wait(lck);
-
- for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) {
- (*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset);
- }
-}
+// void P2PSecureProducerSocket::asyncProduce(const Name &content_name,
+// const uint8_t *buf,
+// size_t buffer_size, bool is_last,
+// uint32_t *start_offset) {
+// if (rtc_) {
+// throw errors::RuntimeException(
+// "RTC transport protocol is not compatible with the production of "
+// "current data. Aborting.");
+// }
+
+// std::unique_lock<std::mutex> lck(mtx_);
+// if (list_producers.empty()) cv_.wait(lck);
+
+// for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
+// {
+// (*it)->asyncProduce(content_name, buf, buffer_size, is_last,
+// start_offset);
+// }
+// }
void P2PSecureProducerSocket::asyncProduce(
Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last,
@@ -269,7 +286,7 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Signer> &socket_option_value) {
+ const std::shared_ptr<auth::Signer> &socket_option_value) {
if (!list_producers.empty())
for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
@@ -323,16 +340,6 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
}
int P2PSecureProducerSocket::setSocketOption(
- int socket_option_key, std::list<Prefix> socket_option_value) {
- if (!list_producers.empty())
- for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
-
- return ProducerSocket::setSocketOption(socket_option_key,
- socket_option_value);
-}
-
-int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, ProducerContentObjectCallback socket_option_value) {
if (!list_producers.empty())
for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
@@ -361,17 +368,7 @@ int P2PSecureProducerSocket::setSocketOption(
}
int P2PSecureProducerSocket::setSocketOption(
- int socket_option_key, utils::CryptoHashType socket_option_value) {
- if (!list_producers.empty())
- for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
- (*it)->setSocketOption(socket_option_key, socket_option_value);
-
- return ProducerSocket::setSocketOption(socket_option_key,
- socket_option_value);
-}
-
-int P2PSecureProducerSocket::setSocketOption(
- int socket_option_key, utils::CryptoSuite socket_option_value) {
+ int socket_option_key, auth::CryptoHashType socket_option_value) {
if (!list_producers.empty())
for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h
index bfc9fc2c1..b7c3d1958 100644
--- a/libtransport/src/implementation/p2psecure_socket_producer.h
+++ b/libtransport/src/implementation/p2psecure_socket_producer.h
@@ -15,15 +15,14 @@
#pragma once
-#include <hicn/transport/security/identity.h>
-#include <hicn/transport/security/signer.h>
-
+#include <hicn/transport/auth/identity.h>
+#include <hicn/transport/auth/signer.h>
#include <implementation/socket_producer.h>
-#include <implementation/tls_rtc_socket_producer.h>
+// #include <implementation/tls_rtc_socket_producer.h>
#include <implementation/tls_socket_producer.h>
+#include <openssl/ssl.h>
#include <utils/content_store.h>
-#include <openssl/ssl.h>
#include <condition_variable>
#include <forward_list>
#include <mutex>
@@ -33,39 +32,40 @@ namespace implementation {
class P2PSecureProducerSocket : public ProducerSocket {
friend class TLSProducerSocket;
- friend class TLSRTCProducerSocket;
+ // TODO
+ // friend class TLSRTCProducerSocket;
public:
explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket);
explicit P2PSecureProducerSocket(
interface::ProducerSocket *producer_socket, bool rtc,
- const std::shared_ptr<utils::Identity> &identity);
+ const std::shared_ptr<auth::Identity> &identity);
~P2PSecureProducerSocket();
- void produce(const uint8_t *buffer, size_t buffer_size) override;
+ uint32_t produceDatagram(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer) override;
- uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size,
- bool is_last = true, uint32_t start_offset = 0) override;
+ uint32_t produceStream(const Name &content_name, const uint8_t *buffer,
+ size_t buffer_size, bool is_last = true,
+ uint32_t start_offset = 0) override;
- uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last = true, uint32_t start_offset = 0) override;
+ uint32_t produceStream(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true,
+ uint32_t start_offset = 0) override;
void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
bool is_last, uint32_t offset,
uint32_t **last_segment = nullptr) override;
- void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size,
- bool is_last = true,
- uint32_t *start_offset = nullptr) override;
-
int setSocketOption(int socket_option_key,
ProducerInterestCallback socket_option_value) override;
int setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Signer> &socket_option_value) override;
+ const std::shared_ptr<auth::Signer> &socket_option_value) override;
int setSocketOption(int socket_option_key,
uint32_t socket_option_value) override;
@@ -75,9 +75,6 @@ class P2PSecureProducerSocket : public ProducerSocket {
int setSocketOption(int socket_option_key,
Name *socket_option_value) override;
- int setSocketOption(int socket_option_key,
- std::list<Prefix> socket_option_value) override;
-
int setSocketOption(
int socket_option_key,
ProducerContentObjectCallback socket_option_value) override;
@@ -86,16 +83,13 @@ class P2PSecureProducerSocket : public ProducerSocket {
ProducerContentCallback socket_option_value) override;
int setSocketOption(int socket_option_key,
- utils::CryptoHashType socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- utils::CryptoSuite socket_option_value) override;
+ auth::CryptoHashType socket_option_value) override;
int setSocketOption(int socket_option_key,
const std::string &socket_option_value) override;
using ProducerSocket::getSocketOption;
- using ProducerSocket::onInterest;
+ // using ProducerSocket::onInterest;
protected:
/* Callback invoked once an interest has been received and its payload
diff --git a/libtransport/src/implementation/socket.cc b/libtransport/src/implementation/socket.cc
new file mode 100644
index 000000000..2e21f2bc3
--- /dev/null
+++ b/libtransport/src/implementation/socket.cc
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/global_configuration.h>
+#include <implementation/socket.h>
+
+namespace transport {
+namespace implementation {
+
+Socket::Socket(std::shared_ptr<core::Portal> &&portal)
+ : portal_(std::move(portal)), is_async_(false) {}
+
+} // namespace implementation
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/implementation/socket.h b/libtransport/src/implementation/socket.h
index 2e51f3027..cf22c03e1 100644
--- a/libtransport/src/implementation/socket.h
+++ b/libtransport/src/implementation/socket.h
@@ -15,13 +15,12 @@
#pragma once
+#include <core/facade.h>
#include <hicn/transport/config.h>
#include <hicn/transport/interfaces/callbacks.h>
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/interfaces/socket_options_keys.h>
-#include <core/facade.h>
-
#define SOCKET_OPTION_GET 0
#define SOCKET_OPTION_NOT_GET 1
#define SOCKET_OPTION_SET 2
@@ -32,56 +31,23 @@ namespace transport {
namespace implementation {
// Forward Declarations
-template <typename PortalType>
class Socket;
-// Define the portal and its connector, depending on the compilation options
-// passed by the build tool.
-using HicnForwarderPortal = core::HicnForwarderPortal;
-
-#ifdef __linux__
-#ifndef __ANDROID__
-using RawSocketPortal = core::RawSocketPortal;
-#endif
-#endif
-
-#ifdef __vpp__
-using VPPForwarderPortal = core::VPPForwarderPortal;
-using BaseSocket = Socket<VPPForwarderPortal>;
-using BasePortal = VPPForwarderPortal;
-#else
-using BaseSocket = Socket<HicnForwarderPortal>;
-using BasePortal = HicnForwarderPortal;
-#endif
-
-template <typename PortalType>
class Socket {
- static_assert(std::is_same<PortalType, HicnForwarderPortal>::value
-#ifdef __linux__
-#ifndef __ANDROID__
- || std::is_same<PortalType, RawSocketPortal>::value
-#ifdef __vpp__
- || std::is_same<PortalType, VPPForwarderPortal>::value
-#endif
-#endif
- ,
-#else
- ,
-
-#endif
- "This class is not allowed as Portal");
-
public:
- using Portal = PortalType;
-
- virtual asio::io_service &getIoService() = 0;
-
virtual void connect() = 0;
-
virtual bool isRunning() = 0;
+ virtual asio::io_service &getIoService() { return portal_->getIoService(); }
+
protected:
+ Socket(std::shared_ptr<core::Portal> &&portal);
+
virtual ~Socket(){};
+
+ protected:
+ std::shared_ptr<core::Portal> portal_;
+ bool is_async_;
};
} // namespace implementation
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
index 87965923e..a7b6ac4e7 100644
--- a/libtransport/src/implementation/socket_consumer.h
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -13,15 +13,17 @@
* limitations under the License.
*/
+#pragma once
+
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/interfaces/statistics.h>
-#include <hicn/transport/security/verifier.h>
+#include <hicn/transport/auth/verifier.h>
#include <hicn/transport/utils/event_thread.h>
#include <protocols/cbr.h>
-#include <protocols/protocol.h>
#include <protocols/raaqm.h>
-#include <protocols/rtc.h>
+#include <protocols/rtc/rtc.h>
+#include <protocols/transport_protocol.h>
namespace transport {
namespace implementation {
@@ -30,12 +32,12 @@ using namespace core;
using namespace interface;
using ReadCallback = interface::ConsumerSocket::ReadCallback;
-class ConsumerSocket : public Socket<BasePortal> {
+class ConsumerSocket : public Socket {
private:
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol,
- std::shared_ptr<Portal> &&portal)
- : consumer_interface_(consumer),
- portal_(portal),
+ std::shared_ptr<core::Portal> &&portal)
+ : Socket(std::move(portal)),
+ consumer_interface_(consumer),
async_downloader_(),
interest_lifetime_(default_values::interest_lifetime),
min_window_size_(default_values::min_window_size),
@@ -54,16 +56,13 @@ class ConsumerSocket : public Socket<BasePortal> {
rate_estimation_observer_(nullptr),
rate_estimation_batching_parameter_(default_values::batch),
rate_estimation_choice_(0),
- is_async_(false),
- verifier_(std::make_shared<utils::Verifier>()),
+ verifier_(std::make_shared<auth::VoidVerifier>()),
verify_signature_(false),
- key_content_(false),
reset_window_(false),
on_interest_output_(VOID_HANDLER),
on_interest_timeout_(VOID_HANDLER),
on_interest_satisfied_(VOID_HANDLER),
on_content_object_input_(VOID_HANDLER),
- on_content_object_verification_(VOID_HANDLER),
stats_summary_(VOID_HANDLER),
read_callback_(nullptr),
timer_interval_milliseconds_(0),
@@ -75,7 +74,7 @@ class ConsumerSocket : public Socket<BasePortal> {
break;
case TransportProtocolAlgorithms::RTC:
transport_protocol_ =
- std::make_unique<protocol::RTCTransportProtocol>(this);
+ std::make_unique<protocol::rtc::RTCTransportProtocol>(this);
break;
case TransportProtocolAlgorithms::RAAQM:
default:
@@ -87,12 +86,12 @@ class ConsumerSocket : public Socket<BasePortal> {
public:
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
- : ConsumerSocket(consumer, protocol, std::make_shared<Portal>()) {}
+ : ConsumerSocket(consumer, protocol, std::make_shared<core::Portal>()) {}
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol,
asio::io_service &io_service)
: ConsumerSocket(consumer, protocol,
- std::make_shared<Portal>(io_service)) {
+ std::make_shared<core::Portal>(io_service)) {
is_async_ = true;
}
@@ -138,8 +137,6 @@ class ConsumerSocket : public Socket<BasePortal> {
return CONSUMER_RUNNING;
}
- bool verifyKeyPackets() { return transport_protocol_->verifyKeyPackets(); }
-
void stop() {
if (transport_protocol_->isRunning()) {
transport_protocol_->stop();
@@ -152,8 +149,6 @@ class ConsumerSocket : public Socket<BasePortal> {
}
}
- asio::io_service &getIoService() { return portal_->getIoService(); }
-
virtual int setSocketOption(int socket_option_key,
ReadCallback *socket_option_value) {
// Reschedule the function on the io_service to avoid race condition in
@@ -316,12 +311,6 @@ class ConsumerSocket : public Socket<BasePortal> {
break;
}
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_verification_ = VOID_HANDLER;
- break;
- }
-
default:
return SOCKET_OPTION_NOT_SET;
}
@@ -334,16 +323,6 @@ class ConsumerSocket : public Socket<BasePortal> {
int result = SOCKET_OPTION_NOT_SET;
if (!transport_protocol_->isRunning()) {
switch (socket_option_key) {
- case GeneralTransportOptions::VERIFY_SIGNATURE:
- verify_signature_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
-
- case GeneralTransportOptions::KEY_CONTENT:
- key_content_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
-
case RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET:
reset_window_ = socket_option_value;
result = SOCKET_OPTION_SET;
@@ -377,29 +356,6 @@ class ConsumerSocket : public Socket<BasePortal> {
});
}
- int setSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationCallback socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in
- // case setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerContentObjectVerificationCallback socket_option_value)
- -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- on_content_object_verification_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
- }
-
int setSocketOption(int socket_option_key,
ConsumerInterestCallback socket_option_value) {
// Reschedule the function on the io_service to avoid race condition in
@@ -433,51 +389,6 @@ class ConsumerSocket : public Socket<BasePortal> {
});
}
- int setSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback socket_option_value) {
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback socket_option_value)
- -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::VERIFICATION_FAILED:
- verification_failed_callback_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
- }
-
- // int setSocketOption(
- // int socket_option_key,
- // ConsumerContentObjectVerificationFailedCallback socket_option_value) {
- // return rescheduleOnIOService(
- // socket_option_key, socket_option_value,
- // [this](
- // int socket_option_key,
- // ConsumerContentObjectVerificationFailedCallback
- // socket_option_value)
- // -> int {
- // switch (socket_option_key) {
- // case ConsumerCallbacksOptions::VERIFICATION_FAILED:
- // verification_failed_callback_ = socket_option_value;
- // break;
-
- // default:
- // return SOCKET_OPTION_NOT_SET;
- // }
-
- // return SOCKET_OPTION_SET;
- // });
- // }
-
int setSocketOption(int socket_option_key, IcnObserver *socket_option_value) {
utils::SpinLock::Acquire locked(guard_raaqm_params_);
switch (socket_option_key) {
@@ -494,7 +405,7 @@ class ConsumerSocket : public Socket<BasePortal> {
int setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Verifier> &socket_option_value) {
+ const std::shared_ptr<auth::Verifier> &socket_option_value) {
int result = SOCKET_OPTION_NOT_SET;
if (!transport_protocol_->isRunning()) {
switch (socket_option_key) {
@@ -516,14 +427,6 @@ class ConsumerSocket : public Socket<BasePortal> {
int result = SOCKET_OPTION_NOT_SET;
if (!transport_protocol_->isRunning()) {
switch (socket_option_key) {
- case GeneralTransportOptions::CERTIFICATE:
- key_id_ = verifier_->addKeyFromCertificate(socket_option_value);
-
- if (key_id_ != nullptr) {
- result = SOCKET_OPTION_SET;
- }
- break;
-
case DataLinkOptions::OUTPUT_INTERFACE:
output_interface_ = socket_option_value;
portal_->setOutputInterface(output_interface_);
@@ -642,14 +545,6 @@ class ConsumerSocket : public Socket<BasePortal> {
socket_option_value = transport_protocol_->isRunning();
break;
- case GeneralTransportOptions::VERIFY_SIGNATURE:
- socket_option_value = verify_signature_;
- break;
-
- case GeneralTransportOptions::KEY_CONTENT:
- socket_option_value = key_content_;
- break;
-
case GeneralTransportOptions::ASYNC_MODE:
socket_option_value = is_async_;
break;
@@ -699,29 +594,6 @@ class ConsumerSocket : public Socket<BasePortal> {
});
}
- int getSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in
- // case setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerContentObjectVerificationCallback **socket_option_value)
- -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- *socket_option_value = &on_content_object_verification_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
- }
-
int getSocketOption(int socket_option_key,
ConsumerInterestCallback **socket_option_value) {
// Reschedule the function on the io_service to avoid race condition in
@@ -755,30 +627,8 @@ class ConsumerSocket : public Socket<BasePortal> {
});
}
- int getSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in
- // case setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback *
- *socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::VERIFICATION_FAILED:
- *socket_option_value = &verification_failed_callback_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
- }
-
int getSocketOption(int socket_option_key,
- std::shared_ptr<Portal> &socket_option_value) {
+ std::shared_ptr<core::Portal> &socket_option_value) {
switch (socket_option_key) {
case PORTAL:
socket_option_value = portal_;
@@ -807,7 +657,7 @@ class ConsumerSocket : public Socket<BasePortal> {
}
int getSocketOption(int socket_option_key,
- std::shared_ptr<utils::Verifier> &socket_option_value) {
+ std::shared_ptr<auth::Verifier> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::VERIFIER:
socket_option_value = verifier_;
@@ -871,7 +721,7 @@ class ConsumerSocket : public Socket<BasePortal> {
// To enforce type check
std::function<int(int, arg2)> func = lambda_func;
int result = SOCKET_OPTION_SET;
- if (transport_protocol_->isRunning()) {
+ if (transport_protocol_ && transport_protocol_->isRunning()) {
std::mutex mtx;
/* Condition variable for the wait */
std::condition_variable cv;
@@ -898,7 +748,6 @@ class ConsumerSocket : public Socket<BasePortal> {
protected:
interface::ConsumerSocket *consumer_interface_;
- std::shared_ptr<Portal> portal_;
utils::EventThread async_downloader_;
// No need to protect from multiple accesses in the async consumer
@@ -926,13 +775,10 @@ class ConsumerSocket : public Socket<BasePortal> {
int rate_estimation_batching_parameter_;
int rate_estimation_choice_;
- bool is_async_;
-
// Verification parameters
- std::shared_ptr<utils::Verifier> verifier_;
+ std::shared_ptr<auth::Verifier> verifier_;
PARCKeyId *key_id_;
std::atomic_bool verify_signature_;
- bool key_content_;
bool reset_window_;
ConsumerInterestCallback on_interest_retransmission_;
@@ -940,9 +786,7 @@ class ConsumerSocket : public Socket<BasePortal> {
ConsumerInterestCallback on_interest_timeout_;
ConsumerInterestCallback on_interest_satisfied_;
ConsumerContentObjectCallback on_content_object_input_;
- ConsumerContentObjectVerificationCallback on_content_object_verification_;
ConsumerTimerCallback stats_summary_;
- ConsumerContentObjectVerificationFailedCallback verification_failed_callback_;
ReadCallback *read_callback_;
@@ -959,4 +803,4 @@ class ConsumerSocket : public Socket<BasePortal> {
};
} // namespace implementation
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index a6f0f969e..af69cd818 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -15,9 +15,11 @@
#pragma once
-#include <hicn/transport/security/signer.h>
+#include <hicn/transport/auth/signer.h>
#include <hicn/transport/utils/event_thread.h>
#include <implementation/socket.h>
+#include <protocols/prod_protocol_bytestream.h>
+#include <protocols/prod_protocol_rtc.h>
#include <utils/content_store.h>
#include <utils/suffix_strategy.h>
@@ -39,21 +41,17 @@ namespace implementation {
using namespace core;
using namespace interface;
-class ProducerSocket : public Socket<BasePortal>,
- public BasePortal::ProducerCallback {
- static constexpr uint32_t burst_size = 256;
-
- public:
- explicit ProducerSocket(interface::ProducerSocket *producer_socket)
- : producer_interface_(producer_socket),
- portal_(std::make_shared<Portal>(io_service_)),
+class ProducerSocket : public Socket {
+ private:
+ ProducerSocket(interface::ProducerSocket *producer_socket, int protocol,
+ std::shared_ptr<core::Portal> &&portal)
+ : Socket(std::move(portal)),
+ producer_interface_(producer_socket),
data_packet_size_(default_values::content_object_packet_size),
content_object_expiry_time_(default_values::content_object_expiry_time),
- output_buffer_(default_values::producer_socket_output_buffer_size),
async_thread_(),
- registration_status_(REGISTRATION_NOT_ATTEMPTED),
making_manifest_(false),
- hash_algorithm_(utils::CryptoHashType::SHA_256),
+ hash_algorithm_(auth::CryptoHashType::SHA_256),
suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
@@ -65,15 +63,33 @@ class ProducerSocket : public Socket<BasePortal>,
on_content_object_in_output_buffer_(VOID_HANDLER),
on_content_object_output_(VOID_HANDLER),
on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
- on_content_produced_(VOID_HANDLER) {}
-
- virtual ~ProducerSocket() {
- stop();
- if (listening_thread_.joinable()) {
- listening_thread_.join();
+ on_content_produced_(VOID_HANDLER) {
+ switch (protocol) {
+ case ProductionProtocolAlgorithms::RTC_PROD:
+ production_protocol_ =
+ std::make_unique<protocol::RTCProductionProtocol>(this);
+ break;
+ case ProductionProtocolAlgorithms::BYTE_STREAM:
+ default:
+ production_protocol_ =
+ std::make_unique<protocol::ByteStreamProductionProtocol>(this);
+ break;
}
}
+ public:
+ ProducerSocket(interface::ProducerSocket *producer, int protocol)
+ : ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {}
+
+ ProducerSocket(interface::ProducerSocket *producer, int protocol,
+ asio::io_service &io_service)
+ : ProducerSocket(producer, protocol,
+ std::make_shared<core::Portal>(io_service)) {
+ is_async_ = true;
+ }
+
+ virtual ~ProducerSocket() {}
+
interface::ProducerSocket *getInterface() {
return producer_interface_;
}
@@ -84,296 +100,10 @@ class ProducerSocket : public Socket<BasePortal>,
void connect() override {
portal_->connect(false);
- listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this));
+ production_protocol_->start();
}
- bool isRunning() override { return !io_service_.stopped(); };
-
- virtual uint32_t produce(Name content_name, const uint8_t *buffer,
- size_t buffer_size, bool is_last = true,
- uint32_t start_offset = 0) {
- return ProducerSocket::produce(
- content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), is_last,
- start_offset);
- }
-
- virtual uint32_t produce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last = true, uint32_t start_offset = 0) {
- if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) {
- return 0;
- }
-
- // Copy the atomic variables to ensure they keep the same value
- // during the production
- std::size_t data_packet_size = data_packet_size_;
- uint32_t content_object_expiry_time = content_object_expiry_time_;
- utils::CryptoHashType hash_algo = hash_algorithm_;
- bool making_manifest = making_manifest_;
- auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy(
- suffix_strategy_, start_offset);
- std::shared_ptr<utils::Signer> signer;
- getSocketOption(GeneralTransportOptions::SIGNER, signer);
-
- auto buffer_size = buffer->length();
- int bytes_segmented = 0;
- std::size_t header_size;
- std::size_t manifest_header_size = 0;
- std::size_t signature_length = 0;
- std::uint32_t final_block_number = start_offset;
- uint64_t free_space_for_content = 0;
-
- core::Packet::Format format;
- std::shared_ptr<ContentObjectManifest> manifest;
- bool is_last_manifest = false;
-
- // TODO Manifest may still be used for indexing
- if (making_manifest && !signer) {
- TRANSPORT_LOGE("Making manifests without setting producer identity.");
- }
-
- core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
- core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC;
- if (content_name.getType() == HNT_CONTIGUOUS_V4 ||
- content_name.getType() == HNT_IOV_V4) {
- hf_format = core::Packet::Format::HF_INET_TCP;
- hf_format_ah = core::Packet::Format::HF_INET_TCP_AH;
- } else if (content_name.getType() == HNT_CONTIGUOUS_V6 ||
- content_name.getType() == HNT_IOV_V6) {
- hf_format = core::Packet::Format::HF_INET6_TCP;
- hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH;
- } else {
- throw errors::RuntimeException("Unknown name format.");
- }
-
- format = hf_format;
- if (making_manifest) {
- manifest_header_size = core::Packet::getHeaderSizeFromFormat(
- signer ? hf_format_ah : hf_format,
- signer ? signer->getSignatureLength() : 0);
- } else if (signer) {
- format = hf_format_ah;
- signature_length = signer->getSignatureLength();
- }
-
- header_size =
- core::Packet::getHeaderSizeFromFormat(format, signature_length);
- free_space_for_content = data_packet_size - header_size;
- uint32_t number_of_segments = uint32_t(
- std::ceil(double(buffer_size) / double(free_space_for_content)));
- if (free_space_for_content * number_of_segments < buffer_size) {
- number_of_segments++;
- }
-
- // TODO allocate space for all the headers
- if (making_manifest) {
- uint32_t segment_in_manifest = static_cast<uint32_t>(
- std::floor(double(data_packet_size - manifest_header_size -
- ContentObjectManifest::getManifestHeaderSize()) /
- ContentObjectManifest::getManifestEntrySize()) -
- 1.0);
- uint32_t number_of_manifests = static_cast<uint32_t>(
- std::ceil(float(number_of_segments) / segment_in_manifest));
- final_block_number += number_of_segments + number_of_manifests - 1;
-
- manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
- core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
- hash_algo, is_last_manifest, content_name, suffix_strategy_,
- signer ? signer->getSignatureLength() : 0));
- manifest->setLifetime(content_object_expiry_time);
-
- if (is_last) {
- manifest->setFinalBlockNumber(final_block_number);
- } else {
- manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX);
- }
- }
-
- for (unsigned int packaged_segments = 0;
- packaged_segments < number_of_segments; packaged_segments++) {
- if (making_manifest) {
- if (manifest->estimateManifestSize(2) >
- data_packet_size - manifest_header_size) {
- // Send the current manifest
- manifest->encode();
-
- // If identity set, sign manifest
- if (signer) {
- signer->sign(*manifest);
- }
-
- passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s",
- manifest->getName().toString().c_str());
-
- // Send content objects stored in the queue
- while (!content_queue_.empty()) {
- passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD(
- "Send content %s",
- content_queue_.front()->getName().toString().c_str());
- content_queue_.pop();
- }
-
- // Create new manifest. The reference to the last manifest has been
- // acquired in the passContentObjectToCallbacks function, so we can
- // safely release this reference
- manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
- core::ManifestVersion::VERSION_1,
- core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
- content_name, suffix_strategy_,
- signer ? signer->getSignatureLength() : 0));
-
- manifest->setLifetime(content_object_expiry_time);
- manifest->setFinalBlockNumber(
- is_last ? final_block_number
- : utils::SuffixStrategy::INVALID_SUFFIX);
- }
- }
-
- auto content_suffix = suffix_strategy->getNextContentSuffix();
- auto content_object = std::make_shared<ContentObject>(
- content_name.setSuffix(content_suffix), format);
- content_object->setLifetime(content_object_expiry_time);
-
- auto b = buffer->cloneOne();
- b->trimStart(free_space_for_content * packaged_segments);
- b->trimEnd(b->length());
-
- if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) {
- b->append(buffer_size - bytes_segmented);
- bytes_segmented += (int)(buffer_size - bytes_segmented);
-
- if (is_last && making_manifest) {
- is_last_manifest = true;
- } else if (is_last) {
- content_object->setRst();
- }
-
- } else {
- b->append(free_space_for_content);
- bytes_segmented += (int)(free_space_for_content);
- }
-
- content_object->appendPayload(std::move(b));
-
- if (making_manifest) {
- using namespace std::chrono_literals;
- utils::CryptoHash hash = content_object->computeDigest(hash_algo);
- manifest->addSuffixHash(content_suffix, hash);
- content_queue_.push(content_object);
- } else {
- if (signer) {
- signer->sign(*content_object);
- }
- passContentObjectToCallbacks(content_object);
- TRANSPORT_LOGD("Send content %s",
- content_object->getName().toString().c_str());
- }
- }
-
- if (making_manifest) {
- if (is_last_manifest) {
- manifest->setFinalManifest(is_last_manifest);
- }
-
- manifest->encode();
- if (signer) {
- signer->sign(*manifest);
- }
-
- passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s",
- manifest->getName().toString().c_str());
-
- while (!content_queue_.empty()) {
- passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %s",
- content_queue_.front()->getName().toString().c_str());
- content_queue_.pop();
- }
- }
-
- io_service_.post([this]() {
- std::shared_ptr<ContentObject> co;
- while (object_queue_for_callbacks_.pop(co)) {
- if (on_new_segment_) {
- on_new_segment_(*producer_interface_, *co);
- }
-
- if (on_content_object_to_sign_) {
- on_content_object_to_sign_(*producer_interface_, *co);
- }
-
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_, *co);
- }
-
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *co);
- }
- }
- });
-
- io_service_.dispatch([this, buffer_size]() {
- if (on_content_produced_) {
- on_content_produced_(*producer_interface_,
- std::make_error_code(std::errc(0)), buffer_size);
- }
- });
-
- return suffix_strategy->getTotalCount();
- }
-
- virtual void produce(ContentObject &content_object) {
- io_service_.dispatch([this, &content_object]() {
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_,
- content_object);
- }
- });
-
- output_buffer_.insert(std::static_pointer_cast<ContentObject>(
- content_object.shared_from_this()));
-
- io_service_.dispatch([this, &content_object]() {
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, content_object);
- }
- });
-
- portal_->sendContentObject(content_object);
- }
-
- virtual void produce(const uint8_t *buffer, size_t buffer_size) {
- produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
- }
-
- virtual void produce(std::unique_ptr<utils::MemBuf> &&buffer) {
- // This API is meant to be used just with the RTC producer.
- // Here it cannot be used since no name for the content is specified.
- throw errors::NotImplementedException();
- }
-
- virtual void asyncProduce(const Name &suffix, const uint8_t *buf,
- size_t buffer_size, bool is_last = true,
- uint32_t *start_offset = nullptr) {
- if (!async_thread_.stopped()) {
- async_thread_.add([this, suffix, buffer = buf, size = buffer_size,
- is_last, start_offset]() {
- if (start_offset != nullptr) {
- *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last,
- *start_offset);
- } else {
- ProducerSocket::produce(suffix, buffer, size, is_last, 0);
- }
- });
- }
- }
-
- void asyncProduce(const Name &suffix);
+ bool isRunning() override { return !production_protocol_->isRunning(); };
virtual void asyncProduce(Name content_name,
std::unique_ptr<utils::MemBuf> &&buffer,
@@ -381,75 +111,56 @@ class ProducerSocket : public Socket<BasePortal>,
uint32_t **last_segment = nullptr) {
if (!async_thread_.stopped()) {
auto a = buffer.release();
- async_thread_.add(
- [this, content_name, a, is_last, offset, last_segment]() {
- auto buf = std::unique_ptr<utils::MemBuf>(a);
- if (last_segment != NULL) {
- **last_segment =
- offset + ProducerSocket::produce(content_name, std::move(buf),
- is_last, offset);
- } else {
- ProducerSocket::produce(content_name, std::move(buf), is_last,
- offset);
- }
- });
- }
- }
-
- virtual void asyncProduce(ContentObject &content_object) {
- if (!async_thread_.stopped()) {
- auto co_ptr = std::static_pointer_cast<ContentObject>(
- content_object.shared_from_this());
- async_thread_.add([this, content_object = std::move(co_ptr)]() {
- ProducerSocket::produce(*content_object);
+ async_thread_.add([this, content_name, a, is_last, offset,
+ last_segment]() {
+ auto buf = std::unique_ptr<utils::MemBuf>(a);
+ if (last_segment != NULL) {
+ **last_segment = offset + produceStream(content_name, std::move(buf),
+ is_last, offset);
+ } else {
+ produceStream(content_name, std::move(buf), is_last, offset);
+ }
});
}
}
- virtual void registerPrefix(const Prefix &producer_namespace) {
- served_namespaces_.push_back(producer_namespace);
+ virtual uint32_t produceStream(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true,
+ uint32_t start_offset = 0) {
+ return production_protocol_->produceStream(content_name, std::move(buffer),
+ is_last, start_offset);
}
- void serveForever() {
- if (listening_thread_.joinable()) {
- listening_thread_.join();
- }
+ virtual uint32_t produceStream(const Name &content_name,
+ const uint8_t *buffer, size_t buffer_size,
+ bool is_last = true,
+ uint32_t start_offset = 0) {
+ return production_protocol_->produceStream(
+ content_name, buffer, buffer_size, is_last, start_offset);
}
- void stop() { portal_->stopEventsLoop(); }
-
- asio::io_service &getIoService() override { return portal_->getIoService(); };
-
- virtual void onInterest(Interest &interest) {
- if (on_interest_input_) {
- on_interest_input_(*producer_interface_, interest);
- }
-
- const std::shared_ptr<ContentObject> content_object =
- output_buffer_.find(interest);
-
- if (content_object) {
- if (on_interest_satisfied_output_buffer_) {
- on_interest_satisfied_output_buffer_(*producer_interface_, interest);
- }
+ virtual uint32_t produceDatagram(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer) {
+ return production_protocol_->produceDatagram(content_name,
+ std::move(buffer));
+ }
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *content_object);
- }
+ virtual uint32_t produceDatagram(const Name &content_name,
+ const uint8_t *buffer, size_t buffer_size) {
+ return production_protocol_->produceDatagram(content_name, buffer,
+ buffer_size);
+ }
- portal_->sendContentObject(*content_object);
- } else {
- if (on_interest_process_) {
- on_interest_process_(*producer_interface_, interest);
- }
- }
+ void produce(ContentObject &content_object) {
+ production_protocol_->produce(content_object);
}
- virtual void onInterest(Interest::Ptr &&interest) override {
- onInterest(*interest);
- };
+ void registerPrefix(const Prefix &producer_namespace) {
+ production_protocol_->registerNamespaceWithNetwork(producer_namespace);
+ }
- virtual void onError(std::error_code ec) override {}
+ void stop() { production_protocol_->stop(); }
virtual int setSocketOption(int socket_option_key,
uint32_t socket_option_value) {
@@ -462,7 +173,7 @@ class ProducerSocket : public Socket<BasePortal>,
break;
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- output_buffer_.setLimit(socket_option_value);
+ production_protocol_->setOutputBufferSize(socket_option_value);
break;
case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
@@ -533,6 +244,12 @@ class ProducerSocket : public Socket<BasePortal>,
break;
}
+ case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_to_sign_ = VOID_HANDLER;
+ break;
+ }
+
default:
return SOCKET_OPTION_NOT_SET;
}
@@ -559,19 +276,6 @@ class ProducerSocket : public Socket<BasePortal>,
return SOCKET_OPTION_NOT_SET;
}
- virtual int setSocketOption(int socket_option_key,
- std::list<Prefix> socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- served_namespaces_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
virtual int setSocketOption(
int socket_option_key,
interface::ProducerContentObjectCallback socket_option_value) {
@@ -594,6 +298,10 @@ class ProducerSocket : public Socket<BasePortal>,
on_content_object_output_ = socket_option_value;
break;
+ case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
+ on_content_object_to_sign_ = socket_option_value;
+ break;
+
default:
return SOCKET_OPTION_NOT_SET;
}
@@ -663,7 +371,7 @@ class ProducerSocket : public Socket<BasePortal>,
}
virtual int setSocketOption(int socket_option_key,
- utils::CryptoHashType socket_option_value) {
+ auth::CryptoHashType socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::HASH_ALGORITHM:
hash_algorithm_ = socket_option_value;
@@ -675,11 +383,12 @@ class ProducerSocket : public Socket<BasePortal>,
return SOCKET_OPTION_SET;
}
- virtual int setSocketOption(int socket_option_key,
- utils::CryptoSuite socket_option_value) {
+ virtual int setSocketOption(
+ int socket_option_key,
+ core::NextSegmentCalculationStrategy socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::CRYPTO_SUITE:
- crypto_suite_ = socket_option_value;
+ case GeneralTransportOptions::SUFFIX_STRATEGY:
+ suffix_strategy_ = socket_option_value;
break;
default:
return SOCKET_OPTION_NOT_SET;
@@ -690,7 +399,7 @@ class ProducerSocket : public Socket<BasePortal>,
virtual int setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Signer> &socket_option_value) {
+ const std::shared_ptr<auth::Signer> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::SIGNER: {
utils::SpinLock::Acquire locked(signer_lock_);
@@ -708,7 +417,7 @@ class ProducerSocket : public Socket<BasePortal>,
uint32_t &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)output_buffer_.getLimit();
+ socket_option_value = production_protocol_->getOutputBufferSize();
break;
case GeneralTransportOptions::DATA_PACKET_SIZE:
@@ -733,18 +442,8 @@ class ProducerSocket : public Socket<BasePortal>,
socket_option_value = making_manifest_;
break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- virtual int getSocketOption(int socket_option_key,
- std::list<Prefix> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- socket_option_value = served_namespaces_;
+ case GeneralTransportOptions::ASYNC_MODE:
+ socket_option_value = is_async_;
break;
default:
@@ -776,6 +475,10 @@ class ProducerSocket : public Socket<BasePortal>,
*socket_option_value = &on_content_object_output_;
break;
+ case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
+ *socket_option_value = &on_content_object_to_sign_;
+ break;
+
default:
return SOCKET_OPTION_NOT_GET;
}
@@ -828,11 +531,11 @@ class ProducerSocket : public Socket<BasePortal>,
*socket_option_value = &on_interest_inserted_input_buffer_;
break;
- case CACHE_HIT:
+ case ProducerCallbacksOptions::CACHE_HIT:
*socket_option_value = &on_interest_satisfied_output_buffer_;
break;
- case CACHE_MISS:
+ case ProducerCallbacksOptions::CACHE_MISS:
*socket_option_value = &on_interest_process_;
break;
@@ -844,8 +547,9 @@ class ProducerSocket : public Socket<BasePortal>,
});
}
- virtual int getSocketOption(int socket_option_key,
- std::shared_ptr<Portal> &socket_option_value) {
+ virtual int getSocketOption(
+ int socket_option_key,
+ std::shared_ptr<core::Portal> &socket_option_value) {
switch (socket_option_key) {
case PORTAL:
socket_option_value = portal_;
@@ -859,7 +563,7 @@ class ProducerSocket : public Socket<BasePortal>,
}
virtual int getSocketOption(int socket_option_key,
- utils::CryptoHashType &socket_option_value) {
+ auth::CryptoHashType &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::HASH_ALGORITHM:
socket_option_value = hash_algorithm_;
@@ -871,22 +575,22 @@ class ProducerSocket : public Socket<BasePortal>,
return SOCKET_OPTION_GET;
}
- virtual int getSocketOption(int socket_option_key,
- utils::CryptoSuite &socket_option_value) {
+ virtual int getSocketOption(
+ int socket_option_key,
+ core::NextSegmentCalculationStrategy &socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- socket_option_value = crypto_suite_;
+ case GeneralTransportOptions::SUFFIX_STRATEGY:
+ socket_option_value = suffix_strategy_;
break;
default:
return SOCKET_OPTION_NOT_GET;
}
-
return SOCKET_OPTION_GET;
}
virtual int getSocketOption(
int socket_option_key,
- std::shared_ptr<utils::Signer> &socket_option_value) {
+ std::shared_ptr<auth::Signer> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::SIGNER: {
utils::SpinLock::Acquire locked(signer_lock_);
@@ -907,19 +611,21 @@ class ProducerSocket : public Socket<BasePortal>,
// If the thread calling lambda_func is not the same of io_service, this
// function reschedule the function on it
template <typename Lambda, typename arg2>
- int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
- Lambda lambda_func) {
+ int rescheduleOnIOServiceWithReference(int socket_option_key,
+ arg2 &socket_option_value,
+ Lambda lambda_func) {
// To enforce type check
- std::function<int(int, arg2)> func = lambda_func;
+ std::function<int(int, arg2 &)> func = lambda_func;
int result = SOCKET_OPTION_SET;
- if (listening_thread_.joinable() &&
- std::this_thread::get_id() != listening_thread_.get_id()) {
+ if (production_protocol_ && production_protocol_->isRunning()) {
std::mutex mtx;
/* Condition variable for the wait */
std::condition_variable cv;
+
bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getIoService().dispatch([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -939,21 +645,19 @@ class ProducerSocket : public Socket<BasePortal>,
// If the thread calling lambda_func is not the same of io_service, this
// function reschedule the function on it
template <typename Lambda, typename arg2>
- int rescheduleOnIOServiceWithReference(int socket_option_key,
- arg2 &socket_option_value,
- Lambda lambda_func) {
+ int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
+ Lambda lambda_func) {
// To enforce type check
- std::function<int(int, arg2 &)> func = lambda_func;
+ std::function<int(int, arg2)> func = lambda_func;
int result = SOCKET_OPTION_SET;
- if (listening_thread_.joinable() &&
- std::this_thread::get_id() != this->listening_thread_.get_id()) {
+ if (production_protocol_ && production_protocol_->isRunning()) {
std::mutex mtx;
/* Condition variable for the wait */
std::condition_variable cv;
-
bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getIoService().dispatch([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -973,39 +677,20 @@ class ProducerSocket : public Socket<BasePortal>,
// Threads
protected:
interface::ProducerSocket *producer_interface_;
- std::thread listening_thread_;
asio::io_service io_service_;
- std::shared_ptr<Portal> portal_;
std::atomic<size_t> data_packet_size_;
- std::list<Prefix>
- served_namespaces_; // No need to be threadsafe, this is always modified
- // by the application thread
std::atomic<uint32_t> content_object_expiry_time_;
- utils::CircularFifo<std::shared_ptr<ContentObject>, 2048>
- object_queue_for_callbacks_;
-
- // buffers
- // ContentStore is thread-safe
- utils::ContentStore output_buffer_;
-
utils::EventThread async_thread_;
- int registration_status_;
std::atomic<bool> making_manifest_;
-
- // map for storing sequence numbers for several calls of the publish
- // function
- std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_;
-
- std::atomic<utils::CryptoHashType> hash_algorithm_;
- std::atomic<utils::CryptoSuite> crypto_suite_;
+ std::atomic<auth::CryptoHashType> hash_algorithm_;
+ std::atomic<auth::CryptoSuite> crypto_suite_;
utils::SpinLock signer_lock_;
- std::shared_ptr<utils::Signer> signer_;
+ std::shared_ptr<auth::Signer> signer_;
core::NextSegmentCalculationStrategy suffix_strategy_;
- // While manifests are being built, contents are stored in a queue
- std::queue<std::shared_ptr<ContentObject>> content_queue_;
+ std::unique_ptr<protocol::ProductionProtocol> production_protocol_;
// callbacks
ProducerInterestCallback on_interest_input_;
@@ -1021,63 +706,6 @@ class ProducerSocket : public Socket<BasePortal>,
ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_;
ProducerContentCallback on_content_produced_;
-
- private:
- void listen() {
- bool first = true;
-
- for (core::Prefix &producer_namespace : served_namespaces_) {
- if (first) {
- core::BindConfig bind_config(producer_namespace, 1000);
- portal_->bind(bind_config);
- portal_->setProducerCallback(this);
- first = !first;
- } else {
- portal_->registerRoute(producer_namespace);
- }
- }
-
- portal_->runEventsLoop();
- }
-
- void scheduleSendBurst() {
- io_service_.post([this]() {
- std::shared_ptr<ContentObject> co;
-
- for (uint32_t i = 0; i < burst_size; i++) {
- if (object_queue_for_callbacks_.pop(co)) {
- if (on_new_segment_) {
- on_new_segment_(*producer_interface_, *co);
- }
-
- if (on_content_object_to_sign_) {
- on_content_object_to_sign_(*producer_interface_, *co);
- }
-
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_, *co);
- }
-
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *co);
- }
- } else {
- break;
- }
- }
- });
- }
-
- void passContentObjectToCallbacks(
- const std::shared_ptr<ContentObject> &content_object) {
- output_buffer_.insert(content_object);
- portal_->sendContentObject(*content_object);
- object_queue_for_callbacks_.push(std::move(content_object));
-
- if (object_queue_for_callbacks_.size() >= burst_size) {
- scheduleSendBurst();
- }
- }
};
} // namespace implementation
diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc
index 9ef79ca23..9a62c8683 100644
--- a/libtransport/src/implementation/tls_rtc_socket_producer.cc
+++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc
@@ -15,10 +15,8 @@
#include <hicn/transport/core/interest.h>
#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
-
#include <implementation/p2psecure_socket_producer.h>
#include <implementation/tls_rtc_socket_producer.h>
-
#include <openssl/bio.h>
#include <openssl/rand.h>
#include <openssl/ssl.h>
diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.h b/libtransport/src/implementation/tls_rtc_socket_producer.h
index 685c91244..92c657afc 100644
--- a/libtransport/src/implementation/tls_rtc_socket_producer.h
+++ b/libtransport/src/implementation/tls_rtc_socket_producer.h
@@ -15,7 +15,6 @@
#pragma once
-#include <implementation/rtc_socket_producer.h>
#include <implementation/tls_socket_producer.h>
namespace transport {
@@ -23,8 +22,7 @@ namespace implementation {
class P2PSecureProducerSocket;
-class TLSRTCProducerSocket : public RTCProducerSocket,
- public TLSProducerSocket {
+class TLSRTCProducerSocket : public TLSProducerSocket {
friend class P2PSecureProducerSocket;
public:
@@ -34,7 +32,8 @@ class TLSRTCProducerSocket : public RTCProducerSocket,
~TLSRTCProducerSocket() = default;
- void produce(std::unique_ptr<utils::MemBuf> &&buffer) override;
+ uint32_t produceDatagram(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer) override;
void accept() override;
diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc
index 1be6f41a7..99bcd4360 100644
--- a/libtransport/src/implementation/tls_socket_consumer.cc
+++ b/libtransport/src/implementation/tls_socket_consumer.cc
@@ -136,7 +136,6 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket,
int protocol, SSL *ssl)
: ConsumerSocket(consumer_socket, protocol),
name_(),
- buf_pool_(),
decrypted_content_(),
payload_(),
head_(),
@@ -223,14 +222,15 @@ int TLSConsumerSocket::download_content(const Name &name) {
content_downloaded_ = false;
std::size_t max_buffer_size = read_callback_decrypted_->maxBufferSize();
- std::size_t buffer_size = read_callback_decrypted_->maxBufferSize() + SSL3_RT_MAX_PLAIN_LENGTH;
+ std::size_t buffer_size =
+ read_callback_decrypted_->maxBufferSize() + SSL3_RT_MAX_PLAIN_LENGTH;
decrypted_content_ = utils::MemBuf::createCombined(buffer_size);
int result = -1;
std::size_t size = 0;
while (!content_downloaded_ || something_to_read_) {
- result = SSL_read(
- this->ssl_, decrypted_content_->writableTail(), SSL3_RT_MAX_PLAIN_LENGTH);
+ result = SSL_read(this->ssl_, decrypted_content_->writableTail(),
+ SSL3_RT_MAX_PLAIN_LENGTH);
/* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of
* the data has been fully downloaded */
diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h
index 1c5df346a..be08ec47d 100644
--- a/libtransport/src/implementation/tls_socket_consumer.h
+++ b/libtransport/src/implementation/tls_socket_consumer.h
@@ -16,9 +16,7 @@
#pragma once
#include <hicn/transport/interfaces/socket_consumer.h>
-
#include <implementation/socket_consumer.h>
-
#include <openssl/ssl.h>
namespace transport {
@@ -74,7 +72,6 @@ class TLSConsumerSocket : public ConsumerSocket,
SSL_CTX *ctx_;
/* Chain of MemBuf to be used as a temporary buffer to pass descypted data
* from the underlying layer to the application */
- utils::ObjectPool<utils::MemBuf> buf_pool_;
std::unique_ptr<utils::MemBuf> decrypted_content_;
/* Chain of MemBuf holding the payload to be written into interest or data */
std::unique_ptr<utils::MemBuf> payload_;
diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc
index 339a1ad58..e54d38d56 100644
--- a/libtransport/src/implementation/tls_socket_producer.cc
+++ b/libtransport/src/implementation/tls_socket_producer.cc
@@ -14,10 +14,8 @@
*/
#include <hicn/transport/interfaces/socket_producer.h>
-
#include <implementation/p2psecure_socket_producer.h>
#include <implementation/tls_socket_producer.h>
-
#include <openssl/bio.h>
#include <openssl/rand.h>
#include <openssl/ssl.h>
@@ -50,10 +48,14 @@ int TLSProducerSocket::readOld(BIO *b, char *buf, int size) {
std::unique_lock<std::mutex> lck(socket->mtx_);
+ TRANSPORT_LOGD("Start wait on the CV.");
+
if (!socket->something_to_read_) {
(socket->cv_).wait(lck);
}
+ TRANSPORT_LOGD("CV unlocked.");
+
/* Either there already is something to read, or the thread has been waken up.
* We must return the payload in the interest anyway */
utils::MemBuf *membuf = socket->handshake_packet_->next();
@@ -103,7 +105,7 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) {
socket->tls_chunks_--;
socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
false);
- socket->parent_->ProducerSocket::produce(
+ socket->parent_->ProducerSocket::produceStream(
socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0,
socket->last_segment_);
socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
@@ -122,18 +124,18 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) {
socket->tls_chunks_--;
socket->to_call_oncontentproduced_--;
- socket->last_segment_ += socket->ProducerSocket::produce(
+ socket->last_segment_ += socket->ProducerSocket::produceStream(
socket->name_, std::move(mbuf), socket->tls_chunks_ == 0,
socket->last_segment_);
- ProducerContentCallback on_content_produced_application;
+ ProducerContentCallback *on_content_produced_application;
socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
- on_content_produced_application);
+ &on_content_produced_application);
if (socket->to_call_oncontentproduced_ == 0 &&
on_content_produced_application) {
- on_content_produced_application(*socket->getInterface(),
- std::error_code(), 0);
+ on_content_produced_application->operator()(*socket->getInterface(),
+ std::error_code(), 0);
}
});
}
@@ -144,7 +146,8 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) {
TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
P2PSecureProducerSocket *parent,
const Name &handshake_name)
- : ProducerSocket(producer_socket),
+ : ProducerSocket(producer_socket,
+ ProductionProtocolAlgorithms::BYTE_STREAM),
on_content_produced_application_(),
mtx_(),
cv_(),
@@ -236,13 +239,14 @@ void TLSProducerSocket::accept() {
std::move(parent_->map_producers[handshake_name_]));
parent_->map_producers.erase(handshake_name_);
- ProducerInterestCallback on_interest_process_decrypted;
+ ProducerInterestCallback *on_interest_process_decrypted;
getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- on_interest_process_decrypted);
+ &on_interest_process_decrypted);
- if (on_interest_process_decrypted) {
- Interest inter(std::move(handshake_packet_));
- on_interest_process_decrypted(*getInterface(), inter);
+ if (*on_interest_process_decrypted) {
+ Interest inter(std::move(*handshake_packet_));
+ handshake_packet_.reset();
+ on_interest_process_decrypted->operator()(*getInterface(), inter);
} else {
throw errors::RuntimeException(
"On interest process unset: unable to perform handshake");
@@ -270,14 +274,14 @@ void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) {
std::unique_lock<std::mutex> lck(mtx_);
name_ = interest.getName();
- interest.separateHeaderPayload();
+ // interest.separateHeaderPayload();
handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
cv_.notify_one();
return;
} else if (handshake_state == SERVER_FINISHED) {
- interest.separateHeaderPayload();
+ // interest.separateHeaderPayload();
handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
@@ -288,12 +292,12 @@ void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) {
interest.getPayload()->length());
}
- ProducerInterestCallback on_interest_input_decrypted;
+ ProducerInterestCallback *on_interest_input_decrypted;
getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT,
- on_interest_input_decrypted);
+ &on_interest_input_decrypted);
- if (on_interest_input_decrypted)
- (on_interest_input_decrypted)(*getInterface(), interest);
+ if (*on_interest_input_decrypted)
+ (*on_interest_input_decrypted)(*getInterface(), interest);
}
}
@@ -301,17 +305,19 @@ void TLSProducerSocket::cacheMiss(interface::ProducerSocket &p,
Interest &interest) {
HandshakeState handshake_state = getHandshakeState();
+ TRANSPORT_LOGD("On cache miss in TLS socket producer.");
+
if (handshake_state == CLIENT_HELLO) {
std::unique_lock<std::mutex> lck(mtx_);
- interest.separateHeaderPayload();
+ // interest.separateHeaderPayload();
handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
handshake_state_ = CLIENT_FINISHED;
cv_.notify_one();
} else if (handshake_state == SERVER_FINISHED) {
- interest.separateHeaderPayload();
+ // interest.separateHeaderPayload();
handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
@@ -343,16 +349,16 @@ void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p,
const std::error_code &err,
uint64_t bytes_written) {}
-uint32_t TLSProducerSocket::produce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t start_offset) {
+uint32_t TLSProducerSocket::produceStream(
+ const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t start_offset) {
if (getHandshakeState() != SERVER_FINISHED) {
throw errors::RuntimeException(
"New handshake on the same P2P secure producer socket not supported");
}
size_t buf_size = buffer->length();
- name_ = served_namespaces_.front().mapName(content_name);
+ name_ = production_protocol_->getNamespaces().front().mapName(content_name);
tls_chunks_ = to_call_oncontentproduced_ =
ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH);
@@ -370,46 +376,6 @@ uint32_t TLSProducerSocket::produce(Name content_name,
return 0;
}
-void TLSProducerSocket::asyncProduce(const Name &content_name,
- const uint8_t *buf, size_t buffer_size,
- bool is_last, uint32_t *start_offset) {
- if (!encryption_thread_.stopped()) {
- encryption_thread_.add([this, content_name, buffer = buf,
- size = buffer_size, is_last, start_offset]() {
- if (start_offset != NULL) {
- produce(content_name, buffer, size, is_last, *start_offset);
- } else {
- produce(content_name, buffer, size, is_last, 0);
- }
- });
- }
-}
-
-void TLSProducerSocket::asyncProduce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t offset,
- uint32_t **last_segment) {
- if (!encryption_thread_.stopped()) {
- auto a = buffer.release();
- encryption_thread_.add(
- [this, content_name, a, is_last, offset, last_segment]() {
- auto buf = std::unique_ptr<utils::MemBuf>(a);
- if (last_segment != NULL) {
- *last_segment = &last_segment_;
- }
- produce(content_name, std::move(buf), is_last, offset);
- });
- }
-}
-
-void TLSProducerSocket::asyncProduce(ContentObject &content_object) {
- throw errors::RuntimeException("API not supported");
-}
-
-void TLSProducerSocket::produce(ContentObject &content_object) {
- throw errors::RuntimeException("API not supported");
-}
-
long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) {
if (cmd == BIO_CTRL_FLUSH) {
}
@@ -424,13 +390,14 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
void *add_arg) {
TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg);
+ TRANSPORT_LOGD("On addHicnKeyIdCb, for the prefix registration.");
+
if (ext_type == 100) {
- ip_prefix_t ip_prefix =
- socket->parent_->served_namespaces_.front().toIpPrefixStruct();
- int inet_family =
- socket->parent_->served_namespaces_.front().getAddressFamily();
- uint16_t prefix_len_bits =
- socket->parent_->served_namespaces_.front().getPrefixLength();
+ auto &prefix =
+ socket->parent_->production_protocol_->getNamespaces().front();
+ const ip_prefix_t &ip_prefix = prefix.toIpPrefixStruct();
+ int inet_family = prefix.getAddressFamily();
+ uint16_t prefix_len_bits = prefix.getPrefixLength();
uint8_t prefix_len_bytes = prefix_len_bits / 8;
uint8_t prefix_len_u32 = prefix_len_bits / 32;
@@ -479,10 +446,9 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
socket->parent_->on_interest_process_decrypted_;
socket->registerPrefix(
- Prefix(socket->parent_->served_namespaces_.front().getName(
- Name(inet_family, (uint8_t *)&mask),
- Name(inet_family, (uint8_t *)&keyId_component),
- socket->parent_->served_namespaces_.front().getName()),
+ Prefix(prefix.getName(Name(inet_family, (uint8_t *)&mask),
+ Name(inet_family, (uint8_t *)&keyId_component),
+ prefix.getName()),
out_ip->len));
socket->connect();
}
@@ -580,61 +546,5 @@ int TLSProducerSocket::getSocketOption(
});
}
-int TLSProducerSocket::getSocketOption(
- int socket_option_key, ProducerContentCallback &socket_option_value) {
- return rescheduleOnIOServiceWithReference(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerContentCallback &socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::CONTENT_PRODUCED:
- socket_option_value = on_content_produced_application_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
-int TLSProducerSocket::getSocketOption(
- int socket_option_key, ProducerInterestCallback &socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in case
- // setSocketOption is called while the io_service is running.
- return rescheduleOnIOServiceWithReference(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ProducerInterestCallback &socket_option_value) -> int {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::INTEREST_INPUT:
- socket_option_value = on_interest_input_decrypted_;
- break;
-
- case ProducerCallbacksOptions::INTEREST_DROP:
- socket_option_value = on_interest_dropped_input_buffer_;
- break;
-
- case ProducerCallbacksOptions::INTEREST_PASS:
- socket_option_value = on_interest_inserted_input_buffer_;
- break;
-
- case ProducerCallbacksOptions::CACHE_HIT:
- socket_option_value = on_interest_satisfied_output_buffer_;
- break;
-
- case ProducerCallbacksOptions::CACHE_MISS:
- socket_option_value = on_interest_process_decrypted_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
-}
-
} // namespace implementation
} // namespace transport
diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h
index 2382e8695..a542a4d9f 100644
--- a/libtransport/src/implementation/tls_socket_producer.h
+++ b/libtransport/src/implementation/tls_socket_producer.h
@@ -16,8 +16,8 @@
#pragma once
#include <implementation/socket_producer.h>
-
#include <openssl/ssl.h>
+
#include <condition_variable>
#include <mutex>
@@ -36,26 +36,18 @@ class TLSProducerSocket : virtual public ProducerSocket {
~TLSProducerSocket();
- uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size,
- bool is_last = true, uint32_t start_offset = 0) override {
- return produce(content_name, utils::MemBuf::copyBuffer(buffer, buffer_size),
- is_last, start_offset);
+ uint32_t produceStream(const Name &content_name, const uint8_t *buffer,
+ size_t buffer_size, bool is_last = true,
+ uint32_t start_offset = 0) override {
+ return produceStream(content_name,
+ utils::MemBuf::copyBuffer(buffer, buffer_size),
+ is_last, start_offset);
}
- uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last = true, uint32_t start_offset = 0) override;
-
- void produce(ContentObject &content_object) override;
-
- void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size,
- bool is_last = true,
- uint32_t *start_offset = nullptr) override;
-
- void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t offset,
- uint32_t **last_segment = nullptr) override;
-
- void asyncProduce(ContentObject &content_object) override;
+ uint32_t produceStream(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true,
+ uint32_t start_offset = 0) override;
virtual void accept();
@@ -80,7 +72,7 @@ class TLSProducerSocket : virtual public ProducerSocket {
ProducerInterestCallback &socket_option_value);
using ProducerSocket::getSocketOption;
- using ProducerSocket::onInterest;
+ // using ProducerSocket::onInterest;
using ProducerSocket::setSocketOption;
protected:
@@ -119,6 +111,7 @@ class TLSProducerSocket : virtual public ProducerSocket {
int to_call_oncontentproduced_;
bool still_writing_;
utils::EventThread encryption_thread_;
+ utils::EventThread async_thread_;
void onInterest(ProducerSocket &p, Interest &interest);