aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/implementation/socket_producer.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/implementation/socket_producer.h')
-rw-r--r--libtransport/src/implementation/socket_producer.h632
1 files changed, 130 insertions, 502 deletions
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index a6f0f969e..af69cd818 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -15,9 +15,11 @@
#pragma once
-#include <hicn/transport/security/signer.h>
+#include <hicn/transport/auth/signer.h>
#include <hicn/transport/utils/event_thread.h>
#include <implementation/socket.h>
+#include <protocols/prod_protocol_bytestream.h>
+#include <protocols/prod_protocol_rtc.h>
#include <utils/content_store.h>
#include <utils/suffix_strategy.h>
@@ -39,21 +41,17 @@ namespace implementation {
using namespace core;
using namespace interface;
-class ProducerSocket : public Socket<BasePortal>,
- public BasePortal::ProducerCallback {
- static constexpr uint32_t burst_size = 256;
-
- public:
- explicit ProducerSocket(interface::ProducerSocket *producer_socket)
- : producer_interface_(producer_socket),
- portal_(std::make_shared<Portal>(io_service_)),
+class ProducerSocket : public Socket {
+ private:
+ ProducerSocket(interface::ProducerSocket *producer_socket, int protocol,
+ std::shared_ptr<core::Portal> &&portal)
+ : Socket(std::move(portal)),
+ producer_interface_(producer_socket),
data_packet_size_(default_values::content_object_packet_size),
content_object_expiry_time_(default_values::content_object_expiry_time),
- output_buffer_(default_values::producer_socket_output_buffer_size),
async_thread_(),
- registration_status_(REGISTRATION_NOT_ATTEMPTED),
making_manifest_(false),
- hash_algorithm_(utils::CryptoHashType::SHA_256),
+ hash_algorithm_(auth::CryptoHashType::SHA_256),
suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
@@ -65,15 +63,33 @@ class ProducerSocket : public Socket<BasePortal>,
on_content_object_in_output_buffer_(VOID_HANDLER),
on_content_object_output_(VOID_HANDLER),
on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
- on_content_produced_(VOID_HANDLER) {}
-
- virtual ~ProducerSocket() {
- stop();
- if (listening_thread_.joinable()) {
- listening_thread_.join();
+ on_content_produced_(VOID_HANDLER) {
+ switch (protocol) {
+ case ProductionProtocolAlgorithms::RTC_PROD:
+ production_protocol_ =
+ std::make_unique<protocol::RTCProductionProtocol>(this);
+ break;
+ case ProductionProtocolAlgorithms::BYTE_STREAM:
+ default:
+ production_protocol_ =
+ std::make_unique<protocol::ByteStreamProductionProtocol>(this);
+ break;
}
}
+ public:
+ ProducerSocket(interface::ProducerSocket *producer, int protocol)
+ : ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {}
+
+ ProducerSocket(interface::ProducerSocket *producer, int protocol,
+ asio::io_service &io_service)
+ : ProducerSocket(producer, protocol,
+ std::make_shared<core::Portal>(io_service)) {
+ is_async_ = true;
+ }
+
+ virtual ~ProducerSocket() {}
+
interface::ProducerSocket *getInterface() {
return producer_interface_;
}
@@ -84,296 +100,10 @@ class ProducerSocket : public Socket<BasePortal>,
void connect() override {
portal_->connect(false);
- listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this));
+ production_protocol_->start();
}
- bool isRunning() override { return !io_service_.stopped(); };
-
- virtual uint32_t produce(Name content_name, const uint8_t *buffer,
- size_t buffer_size, bool is_last = true,
- uint32_t start_offset = 0) {
- return ProducerSocket::produce(
- content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), is_last,
- start_offset);
- }
-
- virtual uint32_t produce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last = true, uint32_t start_offset = 0) {
- if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) {
- return 0;
- }
-
- // Copy the atomic variables to ensure they keep the same value
- // during the production
- std::size_t data_packet_size = data_packet_size_;
- uint32_t content_object_expiry_time = content_object_expiry_time_;
- utils::CryptoHashType hash_algo = hash_algorithm_;
- bool making_manifest = making_manifest_;
- auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy(
- suffix_strategy_, start_offset);
- std::shared_ptr<utils::Signer> signer;
- getSocketOption(GeneralTransportOptions::SIGNER, signer);
-
- auto buffer_size = buffer->length();
- int bytes_segmented = 0;
- std::size_t header_size;
- std::size_t manifest_header_size = 0;
- std::size_t signature_length = 0;
- std::uint32_t final_block_number = start_offset;
- uint64_t free_space_for_content = 0;
-
- core::Packet::Format format;
- std::shared_ptr<ContentObjectManifest> manifest;
- bool is_last_manifest = false;
-
- // TODO Manifest may still be used for indexing
- if (making_manifest && !signer) {
- TRANSPORT_LOGE("Making manifests without setting producer identity.");
- }
-
- core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
- core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC;
- if (content_name.getType() == HNT_CONTIGUOUS_V4 ||
- content_name.getType() == HNT_IOV_V4) {
- hf_format = core::Packet::Format::HF_INET_TCP;
- hf_format_ah = core::Packet::Format::HF_INET_TCP_AH;
- } else if (content_name.getType() == HNT_CONTIGUOUS_V6 ||
- content_name.getType() == HNT_IOV_V6) {
- hf_format = core::Packet::Format::HF_INET6_TCP;
- hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH;
- } else {
- throw errors::RuntimeException("Unknown name format.");
- }
-
- format = hf_format;
- if (making_manifest) {
- manifest_header_size = core::Packet::getHeaderSizeFromFormat(
- signer ? hf_format_ah : hf_format,
- signer ? signer->getSignatureLength() : 0);
- } else if (signer) {
- format = hf_format_ah;
- signature_length = signer->getSignatureLength();
- }
-
- header_size =
- core::Packet::getHeaderSizeFromFormat(format, signature_length);
- free_space_for_content = data_packet_size - header_size;
- uint32_t number_of_segments = uint32_t(
- std::ceil(double(buffer_size) / double(free_space_for_content)));
- if (free_space_for_content * number_of_segments < buffer_size) {
- number_of_segments++;
- }
-
- // TODO allocate space for all the headers
- if (making_manifest) {
- uint32_t segment_in_manifest = static_cast<uint32_t>(
- std::floor(double(data_packet_size - manifest_header_size -
- ContentObjectManifest::getManifestHeaderSize()) /
- ContentObjectManifest::getManifestEntrySize()) -
- 1.0);
- uint32_t number_of_manifests = static_cast<uint32_t>(
- std::ceil(float(number_of_segments) / segment_in_manifest));
- final_block_number += number_of_segments + number_of_manifests - 1;
-
- manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
- core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
- hash_algo, is_last_manifest, content_name, suffix_strategy_,
- signer ? signer->getSignatureLength() : 0));
- manifest->setLifetime(content_object_expiry_time);
-
- if (is_last) {
- manifest->setFinalBlockNumber(final_block_number);
- } else {
- manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX);
- }
- }
-
- for (unsigned int packaged_segments = 0;
- packaged_segments < number_of_segments; packaged_segments++) {
- if (making_manifest) {
- if (manifest->estimateManifestSize(2) >
- data_packet_size - manifest_header_size) {
- // Send the current manifest
- manifest->encode();
-
- // If identity set, sign manifest
- if (signer) {
- signer->sign(*manifest);
- }
-
- passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s",
- manifest->getName().toString().c_str());
-
- // Send content objects stored in the queue
- while (!content_queue_.empty()) {
- passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD(
- "Send content %s",
- content_queue_.front()->getName().toString().c_str());
- content_queue_.pop();
- }
-
- // Create new manifest. The reference to the last manifest has been
- // acquired in the passContentObjectToCallbacks function, so we can
- // safely release this reference
- manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
- core::ManifestVersion::VERSION_1,
- core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
- content_name, suffix_strategy_,
- signer ? signer->getSignatureLength() : 0));
-
- manifest->setLifetime(content_object_expiry_time);
- manifest->setFinalBlockNumber(
- is_last ? final_block_number
- : utils::SuffixStrategy::INVALID_SUFFIX);
- }
- }
-
- auto content_suffix = suffix_strategy->getNextContentSuffix();
- auto content_object = std::make_shared<ContentObject>(
- content_name.setSuffix(content_suffix), format);
- content_object->setLifetime(content_object_expiry_time);
-
- auto b = buffer->cloneOne();
- b->trimStart(free_space_for_content * packaged_segments);
- b->trimEnd(b->length());
-
- if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) {
- b->append(buffer_size - bytes_segmented);
- bytes_segmented += (int)(buffer_size - bytes_segmented);
-
- if (is_last && making_manifest) {
- is_last_manifest = true;
- } else if (is_last) {
- content_object->setRst();
- }
-
- } else {
- b->append(free_space_for_content);
- bytes_segmented += (int)(free_space_for_content);
- }
-
- content_object->appendPayload(std::move(b));
-
- if (making_manifest) {
- using namespace std::chrono_literals;
- utils::CryptoHash hash = content_object->computeDigest(hash_algo);
- manifest->addSuffixHash(content_suffix, hash);
- content_queue_.push(content_object);
- } else {
- if (signer) {
- signer->sign(*content_object);
- }
- passContentObjectToCallbacks(content_object);
- TRANSPORT_LOGD("Send content %s",
- content_object->getName().toString().c_str());
- }
- }
-
- if (making_manifest) {
- if (is_last_manifest) {
- manifest->setFinalManifest(is_last_manifest);
- }
-
- manifest->encode();
- if (signer) {
- signer->sign(*manifest);
- }
-
- passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s",
- manifest->getName().toString().c_str());
-
- while (!content_queue_.empty()) {
- passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %s",
- content_queue_.front()->getName().toString().c_str());
- content_queue_.pop();
- }
- }
-
- io_service_.post([this]() {
- std::shared_ptr<ContentObject> co;
- while (object_queue_for_callbacks_.pop(co)) {
- if (on_new_segment_) {
- on_new_segment_(*producer_interface_, *co);
- }
-
- if (on_content_object_to_sign_) {
- on_content_object_to_sign_(*producer_interface_, *co);
- }
-
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_, *co);
- }
-
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *co);
- }
- }
- });
-
- io_service_.dispatch([this, buffer_size]() {
- if (on_content_produced_) {
- on_content_produced_(*producer_interface_,
- std::make_error_code(std::errc(0)), buffer_size);
- }
- });
-
- return suffix_strategy->getTotalCount();
- }
-
- virtual void produce(ContentObject &content_object) {
- io_service_.dispatch([this, &content_object]() {
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_,
- content_object);
- }
- });
-
- output_buffer_.insert(std::static_pointer_cast<ContentObject>(
- content_object.shared_from_this()));
-
- io_service_.dispatch([this, &content_object]() {
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, content_object);
- }
- });
-
- portal_->sendContentObject(content_object);
- }
-
- virtual void produce(const uint8_t *buffer, size_t buffer_size) {
- produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
- }
-
- virtual void produce(std::unique_ptr<utils::MemBuf> &&buffer) {
- // This API is meant to be used just with the RTC producer.
- // Here it cannot be used since no name for the content is specified.
- throw errors::NotImplementedException();
- }
-
- virtual void asyncProduce(const Name &suffix, const uint8_t *buf,
- size_t buffer_size, bool is_last = true,
- uint32_t *start_offset = nullptr) {
- if (!async_thread_.stopped()) {
- async_thread_.add([this, suffix, buffer = buf, size = buffer_size,
- is_last, start_offset]() {
- if (start_offset != nullptr) {
- *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last,
- *start_offset);
- } else {
- ProducerSocket::produce(suffix, buffer, size, is_last, 0);
- }
- });
- }
- }
-
- void asyncProduce(const Name &suffix);
+ bool isRunning() override { return !production_protocol_->isRunning(); };
virtual void asyncProduce(Name content_name,
std::unique_ptr<utils::MemBuf> &&buffer,
@@ -381,75 +111,56 @@ class ProducerSocket : public Socket<BasePortal>,
uint32_t **last_segment = nullptr) {
if (!async_thread_.stopped()) {
auto a = buffer.release();
- async_thread_.add(
- [this, content_name, a, is_last, offset, last_segment]() {
- auto buf = std::unique_ptr<utils::MemBuf>(a);
- if (last_segment != NULL) {
- **last_segment =
- offset + ProducerSocket::produce(content_name, std::move(buf),
- is_last, offset);
- } else {
- ProducerSocket::produce(content_name, std::move(buf), is_last,
- offset);
- }
- });
- }
- }
-
- virtual void asyncProduce(ContentObject &content_object) {
- if (!async_thread_.stopped()) {
- auto co_ptr = std::static_pointer_cast<ContentObject>(
- content_object.shared_from_this());
- async_thread_.add([this, content_object = std::move(co_ptr)]() {
- ProducerSocket::produce(*content_object);
+ async_thread_.add([this, content_name, a, is_last, offset,
+ last_segment]() {
+ auto buf = std::unique_ptr<utils::MemBuf>(a);
+ if (last_segment != NULL) {
+ **last_segment = offset + produceStream(content_name, std::move(buf),
+ is_last, offset);
+ } else {
+ produceStream(content_name, std::move(buf), is_last, offset);
+ }
});
}
}
- virtual void registerPrefix(const Prefix &producer_namespace) {
- served_namespaces_.push_back(producer_namespace);
+ virtual uint32_t produceStream(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true,
+ uint32_t start_offset = 0) {
+ return production_protocol_->produceStream(content_name, std::move(buffer),
+ is_last, start_offset);
}
- void serveForever() {
- if (listening_thread_.joinable()) {
- listening_thread_.join();
- }
+ virtual uint32_t produceStream(const Name &content_name,
+ const uint8_t *buffer, size_t buffer_size,
+ bool is_last = true,
+ uint32_t start_offset = 0) {
+ return production_protocol_->produceStream(
+ content_name, buffer, buffer_size, is_last, start_offset);
}
- void stop() { portal_->stopEventsLoop(); }
-
- asio::io_service &getIoService() override { return portal_->getIoService(); };
-
- virtual void onInterest(Interest &interest) {
- if (on_interest_input_) {
- on_interest_input_(*producer_interface_, interest);
- }
-
- const std::shared_ptr<ContentObject> content_object =
- output_buffer_.find(interest);
-
- if (content_object) {
- if (on_interest_satisfied_output_buffer_) {
- on_interest_satisfied_output_buffer_(*producer_interface_, interest);
- }
+ virtual uint32_t produceDatagram(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer) {
+ return production_protocol_->produceDatagram(content_name,
+ std::move(buffer));
+ }
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *content_object);
- }
+ virtual uint32_t produceDatagram(const Name &content_name,
+ const uint8_t *buffer, size_t buffer_size) {
+ return production_protocol_->produceDatagram(content_name, buffer,
+ buffer_size);
+ }
- portal_->sendContentObject(*content_object);
- } else {
- if (on_interest_process_) {
- on_interest_process_(*producer_interface_, interest);
- }
- }
+ void produce(ContentObject &content_object) {
+ production_protocol_->produce(content_object);
}
- virtual void onInterest(Interest::Ptr &&interest) override {
- onInterest(*interest);
- };
+ void registerPrefix(const Prefix &producer_namespace) {
+ production_protocol_->registerNamespaceWithNetwork(producer_namespace);
+ }
- virtual void onError(std::error_code ec) override {}
+ void stop() { production_protocol_->stop(); }
virtual int setSocketOption(int socket_option_key,
uint32_t socket_option_value) {
@@ -462,7 +173,7 @@ class ProducerSocket : public Socket<BasePortal>,
break;
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- output_buffer_.setLimit(socket_option_value);
+ production_protocol_->setOutputBufferSize(socket_option_value);
break;
case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
@@ -533,6 +244,12 @@ class ProducerSocket : public Socket<BasePortal>,
break;
}
+ case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_to_sign_ = VOID_HANDLER;
+ break;
+ }
+
default:
return SOCKET_OPTION_NOT_SET;
}
@@ -559,19 +276,6 @@ class ProducerSocket : public Socket<BasePortal>,
return SOCKET_OPTION_NOT_SET;
}
- virtual int setSocketOption(int socket_option_key,
- std::list<Prefix> socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- served_namespaces_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
virtual int setSocketOption(
int socket_option_key,
interface::ProducerContentObjectCallback socket_option_value) {
@@ -594,6 +298,10 @@ class ProducerSocket : public Socket<BasePortal>,
on_content_object_output_ = socket_option_value;
break;
+ case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
+ on_content_object_to_sign_ = socket_option_value;
+ break;
+
default:
return SOCKET_OPTION_NOT_SET;
}
@@ -663,7 +371,7 @@ class ProducerSocket : public Socket<BasePortal>,
}
virtual int setSocketOption(int socket_option_key,
- utils::CryptoHashType socket_option_value) {
+ auth::CryptoHashType socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::HASH_ALGORITHM:
hash_algorithm_ = socket_option_value;
@@ -675,11 +383,12 @@ class ProducerSocket : public Socket<BasePortal>,
return SOCKET_OPTION_SET;
}
- virtual int setSocketOption(int socket_option_key,
- utils::CryptoSuite socket_option_value) {
+ virtual int setSocketOption(
+ int socket_option_key,
+ core::NextSegmentCalculationStrategy socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::CRYPTO_SUITE:
- crypto_suite_ = socket_option_value;
+ case GeneralTransportOptions::SUFFIX_STRATEGY:
+ suffix_strategy_ = socket_option_value;
break;
default:
return SOCKET_OPTION_NOT_SET;
@@ -690,7 +399,7 @@ class ProducerSocket : public Socket<BasePortal>,
virtual int setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Signer> &socket_option_value) {
+ const std::shared_ptr<auth::Signer> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::SIGNER: {
utils::SpinLock::Acquire locked(signer_lock_);
@@ -708,7 +417,7 @@ class ProducerSocket : public Socket<BasePortal>,
uint32_t &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)output_buffer_.getLimit();
+ socket_option_value = production_protocol_->getOutputBufferSize();
break;
case GeneralTransportOptions::DATA_PACKET_SIZE:
@@ -733,18 +442,8 @@ class ProducerSocket : public Socket<BasePortal>,
socket_option_value = making_manifest_;
break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- virtual int getSocketOption(int socket_option_key,
- std::list<Prefix> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- socket_option_value = served_namespaces_;
+ case GeneralTransportOptions::ASYNC_MODE:
+ socket_option_value = is_async_;
break;
default:
@@ -776,6 +475,10 @@ class ProducerSocket : public Socket<BasePortal>,
*socket_option_value = &on_content_object_output_;
break;
+ case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN:
+ *socket_option_value = &on_content_object_to_sign_;
+ break;
+
default:
return SOCKET_OPTION_NOT_GET;
}
@@ -828,11 +531,11 @@ class ProducerSocket : public Socket<BasePortal>,
*socket_option_value = &on_interest_inserted_input_buffer_;
break;
- case CACHE_HIT:
+ case ProducerCallbacksOptions::CACHE_HIT:
*socket_option_value = &on_interest_satisfied_output_buffer_;
break;
- case CACHE_MISS:
+ case ProducerCallbacksOptions::CACHE_MISS:
*socket_option_value = &on_interest_process_;
break;
@@ -844,8 +547,9 @@ class ProducerSocket : public Socket<BasePortal>,
});
}
- virtual int getSocketOption(int socket_option_key,
- std::shared_ptr<Portal> &socket_option_value) {
+ virtual int getSocketOption(
+ int socket_option_key,
+ std::shared_ptr<core::Portal> &socket_option_value) {
switch (socket_option_key) {
case PORTAL:
socket_option_value = portal_;
@@ -859,7 +563,7 @@ class ProducerSocket : public Socket<BasePortal>,
}
virtual int getSocketOption(int socket_option_key,
- utils::CryptoHashType &socket_option_value) {
+ auth::CryptoHashType &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::HASH_ALGORITHM:
socket_option_value = hash_algorithm_;
@@ -871,22 +575,22 @@ class ProducerSocket : public Socket<BasePortal>,
return SOCKET_OPTION_GET;
}
- virtual int getSocketOption(int socket_option_key,
- utils::CryptoSuite &socket_option_value) {
+ virtual int getSocketOption(
+ int socket_option_key,
+ core::NextSegmentCalculationStrategy &socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- socket_option_value = crypto_suite_;
+ case GeneralTransportOptions::SUFFIX_STRATEGY:
+ socket_option_value = suffix_strategy_;
break;
default:
return SOCKET_OPTION_NOT_GET;
}
-
return SOCKET_OPTION_GET;
}
virtual int getSocketOption(
int socket_option_key,
- std::shared_ptr<utils::Signer> &socket_option_value) {
+ std::shared_ptr<auth::Signer> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::SIGNER: {
utils::SpinLock::Acquire locked(signer_lock_);
@@ -907,19 +611,21 @@ class ProducerSocket : public Socket<BasePortal>,
// If the thread calling lambda_func is not the same of io_service, this
// function reschedule the function on it
template <typename Lambda, typename arg2>
- int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
- Lambda lambda_func) {
+ int rescheduleOnIOServiceWithReference(int socket_option_key,
+ arg2 &socket_option_value,
+ Lambda lambda_func) {
// To enforce type check
- std::function<int(int, arg2)> func = lambda_func;
+ std::function<int(int, arg2 &)> func = lambda_func;
int result = SOCKET_OPTION_SET;
- if (listening_thread_.joinable() &&
- std::this_thread::get_id() != listening_thread_.get_id()) {
+ if (production_protocol_ && production_protocol_->isRunning()) {
std::mutex mtx;
/* Condition variable for the wait */
std::condition_variable cv;
+
bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getIoService().dispatch([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -939,21 +645,19 @@ class ProducerSocket : public Socket<BasePortal>,
// If the thread calling lambda_func is not the same of io_service, this
// function reschedule the function on it
template <typename Lambda, typename arg2>
- int rescheduleOnIOServiceWithReference(int socket_option_key,
- arg2 &socket_option_value,
- Lambda lambda_func) {
+ int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
+ Lambda lambda_func) {
// To enforce type check
- std::function<int(int, arg2 &)> func = lambda_func;
+ std::function<int(int, arg2)> func = lambda_func;
int result = SOCKET_OPTION_SET;
- if (listening_thread_.joinable() &&
- std::this_thread::get_id() != this->listening_thread_.get_id()) {
+ if (production_protocol_ && production_protocol_->isRunning()) {
std::mutex mtx;
/* Condition variable for the wait */
std::condition_variable cv;
-
bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getIoService().dispatch([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -973,39 +677,20 @@ class ProducerSocket : public Socket<BasePortal>,
// Threads
protected:
interface::ProducerSocket *producer_interface_;
- std::thread listening_thread_;
asio::io_service io_service_;
- std::shared_ptr<Portal> portal_;
std::atomic<size_t> data_packet_size_;
- std::list<Prefix>
- served_namespaces_; // No need to be threadsafe, this is always modified
- // by the application thread
std::atomic<uint32_t> content_object_expiry_time_;
- utils::CircularFifo<std::shared_ptr<ContentObject>, 2048>
- object_queue_for_callbacks_;
-
- // buffers
- // ContentStore is thread-safe
- utils::ContentStore output_buffer_;
-
utils::EventThread async_thread_;
- int registration_status_;
std::atomic<bool> making_manifest_;
-
- // map for storing sequence numbers for several calls of the publish
- // function
- std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_;
-
- std::atomic<utils::CryptoHashType> hash_algorithm_;
- std::atomic<utils::CryptoSuite> crypto_suite_;
+ std::atomic<auth::CryptoHashType> hash_algorithm_;
+ std::atomic<auth::CryptoSuite> crypto_suite_;
utils::SpinLock signer_lock_;
- std::shared_ptr<utils::Signer> signer_;
+ std::shared_ptr<auth::Signer> signer_;
core::NextSegmentCalculationStrategy suffix_strategy_;
- // While manifests are being built, contents are stored in a queue
- std::queue<std::shared_ptr<ContentObject>> content_queue_;
+ std::unique_ptr<protocol::ProductionProtocol> production_protocol_;
// callbacks
ProducerInterestCallback on_interest_input_;
@@ -1021,63 +706,6 @@ class ProducerSocket : public Socket<BasePortal>,
ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_;
ProducerContentCallback on_content_produced_;
-
- private:
- void listen() {
- bool first = true;
-
- for (core::Prefix &producer_namespace : served_namespaces_) {
- if (first) {
- core::BindConfig bind_config(producer_namespace, 1000);
- portal_->bind(bind_config);
- portal_->setProducerCallback(this);
- first = !first;
- } else {
- portal_->registerRoute(producer_namespace);
- }
- }
-
- portal_->runEventsLoop();
- }
-
- void scheduleSendBurst() {
- io_service_.post([this]() {
- std::shared_ptr<ContentObject> co;
-
- for (uint32_t i = 0; i < burst_size; i++) {
- if (object_queue_for_callbacks_.pop(co)) {
- if (on_new_segment_) {
- on_new_segment_(*producer_interface_, *co);
- }
-
- if (on_content_object_to_sign_) {
- on_content_object_to_sign_(*producer_interface_, *co);
- }
-
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_, *co);
- }
-
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *co);
- }
- } else {
- break;
- }
- }
- });
- }
-
- void passContentObjectToCallbacks(
- const std::shared_ptr<ContentObject> &content_object) {
- output_buffer_.insert(content_object);
- portal_->sendContentObject(*content_object);
- object_queue_for_callbacks_.push(std::move(content_object));
-
- if (object_queue_for_callbacks_.size() >= burst_size) {
- scheduleSendBurst();
- }
- }
};
} // namespace implementation