aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/includes/hicn/transport/interfaces/socket_options_keys.h1
-rw-r--r--libtransport/src/implementation/socket_producer.h94
2 files changed, 57 insertions, 38 deletions
diff --git a/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h b/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h
index 7910a4422..0b7a79c3d 100644
--- a/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h
+++ b/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h
@@ -94,7 +94,6 @@ typedef enum {
CACHE_HIT = 506,
CACHE_MISS = 508,
NEW_CONTENT_OBJECT = 509,
- CONTENT_OBJECT_SIGN = 513,
CONTENT_OBJECT_READY = 510,
CONTENT_OBJECT_OUTPUT = 511,
CONTENT_PRODUCED = 512,
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index d4a239cf6..cae0ad7c7 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -43,6 +43,8 @@ using namespace interface;
class ProducerSocket : public Socket<BasePortal>,
public BasePortal::ProducerCallback {
+ static constexpr uint32_t burst_size = 256;
+
public:
explicit ProducerSocket(interface::ProducerSocket *producer_socket)
: producer_interface_(producer_socket),
@@ -293,6 +295,27 @@ class ProducerSocket : public Socket<BasePortal>,
}
}
+ io_service_.post([this]() {
+ std::shared_ptr<ContentObject> co;
+ while (object_queue_for_callbacks_.pop(co)) {
+ if (on_new_segment_) {
+ on_new_segment_(*producer_interface_, *co);
+ }
+
+ if (on_content_object_to_sign_) {
+ on_content_object_to_sign_(*producer_interface_, *co);
+ }
+
+ if (on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_(*producer_interface_, *co);
+ }
+
+ if (on_content_object_output_) {
+ on_content_object_output_(*producer_interface_, *co);
+ }
+ }
+ });
+
io_service_.dispatch([this, buffer_size]() {
if (on_content_produced_) {
on_content_produced_(*producer_interface_,
@@ -300,7 +323,6 @@ class ProducerSocket : public Socket<BasePortal>,
}
});
- TRANSPORT_LOGD("--------- END PRODUCE ------------");
return suffix_strategy->getTotalCount();
}
@@ -498,12 +520,6 @@ class ProducerSocket : public Socket<BasePortal>,
break;
}
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_to_sign_ = VOID_HANDLER;
- break;
- }
-
case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
if (socket_option_value == VOID_HANDLER) {
on_content_object_in_output_buffer_ = VOID_HANDLER;
@@ -569,10 +585,6 @@ class ProducerSocket : public Socket<BasePortal>,
on_new_segment_ = socket_option_value;
break;
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- on_content_object_to_sign_ = socket_option_value;
- break;
-
case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
on_content_object_in_output_buffer_ = socket_option_value;
break;
@@ -755,10 +767,6 @@ class ProducerSocket : public Socket<BasePortal>,
*socket_option_value = &on_new_segment_;
break;
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- *socket_option_value = &on_content_object_to_sign_;
- break;
-
case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
*socket_option_value = &on_content_object_in_output_buffer_;
break;
@@ -972,6 +980,9 @@ class ProducerSocket : public Socket<BasePortal>,
// by the application thread
std::atomic<uint32_t> content_object_expiry_time_;
+ utils::CircularFifo<std::shared_ptr<ContentObject>, 2048>
+ object_queue_for_callbacks_;
+
// buffers
// ContentStore is thread-safe
utils::ContentStore output_buffer_;
@@ -1027,36 +1038,45 @@ class ProducerSocket : public Socket<BasePortal>,
portal_->runEventsLoop();
}
- void passContentObjectToCallbacks(
- const std::shared_ptr<ContentObject> &content_object) {
- if (content_object) {
- io_service_.dispatch([this, content_object]() {
- if (on_new_segment_) {
- on_new_segment_(*producer_interface_, *content_object);
- }
+ void scheduleSendBurst() {
+ io_service_.post([this]() {
+ std::shared_ptr<ContentObject> co;
- if (on_content_object_to_sign_) {
- on_content_object_to_sign_(*producer_interface_, *content_object);
- }
+ for (uint32_t i = 0; i < burst_size; i++) {
+ if (object_queue_for_callbacks_.pop(co)) {
+ if (on_new_segment_) {
+ on_new_segment_(*producer_interface_, *co);
+ }
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_,
- *content_object);
- }
- });
+ if (on_content_object_to_sign_) {
+ on_content_object_to_sign_(*producer_interface_, *co);
+ }
- output_buffer_.insert(content_object);
+ if (on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_(*producer_interface_, *co);
+ }
- io_service_.dispatch([this, content_object]() {
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *content_object);
+ if (on_content_object_output_) {
+ on_content_object_output_(*producer_interface_, *co);
+ }
+ } else {
+ break;
}
- });
+ }
+ });
+ }
- portal_->sendContentObject(*content_object);
+ void passContentObjectToCallbacks(
+ const std::shared_ptr<ContentObject> &content_object) {
+ output_buffer_.insert(content_object);
+ portal_->sendContentObject(*content_object);
+ object_queue_for_callbacks_.push(std::move(content_object));
+
+ if (object_queue_for_callbacks_.size() >= burst_size) {
+ scheduleSendBurst();
}
}
-}; // namespace implementation
+};
} // namespace implementation