diff options
Diffstat (limited to 'libtransport/src/implementation/socket_producer.h')
-rw-r--r-- | libtransport/src/implementation/socket_producer.h | 94 |
1 files changed, 57 insertions, 37 deletions
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index d4a239cf6..cae0ad7c7 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -43,6 +43,8 @@ using namespace interface; class ProducerSocket : public Socket<BasePortal>, public BasePortal::ProducerCallback { + static constexpr uint32_t burst_size = 256; + public: explicit ProducerSocket(interface::ProducerSocket *producer_socket) : producer_interface_(producer_socket), @@ -293,6 +295,27 @@ class ProducerSocket : public Socket<BasePortal>, } } + io_service_.post([this]() { + std::shared_ptr<ContentObject> co; + while (object_queue_for_callbacks_.pop(co)) { + if (on_new_segment_) { + on_new_segment_(*producer_interface_, *co); + } + + if (on_content_object_to_sign_) { + on_content_object_to_sign_(*producer_interface_, *co); + } + + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*producer_interface_, *co); + } + + if (on_content_object_output_) { + on_content_object_output_(*producer_interface_, *co); + } + } + }); + io_service_.dispatch([this, buffer_size]() { if (on_content_produced_) { on_content_produced_(*producer_interface_, @@ -300,7 +323,6 @@ class ProducerSocket : public Socket<BasePortal>, } }); - TRANSPORT_LOGD("--------- END PRODUCE ------------"); return suffix_strategy->getTotalCount(); } @@ -498,12 +520,6 @@ class ProducerSocket : public Socket<BasePortal>, break; } - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - if (socket_option_value == VOID_HANDLER) { - on_content_object_to_sign_ = VOID_HANDLER; - break; - } - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: if (socket_option_value == VOID_HANDLER) { on_content_object_in_output_buffer_ = VOID_HANDLER; @@ -569,10 +585,6 @@ class ProducerSocket : public Socket<BasePortal>, on_new_segment_ = socket_option_value; break; - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - on_content_object_to_sign_ = socket_option_value; - break; - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: on_content_object_in_output_buffer_ = socket_option_value; break; @@ -755,10 +767,6 @@ class ProducerSocket : public Socket<BasePortal>, *socket_option_value = &on_new_segment_; break; - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - *socket_option_value = &on_content_object_to_sign_; - break; - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: *socket_option_value = &on_content_object_in_output_buffer_; break; @@ -972,6 +980,9 @@ class ProducerSocket : public Socket<BasePortal>, // by the application thread std::atomic<uint32_t> content_object_expiry_time_; + utils::CircularFifo<std::shared_ptr<ContentObject>, 2048> + object_queue_for_callbacks_; + // buffers // ContentStore is thread-safe utils::ContentStore output_buffer_; @@ -1027,36 +1038,45 @@ class ProducerSocket : public Socket<BasePortal>, portal_->runEventsLoop(); } - void passContentObjectToCallbacks( - const std::shared_ptr<ContentObject> &content_object) { - if (content_object) { - io_service_.dispatch([this, content_object]() { - if (on_new_segment_) { - on_new_segment_(*producer_interface_, *content_object); - } + void scheduleSendBurst() { + io_service_.post([this]() { + std::shared_ptr<ContentObject> co; - if (on_content_object_to_sign_) { - on_content_object_to_sign_(*producer_interface_, *content_object); - } + for (uint32_t i = 0; i < burst_size; i++) { + if (object_queue_for_callbacks_.pop(co)) { + if (on_new_segment_) { + on_new_segment_(*producer_interface_, *co); + } - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, - *content_object); - } - }); + if (on_content_object_to_sign_) { + on_content_object_to_sign_(*producer_interface_, *co); + } - output_buffer_.insert(content_object); + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*producer_interface_, *co); + } - io_service_.dispatch([this, content_object]() { - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *content_object); + if (on_content_object_output_) { + on_content_object_output_(*producer_interface_, *co); + } + } else { + break; } - }); + } + }); + } - portal_->sendContentObject(*content_object); + void passContentObjectToCallbacks( + const std::shared_ptr<ContentObject> &content_object) { + output_buffer_.insert(content_object); + portal_->sendContentObject(*content_object); + object_queue_for_callbacks_.push(std::move(content_object)); + + if (object_queue_for_callbacks_.size() >= burst_size) { + scheduleSendBurst(); } } -}; // namespace implementation +}; } // namespace implementation |