From 642ac16179e8a041f37d749f5bf72512f76fcfae Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Wed, 11 Mar 2020 14:01:41 +0100 Subject: [HICN-545] Do not schedule callbacks during segmentation in a per packet basis. Signed-off-by: Mauro Sardara Change-Id: I1531a1fe1d1fa51bb45edab20ee449faa34847c3 --- .../transport/interfaces/socket_options_keys.h | 1 - libtransport/src/implementation/socket_producer.h | 94 +++++++++++++--------- 2 files changed, 57 insertions(+), 38 deletions(-) diff --git a/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h b/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h index 7910a4422..0b7a79c3d 100644 --- a/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h +++ b/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h @@ -94,7 +94,6 @@ typedef enum { CACHE_HIT = 506, CACHE_MISS = 508, NEW_CONTENT_OBJECT = 509, - CONTENT_OBJECT_SIGN = 513, CONTENT_OBJECT_READY = 510, CONTENT_OBJECT_OUTPUT = 511, CONTENT_PRODUCED = 512, diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index d4a239cf6..cae0ad7c7 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -43,6 +43,8 @@ using namespace interface; class ProducerSocket : public Socket, public BasePortal::ProducerCallback { + static constexpr uint32_t burst_size = 256; + public: explicit ProducerSocket(interface::ProducerSocket *producer_socket) : producer_interface_(producer_socket), @@ -293,6 +295,27 @@ class ProducerSocket : public Socket, } } + io_service_.post([this]() { + std::shared_ptr co; + while (object_queue_for_callbacks_.pop(co)) { + if (on_new_segment_) { + on_new_segment_(*producer_interface_, *co); + } + + if (on_content_object_to_sign_) { + on_content_object_to_sign_(*producer_interface_, *co); + } + + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*producer_interface_, *co); + } + + if (on_content_object_output_) { + on_content_object_output_(*producer_interface_, *co); + } + } + }); + io_service_.dispatch([this, buffer_size]() { if (on_content_produced_) { on_content_produced_(*producer_interface_, @@ -300,7 +323,6 @@ class ProducerSocket : public Socket, } }); - TRANSPORT_LOGD("--------- END PRODUCE ------------"); return suffix_strategy->getTotalCount(); } @@ -498,12 +520,6 @@ class ProducerSocket : public Socket, break; } - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - if (socket_option_value == VOID_HANDLER) { - on_content_object_to_sign_ = VOID_HANDLER; - break; - } - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: if (socket_option_value == VOID_HANDLER) { on_content_object_in_output_buffer_ = VOID_HANDLER; @@ -569,10 +585,6 @@ class ProducerSocket : public Socket, on_new_segment_ = socket_option_value; break; - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - on_content_object_to_sign_ = socket_option_value; - break; - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: on_content_object_in_output_buffer_ = socket_option_value; break; @@ -755,10 +767,6 @@ class ProducerSocket : public Socket, *socket_option_value = &on_new_segment_; break; - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - *socket_option_value = &on_content_object_to_sign_; - break; - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: *socket_option_value = &on_content_object_in_output_buffer_; break; @@ -972,6 +980,9 @@ class ProducerSocket : public Socket, // by the application thread std::atomic content_object_expiry_time_; + utils::CircularFifo, 2048> + object_queue_for_callbacks_; + // buffers // ContentStore is thread-safe utils::ContentStore output_buffer_; @@ -1027,36 +1038,45 @@ class ProducerSocket : public Socket, portal_->runEventsLoop(); } - void passContentObjectToCallbacks( - const std::shared_ptr &content_object) { - if (content_object) { - io_service_.dispatch([this, content_object]() { - if (on_new_segment_) { - on_new_segment_(*producer_interface_, *content_object); - } + void scheduleSendBurst() { + io_service_.post([this]() { + std::shared_ptr co; - if (on_content_object_to_sign_) { - on_content_object_to_sign_(*producer_interface_, *content_object); - } + for (uint32_t i = 0; i < burst_size; i++) { + if (object_queue_for_callbacks_.pop(co)) { + if (on_new_segment_) { + on_new_segment_(*producer_interface_, *co); + } - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, - *content_object); - } - }); + if (on_content_object_to_sign_) { + on_content_object_to_sign_(*producer_interface_, *co); + } - output_buffer_.insert(content_object); + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*producer_interface_, *co); + } - io_service_.dispatch([this, content_object]() { - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *content_object); + if (on_content_object_output_) { + on_content_object_output_(*producer_interface_, *co); + } + } else { + break; } - }); + } + }); + } - portal_->sendContentObject(*content_object); + void passContentObjectToCallbacks( + const std::shared_ptr &content_object) { + output_buffer_.insert(content_object); + portal_->sendContentObject(*content_object); + object_queue_for_callbacks_.push(std::move(content_object)); + + if (object_queue_for_callbacks_.size() >= burst_size) { + scheduleSendBurst(); } } -}; // namespace implementation +}; } // namespace implementation -- cgit 1.2.3-korg