aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/implementation
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/implementation')
-rw-r--r--libtransport/src/implementation/socket_consumer.h23
-rw-r--r--libtransport/src/implementation/socket_producer.h94
2 files changed, 61 insertions, 56 deletions
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
index 2fc8d2b48..488f238ba 100644
--- a/libtransport/src/implementation/socket_consumer.h
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -35,7 +35,7 @@ class ConsumerSocket : public Socket<BasePortal> {
public:
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
: consumer_interface_(consumer),
- portal_(std::make_shared<Portal>(io_service_)),
+ portal_(std::make_shared<Portal>()),
async_downloader_(),
interest_lifetime_(default_values::interest_lifetime),
min_window_size_(default_values::min_window_size),
@@ -62,10 +62,8 @@ class ConsumerSocket : public Socket<BasePortal> {
on_interest_satisfied_(VOID_HANDLER),
on_content_object_input_(VOID_HANDLER),
on_content_object_verification_(VOID_HANDLER),
- on_content_object_(VOID_HANDLER),
stats_summary_(VOID_HANDLER),
read_callback_(nullptr),
- virtual_download_(false),
timer_interval_milliseconds_(0),
guard_raaqm_params_() {
switch (protocol) {
@@ -323,11 +321,6 @@ class ConsumerSocket : public Socket<BasePortal> {
int result = SOCKET_OPTION_NOT_SET;
if (!transport_protocol_->isRunning()) {
switch (socket_option_key) {
- case OtherOptions::VIRTUAL_DOWNLOAD:
- virtual_download_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
-
case GeneralTransportOptions::VERIFY_SIGNATURE:
verify_signature_ = socket_option_value;
result = SOCKET_OPTION_SET;
@@ -631,10 +624,6 @@ class ConsumerSocket : public Socket<BasePortal> {
socket_option_value = transport_protocol_->isRunning();
break;
- case OtherOptions::VIRTUAL_DOWNLOAD:
- socket_option_value = virtual_download_;
- break;
-
case GeneralTransportOptions::VERIFY_SIGNATURE:
socket_option_value = verify_signature_;
break;
@@ -861,8 +850,9 @@ class ConsumerSocket : public Socket<BasePortal> {
/* Condition variable for the wait */
std::condition_variable cv;
bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getIoService().dispatch([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -881,7 +871,6 @@ class ConsumerSocket : public Socket<BasePortal> {
protected:
interface::ConsumerSocket *consumer_interface_;
- asio::io_service io_service_;
std::shared_ptr<Portal> portal_;
utils::EventThread async_downloader_;
@@ -925,15 +914,11 @@ class ConsumerSocket : public Socket<BasePortal> {
ConsumerInterestCallback on_interest_satisfied_;
ConsumerContentObjectCallback on_content_object_input_;
ConsumerContentObjectVerificationCallback on_content_object_verification_;
- ConsumerContentObjectCallback on_content_object_;
ConsumerTimerCallback stats_summary_;
ConsumerContentObjectVerificationFailedCallback verification_failed_callback_;
ReadCallback *read_callback_;
- // Virtual download for traffic generator
- bool virtual_download_;
-
uint32_t timer_interval_milliseconds_;
// Transport protocol
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index d4a239cf6..cae0ad7c7 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -43,6 +43,8 @@ using namespace interface;
class ProducerSocket : public Socket<BasePortal>,
public BasePortal::ProducerCallback {
+ static constexpr uint32_t burst_size = 256;
+
public:
explicit ProducerSocket(interface::ProducerSocket *producer_socket)
: producer_interface_(producer_socket),
@@ -293,6 +295,27 @@ class ProducerSocket : public Socket<BasePortal>,
}
}
+ io_service_.post([this]() {
+ std::shared_ptr<ContentObject> co;
+ while (object_queue_for_callbacks_.pop(co)) {
+ if (on_new_segment_) {
+ on_new_segment_(*producer_interface_, *co);
+ }
+
+ if (on_content_object_to_sign_) {
+ on_content_object_to_sign_(*producer_interface_, *co);
+ }
+
+ if (on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_(*producer_interface_, *co);
+ }
+
+ if (on_content_object_output_) {
+ on_content_object_output_(*producer_interface_, *co);
+ }
+ }
+ });
+
io_service_.dispatch([this, buffer_size]() {
if (on_content_produced_) {
on_content_produced_(*producer_interface_,
@@ -300,7 +323,6 @@ class ProducerSocket : public Socket<BasePortal>,
}
});
- TRANSPORT_LOGD("--------- END PRODUCE ------------");
return suffix_strategy->getTotalCount();
}
@@ -498,12 +520,6 @@ class ProducerSocket : public Socket<BasePortal>,
break;
}
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_to_sign_ = VOID_HANDLER;
- break;
- }
-
case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
if (socket_option_value == VOID_HANDLER) {
on_content_object_in_output_buffer_ = VOID_HANDLER;
@@ -569,10 +585,6 @@ class ProducerSocket : public Socket<BasePortal>,
on_new_segment_ = socket_option_value;
break;
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- on_content_object_to_sign_ = socket_option_value;
- break;
-
case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
on_content_object_in_output_buffer_ = socket_option_value;
break;
@@ -755,10 +767,6 @@ class ProducerSocket : public Socket<BasePortal>,
*socket_option_value = &on_new_segment_;
break;
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- *socket_option_value = &on_content_object_to_sign_;
- break;
-
case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
*socket_option_value = &on_content_object_in_output_buffer_;
break;
@@ -972,6 +980,9 @@ class ProducerSocket : public Socket<BasePortal>,
// by the application thread
std::atomic<uint32_t> content_object_expiry_time_;
+ utils::CircularFifo<std::shared_ptr<ContentObject>, 2048>
+ object_queue_for_callbacks_;
+
// buffers
// ContentStore is thread-safe
utils::ContentStore output_buffer_;
@@ -1027,36 +1038,45 @@ class ProducerSocket : public Socket<BasePortal>,
portal_->runEventsLoop();
}
- void passContentObjectToCallbacks(
- const std::shared_ptr<ContentObject> &content_object) {
- if (content_object) {
- io_service_.dispatch([this, content_object]() {
- if (on_new_segment_) {
- on_new_segment_(*producer_interface_, *content_object);
- }
+ void scheduleSendBurst() {
+ io_service_.post([this]() {
+ std::shared_ptr<ContentObject> co;
- if (on_content_object_to_sign_) {
- on_content_object_to_sign_(*producer_interface_, *content_object);
- }
+ for (uint32_t i = 0; i < burst_size; i++) {
+ if (object_queue_for_callbacks_.pop(co)) {
+ if (on_new_segment_) {
+ on_new_segment_(*producer_interface_, *co);
+ }
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_,
- *content_object);
- }
- });
+ if (on_content_object_to_sign_) {
+ on_content_object_to_sign_(*producer_interface_, *co);
+ }
- output_buffer_.insert(content_object);
+ if (on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_(*producer_interface_, *co);
+ }
- io_service_.dispatch([this, content_object]() {
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *content_object);
+ if (on_content_object_output_) {
+ on_content_object_output_(*producer_interface_, *co);
+ }
+ } else {
+ break;
}
- });
+ }
+ });
+ }
- portal_->sendContentObject(*content_object);
+ void passContentObjectToCallbacks(
+ const std::shared_ptr<ContentObject> &content_object) {
+ output_buffer_.insert(content_object);
+ portal_->sendContentObject(*content_object);
+ object_queue_for_callbacks_.push(std::move(content_object));
+
+ if (object_queue_for_callbacks_.size() >= burst_size) {
+ scheduleSendBurst();
}
}
-}; // namespace implementation
+};
} // namespace implementation