From 81fb39606b069fbece973995572fa7f90ea1950a Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Thu, 14 May 2020 20:21:02 +0200 Subject: [HICN-613] Add io_service to ConsumerSocket constructor. Change-Id: Ic1952388e1d2b1e7457c71ae8a959d97aa0cd2d6 Signed-off-by: Mauro Sardara --- .../src/protocols/byte_stream_reassembly.cc | 12 ++++++++--- .../src/protocols/byte_stream_reassembly.h | 2 +- libtransport/src/protocols/cbr.cc | 1 - libtransport/src/protocols/protocol.cc | 23 +++++++++++++-------- libtransport/src/protocols/protocol.h | 7 ++++--- libtransport/src/protocols/raaqm.cc | 9 +++++--- libtransport/src/protocols/raaqm.h | 3 ++- libtransport/src/protocols/rtc.cc | 24 +++++++++++----------- 8 files changed, 49 insertions(+), 32 deletions(-) (limited to 'libtransport/src/protocols') diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc index 12631637e..6662bec3f 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.cc +++ b/libtransport/src/protocols/byte_stream_reassembly.cc @@ -16,7 +16,6 @@ #include #include #include - #include #include #include @@ -67,7 +66,9 @@ void ByteStreamReassembly::assembleContent() { while (it != received_packets_.end()) { // Check if valid packet if (it->second) { - copyContent(*it->second); + if (TRANSPORT_EXPECT_FALSE(copyContent(*it->second))) { + return; + } } received_packets_.erase(it); @@ -80,7 +81,9 @@ void ByteStreamReassembly::assembleContent() { } } -void ByteStreamReassembly::copyContent(const ContentObject &content_object) { +bool ByteStreamReassembly::copyContent(const ContentObject &content_object) { + bool ret = false; + auto payload = content_object.getPayloadReference(); auto payload_length = payload.second; auto write_size = std::min(payload_length, read_buffer_->tailroom()); @@ -102,10 +105,13 @@ void ByteStreamReassembly::copyContent(const ContentObject &content_object) { index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); if (TRANSPORT_EXPECT_FALSE(download_complete_)) { + ret = download_complete_; notifyApplication(); transport_protocol_->onContentReassembled( make_error_code(protocol_error::success)); } + + return ret; } void ByteStreamReassembly::reInitialize() { diff --git a/libtransport/src/protocols/byte_stream_reassembly.h b/libtransport/src/protocols/byte_stream_reassembly.h index 5e5c9ec6b..e4f62b3a8 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.h +++ b/libtransport/src/protocols/byte_stream_reassembly.h @@ -32,7 +32,7 @@ class ByteStreamReassembly : public Reassembly { virtual void reassemble( std::unique_ptr &&manifest) override; - virtual void copyContent(const core::ContentObject &content_object); + bool copyContent(const core::ContentObject &content_object); virtual void reInitialize() override; diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc index 5df55bd5c..0bffd7d18 100644 --- a/libtransport/src/protocols/cbr.cc +++ b/libtransport/src/protocols/cbr.cc @@ -14,7 +14,6 @@ */ #include - #include namespace transport { diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc index 8463f84f9..d1bd566a0 100644 --- a/libtransport/src/protocols/protocol.cc +++ b/libtransport/src/protocols/protocol.cc @@ -14,7 +14,6 @@ */ #include - #include #include @@ -74,6 +73,7 @@ int TransportProtocol::start() { &verification_failed_callback_); socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload_); + socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); // Schedule next interests scheduleNextInterests(); @@ -83,18 +83,25 @@ int TransportProtocol::start() { // Set the protocol as running is_running_ = true; - // Start Event loop - portal_->runEventsLoop(); + if (!is_async_) { + // Start Event loop + portal_->runEventsLoop(); - // Not running anymore - is_running_ = false; + // Not running anymore + is_running_ = false; + } return 0; } void TransportProtocol::stop() { is_running_ = false; - portal_->stopEventsLoop(); + + if (!is_async_) { + portal_->stopEventsLoop(); + } else { + portal_->clear(); + } } void TransportProtocol::resume() { @@ -110,6 +117,8 @@ void TransportProtocol::resume() { } void TransportProtocol::onContentReassembled(std::error_code ec) { + stop(); + if (!on_payload_) { throw errors::RuntimeException( "The read callback must be installed in the transport before " @@ -122,8 +131,6 @@ void TransportProtocol::onContentReassembled(std::error_code ec) { } else { on_payload_->readError(ec); } - - stop(); } } // end namespace protocol diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h index db4524133..73a0a2c64 100644 --- a/libtransport/src/protocols/protocol.h +++ b/libtransport/src/protocols/protocol.h @@ -15,19 +15,18 @@ #pragma once -#include - #include #include #include #include - #include #include #include #include #include +#include + namespace transport { namespace protocol { @@ -107,6 +106,8 @@ class TransportProtocol : public implementation::BasePortal::ConsumerCallback, interface::ConsumerContentObjectVerificationFailedCallback *verification_failed_callback_; ReadCallback *on_payload_; + + bool is_async_; }; } // end namespace protocol diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index f8da69ceb..8f9ccc4f0 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -14,7 +14,6 @@ */ #include - #include #include #include @@ -36,7 +35,8 @@ RaaqmTransportProtocol::RaaqmTransportProtocol( interests_in_flight_(0), cur_path_(nullptr), t0_(utils::SteadyClock::now()), - rate_estimator_(nullptr) { + rate_estimator_(nullptr), + schedule_interests_(true) { init(); } @@ -451,7 +451,9 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::scheduleNextInterests() { - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + bool cancel = (!is_running_ && !is_first_) || !schedule_interests_; + if (TRANSPORT_EXPECT_FALSE(cancel)) { + schedule_interests_ = true; return; } @@ -522,6 +524,7 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { rate_estimator_->onDownloadFinished(); TransportProtocol::onContentReassembled(ec); + schedule_interests_ = false; } void RaaqmTransportProtocol::updateRtt(uint64_t segment) { diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h index ecc466755..fce4194d4 100644 --- a/libtransport/src/protocols/raaqm.h +++ b/libtransport/src/protocols/raaqm.h @@ -16,7 +16,6 @@ #pragma once #include - #include #include #include @@ -135,6 +134,8 @@ class RaaqmTransportProtocol : public TransportProtocol, double drop_lte_; unsigned int wifi_delay_; unsigned int lte_delay_; + + bool schedule_interests_; }; } // end namespace protocol diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc index 72abb599a..4fb352623 100644 --- a/libtransport/src/protocols/rtc.cc +++ b/libtransport/src/protocols/rtc.cc @@ -13,12 +13,11 @@ * limitations under the License. */ -#include - #include #include - #include +#include + #include namespace transport { @@ -42,11 +41,7 @@ RTCTransportProtocol::RTCTransportProtocol( reset(); } -RTCTransportProtocol::~RTCTransportProtocol() { - if (is_running_) { - stop(); - } -} +RTCTransportProtocol::~RTCTransportProtocol() {} int RTCTransportProtocol::start() { if (is_running_) return -1; @@ -61,17 +56,22 @@ int RTCTransportProtocol::start() { is_first_ = false; is_running_ = true; - portal_->runEventsLoop(); - is_running_ = false; + + if (is_async_) { + portal_->runEventsLoop(); + is_running_ = false; + } return 0; } void RTCTransportProtocol::stop() { if (!is_running_) return; - is_running_ = false; - portal_->stopEventsLoop(); + + if (is_async_) { + portal_->stopEventsLoop(); + } } void RTCTransportProtocol::resume() { -- cgit 1.2.3-korg