diff options
Diffstat (limited to 'libtransport/src/implementation/socket_producer.h')
-rw-r--r-- | libtransport/src/implementation/socket_producer.h | 632 |
1 files changed, 130 insertions, 502 deletions
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index a6f0f969e..af69cd818 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -15,9 +15,11 @@ #pragma once -#include <hicn/transport/security/signer.h> +#include <hicn/transport/auth/signer.h> #include <hicn/transport/utils/event_thread.h> #include <implementation/socket.h> +#include <protocols/prod_protocol_bytestream.h> +#include <protocols/prod_protocol_rtc.h> #include <utils/content_store.h> #include <utils/suffix_strategy.h> @@ -39,21 +41,17 @@ namespace implementation { using namespace core; using namespace interface; -class ProducerSocket : public Socket<BasePortal>, - public BasePortal::ProducerCallback { - static constexpr uint32_t burst_size = 256; - - public: - explicit ProducerSocket(interface::ProducerSocket *producer_socket) - : producer_interface_(producer_socket), - portal_(std::make_shared<Portal>(io_service_)), +class ProducerSocket : public Socket { + private: + ProducerSocket(interface::ProducerSocket *producer_socket, int protocol, + std::shared_ptr<core::Portal> &&portal) + : Socket(std::move(portal)), + producer_interface_(producer_socket), data_packet_size_(default_values::content_object_packet_size), content_object_expiry_time_(default_values::content_object_expiry_time), - output_buffer_(default_values::producer_socket_output_buffer_size), async_thread_(), - registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), - hash_algorithm_(utils::CryptoHashType::SHA_256), + hash_algorithm_(auth::CryptoHashType::SHA_256), suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), @@ -65,15 +63,33 @@ class ProducerSocket : public Socket<BasePortal>, on_content_object_in_output_buffer_(VOID_HANDLER), on_content_object_output_(VOID_HANDLER), on_content_object_evicted_from_output_buffer_(VOID_HANDLER), - on_content_produced_(VOID_HANDLER) {} - - virtual ~ProducerSocket() { - stop(); - if (listening_thread_.joinable()) { - listening_thread_.join(); + on_content_produced_(VOID_HANDLER) { + switch (protocol) { + case ProductionProtocolAlgorithms::RTC_PROD: + production_protocol_ = + std::make_unique<protocol::RTCProductionProtocol>(this); + break; + case ProductionProtocolAlgorithms::BYTE_STREAM: + default: + production_protocol_ = + std::make_unique<protocol::ByteStreamProductionProtocol>(this); + break; } } + public: + ProducerSocket(interface::ProducerSocket *producer, int protocol) + : ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {} + + ProducerSocket(interface::ProducerSocket *producer, int protocol, + asio::io_service &io_service) + : ProducerSocket(producer, protocol, + std::make_shared<core::Portal>(io_service)) { + is_async_ = true; + } + + virtual ~ProducerSocket() {} + interface::ProducerSocket *getInterface() { return producer_interface_; } @@ -84,296 +100,10 @@ class ProducerSocket : public Socket<BasePortal>, void connect() override { portal_->connect(false); - listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this)); + production_protocol_->start(); } - bool isRunning() override { return !io_service_.stopped(); }; - - virtual uint32_t produce(Name content_name, const uint8_t *buffer, - size_t buffer_size, bool is_last = true, - uint32_t start_offset = 0) { - return ProducerSocket::produce( - content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), is_last, - start_offset); - } - - virtual uint32_t produce(Name content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last = true, uint32_t start_offset = 0) { - if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) { - return 0; - } - - // Copy the atomic variables to ensure they keep the same value - // during the production - std::size_t data_packet_size = data_packet_size_; - uint32_t content_object_expiry_time = content_object_expiry_time_; - utils::CryptoHashType hash_algo = hash_algorithm_; - bool making_manifest = making_manifest_; - auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( - suffix_strategy_, start_offset); - std::shared_ptr<utils::Signer> signer; - getSocketOption(GeneralTransportOptions::SIGNER, signer); - - auto buffer_size = buffer->length(); - int bytes_segmented = 0; - std::size_t header_size; - std::size_t manifest_header_size = 0; - std::size_t signature_length = 0; - std::uint32_t final_block_number = start_offset; - uint64_t free_space_for_content = 0; - - core::Packet::Format format; - std::shared_ptr<ContentObjectManifest> manifest; - bool is_last_manifest = false; - - // TODO Manifest may still be used for indexing - if (making_manifest && !signer) { - TRANSPORT_LOGE("Making manifests without setting producer identity."); - } - - core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; - core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC; - if (content_name.getType() == HNT_CONTIGUOUS_V4 || - content_name.getType() == HNT_IOV_V4) { - hf_format = core::Packet::Format::HF_INET_TCP; - hf_format_ah = core::Packet::Format::HF_INET_TCP_AH; - } else if (content_name.getType() == HNT_CONTIGUOUS_V6 || - content_name.getType() == HNT_IOV_V6) { - hf_format = core::Packet::Format::HF_INET6_TCP; - hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH; - } else { - throw errors::RuntimeException("Unknown name format."); - } - - format = hf_format; - if (making_manifest) { - manifest_header_size = core::Packet::getHeaderSizeFromFormat( - signer ? hf_format_ah : hf_format, - signer ? signer->getSignatureLength() : 0); - } else if (signer) { - format = hf_format_ah; - signature_length = signer->getSignatureLength(); - } - - header_size = - core::Packet::getHeaderSizeFromFormat(format, signature_length); - free_space_for_content = data_packet_size - header_size; - uint32_t number_of_segments = uint32_t( - std::ceil(double(buffer_size) / double(free_space_for_content))); - if (free_space_for_content * number_of_segments < buffer_size) { - number_of_segments++; - } - - // TODO allocate space for all the headers - if (making_manifest) { - uint32_t segment_in_manifest = static_cast<uint32_t>( - std::floor(double(data_packet_size - manifest_header_size - - ContentObjectManifest::getManifestHeaderSize()) / - ContentObjectManifest::getManifestEntrySize()) - - 1.0); - uint32_t number_of_manifests = static_cast<uint32_t>( - std::ceil(float(number_of_segments) / segment_in_manifest)); - final_block_number += number_of_segments + number_of_manifests - 1; - - manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), - core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, - hash_algo, is_last_manifest, content_name, suffix_strategy_, - signer ? signer->getSignatureLength() : 0)); - manifest->setLifetime(content_object_expiry_time); - - if (is_last) { - manifest->setFinalBlockNumber(final_block_number); - } else { - manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX); - } - } - - for (unsigned int packaged_segments = 0; - packaged_segments < number_of_segments; packaged_segments++) { - if (making_manifest) { - if (manifest->estimateManifestSize(2) > - data_packet_size - manifest_header_size) { - // Send the current manifest - manifest->encode(); - - // If identity set, sign manifest - if (signer) { - signer->sign(*manifest); - } - - passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %s", - manifest->getName().toString().c_str()); - - // Send content objects stored in the queue - while (!content_queue_.empty()) { - passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD( - "Send content %s", - content_queue_.front()->getName().toString().c_str()); - content_queue_.pop(); - } - - // Create new manifest. The reference to the last manifest has been - // acquired in the passContentObjectToCallbacks function, so we can - // safely release this reference - manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), - core::ManifestVersion::VERSION_1, - core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, - content_name, suffix_strategy_, - signer ? signer->getSignatureLength() : 0)); - - manifest->setLifetime(content_object_expiry_time); - manifest->setFinalBlockNumber( - is_last ? final_block_number - : utils::SuffixStrategy::INVALID_SUFFIX); - } - } - - auto content_suffix = suffix_strategy->getNextContentSuffix(); - auto content_object = std::make_shared<ContentObject>( - content_name.setSuffix(content_suffix), format); - content_object->setLifetime(content_object_expiry_time); - - auto b = buffer->cloneOne(); - b->trimStart(free_space_for_content * packaged_segments); - b->trimEnd(b->length()); - - if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) { - b->append(buffer_size - bytes_segmented); - bytes_segmented += (int)(buffer_size - bytes_segmented); - - if (is_last && making_manifest) { - is_last_manifest = true; - } else if (is_last) { - content_object->setRst(); - } - - } else { - b->append(free_space_for_content); - bytes_segmented += (int)(free_space_for_content); - } - - content_object->appendPayload(std::move(b)); - - if (making_manifest) { - using namespace std::chrono_literals; - utils::CryptoHash hash = content_object->computeDigest(hash_algo); - manifest->addSuffixHash(content_suffix, hash); - content_queue_.push(content_object); - } else { - if (signer) { - signer->sign(*content_object); - } - passContentObjectToCallbacks(content_object); - TRANSPORT_LOGD("Send content %s", - content_object->getName().toString().c_str()); - } - } - - if (making_manifest) { - if (is_last_manifest) { - manifest->setFinalManifest(is_last_manifest); - } - - manifest->encode(); - if (signer) { - signer->sign(*manifest); - } - - passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %s", - manifest->getName().toString().c_str()); - - while (!content_queue_.empty()) { - passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %s", - content_queue_.front()->getName().toString().c_str()); - content_queue_.pop(); - } - } - - io_service_.post([this]() { - std::shared_ptr<ContentObject> co; - while (object_queue_for_callbacks_.pop(co)) { - if (on_new_segment_) { - on_new_segment_(*producer_interface_, *co); - } - - if (on_content_object_to_sign_) { - on_content_object_to_sign_(*producer_interface_, *co); - } - - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, *co); - } - - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *co); - } - } - }); - - io_service_.dispatch([this, buffer_size]() { - if (on_content_produced_) { - on_content_produced_(*producer_interface_, - std::make_error_code(std::errc(0)), buffer_size); - } - }); - - return suffix_strategy->getTotalCount(); - } - - virtual void produce(ContentObject &content_object) { - io_service_.dispatch([this, &content_object]() { - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, - content_object); - } - }); - - output_buffer_.insert(std::static_pointer_cast<ContentObject>( - content_object.shared_from_this())); - - io_service_.dispatch([this, &content_object]() { - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, content_object); - } - }); - - portal_->sendContentObject(content_object); - } - - virtual void produce(const uint8_t *buffer, size_t buffer_size) { - produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); - } - - virtual void produce(std::unique_ptr<utils::MemBuf> &&buffer) { - // This API is meant to be used just with the RTC producer. - // Here it cannot be used since no name for the content is specified. - throw errors::NotImplementedException(); - } - - virtual void asyncProduce(const Name &suffix, const uint8_t *buf, - size_t buffer_size, bool is_last = true, - uint32_t *start_offset = nullptr) { - if (!async_thread_.stopped()) { - async_thread_.add([this, suffix, buffer = buf, size = buffer_size, - is_last, start_offset]() { - if (start_offset != nullptr) { - *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last, - *start_offset); - } else { - ProducerSocket::produce(suffix, buffer, size, is_last, 0); - } - }); - } - } - - void asyncProduce(const Name &suffix); + bool isRunning() override { return !production_protocol_->isRunning(); }; virtual void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, @@ -381,75 +111,56 @@ class ProducerSocket : public Socket<BasePortal>, uint32_t **last_segment = nullptr) { if (!async_thread_.stopped()) { auto a = buffer.release(); - async_thread_.add( - [this, content_name, a, is_last, offset, last_segment]() { - auto buf = std::unique_ptr<utils::MemBuf>(a); - if (last_segment != NULL) { - **last_segment = - offset + ProducerSocket::produce(content_name, std::move(buf), - is_last, offset); - } else { - ProducerSocket::produce(content_name, std::move(buf), is_last, - offset); - } - }); - } - } - - virtual void asyncProduce(ContentObject &content_object) { - if (!async_thread_.stopped()) { - auto co_ptr = std::static_pointer_cast<ContentObject>( - content_object.shared_from_this()); - async_thread_.add([this, content_object = std::move(co_ptr)]() { - ProducerSocket::produce(*content_object); + async_thread_.add([this, content_name, a, is_last, offset, + last_segment]() { + auto buf = std::unique_ptr<utils::MemBuf>(a); + if (last_segment != NULL) { + **last_segment = offset + produceStream(content_name, std::move(buf), + is_last, offset); + } else { + produceStream(content_name, std::move(buf), is_last, offset); + } }); } } - virtual void registerPrefix(const Prefix &producer_namespace) { - served_namespaces_.push_back(producer_namespace); + virtual uint32_t produceStream(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, + uint32_t start_offset = 0) { + return production_protocol_->produceStream(content_name, std::move(buffer), + is_last, start_offset); } - void serveForever() { - if (listening_thread_.joinable()) { - listening_thread_.join(); - } + virtual uint32_t produceStream(const Name &content_name, + const uint8_t *buffer, size_t buffer_size, + bool is_last = true, + uint32_t start_offset = 0) { + return production_protocol_->produceStream( + content_name, buffer, buffer_size, is_last, start_offset); } - void stop() { portal_->stopEventsLoop(); } - - asio::io_service &getIoService() override { return portal_->getIoService(); }; - - virtual void onInterest(Interest &interest) { - if (on_interest_input_) { - on_interest_input_(*producer_interface_, interest); - } - - const std::shared_ptr<ContentObject> content_object = - output_buffer_.find(interest); - - if (content_object) { - if (on_interest_satisfied_output_buffer_) { - on_interest_satisfied_output_buffer_(*producer_interface_, interest); - } + virtual uint32_t produceDatagram(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer) { + return production_protocol_->produceDatagram(content_name, + std::move(buffer)); + } - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *content_object); - } + virtual uint32_t produceDatagram(const Name &content_name, + const uint8_t *buffer, size_t buffer_size) { + return production_protocol_->produceDatagram(content_name, buffer, + buffer_size); + } - portal_->sendContentObject(*content_object); - } else { - if (on_interest_process_) { - on_interest_process_(*producer_interface_, interest); - } - } + void produce(ContentObject &content_object) { + production_protocol_->produce(content_object); } - virtual void onInterest(Interest::Ptr &&interest) override { - onInterest(*interest); - }; + void registerPrefix(const Prefix &producer_namespace) { + production_protocol_->registerNamespaceWithNetwork(producer_namespace); + } - virtual void onError(std::error_code ec) override {} + void stop() { production_protocol_->stop(); } virtual int setSocketOption(int socket_option_key, uint32_t socket_option_value) { @@ -462,7 +173,7 @@ class ProducerSocket : public Socket<BasePortal>, break; case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - output_buffer_.setLimit(socket_option_value); + production_protocol_->setOutputBufferSize(socket_option_value); break; case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: @@ -533,6 +244,12 @@ class ProducerSocket : public Socket<BasePortal>, break; } + case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN: + if (socket_option_value == VOID_HANDLER) { + on_content_object_to_sign_ = VOID_HANDLER; + break; + } + default: return SOCKET_OPTION_NOT_SET; } @@ -559,19 +276,6 @@ class ProducerSocket : public Socket<BasePortal>, return SOCKET_OPTION_NOT_SET; } - virtual int setSocketOption(int socket_option_key, - std::list<Prefix> socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - served_namespaces_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - virtual int setSocketOption( int socket_option_key, interface::ProducerContentObjectCallback socket_option_value) { @@ -594,6 +298,10 @@ class ProducerSocket : public Socket<BasePortal>, on_content_object_output_ = socket_option_value; break; + case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN: + on_content_object_to_sign_ = socket_option_value; + break; + default: return SOCKET_OPTION_NOT_SET; } @@ -663,7 +371,7 @@ class ProducerSocket : public Socket<BasePortal>, } virtual int setSocketOption(int socket_option_key, - utils::CryptoHashType socket_option_value) { + auth::CryptoHashType socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::HASH_ALGORITHM: hash_algorithm_ = socket_option_value; @@ -675,11 +383,12 @@ class ProducerSocket : public Socket<BasePortal>, return SOCKET_OPTION_SET; } - virtual int setSocketOption(int socket_option_key, - utils::CryptoSuite socket_option_value) { + virtual int setSocketOption( + int socket_option_key, + core::NextSegmentCalculationStrategy socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::CRYPTO_SUITE: - crypto_suite_ = socket_option_value; + case GeneralTransportOptions::SUFFIX_STRATEGY: + suffix_strategy_ = socket_option_value; break; default: return SOCKET_OPTION_NOT_SET; @@ -690,7 +399,7 @@ class ProducerSocket : public Socket<BasePortal>, virtual int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Signer> &socket_option_value) { + const std::shared_ptr<auth::Signer> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SIGNER: { utils::SpinLock::Acquire locked(signer_lock_); @@ -708,7 +417,7 @@ class ProducerSocket : public Socket<BasePortal>, uint32_t &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)output_buffer_.getLimit(); + socket_option_value = production_protocol_->getOutputBufferSize(); break; case GeneralTransportOptions::DATA_PACKET_SIZE: @@ -733,18 +442,8 @@ class ProducerSocket : public Socket<BasePortal>, socket_option_value = making_manifest_; break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - virtual int getSocketOption(int socket_option_key, - std::list<Prefix> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - socket_option_value = served_namespaces_; + case GeneralTransportOptions::ASYNC_MODE: + socket_option_value = is_async_; break; default: @@ -776,6 +475,10 @@ class ProducerSocket : public Socket<BasePortal>, *socket_option_value = &on_content_object_output_; break; + case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN: + *socket_option_value = &on_content_object_to_sign_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -828,11 +531,11 @@ class ProducerSocket : public Socket<BasePortal>, *socket_option_value = &on_interest_inserted_input_buffer_; break; - case CACHE_HIT: + case ProducerCallbacksOptions::CACHE_HIT: *socket_option_value = &on_interest_satisfied_output_buffer_; break; - case CACHE_MISS: + case ProducerCallbacksOptions::CACHE_MISS: *socket_option_value = &on_interest_process_; break; @@ -844,8 +547,9 @@ class ProducerSocket : public Socket<BasePortal>, }); } - virtual int getSocketOption(int socket_option_key, - std::shared_ptr<Portal> &socket_option_value) { + virtual int getSocketOption( + int socket_option_key, + std::shared_ptr<core::Portal> &socket_option_value) { switch (socket_option_key) { case PORTAL: socket_option_value = portal_; @@ -859,7 +563,7 @@ class ProducerSocket : public Socket<BasePortal>, } virtual int getSocketOption(int socket_option_key, - utils::CryptoHashType &socket_option_value) { + auth::CryptoHashType &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::HASH_ALGORITHM: socket_option_value = hash_algorithm_; @@ -871,22 +575,22 @@ class ProducerSocket : public Socket<BasePortal>, return SOCKET_OPTION_GET; } - virtual int getSocketOption(int socket_option_key, - utils::CryptoSuite &socket_option_value) { + virtual int getSocketOption( + int socket_option_key, + core::NextSegmentCalculationStrategy &socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - socket_option_value = crypto_suite_; + case GeneralTransportOptions::SUFFIX_STRATEGY: + socket_option_value = suffix_strategy_; break; default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } virtual int getSocketOption( int socket_option_key, - std::shared_ptr<utils::Signer> &socket_option_value) { + std::shared_ptr<auth::Signer> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SIGNER: { utils::SpinLock::Acquire locked(signer_lock_); @@ -907,19 +611,21 @@ class ProducerSocket : public Socket<BasePortal>, // If the thread calling lambda_func is not the same of io_service, this // function reschedule the function on it template <typename Lambda, typename arg2> - int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, - Lambda lambda_func) { + int rescheduleOnIOServiceWithReference(int socket_option_key, + arg2 &socket_option_value, + Lambda lambda_func) { // To enforce type check - std::function<int(int, arg2)> func = lambda_func; + std::function<int(int, arg2 &)> func = lambda_func; int result = SOCKET_OPTION_SET; - if (listening_thread_.joinable() && - std::this_thread::get_id() != listening_thread_.get_id()) { + if (production_protocol_ && production_protocol_->isRunning()) { std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; + bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getIoService().dispatch([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -939,21 +645,19 @@ class ProducerSocket : public Socket<BasePortal>, // If the thread calling lambda_func is not the same of io_service, this // function reschedule the function on it template <typename Lambda, typename arg2> - int rescheduleOnIOServiceWithReference(int socket_option_key, - arg2 &socket_option_value, - Lambda lambda_func) { + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func) { // To enforce type check - std::function<int(int, arg2 &)> func = lambda_func; + std::function<int(int, arg2)> func = lambda_func; int result = SOCKET_OPTION_SET; - if (listening_thread_.joinable() && - std::this_thread::get_id() != this->listening_thread_.get_id()) { + if (production_protocol_ && production_protocol_->isRunning()) { std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; - bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getIoService().dispatch([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -973,39 +677,20 @@ class ProducerSocket : public Socket<BasePortal>, // Threads protected: interface::ProducerSocket *producer_interface_; - std::thread listening_thread_; asio::io_service io_service_; - std::shared_ptr<Portal> portal_; std::atomic<size_t> data_packet_size_; - std::list<Prefix> - served_namespaces_; // No need to be threadsafe, this is always modified - // by the application thread std::atomic<uint32_t> content_object_expiry_time_; - utils::CircularFifo<std::shared_ptr<ContentObject>, 2048> - object_queue_for_callbacks_; - - // buffers - // ContentStore is thread-safe - utils::ContentStore output_buffer_; - utils::EventThread async_thread_; - int registration_status_; std::atomic<bool> making_manifest_; - - // map for storing sequence numbers for several calls of the publish - // function - std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_; - - std::atomic<utils::CryptoHashType> hash_algorithm_; - std::atomic<utils::CryptoSuite> crypto_suite_; + std::atomic<auth::CryptoHashType> hash_algorithm_; + std::atomic<auth::CryptoSuite> crypto_suite_; utils::SpinLock signer_lock_; - std::shared_ptr<utils::Signer> signer_; + std::shared_ptr<auth::Signer> signer_; core::NextSegmentCalculationStrategy suffix_strategy_; - // While manifests are being built, contents are stored in a queue - std::queue<std::shared_ptr<ContentObject>> content_queue_; + std::unique_ptr<protocol::ProductionProtocol> production_protocol_; // callbacks ProducerInterestCallback on_interest_input_; @@ -1021,63 +706,6 @@ class ProducerSocket : public Socket<BasePortal>, ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_; ProducerContentCallback on_content_produced_; - - private: - void listen() { - bool first = true; - - for (core::Prefix &producer_namespace : served_namespaces_) { - if (first) { - core::BindConfig bind_config(producer_namespace, 1000); - portal_->bind(bind_config); - portal_->setProducerCallback(this); - first = !first; - } else { - portal_->registerRoute(producer_namespace); - } - } - - portal_->runEventsLoop(); - } - - void scheduleSendBurst() { - io_service_.post([this]() { - std::shared_ptr<ContentObject> co; - - for (uint32_t i = 0; i < burst_size; i++) { - if (object_queue_for_callbacks_.pop(co)) { - if (on_new_segment_) { - on_new_segment_(*producer_interface_, *co); - } - - if (on_content_object_to_sign_) { - on_content_object_to_sign_(*producer_interface_, *co); - } - - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, *co); - } - - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *co); - } - } else { - break; - } - } - }); - } - - void passContentObjectToCallbacks( - const std::shared_ptr<ContentObject> &content_object) { - output_buffer_.insert(content_object); - portal_->sendContentObject(*content_object); - object_queue_for_callbacks_.push(std::move(content_object)); - - if (object_queue_for_callbacks_.size() >= burst_size) { - scheduleSendBurst(); - } - } }; } // namespace implementation |