aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-05-14 20:21:02 +0200
committerMauro Sardara <msardara@cisco.com>2020-05-20 10:45:53 +0200
commit81fb39606b069fbece973995572fa7f90ea1950a (patch)
tree10c1534707c725eb654741e5b4d280a17ef0c0dc /libtransport/src/protocols
parent67b86555b33c641de14d3c1d0864e571370a71e6 (diff)
[HICN-613] Add io_service to ConsumerSocket constructor.
Change-Id: Ic1952388e1d2b1e7457c71ae8a959d97aa0cd2d6 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/protocols')
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.cc12
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.h2
-rw-r--r--libtransport/src/protocols/cbr.cc1
-rw-r--r--libtransport/src/protocols/protocol.cc23
-rw-r--r--libtransport/src/protocols/protocol.h7
-rw-r--r--libtransport/src/protocols/raaqm.cc9
-rw-r--r--libtransport/src/protocols/raaqm.h3
-rw-r--r--libtransport/src/protocols/rtc.cc24
8 files changed, 49 insertions, 32 deletions
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc
index 12631637e..6662bec3f 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.cc
+++ b/libtransport/src/protocols/byte_stream_reassembly.cc
@@ -16,7 +16,6 @@
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/utils/array.h>
#include <hicn/transport/utils/membuf.h>
-
#include <implementation/socket_consumer.h>
#include <protocols/byte_stream_reassembly.h>
#include <protocols/errors.h>
@@ -67,7 +66,9 @@ void ByteStreamReassembly::assembleContent() {
while (it != received_packets_.end()) {
// Check if valid packet
if (it->second) {
- copyContent(*it->second);
+ if (TRANSPORT_EXPECT_FALSE(copyContent(*it->second))) {
+ return;
+ }
}
received_packets_.erase(it);
@@ -80,7 +81,9 @@ void ByteStreamReassembly::assembleContent() {
}
}
-void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
+bool ByteStreamReassembly::copyContent(const ContentObject &content_object) {
+ bool ret = false;
+
auto payload = content_object.getPayloadReference();
auto payload_length = payload.second;
auto write_size = std::min(payload_length, read_buffer_->tailroom());
@@ -102,10 +105,13 @@ void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
if (TRANSPORT_EXPECT_FALSE(download_complete_)) {
+ ret = download_complete_;
notifyApplication();
transport_protocol_->onContentReassembled(
make_error_code(protocol_error::success));
}
+
+ return ret;
}
void ByteStreamReassembly::reInitialize() {
diff --git a/libtransport/src/protocols/byte_stream_reassembly.h b/libtransport/src/protocols/byte_stream_reassembly.h
index 5e5c9ec6b..e4f62b3a8 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.h
+++ b/libtransport/src/protocols/byte_stream_reassembly.h
@@ -32,7 +32,7 @@ class ByteStreamReassembly : public Reassembly {
virtual void reassemble(
std::unique_ptr<core::ContentObjectManifest> &&manifest) override;
- virtual void copyContent(const core::ContentObject &content_object);
+ bool copyContent(const core::ContentObject &content_object);
virtual void reInitialize() override;
diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc
index 5df55bd5c..0bffd7d18 100644
--- a/libtransport/src/protocols/cbr.cc
+++ b/libtransport/src/protocols/cbr.cc
@@ -14,7 +14,6 @@
*/
#include <implementation/socket_consumer.h>
-
#include <protocols/cbr.h>
namespace transport {
diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc
index 8463f84f9..d1bd566a0 100644
--- a/libtransport/src/protocols/protocol.cc
+++ b/libtransport/src/protocols/protocol.cc
@@ -14,7 +14,6 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
-
#include <implementation/socket_consumer.h>
#include <protocols/protocol.h>
@@ -74,6 +73,7 @@ int TransportProtocol::start() {
&verification_failed_callback_);
socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
&on_payload_);
+ socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
// Schedule next interests
scheduleNextInterests();
@@ -83,18 +83,25 @@ int TransportProtocol::start() {
// Set the protocol as running
is_running_ = true;
- // Start Event loop
- portal_->runEventsLoop();
+ if (!is_async_) {
+ // Start Event loop
+ portal_->runEventsLoop();
- // Not running anymore
- is_running_ = false;
+ // Not running anymore
+ is_running_ = false;
+ }
return 0;
}
void TransportProtocol::stop() {
is_running_ = false;
- portal_->stopEventsLoop();
+
+ if (!is_async_) {
+ portal_->stopEventsLoop();
+ } else {
+ portal_->clear();
+ }
}
void TransportProtocol::resume() {
@@ -110,6 +117,8 @@ void TransportProtocol::resume() {
}
void TransportProtocol::onContentReassembled(std::error_code ec) {
+ stop();
+
if (!on_payload_) {
throw errors::RuntimeException(
"The read callback must be installed in the transport before "
@@ -122,8 +131,6 @@ void TransportProtocol::onContentReassembled(std::error_code ec) {
} else {
on_payload_->readError(ec);
}
-
- stop();
}
} // end namespace protocol
diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h
index db4524133..73a0a2c64 100644
--- a/libtransport/src/protocols/protocol.h
+++ b/libtransport/src/protocols/protocol.h
@@ -15,19 +15,18 @@
#pragma once
-#include <atomic>
-
#include <hicn/transport/interfaces/callbacks.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/statistics.h>
#include <hicn/transport/utils/object_pool.h>
-
#include <implementation/socket.h>
#include <protocols/data_processing_events.h>
#include <protocols/indexer.h>
#include <protocols/packet_manager.h>
#include <protocols/reassembly.h>
+#include <atomic>
+
namespace transport {
namespace protocol {
@@ -107,6 +106,8 @@ class TransportProtocol : public implementation::BasePortal::ConsumerCallback,
interface::ConsumerContentObjectVerificationFailedCallback
*verification_failed_callback_;
ReadCallback *on_payload_;
+
+ bool is_async_;
};
} // end namespace protocol
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index f8da69ceb..8f9ccc4f0 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -14,7 +14,6 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
-
#include <implementation/socket_consumer.h>
#include <protocols/errors.h>
#include <protocols/indexer.h>
@@ -36,7 +35,8 @@ RaaqmTransportProtocol::RaaqmTransportProtocol(
interests_in_flight_(0),
cur_path_(nullptr),
t0_(utils::SteadyClock::now()),
- rate_estimator_(nullptr) {
+ rate_estimator_(nullptr),
+ schedule_interests_(true) {
init();
}
@@ -451,7 +451,9 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::scheduleNextInterests() {
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ bool cancel = (!is_running_ && !is_first_) || !schedule_interests_;
+ if (TRANSPORT_EXPECT_FALSE(cancel)) {
+ schedule_interests_ = true;
return;
}
@@ -522,6 +524,7 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
rate_estimator_->onDownloadFinished();
TransportProtocol::onContentReassembled(ec);
+ schedule_interests_ = false;
}
void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h
index ecc466755..fce4194d4 100644
--- a/libtransport/src/protocols/raaqm.h
+++ b/libtransport/src/protocols/raaqm.h
@@ -16,7 +16,6 @@
#pragma once
#include <hicn/transport/utils/chrono_typedefs.h>
-
#include <protocols/byte_stream_reassembly.h>
#include <protocols/congestion_window_protocol.h>
#include <protocols/protocol.h>
@@ -135,6 +134,8 @@ class RaaqmTransportProtocol : public TransportProtocol,
double drop_lte_;
unsigned int wifi_delay_;
unsigned int lte_delay_;
+
+ bool schedule_interests_;
};
} // end namespace protocol
diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc
index 72abb599a..4fb352623 100644
--- a/libtransport/src/protocols/rtc.cc
+++ b/libtransport/src/protocols/rtc.cc
@@ -13,12 +13,11 @@
* limitations under the License.
*/
-#include <protocols/rtc.h>
-
#include <hicn/transport/interfaces/socket_consumer.h>
#include <implementation/socket_consumer.h>
-
#include <math.h>
+#include <protocols/rtc.h>
+
#include <random>
namespace transport {
@@ -42,11 +41,7 @@ RTCTransportProtocol::RTCTransportProtocol(
reset();
}
-RTCTransportProtocol::~RTCTransportProtocol() {
- if (is_running_) {
- stop();
- }
-}
+RTCTransportProtocol::~RTCTransportProtocol() {}
int RTCTransportProtocol::start() {
if (is_running_) return -1;
@@ -61,17 +56,22 @@ int RTCTransportProtocol::start() {
is_first_ = false;
is_running_ = true;
- portal_->runEventsLoop();
- is_running_ = false;
+
+ if (is_async_) {
+ portal_->runEventsLoop();
+ is_running_ = false;
+ }
return 0;
}
void RTCTransportProtocol::stop() {
if (!is_running_) return;
-
is_running_ = false;
- portal_->stopEventsLoop();
+
+ if (is_async_) {
+ portal_->stopEventsLoop();
+ }
}
void RTCTransportProtocol::resume() {