diff options
Diffstat (limited to 'libtransport/src/implementation/socket_producer.h')
-rw-r--r-- | libtransport/src/implementation/socket_producer.h | 207 |
1 files changed, 134 insertions, 73 deletions
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index 9daf79b9d..37151d497 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -40,6 +40,7 @@ namespace implementation { using namespace core; using namespace interface; +using ProducerCallback = interface::ProducerSocket::Callback; class ProducerSocket : public Socket { private: @@ -48,11 +49,14 @@ class ProducerSocket : public Socket { : Socket(std::move(portal)), producer_interface_(producer_socket), data_packet_size_(default_values::content_object_packet_size), + max_segment_size_(default_values::content_object_packet_size), content_object_expiry_time_(default_values::content_object_expiry_time), - async_thread_(), - making_manifest_(false), + making_manifest_(default_values::manifest_capacity), hash_algorithm_(auth::CryptoHashType::SHA256), - suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL), + signer_(std::make_shared<auth::VoidSigner>()), + suffix_strategy_(std::make_shared<utils::IncrementalSuffixStrategy>(0)), + aggregated_data_(false), + fec_setting_(""), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -63,29 +67,30 @@ class ProducerSocket : public Socket { on_content_object_in_output_buffer_(VOID_HANDLER), on_content_object_output_(VOID_HANDLER), on_content_object_evicted_from_output_buffer_(VOID_HANDLER), - on_content_produced_(VOID_HANDLER) { + on_content_produced_(VOID_HANDLER), + application_callback_(nullptr) { switch (protocol) { case ProductionProtocolAlgorithms::RTC_PROD: production_protocol_ = - std::make_unique<protocol::RTCProductionProtocol>(this); + std::make_shared<protocol::RTCProductionProtocol>(this); break; case ProductionProtocolAlgorithms::BYTE_STREAM: default: production_protocol_ = - std::make_unique<protocol::ByteStreamProductionProtocol>(this); + std::make_shared<protocol::ByteStreamProductionProtocol>(this); break; } } public: ProducerSocket(interface::ProducerSocket *producer, int protocol) - : ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {} + : ProducerSocket(producer, protocol, core::Portal::createShared()) { + is_async_ = true; + } ProducerSocket(interface::ProducerSocket *producer, int protocol, - asio::io_service &io_service) - : ProducerSocket(producer, protocol, - std::make_shared<core::Portal>(io_service)) { - is_async_ = true; + ::utils::EventThread &worker) + : ProducerSocket(producer, protocol, core::Portal::createShared(worker)) { } virtual ~ProducerSocket() {} @@ -98,31 +103,9 @@ class ProducerSocket : public Socket { producer_interface_ = producer_socket; } - void connect() override { - portal_->connect(false); - production_protocol_->start(); - } - - bool isRunning() override { return !production_protocol_->isRunning(); }; + void connect() override { portal_->connect(false); } - virtual void asyncProduce(Name content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t offset, - uint32_t **last_segment = nullptr) { - if (!async_thread_.stopped()) { - auto a = buffer.release(); - async_thread_.add([this, content_name, a, is_last, offset, - last_segment]() { - auto buf = std::unique_ptr<utils::MemBuf>(a); - if (last_segment != NULL) { - **last_segment = offset + produceStream(content_name, std::move(buf), - is_last, offset); - } else { - produceStream(content_name, std::move(buf), is_last, offset); - } - }); - } - } + bool isRunning() override { return production_protocol_->isRunning(); }; virtual uint32_t produceStream(const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, @@ -156,12 +139,38 @@ class ProducerSocket : public Socket { production_protocol_->produce(content_object); } + void sendMapme() { production_protocol_->sendMapme(); } + void registerPrefix(const Prefix &producer_namespace) { - production_protocol_->registerNamespaceWithNetwork(producer_namespace); + portal_->registerRoute(producer_namespace); } + void start() { production_protocol_->start(); } void stop() { production_protocol_->stop(); } + using Socket::getSocketOption; + using Socket::setSocketOption; + + virtual int setSocketOption(int socket_option_key, + ProducerCallback *socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::PRODUCER_CALLBACK: + application_callback_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + virtual int setSocketOption(int socket_option_key, uint32_t socket_option_value) { switch (socket_option_key) { @@ -172,6 +181,17 @@ class ProducerSocket : public Socket { } break; + case GeneralTransportOptions::MAKE_MANIFEST: + making_manifest_ = socket_option_value; + break; + + case GeneralTransportOptions::MAX_SEGMENT_SIZE: + if (socket_option_value <= default_values::max_content_object_size && + socket_option_value > 0) { + max_segment_size_ = socket_option_value; + } + break; + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: production_protocol_->setOutputBufferSize(socket_option_value); break; @@ -260,8 +280,8 @@ class ProducerSocket : public Socket { virtual int setSocketOption(int socket_option_key, bool socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - making_manifest_ = socket_option_value; + case RtcTransportOptions::AGGREGATED_DATA: + aggregated_data_ = socket_option_value; break; default: @@ -385,7 +405,7 @@ class ProducerSocket : public Socket { virtual int setSocketOption( int socket_option_key, - core::NextSegmentCalculationStrategy socket_option_value) { + const std::shared_ptr<utils::SuffixStrategy> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SUFFIX_STRATEGY: suffix_strategy_ = socket_option_value; @@ -413,9 +433,33 @@ class ProducerSocket : public Socket { return SOCKET_OPTION_SET; } + int getSocketOption(int socket_option_key, + ProducerCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::PRODUCER_CALLBACK: + *socket_option_value = application_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + virtual int getSocketOption(int socket_option_key, uint32_t &socket_option_value) { switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + socket_option_value = making_manifest_; + break; + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: socket_option_value = production_protocol_->getOutputBufferSize(); break; @@ -424,6 +468,10 @@ class ProducerSocket : public Socket { socket_option_value = (uint32_t)data_packet_size_; break; + case GeneralTransportOptions::MAX_SEGMENT_SIZE: + socket_option_value = (uint32_t)max_segment_size_; + break; + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: socket_option_value = content_object_expiry_time_; break; @@ -438,14 +486,14 @@ class ProducerSocket : public Socket { virtual int getSocketOption(int socket_option_key, bool &socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - socket_option_value = making_manifest_; - break; - case GeneralTransportOptions::ASYNC_MODE: socket_option_value = is_async_; break; + case RtcTransportOptions::AGGREGATED_DATA: + socket_option_value = aggregated_data_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -547,21 +595,6 @@ class ProducerSocket : public Socket { }); } - virtual int getSocketOption( - int socket_option_key, - std::shared_ptr<core::Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - default: - return SOCKET_OPTION_NOT_GET; - ; - } - - return SOCKET_OPTION_GET; - } - virtual int getSocketOption(int socket_option_key, auth::CryptoHashType &socket_option_value) { switch (socket_option_key) { @@ -577,7 +610,7 @@ class ProducerSocket : public Socket { virtual int getSocketOption( int socket_option_key, - core::NextSegmentCalculationStrategy &socket_option_value) { + std::shared_ptr<utils::SuffixStrategy> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SUFFIX_STRATEGY: socket_option_value = suffix_strategy_; @@ -603,9 +636,31 @@ class ProducerSocket : public Socket { return SOCKET_OPTION_GET; } + int getSocketOption(int socket_option_key, std::string &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::FEC_TYPE: + socket_option_value = fec_setting_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + virtual int setSocketOption(int socket_option_key, const std::string &socket_option_value) { - return SOCKET_OPTION_NOT_SET; + int result = SOCKET_OPTION_NOT_SET; + switch (socket_option_key) { + case GeneralTransportOptions::FEC_TYPE: + fec_setting_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + default: + return result; + } + return result; } // If the thread calling lambda_func is not the same of io_service, this @@ -623,9 +678,9 @@ class ProducerSocket : public Socket { std::condition_variable cv; bool done = false; - portal_->getIoService().dispatch([&socket_option_key, - &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getThread().tryRunHandlerNow([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -655,9 +710,9 @@ class ProducerSocket : public Socket { /* Condition variable for the wait */ std::condition_variable cv; bool done = false; - portal_->getIoService().dispatch([&socket_option_key, - &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getThread().tryRunHandlerNow([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -677,20 +732,24 @@ class ProducerSocket : public Socket { // Threads protected: interface::ProducerSocket *producer_interface_; - asio::io_service io_service_; std::atomic<size_t> data_packet_size_; + std::atomic<size_t> max_segment_size_; std::atomic<uint32_t> content_object_expiry_time_; - utils::EventThread async_thread_; - - std::atomic<bool> making_manifest_; + std::atomic<uint32_t> making_manifest_; std::atomic<auth::CryptoHashType> hash_algorithm_; std::atomic<auth::CryptoSuite> crypto_suite_; utils::SpinLock signer_lock_; std::shared_ptr<auth::Signer> signer_; - core::NextSegmentCalculationStrategy suffix_strategy_; + std::shared_ptr<utils::SuffixStrategy> suffix_strategy_; + + std::shared_ptr<protocol::ProductionProtocol> production_protocol_; - std::unique_ptr<protocol::ProductionProtocol> production_protocol_; + // RTC transport + bool aggregated_data_; + + // FEC setting + std::string fec_setting_; // callbacks ProducerInterestCallback on_interest_input_; @@ -706,6 +765,8 @@ class ProducerSocket : public Socket { ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_; ProducerContentCallback on_content_produced_; + + ProducerCallback *application_callback_; }; } // namespace implementation |