aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/implementation/socket_producer.h
diff options
context:
space:
mode:
authorLuca Muscariello <muscariello@ieee.org>2021-04-15 09:05:46 +0200
committerMauro Sardara <msardara@cisco.com>2021-04-15 16:36:16 +0200
commite92e9e839ca2cf42b56322b2489ccc0d8bf767af (patch)
tree9f1647c83a87fbf982ae329e800af25dbfb226b5 /libtransport/src/implementation/socket_producer.h
parent3e541d7c947cc2f9db145f26c9274efd29a6fb56 (diff)
[HICN-690] Transport Library Major Refactory
The current patch provides a major refactory of the transportlibrary. A summary of the different components that underwent major modifications is reported below. - Transport protocol updates The hierarchy of classes has been optimized to have common transport services across different transport protocols. This can allow to customize a transport protocol with new features. - A new real-time communication protocol The RTC protocol has been optimized in terms of algorithms to reduce consumer-producer synchronization latency. - A novel socket API The API has been reworked to be easier to consumer but also to have a more efficient integration in L4 proxies. - Several performance improvements A large number of performance improvements have been included in particular to make the entire stack zero-copy and optimize cache miss. - New memory buffer framework Memory management has been reworked entirely to provide a more efficient infra with a richer API. Buffers are now allocated in blocks and a single buffer holds the memory for (1) the shared_ptr control block, (2) the metadata of the packet (e.g. name, pointer to other buffers if buffer is chained and relevant offsets), and (3) the packet itself, as it is sent/received over the network. - A new slab allocator Dynamic memory allocation is now managed by a novel slab allocator that is optimised for packet processing and connection management. Memory is organized in pools of blocks all of the same size which are used during the processing of outgoing/incoming packets. When a memory block Is allocated is always taken from a global pool and when it is deallocated is returned to the pool, thus avoiding the cost of any heap allocation in the data path. - New transport connectors Consumer and producer end-points can communication either using an hicn packet forwarder or with direct connector based on shared memories or sockets. The usage of transport connectors typically for unit and funcitonal testing but may have additional usage. - Support for FEC/ECC for transport services FEC/ECC via reed solomon is supported by default and made available to transport services as a modular component. Reed solomon block codes is a default FEC model that can be replaced in a modular way by many other codes including RLNC not avaiable in this distribution. The current FEC framework support variable size padding and efficiently makes use of the infra memory buffers to avoid additiona copies. - Secure transport framework for signature computation and verification Crypto support is nativelty used in hICN for integrity and authenticity. Novel support that includes RTC has been implemented and made modular and reusable acrosso different transport protocols. - TLS - Transport layer security over hicn Point to point confidentiality is provided by integrating TLS on top of hICN reliable and non-reliable transport. The integration is common and makes a different use of the TLS record. - MLS - Messaging layer security over hicn MLS integration on top of hICN is made by using the MLSPP implemetation open sourced by Cisco. We have included instrumentation tools to deploy performance and functional tests of groups of end-points. - Android support The overall code has been heavily tested in Android environments and has received heavy lifting to better run natively in recent Android OS. Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> Change-Id: If477ba2fa686e6f47bdf96307ac60938766aef69 Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/implementation/socket_producer.h')
-rw-r--r--libtransport/src/implementation/socket_producer.h632
1 files changed, 130 insertions, 502 deletions
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index a6f0f969e..af69cd818 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -15,9 +15,11 @@
#pragma once
-#include <hicn/transport/security/signer.h>
+#include <hicn/transport/auth/signer.h>
#include <hicn/transport/utils/event_thread.h>
#include <implementation/socket.h>
+#include <protocols/prod_protocol_bytestream.h>
+#include <protocols/prod_protocol_rtc.h>
#include <utils/content_store.h>
#include <utils/suffix_strategy.h>
@@ -39,21 +41,17 @@ namespace implementation {
using namespace core;
using namespace interface;
-class ProducerSocket : public Socket<BasePortal>,
- public BasePortal::ProducerCallback {
- static constexpr uint32_t burst_size = 256;
-
- public:
- explicit ProducerSocket(interface::ProducerSocket *producer_socket)
- : producer_interface_(producer_socket),
- portal_(std::make_shared<Portal>(io_service_)),
+class ProducerSocket : public Socket {
+ private:
+ ProducerSocket(interface::ProducerSocket *producer_socket, int protocol,
+ std::shared_ptr<core::Portal> &&portal)
+ : Socket(std::move(portal)),
+ producer_interface_(producer_socket),
data_packet_size_(default_values::content_object_packet_size),
content_object_expiry_time_(default_values::content_object_expiry_time),
- output_buffer_(default_values::producer_socket_output_buffer_size),
async_thread_(),
- registration_status_(REGISTRATION_NOT_ATTEMPTED),
making_manifest_(false),
- hash_algorithm_(utils::CryptoHashType::SHA_256),
+ hash_algorithm_(auth::CryptoHashType::SHA_256),
suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
@@ -65,15 +63,33 @@ class ProducerSocket : public Socket<BasePortal>,
on_content_object_in_output_buffer_(VOID_HANDLER),
on_content_object_output_(VOID_HANDLER),
on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
- on_content_produced_(VOID_HANDLER) {}
-
- virtual ~ProducerSocket() {
- stop();
- if (listening_thread_.joinable()) {
- listening_thread_.join();
+ on_content_produced_(VOID_HANDLER) {
+ switch (protocol) {
+ case ProductionProtocolAlgorithms::RTC_PROD:
+ production_protocol_ =
+ std::make_unique<protocol::RTCProductionProtocol>(this);
+ break;
+ case ProductionProtocolAlgorithms::BYTE_STREAM:
+ default:
+ production_protocol_ =
+ std::make_unique<protocol::ByteStreamProductionProtocol>(this);
+ break;
}
}
+ public:
+ ProducerSocket(interface::ProducerSocket *producer, int protocol)
+ : ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {}
+
+ ProducerSocket(interface::ProducerSocket *producer, int protocol,
+ asio::io_service &io_service)
+ : ProducerSocket(producer, protocol,
+ std::make_shared<core::Portal>(io_service)) {
+ is_async_ = true;
+ }
+
+ virtual ~ProducerSocket() {}
+
interface::ProducerSocket *getInterface() {
return producer_interface_;
}
@@ -84,296 +100,10 @@ class ProducerSocket : public Socket<BasePortal>,
void connect() override {
portal_->connect(false);
- listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this));
+ production_protocol_->start();
}
- bool isRunning() override { return !io_service_.stopped(); };
-
- virtual uint32_t produce(Name content_name, const uint8_t *buffer,
- size_t buffer_size, bool is_last = true,
- uint32_t start_offset = 0) {
- return ProducerSocket::produce(
- content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), is_last,
- start_offset);
- }
-
- virtual uint32_t produce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last = true, uint32_t start_offset = 0) {
- if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) {
- return 0;
- }
-
- // Copy the atomic variables to ensure they keep the same value
- // during the production
- std::size_t data_packet_size = data_packet_size_;
- uint32_t content_object_expiry_time = content_object_expiry_time_;
- utils::CryptoHashType hash_algo = hash_algorithm_;
- bool making_manifest = making_manifest_;
- auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy(
- suffix_strategy_, start_offset);
- std::shared_ptr<utils::Signer> signer;
- getSocketOption(GeneralTransportOptions::SIGNER, signer);
-
- auto buffer_size = buffer->length();
- int bytes_segmented = 0;
- std::size_t header_size;
- std::size_t manifest_header_size = 0;
- std::size_t signature_length = 0;
- std::uint32_t final_block_number = start_offset;
- uint64_t free_space_for_content = 0;
-
- core::Packet::Format format;
- std::shared_ptr<ContentObjectManifest> manifest;
- bool is_last_manifest = false;
-
- // TODO Manifest may still be used for indexing
- if (making_manifest && !signer) {
- TRANSPORT_LOGE("Making manifests without setting producer identity.");
- }
-
- core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
- core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC;
- if (content_name.getType() == HNT_CONTIGUOUS_V4 ||
- content_name.getType() == HNT_IOV_V4) {
- hf_format = core::Packet::Format::HF_INET_TCP;
- hf_format_ah = core::Packet::Format::HF_INET_TCP_AH;
- } else if (content_name.getType() == HNT_CONTIGUOUS_V6 ||
- content_name.getType() == HNT_IOV_V6) {
- hf_format = core::Packet::Format::HF_INET6_TCP;
- hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH;
- } else {
- throw errors::RuntimeException("Unknown name format.");
- }
-
- format = hf_format;
- if (making_manifest) {
- manifest_header_size = core::Packet::getHeaderSizeFromFormat(
- signer ? hf_format_ah : hf_format,
- signer ? signer->getSignatureLength() : 0);
- } else if (signer) {
- format = hf_format_ah;
- signature_length = signer->getSignatureLength();
- }
-
- header_size =
- core::Packet::getHeaderSizeFromFormat(format, signature_length);
- free_space_for_content = data_packet_size - header_size;
- uint32_t number_of_segments = uint32_t(
- std::ceil(double(buffer_size) / double(free_space_for_content)));
- if (free_space_for_content * number_of_segments < buffer_size) {
- number_of_segments++;
- }
-
- // TODO allocate space for all the headers
- if (making_manifest) {
- uint32_t segment_in_manifest = static_cast<uint32_t>(
- std::floor(double(data_packet_size - manifest_header_size -
- ContentObjectManifest::getManifestHeaderSize()) /
- ContentObjectManifest::getManifestEntrySize()) -
- 1.0);
- uint32_t number_of_manifests = static_cast<uint32_t>(
- std::ceil(float(number_of_segments) / segment_in_manifest));
- final_block_number += number_of_segments + number_of_manifests - 1;
-
- manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
- core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
- hash_algo, is_last_manifest, content_name, suffix_strategy_,
- signer ? signer->getSignatureLength() : 0));
- manifest->setLifetime(content_object_expiry_time);
-
- if (is_last) {
- manifest->setFinalBlockNumber(final_block_number);
- } else {
- manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX);
- }
- }
-
- for (unsigned int packaged_segments = 0;
- packaged_segments < number_of_segments; packaged_segments++) {
- if (making_manifest) {
- if (manifest->estimateManifestSize(2) >
- data_packet_size - manifest_header_size) {
- // Send the current manifest
- manifest->encode();
-
- // If identity set, sign manifest
- if (signer) {
- signer->sign(*manifest);
- }
-
- passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s",
- manifest->getName().toString().c_str());
-
- // Send content objects stored in the queue
- while (!content_queue_.empty()) {
- passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD(
- "Send content %s",
- content_queue_.front()->getName().toString().c_str());
- content_queue_.pop();
- }
-
- // Create new manifest. The reference to the last manifest has been
- // acquired in the passContentObjectToCallbacks function, so we can
- // safely release this reference
- manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
- core::ManifestVersion::VERSION_1,
- core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
- content_name, suffix_strategy_,
- signer ? signer->getSignatureLength() : 0));
-
- manifest->setLifetime(content_object_expiry_time);
- manifest->setFinalBlockNumber(
- is_last ? final_block_number
- : utils::SuffixStrategy::INVALID_SUFFIX);
- }
- }
-
- auto content_suffix = suffix_strategy->getNextContentSuffix();
- auto content_object = std::make_shared<ContentObject>(
- content_name.setSuffix(content_suffix), format);
- content_object->setLifetime(content_object_expiry_time);
-
- auto b = buffer->cloneOne();
- b->trimStart(free_space_for_content * packaged_segments);
- b->trimEnd(b->length());
-
- if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) {
- b->append(buffer_size - bytes_segmented);
- bytes_segmented += (int)(buffer_size - bytes_segmented);
-
- if (is_last && making_manifest) {
- is_last_manifest = true;
- } else if (is_last) {
- content_object->setRst();
- }
-
- } else {
- b->append(free_space_for_content);
- bytes_segmented += (int)(free_space_for_content);
- }
-
- content_object->appendPayload(std::move(b));
-
- if (making_manifest) {
- using namespace std::chrono_literals;
- utils::CryptoHash hash = content_object->computeDigest(hash_algo);
- manifest->addSuffixHash(content_suffix, hash);
- content_queue_.push(content_object);
- } else {
- if (signer) {
- signer->sign(*content_object);
- }
- passContentObjectToCallbacks(content_object);
- TRANSPORT_LOGD("Send content %s",
- content_object->getName().toString().c_str());
- }
- }
-
- if (making_manifest) {
- if (is_last_manifest) {
- manifest->setFinalManifest(is_last_manifest);
- }
-
- manifest->encode();
- if (signer) {
- signer->sign(*manifest);
- }
-
- passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s",
- manifest->getName().toString().c_str());
-
- while (!content_queue_.empty()) {
- passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %s",
- content_queue_.front()->getName().toString().c_str());
- content_queue_.pop();
- }
- }
-
- io_service_.post([this]() {
- std::shared_ptr<ContentObject> co;
- while (object_queue_for_callbacks_.pop(co)) {
- if (on_new_segment_) {
- on_new_segment_(*producer_interface_, *co);
- }
-
- if (on_content_object_to_sign_) {
- on_content_object_to_sign_(*producer_interface_, *co);
- }
-
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_, *co);
- }
-
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *co);
- }
- }
- });
-
- io_service_.dispatch([this, buffer_size]() {
- if (on_content_produced_) {
- on_content_produced_(*producer_interface_,
- std::make_error_code(std::errc(0)), buffer_size);
- }
- });
-
- return suffix_strategy->getTotalCount();
- }
-
- virtual void produce(ContentObject &content_object) {
- io_service_.dispatch([this, &content_object]() {
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_,
- content_object);
- }
- });
-
- output_buffer_.insert(std::static_pointer_cast<ContentObject>(
- content_object.shared_from_this()));
-
- io_service_.dispatch([this, &content_object]() {
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, content_object);
- }
- });
-
- portal_->sendContentObject(content_object);
- }
-
- virtual void produce(const uint8_t *buffer, size_t buffer_size) {
- produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
- }
-
- virtual void produce(std::unique_ptr<utils::MemBuf> &&buffer) {
- // This API is meant to be used just with the RTC producer.
- // Here it cannot be used since no name for the content is specified.
- throw errors::NotImplementedException();
- }
-
- virtual void asyncProduce(const Name &suffix, const uint8_t *buf,
- size_t buffer_size, bool is_last = true,
- uint32_t *start_offset = nullptr) {
- if (!async_thread_.stopped()) {
- async_thread_.add([this, suffix, buffer = buf, size = buffer_size,
- is_last, start_offset]() {
- if (start_offset != nullptr) {
- *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last,
- *start_offset);
- } else {
- ProducerSocket::produce(suffix, buffer, size, is_last, 0);
- }
- });
- }
- }
-
- void asyncProduce(const Name &suffix);
+ bool isRunning() override { return !production_protocol_->isRunning(); };
virtual void asyncProduce(Name content_name,
std::unique_ptr<utils::MemBuf> &&buffer,
@@ -381,75 +111,56 @@ class ProducerSocket : public Socket<BasePortal>,
uint32_t **last_segment = nullptr) {
if (!async_thread_.stopped()) {
auto a = buffer.release();
- async_thread_.add(
- [this, content_name, a, is_last, offset, last_segment]() {
- auto buf = std::unique_ptr<utils::MemBuf>(a);
- if (last_segment != NULL) {
- **last_segment =
- offset + ProducerSocket::produce(content_name, std::move(buf),
- is_last, offset);
- } else {
- ProducerSocket::produce(content_name, std::move(buf), is_last,
- offset);
- }
- });
- }
- }
-
- virtual void asyncProduce(ContentObject &content_object) {
- if (!async_thread_.stopped()) {
- auto co_ptr = std::static_pointer_cast<ContentObject>(
- content_object.shared_from_this());
- async_thread_.add([this, content_object = std::move(co_ptr)]() {
- ProducerSocket::produce(*content_object);
+ async_thread_.add([this, content_name, a, is_last, offset,
+ last_segment]() {
+ auto buf = std::unique_ptr<utils::MemBuf>(a);
+ if (last_segment != NULL) {
+ **last_segment = offset + produceStream(content_name, std::move(buf),
+ is_last, offset);
+ } else {
+ produceStream(content_name, std::move(buf), is_last, offset);
+ }
});
}
}
- virtual void registerPrefix(const Prefix &producer_namespace) {
- served_namespaces_.push_back(producer_namespace);
+ virtual uint32_t produceStream(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true,
+ uint32_t start_offset = 0) {
+ return production_protocol_->produceStream(content_name, std::move(buffer),
+ is_last, start_offset);
}
- void serveForever() {
- if (listening_thread_.joinable()) {
- listening_thread_.join();
- }
+ virtual uint32_t produceStream(const Name &content_name,
+ const uint8_t *buffer, size_t buffer_size,
+ bool is_last = true,
+ uint32_t start_offset = 0) {
+ return production_protocol_->produceStream(
+ content_name, buffer, buffer_size, is_last, start_offset);
}
- void stop() { portal_->stopEventsLoop(); }
-
- asio::io_service &getIoService() override { return portal_->getIoService(); };
-
- virtual void onInterest(Interest &interest) {
- if (on_interest_input_) {
- on_interest_input_(*producer_interface_, interest);
- }
-
- const std::shared_ptr<ContentObject> content_object =
- output_buffer_.find(interest);
-
- if (content_object) {
- if (on_interest_satisfied_output_buffer_) {
- on_interest_satisfied_output_buffer_(*producer_interface_, interest);
- }
+ virtual uint32_t produceDatagram(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer) {
+ return production_protocol_->produceDatagram(content_name,
+ std::move(buffer));
+ }
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *content_object);
- }
+ virtual uint32_t produceDatagram(const Name &content_name,
+ const uint8_t *buffer, size_t buffer_size) {
+ return production_protocol_->produceDatagram(content_name, buffer,
+ buffer_size);
+ }
- portal_->sendContentObject(*content_object);
- } else {
- if (on_interest_process_) {
- on_interest_process_(*producer_interface_, interest);
- }
- }
+ void produce(ContentObject &content_object) {
+ production_protocol_->produce(content_object);
}
- virtual void onInterest(Interest::Ptr &&interest) override {
- onInterest(*interest);
- };
+ void registerPrefix(const Prefix &producer_namespace) {
+ production_protocol_->registerNamespaceWithNetwork(producer_namespace);
+ }
- virtual void onError(std::error_code ec) override {}
+ void stop() { production_protocol_->stop(); }
virtual int setSocketOption(int socket_option_key,
uint32_t socket_option_value) {
@@ -462,7 +173,7 @@ class ProducerSocket : public Socket<BasePortal>,
break;
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- output_buffer_.setLimit(socket_option_value);
+ production_protocol_->setOutputBufferSize(socket_option_value);
break;
case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
@@ -533,6 +244,12 @@ class ProducerSocket : public Socket<BasePortal>,
break;
}
+ case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_to_sign_ = VOID_HANDLER;
+ break;
+ }
+
default:
return SOCKET_OPTION_NOT_SET;
}
@@ -559,19 +276,6 @@ class ProducerSocket : public Socket<BasePortal>,
return SOCKET_OPTION_NOT_SET;
}
- virtual int setSocketOption(int socket_option_key,
- std::list<Prefix> socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- served_namespaces_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
virtual int setSocketOption(
int socket_option_key,
interface::ProducerContentObjectCallback socket_option_value) {
@@ -594,6 +298,10 @@ class ProducerSocket : public Socket<BasePortal>,
on_content_object_output_ = socket_option_value;
break;
+ case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
+ on_content_object_to_sign_ = socket_option_value;
+ break;
+
default:
return SOCKET_OPTION_NOT_SET;
}
@@ -663,7 +371,7 @@ class ProducerSocket : public Socket<BasePortal>,
}
virtual int setSocketOption(int socket_option_key,
- utils::CryptoHashType socket_option_value) {
+ auth::CryptoHashType socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::HASH_ALGORITHM:
hash_algorithm_ = socket_option_value;
@@ -675,11 +383,12 @@ class ProducerSocket : public Socket<BasePortal>,
return SOCKET_OPTION_SET;
}
- virtual int setSocketOption(int socket_option_key,
- utils::CryptoSuite socket_option_value) {
+ virtual int setSocketOption(
+ int socket_option_key,
+ core::NextSegmentCalculationStrategy socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::CRYPTO_SUITE:
- crypto_suite_ = socket_option_value;
+ case GeneralTransportOptions::SUFFIX_STRATEGY:
+ suffix_strategy_ = socket_option_value;
break;
default:
return SOCKET_OPTION_NOT_SET;
@@ -690,7 +399,7 @@ class ProducerSocket : public Socket<BasePortal>,
virtual int setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Signer> &socket_option_value) {
+ const std::shared_ptr<auth::Signer> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::SIGNER: {
utils::SpinLock::Acquire locked(signer_lock_);
@@ -708,7 +417,7 @@ class ProducerSocket : public Socket<BasePortal>,
uint32_t &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)output_buffer_.getLimit();
+ socket_option_value = production_protocol_->getOutputBufferSize();
break;
case GeneralTransportOptions::DATA_PACKET_SIZE:
@@ -733,18 +442,8 @@ class ProducerSocket : public Socket<BasePortal>,
socket_option_value = making_manifest_;
break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- virtual int getSocketOption(int socket_option_key,
- std::list<Prefix> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- socket_option_value = served_namespaces_;
+ case GeneralTransportOptions::ASYNC_MODE:
+ socket_option_value = is_async_;
break;
default:
@@ -776,6 +475,10 @@ class ProducerSocket : public Socket<BasePortal>,
*socket_option_value = &on_content_object_output_;
break;
+ case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
+ *socket_option_value = &on_content_object_to_sign_;
+ break;
+
default:
return SOCKET_OPTION_NOT_GET;
}
@@ -828,11 +531,11 @@ class ProducerSocket : public Socket<BasePortal>,
*socket_option_value = &on_interest_inserted_input_buffer_;
break;
- case CACHE_HIT:
+ case ProducerCallbacksOptions::CACHE_HIT:
*socket_option_value = &on_interest_satisfied_output_buffer_;
break;
- case CACHE_MISS:
+ case ProducerCallbacksOptions::CACHE_MISS:
*socket_option_value = &on_interest_process_;
break;
@@ -844,8 +547,9 @@ class ProducerSocket : public Socket<BasePortal>,
});
}
- virtual int getSocketOption(int socket_option_key,
- std::shared_ptr<Portal> &socket_option_value) {
+ virtual int getSocketOption(
+ int socket_option_key,
+ std::shared_ptr<core::Portal> &socket_option_value) {
switch (socket_option_key) {
case PORTAL:
socket_option_value = portal_;
@@ -859,7 +563,7 @@ class ProducerSocket : public Socket<BasePortal>,
}
virtual int getSocketOption(int socket_option_key,
- utils::CryptoHashType &socket_option_value) {
+ auth::CryptoHashType &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::HASH_ALGORITHM:
socket_option_value = hash_algorithm_;
@@ -871,22 +575,22 @@ class ProducerSocket : public Socket<BasePortal>,
return SOCKET_OPTION_GET;
}
- virtual int getSocketOption(int socket_option_key,
- utils::CryptoSuite &socket_option_value) {
+ virtual int getSocketOption(
+ int socket_option_key,
+ core::NextSegmentCalculationStrategy &socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- socket_option_value = crypto_suite_;
+ case GeneralTransportOptions::SUFFIX_STRATEGY:
+ socket_option_value = suffix_strategy_;
break;
default:
return SOCKET_OPTION_NOT_GET;
}
-
return SOCKET_OPTION_GET;
}
virtual int getSocketOption(
int socket_option_key,
- std::shared_ptr<utils::Signer> &socket_option_value) {
+ std::shared_ptr<auth::Signer> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::SIGNER: {
utils::SpinLock::Acquire locked(signer_lock_);
@@ -907,19 +611,21 @@ class ProducerSocket : public Socket<BasePortal>,
// If the thread calling lambda_func is not the same of io_service, this
// function reschedule the function on it
template <typename Lambda, typename arg2>
- int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
- Lambda lambda_func) {
+ int rescheduleOnIOServiceWithReference(int socket_option_key,
+ arg2 &socket_option_value,
+ Lambda lambda_func) {
// To enforce type check
- std::function<int(int, arg2)> func = lambda_func;
+ std::function<int(int, arg2 &)> func = lambda_func;
int result = SOCKET_OPTION_SET;
- if (listening_thread_.joinable() &&
- std::this_thread::get_id() != listening_thread_.get_id()) {
+ if (production_protocol_ && production_protocol_->isRunning()) {
std::mutex mtx;
/* Condition variable for the wait */
std::condition_variable cv;
+
bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getIoService().dispatch([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -939,21 +645,19 @@ class ProducerSocket : public Socket<BasePortal>,
// If the thread calling lambda_func is not the same of io_service, this
// function reschedule the function on it
template <typename Lambda, typename arg2>
- int rescheduleOnIOServiceWithReference(int socket_option_key,
- arg2 &socket_option_value,
- Lambda lambda_func) {
+ int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
+ Lambda lambda_func) {
// To enforce type check
- std::function<int(int, arg2 &)> func = lambda_func;
+ std::function<int(int, arg2)> func = lambda_func;
int result = SOCKET_OPTION_SET;
- if (listening_thread_.joinable() &&
- std::this_thread::get_id() != this->listening_thread_.get_id()) {
+ if (production_protocol_ && production_protocol_->isRunning()) {
std::mutex mtx;
/* Condition variable for the wait */
std::condition_variable cv;
-
bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getIoService().dispatch([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -973,39 +677,20 @@ class ProducerSocket : public Socket<BasePortal>,
// Threads
protected:
interface::ProducerSocket *producer_interface_;
- std::thread listening_thread_;
asio::io_service io_service_;
- std::shared_ptr<Portal> portal_;
std::atomic<size_t> data_packet_size_;
- std::list<Prefix>
- served_namespaces_; // No need to be threadsafe, this is always modified
- // by the application thread
std::atomic<uint32_t> content_object_expiry_time_;
- utils::CircularFifo<std::shared_ptr<ContentObject>, 2048>
- object_queue_for_callbacks_;
-
- // buffers
- // ContentStore is thread-safe
- utils::ContentStore output_buffer_;
-
utils::EventThread async_thread_;
- int registration_status_;
std::atomic<bool> making_manifest_;
-
- // map for storing sequence numbers for several calls of the publish
- // function
- std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_;
-
- std::atomic<utils::CryptoHashType> hash_algorithm_;
- std::atomic<utils::CryptoSuite> crypto_suite_;
+ std::atomic<auth::CryptoHashType> hash_algorithm_;
+ std::atomic<auth::CryptoSuite> crypto_suite_;
utils::SpinLock signer_lock_;
- std::shared_ptr<utils::Signer> signer_;
+ std::shared_ptr<auth::Signer> signer_;
core::NextSegmentCalculationStrategy suffix_strategy_;
- // While manifests are being built, contents are stored in a queue
- std::queue<std::shared_ptr<ContentObject>> content_queue_;
+ std::unique_ptr<protocol::ProductionProtocol> production_protocol_;
// callbacks
ProducerInterestCallback on_interest_input_;
@@ -1021,63 +706,6 @@ class ProducerSocket : public Socket<BasePortal>,
ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_;
ProducerContentCallback on_content_produced_;
-
- private:
- void listen() {
- bool first = true;
-
- for (core::Prefix &producer_namespace : served_namespaces_) {
- if (first) {
- core::BindConfig bind_config(producer_namespace, 1000);
- portal_->bind(bind_config);
- portal_->setProducerCallback(this);
- first = !first;
- } else {
- portal_->registerRoute(producer_namespace);
- }
- }
-
- portal_->runEventsLoop();
- }
-
- void scheduleSendBurst() {
- io_service_.post([this]() {
- std::shared_ptr<ContentObject> co;
-
- for (uint32_t i = 0; i < burst_size; i++) {
- if (object_queue_for_callbacks_.pop(co)) {
- if (on_new_segment_) {
- on_new_segment_(*producer_interface_, *co);
- }
-
- if (on_content_object_to_sign_) {
- on_content_object_to_sign_(*producer_interface_, *co);
- }
-
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_, *co);
- }
-
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *co);
- }
- } else {
- break;
- }
- }
- });
- }
-
- void passContentObjectToCallbacks(
- const std::shared_ptr<ContentObject> &content_object) {
- output_buffer_.insert(content_object);
- portal_->sendContentObject(*content_object);
- object_queue_for_callbacks_.push(std::move(content_object));
-
- if (object_queue_for_callbacks_.size() >= burst_size) {
- scheduleSendBurst();
- }
- }
};
} // namespace implementation