diff options
Diffstat (limited to 'libtransport/src/implementation')
-rw-r--r-- | libtransport/src/implementation/socket_consumer.h | 23 | ||||
-rw-r--r-- | libtransport/src/implementation/socket_producer.h | 94 |
2 files changed, 61 insertions, 56 deletions
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index 2fc8d2b48..488f238ba 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -35,7 +35,7 @@ class ConsumerSocket : public Socket<BasePortal> { public: ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) : consumer_interface_(consumer), - portal_(std::make_shared<Portal>(io_service_)), + portal_(std::make_shared<Portal>()), async_downloader_(), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), @@ -62,10 +62,8 @@ class ConsumerSocket : public Socket<BasePortal> { on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), on_content_object_verification_(VOID_HANDLER), - on_content_object_(VOID_HANDLER), stats_summary_(VOID_HANDLER), read_callback_(nullptr), - virtual_download_(false), timer_interval_milliseconds_(0), guard_raaqm_params_() { switch (protocol) { @@ -323,11 +321,6 @@ class ConsumerSocket : public Socket<BasePortal> { int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { - case OtherOptions::VIRTUAL_DOWNLOAD: - virtual_download_ = socket_option_value; - result = SOCKET_OPTION_SET; - break; - case GeneralTransportOptions::VERIFY_SIGNATURE: verify_signature_ = socket_option_value; result = SOCKET_OPTION_SET; @@ -631,10 +624,6 @@ class ConsumerSocket : public Socket<BasePortal> { socket_option_value = transport_protocol_->isRunning(); break; - case OtherOptions::VIRTUAL_DOWNLOAD: - socket_option_value = virtual_download_; - break; - case GeneralTransportOptions::VERIFY_SIGNATURE: socket_option_value = verify_signature_; break; @@ -861,8 +850,9 @@ class ConsumerSocket : public Socket<BasePortal> { /* Condition variable for the wait */ std::condition_variable cv; bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getIoService().dispatch([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -881,7 +871,6 @@ class ConsumerSocket : public Socket<BasePortal> { protected: interface::ConsumerSocket *consumer_interface_; - asio::io_service io_service_; std::shared_ptr<Portal> portal_; utils::EventThread async_downloader_; @@ -925,15 +914,11 @@ class ConsumerSocket : public Socket<BasePortal> { ConsumerInterestCallback on_interest_satisfied_; ConsumerContentObjectCallback on_content_object_input_; ConsumerContentObjectVerificationCallback on_content_object_verification_; - ConsumerContentObjectCallback on_content_object_; ConsumerTimerCallback stats_summary_; ConsumerContentObjectVerificationFailedCallback verification_failed_callback_; ReadCallback *read_callback_; - // Virtual download for traffic generator - bool virtual_download_; - uint32_t timer_interval_milliseconds_; // Transport protocol diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index d4a239cf6..cae0ad7c7 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -43,6 +43,8 @@ using namespace interface; class ProducerSocket : public Socket<BasePortal>, public BasePortal::ProducerCallback { + static constexpr uint32_t burst_size = 256; + public: explicit ProducerSocket(interface::ProducerSocket *producer_socket) : producer_interface_(producer_socket), @@ -293,6 +295,27 @@ class ProducerSocket : public Socket<BasePortal>, } } + io_service_.post([this]() { + std::shared_ptr<ContentObject> co; + while (object_queue_for_callbacks_.pop(co)) { + if (on_new_segment_) { + on_new_segment_(*producer_interface_, *co); + } + + if (on_content_object_to_sign_) { + on_content_object_to_sign_(*producer_interface_, *co); + } + + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*producer_interface_, *co); + } + + if (on_content_object_output_) { + on_content_object_output_(*producer_interface_, *co); + } + } + }); + io_service_.dispatch([this, buffer_size]() { if (on_content_produced_) { on_content_produced_(*producer_interface_, @@ -300,7 +323,6 @@ class ProducerSocket : public Socket<BasePortal>, } }); - TRANSPORT_LOGD("--------- END PRODUCE ------------"); return suffix_strategy->getTotalCount(); } @@ -498,12 +520,6 @@ class ProducerSocket : public Socket<BasePortal>, break; } - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - if (socket_option_value == VOID_HANDLER) { - on_content_object_to_sign_ = VOID_HANDLER; - break; - } - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: if (socket_option_value == VOID_HANDLER) { on_content_object_in_output_buffer_ = VOID_HANDLER; @@ -569,10 +585,6 @@ class ProducerSocket : public Socket<BasePortal>, on_new_segment_ = socket_option_value; break; - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - on_content_object_to_sign_ = socket_option_value; - break; - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: on_content_object_in_output_buffer_ = socket_option_value; break; @@ -755,10 +767,6 @@ class ProducerSocket : public Socket<BasePortal>, *socket_option_value = &on_new_segment_; break; - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - *socket_option_value = &on_content_object_to_sign_; - break; - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: *socket_option_value = &on_content_object_in_output_buffer_; break; @@ -972,6 +980,9 @@ class ProducerSocket : public Socket<BasePortal>, // by the application thread std::atomic<uint32_t> content_object_expiry_time_; + utils::CircularFifo<std::shared_ptr<ContentObject>, 2048> + object_queue_for_callbacks_; + // buffers // ContentStore is thread-safe utils::ContentStore output_buffer_; @@ -1027,36 +1038,45 @@ class ProducerSocket : public Socket<BasePortal>, portal_->runEventsLoop(); } - void passContentObjectToCallbacks( - const std::shared_ptr<ContentObject> &content_object) { - if (content_object) { - io_service_.dispatch([this, content_object]() { - if (on_new_segment_) { - on_new_segment_(*producer_interface_, *content_object); - } + void scheduleSendBurst() { + io_service_.post([this]() { + std::shared_ptr<ContentObject> co; - if (on_content_object_to_sign_) { - on_content_object_to_sign_(*producer_interface_, *content_object); - } + for (uint32_t i = 0; i < burst_size; i++) { + if (object_queue_for_callbacks_.pop(co)) { + if (on_new_segment_) { + on_new_segment_(*producer_interface_, *co); + } - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, - *content_object); - } - }); + if (on_content_object_to_sign_) { + on_content_object_to_sign_(*producer_interface_, *co); + } - output_buffer_.insert(content_object); + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*producer_interface_, *co); + } - io_service_.dispatch([this, content_object]() { - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *content_object); + if (on_content_object_output_) { + on_content_object_output_(*producer_interface_, *co); + } + } else { + break; } - }); + } + }); + } - portal_->sendContentObject(*content_object); + void passContentObjectToCallbacks( + const std::shared_ptr<ContentObject> &content_object) { + output_buffer_.insert(content_object); + portal_->sendContentObject(*content_object); + object_queue_for_callbacks_.push(std::move(content_object)); + + if (object_queue_for_callbacks_.size() >= burst_size) { + scheduleSendBurst(); } } -}; // namespace implementation +}; } // namespace implementation |