summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-05-14 20:21:02 +0200
committerMauro Sardara <msardara@cisco.com>2020-05-20 10:45:53 +0200
commit81fb39606b069fbece973995572fa7f90ea1950a (patch)
tree10c1534707c725eb654741e5b4d280a17ef0c0dc
parent67b86555b33c641de14d3c1d0864e571370a71e6 (diff)
[HICN-613] Add io_service to ConsumerSocket constructor.
Change-Id: Ic1952388e1d2b1e7457c71ae8a959d97aa0cd2d6 Signed-off-by: Mauro Sardara <msardara@cisco.com>
-rw-r--r--libtransport/includes/hicn/transport/http/request.h5
-rw-r--r--libtransport/includes/hicn/transport/interfaces/socket_consumer.h21
-rw-r--r--libtransport/includes/hicn/transport/utils/CMakeLists.txt1
-rw-r--r--libtransport/includes/hicn/transport/utils/event_thread.h (renamed from libtransport/src/utils/event_thread.h)4
-rw-r--r--libtransport/includes/hicn/transport/utils/ring_buffer.h1
-rw-r--r--libtransport/src/core/memif_connector.cc4
-rw-r--r--libtransport/src/core/portal.h9
-rw-r--r--libtransport/src/http/request.cc57
-rw-r--r--libtransport/src/http/response.cc14
-rw-r--r--libtransport/src/implementation/socket_consumer.h27
-rw-r--r--libtransport/src/implementation/socket_producer.h20
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.cc6
-rw-r--r--libtransport/src/interfaces/socket_consumer.cc6
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.cc12
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.h2
-rw-r--r--libtransport/src/protocols/cbr.cc1
-rw-r--r--libtransport/src/protocols/protocol.cc23
-rw-r--r--libtransport/src/protocols/protocol.h7
-rw-r--r--libtransport/src/protocols/raaqm.cc9
-rw-r--r--libtransport/src/protocols/raaqm.h3
-rw-r--r--libtransport/src/protocols/rtc.cc24
-rw-r--r--libtransport/src/utils/CMakeLists.txt1
22 files changed, 184 insertions, 73 deletions
diff --git a/libtransport/includes/hicn/transport/http/request.h b/libtransport/includes/hicn/transport/http/request.h
index 54904d696..b62f5b061 100644
--- a/libtransport/includes/hicn/transport/http/request.h
+++ b/libtransport/includes/hicn/transport/http/request.h
@@ -46,6 +46,11 @@ class HTTPRequest : public HTTPMessage {
std::string getRequestString() const;
+ static std::size_t parseHeaders(const uint8_t *buffer, std::size_t size,
+ HTTPHeaders &headers,
+ std::string &http_version,
+ std::string &method, std::string &url);
+
private:
std::string query_string_, path_, protocol_, locator_, port_;
std::string request_string_;
diff --git a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h
index 73cbb78b0..2447f9b5b 100644
--- a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h
@@ -145,7 +145,7 @@ class ConsumerSocket {
* @param protocol - The transport protocol to use. So far the following
* transport are supported:
* - CBR: Constant bitrate
- * - Raaqm: Based on paper: Optimal multipath congestion control and request
+ * - RAAQM: Based on paper: Optimal multipath congestion control and request
* forwarding in information-centric networks: Protocol design and
* experimentation. G Carofiglio, M Gallo, L Muscariello. Computer Networks
* 110, 104-117
@@ -154,6 +154,25 @@ class ConsumerSocket {
explicit ConsumerSocket(int protocol);
/**
+ * @brief Create a new consumer socket, passing an io_service to it.
+ * Passing an io_service means that the caller must explicitely call
+ * io_service.run() for the consumer to start. Any call to consume won't be
+ * blocking. This can be used in case we want to share a single thread
+ * among multiple consumer sockets. The caller MUST ensure the provided
+ * io_service will outlive the ConsumerSocket.
+ *
+ * @param protocol - The transport protocol to use. So far the following
+ * transport are supported:
+ * - CBR: Constant bitrate
+ * - RAAQM: Based on paper: Optimal multipath congestion control and request
+ * forwarding in information-centric networks: Protocol design and
+ * experimentation. G Carofiglio, M Gallo, L Muscariello. Computer Networks
+ * 110, 104-117
+ * - RTC: Real time communication
+ */
+ explicit ConsumerSocket(int protocol, asio::io_service &io_service);
+
+ /**
* @brief Destroy the consumer socket.
*/
~ConsumerSocket();
diff --git a/libtransport/includes/hicn/transport/utils/CMakeLists.txt b/libtransport/includes/hicn/transport/utils/CMakeLists.txt
index 38ecc3d37..f9a98dc69 100644
--- a/libtransport/includes/hicn/transport/utils/CMakeLists.txt
+++ b/libtransport/includes/hicn/transport/utils/CMakeLists.txt
@@ -29,6 +29,7 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/membuf.h
${CMAKE_CURRENT_SOURCE_DIR}/spinlock.h
${CMAKE_CURRENT_SOURCE_DIR}/fixed_block_allocator.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/event_thread.h
)
if(NOT WIN32)
diff --git a/libtransport/src/utils/event_thread.h b/libtransport/includes/hicn/transport/utils/event_thread.h
index e50ae9648..db1194821 100644
--- a/libtransport/src/utils/event_thread.h
+++ b/libtransport/includes/hicn/transport/utils/event_thread.h
@@ -15,10 +15,12 @@
#pragma once
+#include <hicn/transport/config.h>
#include <hicn/transport/errors/runtime_exception.h>
-#include <memory>
#include <asio.hpp>
+#include <memory>
+#include <thread>
namespace utils {
diff --git a/libtransport/includes/hicn/transport/utils/ring_buffer.h b/libtransport/includes/hicn/transport/utils/ring_buffer.h
index 9babe56bd..52629b82b 100644
--- a/libtransport/includes/hicn/transport/utils/ring_buffer.h
+++ b/libtransport/includes/hicn/transport/utils/ring_buffer.h
@@ -17,6 +17,7 @@
#include <atomic>
#include <cstddef>
+#include <utility>
namespace utils {
diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc
index 49f262ec8..553aab42a 100644
--- a/libtransport/src/core/memif_connector.cc
+++ b/libtransport/src/core/memif_connector.cc
@@ -13,13 +13,13 @@
* limitations under the License.
*/
-#include <hicn/transport/errors/not_implemented_exception.h>
-
#include <core/memif_connector.h>
+#include <hicn/transport/errors/not_implemented_exception.h>
#ifdef __vpp__
#include <sys/epoll.h>
+
#include <cstdlib>
extern "C" {
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h
index 05715543a..364a36577 100644
--- a/libtransport/src/core/portal.h
+++ b/libtransport/src/core/portal.h
@@ -15,6 +15,9 @@
#pragma once
+#include <core/forwarder_interface.h>
+#include <core/pending_interest.h>
+#include <core/udp_socket_connector.h>
#include <hicn/transport/config.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/interest.h>
@@ -23,12 +26,8 @@
#include <hicn/transport/errors/errors.h>
#include <hicn/transport/interfaces/portal.h>
#include <hicn/transport/portability/portability.h>
-#include <hicn/transport/utils/log.h>
#include <hicn/transport/utils/fixed_block_allocator.h>
-
-#include <core/forwarder_interface.h>
-#include <core/pending_interest.h>
-#include <core/udp_socket_connector.h>
+#include <hicn/transport/utils/log.h>
#ifdef __vpp__
#include <core/memif_connector.h>
diff --git a/libtransport/src/http/request.cc b/libtransport/src/http/request.cc
index 09f709642..87e499cc6 100644
--- a/libtransport/src/http/request.cc
+++ b/libtransport/src/http/request.cc
@@ -69,6 +69,63 @@ std::string HTTPRequest::getQueryString() const { return query_string_; }
std::string HTTPRequest::getRequestString() const { return request_string_; }
+std::size_t HTTPRequest::parseHeaders(const uint8_t *buffer, std::size_t size,
+ HTTPHeaders &headers,
+ std::string &http_version,
+ std::string &method, std::string &url) {
+ const char *crlf2 = "\r\n\r\n";
+ const char *begin = (const char *)buffer;
+ const char *end = begin + size;
+ const char *begincrlf2 = (const char *)crlf2;
+ const char *endcrlf2 = begincrlf2 + strlen(crlf2);
+ auto it = std::search(begin, end, begincrlf2, endcrlf2);
+
+ if (it != end) {
+ std::stringstream ss;
+ ss.str(std::string(begin, it + 2));
+
+ std::string line;
+ getline(ss, line);
+ std::istringstream line_s(line);
+ std::string _http_version;
+
+ line_s >> method;
+ line_s >> url;
+ line_s >> _http_version;
+ std::size_t separator;
+ if ((separator = _http_version.find('/')) != std::string::npos) {
+ if (_http_version.substr(0, separator) != "HTTP") {
+ return 0;
+ }
+ http_version =
+ line.substr(separator + 1, _http_version.length() - separator - 1);
+ } else {
+ return 0;
+ }
+
+ std::size_t param_end;
+ std::size_t value_start;
+ while (getline(ss, line)) {
+ if ((param_end = line.find(':')) != std::string::npos) {
+ value_start = param_end + 1;
+ if ((value_start) < line.size()) {
+ if (line[value_start] == ' ') {
+ value_start++;
+ }
+ if (value_start < line.size()) {
+ headers[line.substr(0, param_end)] =
+ line.substr(value_start, line.size() - value_start - 1);
+ }
+ }
+ } else {
+ return 0;
+ }
+ }
+ }
+
+ return it + strlen(crlf2) - begin;
+}
+
} // namespace http
} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/http/response.cc b/libtransport/src/http/response.cc
index 409992835..79550898b 100644
--- a/libtransport/src/http/response.cc
+++ b/libtransport/src/http/response.cc
@@ -17,9 +17,8 @@
#include <hicn/transport/http/response.h>
#include <algorithm>
-#include <functional>
-
#include <cstring>
+#include <functional>
namespace transport {
@@ -67,7 +66,7 @@ std::size_t HTTPResponse::parseHeaders(const uint8_t *buffer, std::size_t size,
auto it = std::search(begin, end, begincrlf2, endcrlf2);
if (it != end) {
std::stringstream ss;
- ss.str(std::string(begin, it));
+ ss.str(std::string(begin, it + 2));
std::string line;
getline(ss, line);
@@ -86,15 +85,8 @@ std::size_t HTTPResponse::parseHeaders(const uint8_t *buffer, std::size_t size,
return 0;
}
- std::string _status_string;
-
line_s >> status_code;
- line_s >> _status_string;
-
- auto _it = std::search(line.begin(), line.end(), status_string.begin(),
- status_string.end());
-
- status_string = std::string(_it, line.end() - 1);
+ line_s >> status_string;
std::size_t param_end;
std::size_t value_start;
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
index 488f238ba..175678209 100644
--- a/libtransport/src/implementation/socket_consumer.h
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -17,12 +17,11 @@
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/interfaces/statistics.h>
#include <hicn/transport/security/verifier.h>
-
+#include <hicn/transport/utils/event_thread.h>
#include <protocols/cbr.h>
#include <protocols/protocol.h>
#include <protocols/raaqm.h>
#include <protocols/rtc.h>
-#include <utils/event_thread.h>
namespace transport {
namespace implementation {
@@ -32,10 +31,11 @@ using namespace interface;
using ReadCallback = interface::ConsumerSocket::ReadCallback;
class ConsumerSocket : public Socket<BasePortal> {
- public:
- ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
+ private:
+ ConsumerSocket(interface::ConsumerSocket *consumer, int protocol,
+ std::shared_ptr<Portal> &&portal)
: consumer_interface_(consumer),
- portal_(std::make_shared<Portal>()),
+ portal_(portal),
async_downloader_(),
interest_lifetime_(default_values::interest_lifetime),
min_window_size_(default_values::min_window_size),
@@ -83,6 +83,17 @@ class ConsumerSocket : public Socket<BasePortal> {
}
}
+ public:
+ ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
+ : ConsumerSocket(consumer, protocol, std::make_shared<Portal>()) {}
+
+ ConsumerSocket(interface::ConsumerSocket *consumer, int protocol,
+ asio::io_service &io_service)
+ : ConsumerSocket(consumer, protocol,
+ std::make_shared<Portal>(io_service)) {
+ is_async_ = true;
+ }
+
~ConsumerSocket() {
stop();
async_downloader_.stop();
@@ -110,7 +121,7 @@ class ConsumerSocket : public Socket<BasePortal> {
transport_protocol_->start();
- return CONSUMER_FINISHED;
+ return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED;
}
virtual int asyncConsume(const Name &name) {
@@ -632,6 +643,10 @@ class ConsumerSocket : public Socket<BasePortal> {
socket_option_value = key_content_;
break;
+ case GeneralTransportOptions::ASYNC_MODE:
+ socket_option_value = is_async_;
+ break;
+
default:
return SOCKET_OPTION_NOT_GET;
}
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index 8c5c453fc..a6f0f969e 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -16,11 +16,9 @@
#pragma once
#include <hicn/transport/security/signer.h>
-
+#include <hicn/transport/utils/event_thread.h>
#include <implementation/socket.h>
-
#include <utils/content_store.h>
-#include <utils/event_thread.h>
#include <utils/suffix_strategy.h>
#include <atomic>
@@ -206,12 +204,15 @@ class ProducerSocket : public Socket<BasePortal>,
}
passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str());
+ TRANSPORT_LOGD("Send manifest %s",
+ manifest->getName().toString().c_str());
// Send content objects stored in the queue
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str());
+ TRANSPORT_LOGD(
+ "Send content %s",
+ content_queue_.front()->getName().toString().c_str());
content_queue_.pop();
}
@@ -268,7 +269,8 @@ class ProducerSocket : public Socket<BasePortal>,
signer->sign(*content_object);
}
passContentObjectToCallbacks(content_object);
- TRANSPORT_LOGD("Send content %s", content_object->getName().toString().c_str());
+ TRANSPORT_LOGD("Send content %s",
+ content_object->getName().toString().c_str());
}
}
@@ -283,11 +285,13 @@ class ProducerSocket : public Socket<BasePortal>,
}
passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str());
+ TRANSPORT_LOGD("Send manifest %s",
+ manifest->getName().toString().c_str());
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str());
+ TRANSPORT_LOGD("Send content %s",
+ content_queue_.front()->getName().toString().c_str());
content_queue_.pop();
}
}
diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc
index 7cf653848..1be6f41a7 100644
--- a/libtransport/src/implementation/tls_socket_consumer.cc
+++ b/libtransport/src/implementation/tls_socket_consumer.cc
@@ -14,7 +14,6 @@
*/
#include <implementation/tls_socket_consumer.h>
-
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/tls1.h>
@@ -304,10 +303,7 @@ int TLSConsumerSocket::asyncConsume(const Name &name) {
}
if (!async_downloader_tls_.stopped()) {
- async_downloader_tls_.add([this, name]() {
- is_async_ = true;
- download_content(name);
- });
+ async_downloader_tls_.add([this, name]() { download_content(name); });
}
return CONSUMER_RUNNING;
diff --git a/libtransport/src/interfaces/socket_consumer.cc b/libtransport/src/interfaces/socket_consumer.cc
index b4be16ade..ea0606347 100644
--- a/libtransport/src/interfaces/socket_consumer.cc
+++ b/libtransport/src/interfaces/socket_consumer.cc
@@ -14,7 +14,6 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
-
#include <implementation/socket_consumer.h>
namespace transport {
@@ -24,6 +23,11 @@ ConsumerSocket::ConsumerSocket(int protocol) {
socket_ = std::make_unique<implementation::ConsumerSocket>(this, protocol);
}
+ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) {
+ socket_ = std::make_unique<implementation::ConsumerSocket>(this, protocol,
+ io_service);
+}
+
ConsumerSocket::ConsumerSocket() {}
ConsumerSocket::~ConsumerSocket() { socket_->stop(); }
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc
index 12631637e..6662bec3f 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.cc
+++ b/libtransport/src/protocols/byte_stream_reassembly.cc
@@ -16,7 +16,6 @@
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/utils/array.h>
#include <hicn/transport/utils/membuf.h>
-
#include <implementation/socket_consumer.h>
#include <protocols/byte_stream_reassembly.h>
#include <protocols/errors.h>
@@ -67,7 +66,9 @@ void ByteStreamReassembly::assembleContent() {
while (it != received_packets_.end()) {
// Check if valid packet
if (it->second) {
- copyContent(*it->second);
+ if (TRANSPORT_EXPECT_FALSE(copyContent(*it->second))) {
+ return;
+ }
}
received_packets_.erase(it);
@@ -80,7 +81,9 @@ void ByteStreamReassembly::assembleContent() {
}
}
-void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
+bool ByteStreamReassembly::copyContent(const ContentObject &content_object) {
+ bool ret = false;
+
auto payload = content_object.getPayloadReference();
auto payload_length = payload.second;
auto write_size = std::min(payload_length, read_buffer_->tailroom());
@@ -102,10 +105,13 @@ void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
if (TRANSPORT_EXPECT_FALSE(download_complete_)) {
+ ret = download_complete_;
notifyApplication();
transport_protocol_->onContentReassembled(
make_error_code(protocol_error::success));
}
+
+ return ret;
}
void ByteStreamReassembly::reInitialize() {
diff --git a/libtransport/src/protocols/byte_stream_reassembly.h b/libtransport/src/protocols/byte_stream_reassembly.h
index 5e5c9ec6b..e4f62b3a8 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.h
+++ b/libtransport/src/protocols/byte_stream_reassembly.h
@@ -32,7 +32,7 @@ class ByteStreamReassembly : public Reassembly {
virtual void reassemble(
std::unique_ptr<core::ContentObjectManifest> &&manifest) override;
- virtual void copyContent(const core::ContentObject &content_object);
+ bool copyContent(const core::ContentObject &content_object);
virtual void reInitialize() override;
diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc
index 5df55bd5c..0bffd7d18 100644
--- a/libtransport/src/protocols/cbr.cc
+++ b/libtransport/src/protocols/cbr.cc
@@ -14,7 +14,6 @@
*/
#include <implementation/socket_consumer.h>
-
#include <protocols/cbr.h>
namespace transport {
diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc
index 8463f84f9..d1bd566a0 100644
--- a/libtransport/src/protocols/protocol.cc
+++ b/libtransport/src/protocols/protocol.cc
@@ -14,7 +14,6 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
-
#include <implementation/socket_consumer.h>
#include <protocols/protocol.h>
@@ -74,6 +73,7 @@ int TransportProtocol::start() {
&verification_failed_callback_);
socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
&on_payload_);
+ socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
// Schedule next interests
scheduleNextInterests();
@@ -83,18 +83,25 @@ int TransportProtocol::start() {
// Set the protocol as running
is_running_ = true;
- // Start Event loop
- portal_->runEventsLoop();
+ if (!is_async_) {
+ // Start Event loop
+ portal_->runEventsLoop();
- // Not running anymore
- is_running_ = false;
+ // Not running anymore
+ is_running_ = false;
+ }
return 0;
}
void TransportProtocol::stop() {
is_running_ = false;
- portal_->stopEventsLoop();
+
+ if (!is_async_) {
+ portal_->stopEventsLoop();
+ } else {
+ portal_->clear();
+ }
}
void TransportProtocol::resume() {
@@ -110,6 +117,8 @@ void TransportProtocol::resume() {
}
void TransportProtocol::onContentReassembled(std::error_code ec) {
+ stop();
+
if (!on_payload_) {
throw errors::RuntimeException(
"The read callback must be installed in the transport before "
@@ -122,8 +131,6 @@ void TransportProtocol::onContentReassembled(std::error_code ec) {
} else {
on_payload_->readError(ec);
}
-
- stop();
}
} // end namespace protocol
diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h
index db4524133..73a0a2c64 100644
--- a/libtransport/src/protocols/protocol.h
+++ b/libtransport/src/protocols/protocol.h
@@ -15,19 +15,18 @@
#pragma once
-#include <atomic>
-
#include <hicn/transport/interfaces/callbacks.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/statistics.h>
#include <hicn/transport/utils/object_pool.h>
-
#include <implementation/socket.h>
#include <protocols/data_processing_events.h>
#include <protocols/indexer.h>
#include <protocols/packet_manager.h>
#include <protocols/reassembly.h>
+#include <atomic>
+
namespace transport {
namespace protocol {
@@ -107,6 +106,8 @@ class TransportProtocol : public implementation::BasePortal::ConsumerCallback,
interface::ConsumerContentObjectVerificationFailedCallback
*verification_failed_callback_;
ReadCallback *on_payload_;
+
+ bool is_async_;
};
} // end namespace protocol
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index f8da69ceb..8f9ccc4f0 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -14,7 +14,6 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
-
#include <implementation/socket_consumer.h>
#include <protocols/errors.h>
#include <protocols/indexer.h>
@@ -36,7 +35,8 @@ RaaqmTransportProtocol::RaaqmTransportProtocol(
interests_in_flight_(0),
cur_path_(nullptr),
t0_(utils::SteadyClock::now()),
- rate_estimator_(nullptr) {
+ rate_estimator_(nullptr),
+ schedule_interests_(true) {
init();
}
@@ -451,7 +451,9 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::scheduleNextInterests() {
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ bool cancel = (!is_running_ && !is_first_) || !schedule_interests_;
+ if (TRANSPORT_EXPECT_FALSE(cancel)) {
+ schedule_interests_ = true;
return;
}
@@ -522,6 +524,7 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
rate_estimator_->onDownloadFinished();
TransportProtocol::onContentReassembled(ec);
+ schedule_interests_ = false;
}
void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h
index ecc466755..fce4194d4 100644
--- a/libtransport/src/protocols/raaqm.h
+++ b/libtransport/src/protocols/raaqm.h
@@ -16,7 +16,6 @@
#pragma once
#include <hicn/transport/utils/chrono_typedefs.h>
-
#include <protocols/byte_stream_reassembly.h>
#include <protocols/congestion_window_protocol.h>
#include <protocols/protocol.h>
@@ -135,6 +134,8 @@ class RaaqmTransportProtocol : public TransportProtocol,
double drop_lte_;
unsigned int wifi_delay_;
unsigned int lte_delay_;
+
+ bool schedule_interests_;
};
} // end namespace protocol
diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc
index 72abb599a..4fb352623 100644
--- a/libtransport/src/protocols/rtc.cc
+++ b/libtransport/src/protocols/rtc.cc
@@ -13,12 +13,11 @@
* limitations under the License.
*/
-#include <protocols/rtc.h>
-
#include <hicn/transport/interfaces/socket_consumer.h>
#include <implementation/socket_consumer.h>
-
#include <math.h>
+#include <protocols/rtc.h>
+
#include <random>
namespace transport {
@@ -42,11 +41,7 @@ RTCTransportProtocol::RTCTransportProtocol(
reset();
}
-RTCTransportProtocol::~RTCTransportProtocol() {
- if (is_running_) {
- stop();
- }
-}
+RTCTransportProtocol::~RTCTransportProtocol() {}
int RTCTransportProtocol::start() {
if (is_running_) return -1;
@@ -61,17 +56,22 @@ int RTCTransportProtocol::start() {
is_first_ = false;
is_running_ = true;
- portal_->runEventsLoop();
- is_running_ = false;
+
+ if (is_async_) {
+ portal_->runEventsLoop();
+ is_running_ = false;
+ }
return 0;
}
void RTCTransportProtocol::stop() {
if (!is_running_) return;
-
is_running_ = false;
- portal_->stopEventsLoop();
+
+ if (is_async_) {
+ portal_->stopEventsLoop();
+ }
}
void RTCTransportProtocol::resume() {
diff --git a/libtransport/src/utils/CMakeLists.txt b/libtransport/src/utils/CMakeLists.txt
index 88d451b6b..1a23459b5 100644
--- a/libtransport/src/utils/CMakeLists.txt
+++ b/libtransport/src/utils/CMakeLists.txt
@@ -27,7 +27,6 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/min_filter.h
${CMAKE_CURRENT_SOURCE_DIR}/stream_buffer.h
${CMAKE_CURRENT_SOURCE_DIR}/suffix_strategy.h
- ${CMAKE_CURRENT_SOURCE_DIR}/event_thread.h
${CMAKE_CURRENT_SOURCE_DIR}/content_store.h
${CMAKE_CURRENT_SOURCE_DIR}/deadline_timer.h
)