From 81fb39606b069fbece973995572fa7f90ea1950a Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Thu, 14 May 2020 20:21:02 +0200 Subject: [HICN-613] Add io_service to ConsumerSocket constructor. Change-Id: Ic1952388e1d2b1e7457c71ae8a959d97aa0cd2d6 Signed-off-by: Mauro Sardara --- .../includes/hicn/transport/http/request.h | 5 + .../hicn/transport/interfaces/socket_consumer.h | 21 ++++- .../includes/hicn/transport/utils/CMakeLists.txt | 1 + .../includes/hicn/transport/utils/event_thread.h | 101 +++++++++++++++++++++ .../includes/hicn/transport/utils/ring_buffer.h | 1 + libtransport/src/core/memif_connector.cc | 4 +- libtransport/src/core/portal.h | 9 +- libtransport/src/http/request.cc | 57 ++++++++++++ libtransport/src/http/response.cc | 14 +-- libtransport/src/implementation/socket_consumer.h | 27 ++++-- libtransport/src/implementation/socket_producer.h | 20 ++-- .../src/implementation/tls_socket_consumer.cc | 6 +- libtransport/src/interfaces/socket_consumer.cc | 6 +- .../src/protocols/byte_stream_reassembly.cc | 12 ++- .../src/protocols/byte_stream_reassembly.h | 2 +- libtransport/src/protocols/cbr.cc | 1 - libtransport/src/protocols/protocol.cc | 23 +++-- libtransport/src/protocols/protocol.h | 7 +- libtransport/src/protocols/raaqm.cc | 9 +- libtransport/src/protocols/raaqm.h | 3 +- libtransport/src/protocols/rtc.cc | 24 ++--- libtransport/src/utils/CMakeLists.txt | 1 - libtransport/src/utils/event_thread.h | 99 -------------------- 23 files changed, 282 insertions(+), 171 deletions(-) create mode 100644 libtransport/includes/hicn/transport/utils/event_thread.h delete mode 100644 libtransport/src/utils/event_thread.h diff --git a/libtransport/includes/hicn/transport/http/request.h b/libtransport/includes/hicn/transport/http/request.h index 54904d696..b62f5b061 100644 --- a/libtransport/includes/hicn/transport/http/request.h +++ b/libtransport/includes/hicn/transport/http/request.h @@ -46,6 +46,11 @@ class HTTPRequest : public HTTPMessage { std::string getRequestString() const; + static std::size_t parseHeaders(const uint8_t *buffer, std::size_t size, + HTTPHeaders &headers, + std::string &http_version, + std::string &method, std::string &url); + private: std::string query_string_, path_, protocol_, locator_, port_; std::string request_string_; diff --git a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h index 73cbb78b0..2447f9b5b 100644 --- a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h @@ -145,7 +145,7 @@ class ConsumerSocket { * @param protocol - The transport protocol to use. So far the following * transport are supported: * - CBR: Constant bitrate - * - Raaqm: Based on paper: Optimal multipath congestion control and request + * - RAAQM: Based on paper: Optimal multipath congestion control and request * forwarding in information-centric networks: Protocol design and * experimentation. G Carofiglio, M Gallo, L Muscariello. Computer Networks * 110, 104-117 @@ -153,6 +153,25 @@ class ConsumerSocket { */ explicit ConsumerSocket(int protocol); + /** + * @brief Create a new consumer socket, passing an io_service to it. + * Passing an io_service means that the caller must explicitely call + * io_service.run() for the consumer to start. Any call to consume won't be + * blocking. This can be used in case we want to share a single thread + * among multiple consumer sockets. The caller MUST ensure the provided + * io_service will outlive the ConsumerSocket. + * + * @param protocol - The transport protocol to use. So far the following + * transport are supported: + * - CBR: Constant bitrate + * - RAAQM: Based on paper: Optimal multipath congestion control and request + * forwarding in information-centric networks: Protocol design and + * experimentation. G Carofiglio, M Gallo, L Muscariello. Computer Networks + * 110, 104-117 + * - RTC: Real time communication + */ + explicit ConsumerSocket(int protocol, asio::io_service &io_service); + /** * @brief Destroy the consumer socket. */ diff --git a/libtransport/includes/hicn/transport/utils/CMakeLists.txt b/libtransport/includes/hicn/transport/utils/CMakeLists.txt index 38ecc3d37..f9a98dc69 100644 --- a/libtransport/includes/hicn/transport/utils/CMakeLists.txt +++ b/libtransport/includes/hicn/transport/utils/CMakeLists.txt @@ -29,6 +29,7 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/membuf.h ${CMAKE_CURRENT_SOURCE_DIR}/spinlock.h ${CMAKE_CURRENT_SOURCE_DIR}/fixed_block_allocator.h + ${CMAKE_CURRENT_SOURCE_DIR}/event_thread.h ) if(NOT WIN32) diff --git a/libtransport/includes/hicn/transport/utils/event_thread.h b/libtransport/includes/hicn/transport/utils/event_thread.h new file mode 100644 index 000000000..db1194821 --- /dev/null +++ b/libtransport/includes/hicn/transport/utils/event_thread.h @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include + +namespace utils { + +class EventThread { + private: + // No copies + EventThread(const EventThread&) = delete; // non construction-copyable + EventThread& operator=(const EventThread&) = delete; // non copyable + + public: + explicit EventThread(asio::io_service& io_service) + : internal_io_service_(nullptr), + io_service_(io_service), + work_(io_service_), + thread_(nullptr) { + run(); + } + + explicit EventThread() + : internal_io_service_(std::make_unique()), + io_service_(*internal_io_service_), + work_(io_service_), + thread_(nullptr) { + run(); + } + + ~EventThread() { stop(); } + + void run() { + if (stopped()) { + io_service_.reset(); + } + + thread_ = std::make_unique([this]() { io_service_.run(); }); + } + + std::thread::id getThreadId() const { + if (thread_) { + return thread_->get_id(); + } else { + throw errors::RuntimeException("Event thread is not running."); + } + } + + template + void add(Func&& f) { + // If the function f + // TODO USe post in mac os, asio->post in xenial + io_service_.post(std::forward(f)); + } + + template + void tryRunHandlerNow(Func&& f) { + io_service_.dispatch(std::forward(f)); + } + + void stop() { + io_service_.stop(); + + if (thread_ && thread_->joinable()) { + thread_->join(); + } + + thread_.reset(); + } + + bool stopped() { return io_service_.stopped(); } + + asio::io_service& getIoService() { return io_service_; } + + private: + std::unique_ptr internal_io_service_; + asio::io_service& io_service_; + asio::io_service::work work_; + std::unique_ptr thread_; +}; + +} // namespace utils \ No newline at end of file diff --git a/libtransport/includes/hicn/transport/utils/ring_buffer.h b/libtransport/includes/hicn/transport/utils/ring_buffer.h index 9babe56bd..52629b82b 100644 --- a/libtransport/includes/hicn/transport/utils/ring_buffer.h +++ b/libtransport/includes/hicn/transport/utils/ring_buffer.h @@ -17,6 +17,7 @@ #include #include +#include namespace utils { diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc index 49f262ec8..553aab42a 100644 --- a/libtransport/src/core/memif_connector.cc +++ b/libtransport/src/core/memif_connector.cc @@ -13,13 +13,13 @@ * limitations under the License. */ -#include - #include +#include #ifdef __vpp__ #include + #include extern "C" { diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h index 05715543a..364a36577 100644 --- a/libtransport/src/core/portal.h +++ b/libtransport/src/core/portal.h @@ -15,6 +15,9 @@ #pragma once +#include +#include +#include #include #include #include @@ -23,12 +26,8 @@ #include #include #include -#include #include - -#include -#include -#include +#include #ifdef __vpp__ #include diff --git a/libtransport/src/http/request.cc b/libtransport/src/http/request.cc index 09f709642..87e499cc6 100644 --- a/libtransport/src/http/request.cc +++ b/libtransport/src/http/request.cc @@ -69,6 +69,63 @@ std::string HTTPRequest::getQueryString() const { return query_string_; } std::string HTTPRequest::getRequestString() const { return request_string_; } +std::size_t HTTPRequest::parseHeaders(const uint8_t *buffer, std::size_t size, + HTTPHeaders &headers, + std::string &http_version, + std::string &method, std::string &url) { + const char *crlf2 = "\r\n\r\n"; + const char *begin = (const char *)buffer; + const char *end = begin + size; + const char *begincrlf2 = (const char *)crlf2; + const char *endcrlf2 = begincrlf2 + strlen(crlf2); + auto it = std::search(begin, end, begincrlf2, endcrlf2); + + if (it != end) { + std::stringstream ss; + ss.str(std::string(begin, it + 2)); + + std::string line; + getline(ss, line); + std::istringstream line_s(line); + std::string _http_version; + + line_s >> method; + line_s >> url; + line_s >> _http_version; + std::size_t separator; + if ((separator = _http_version.find('/')) != std::string::npos) { + if (_http_version.substr(0, separator) != "HTTP") { + return 0; + } + http_version = + line.substr(separator + 1, _http_version.length() - separator - 1); + } else { + return 0; + } + + std::size_t param_end; + std::size_t value_start; + while (getline(ss, line)) { + if ((param_end = line.find(':')) != std::string::npos) { + value_start = param_end + 1; + if ((value_start) < line.size()) { + if (line[value_start] == ' ') { + value_start++; + } + if (value_start < line.size()) { + headers[line.substr(0, param_end)] = + line.substr(value_start, line.size() - value_start - 1); + } + } + } else { + return 0; + } + } + } + + return it + strlen(crlf2) - begin; +} + } // namespace http } // namespace transport \ No newline at end of file diff --git a/libtransport/src/http/response.cc b/libtransport/src/http/response.cc index 409992835..79550898b 100644 --- a/libtransport/src/http/response.cc +++ b/libtransport/src/http/response.cc @@ -17,9 +17,8 @@ #include #include -#include - #include +#include namespace transport { @@ -67,7 +66,7 @@ std::size_t HTTPResponse::parseHeaders(const uint8_t *buffer, std::size_t size, auto it = std::search(begin, end, begincrlf2, endcrlf2); if (it != end) { std::stringstream ss; - ss.str(std::string(begin, it)); + ss.str(std::string(begin, it + 2)); std::string line; getline(ss, line); @@ -86,15 +85,8 @@ std::size_t HTTPResponse::parseHeaders(const uint8_t *buffer, std::size_t size, return 0; } - std::string _status_string; - line_s >> status_code; - line_s >> _status_string; - - auto _it = std::search(line.begin(), line.end(), status_string.begin(), - status_string.end()); - - status_string = std::string(_it, line.end() - 1); + line_s >> status_string; std::size_t param_end; std::size_t value_start; diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index 488f238ba..175678209 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -17,12 +17,11 @@ #include #include #include - +#include #include #include #include #include -#include namespace transport { namespace implementation { @@ -32,10 +31,11 @@ using namespace interface; using ReadCallback = interface::ConsumerSocket::ReadCallback; class ConsumerSocket : public Socket { - public: - ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) + private: + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, + std::shared_ptr &&portal) : consumer_interface_(consumer), - portal_(std::make_shared()), + portal_(portal), async_downloader_(), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), @@ -83,6 +83,17 @@ class ConsumerSocket : public Socket { } } + public: + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) + : ConsumerSocket(consumer, protocol, std::make_shared()) {} + + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, + asio::io_service &io_service) + : ConsumerSocket(consumer, protocol, + std::make_shared(io_service)) { + is_async_ = true; + } + ~ConsumerSocket() { stop(); async_downloader_.stop(); @@ -110,7 +121,7 @@ class ConsumerSocket : public Socket { transport_protocol_->start(); - return CONSUMER_FINISHED; + return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED; } virtual int asyncConsume(const Name &name) { @@ -632,6 +643,10 @@ class ConsumerSocket : public Socket { socket_option_value = key_content_; break; + case GeneralTransportOptions::ASYNC_MODE: + socket_option_value = is_async_; + break; + default: return SOCKET_OPTION_NOT_GET; } diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index 8c5c453fc..a6f0f969e 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -16,11 +16,9 @@ #pragma once #include - +#include #include - #include -#include #include #include @@ -206,12 +204,15 @@ class ProducerSocket : public Socket, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); + TRANSPORT_LOGD("Send manifest %s", + manifest->getName().toString().c_str()); // Send content objects stored in the queue while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); + TRANSPORT_LOGD( + "Send content %s", + content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } @@ -268,7 +269,8 @@ class ProducerSocket : public Socket, signer->sign(*content_object); } passContentObjectToCallbacks(content_object); - TRANSPORT_LOGD("Send content %s", content_object->getName().toString().c_str()); + TRANSPORT_LOGD("Send content %s", + content_object->getName().toString().c_str()); } } @@ -283,11 +285,13 @@ class ProducerSocket : public Socket, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); + TRANSPORT_LOGD("Send manifest %s", + manifest->getName().toString().c_str()); while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); + TRANSPORT_LOGD("Send content %s", + content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } } diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc index 7cf653848..1be6f41a7 100644 --- a/libtransport/src/implementation/tls_socket_consumer.cc +++ b/libtransport/src/implementation/tls_socket_consumer.cc @@ -14,7 +14,6 @@ */ #include - #include #include #include @@ -304,10 +303,7 @@ int TLSConsumerSocket::asyncConsume(const Name &name) { } if (!async_downloader_tls_.stopped()) { - async_downloader_tls_.add([this, name]() { - is_async_ = true; - download_content(name); - }); + async_downloader_tls_.add([this, name]() { download_content(name); }); } return CONSUMER_RUNNING; diff --git a/libtransport/src/interfaces/socket_consumer.cc b/libtransport/src/interfaces/socket_consumer.cc index b4be16ade..ea0606347 100644 --- a/libtransport/src/interfaces/socket_consumer.cc +++ b/libtransport/src/interfaces/socket_consumer.cc @@ -14,7 +14,6 @@ */ #include - #include namespace transport { @@ -24,6 +23,11 @@ ConsumerSocket::ConsumerSocket(int protocol) { socket_ = std::make_unique(this, protocol); } +ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) { + socket_ = std::make_unique(this, protocol, + io_service); +} + ConsumerSocket::ConsumerSocket() {} ConsumerSocket::~ConsumerSocket() { socket_->stop(); } diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc index 12631637e..6662bec3f 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.cc +++ b/libtransport/src/protocols/byte_stream_reassembly.cc @@ -16,7 +16,6 @@ #include #include #include - #include #include #include @@ -67,7 +66,9 @@ void ByteStreamReassembly::assembleContent() { while (it != received_packets_.end()) { // Check if valid packet if (it->second) { - copyContent(*it->second); + if (TRANSPORT_EXPECT_FALSE(copyContent(*it->second))) { + return; + } } received_packets_.erase(it); @@ -80,7 +81,9 @@ void ByteStreamReassembly::assembleContent() { } } -void ByteStreamReassembly::copyContent(const ContentObject &content_object) { +bool ByteStreamReassembly::copyContent(const ContentObject &content_object) { + bool ret = false; + auto payload = content_object.getPayloadReference(); auto payload_length = payload.second; auto write_size = std::min(payload_length, read_buffer_->tailroom()); @@ -102,10 +105,13 @@ void ByteStreamReassembly::copyContent(const ContentObject &content_object) { index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); if (TRANSPORT_EXPECT_FALSE(download_complete_)) { + ret = download_complete_; notifyApplication(); transport_protocol_->onContentReassembled( make_error_code(protocol_error::success)); } + + return ret; } void ByteStreamReassembly::reInitialize() { diff --git a/libtransport/src/protocols/byte_stream_reassembly.h b/libtransport/src/protocols/byte_stream_reassembly.h index 5e5c9ec6b..e4f62b3a8 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.h +++ b/libtransport/src/protocols/byte_stream_reassembly.h @@ -32,7 +32,7 @@ class ByteStreamReassembly : public Reassembly { virtual void reassemble( std::unique_ptr &&manifest) override; - virtual void copyContent(const core::ContentObject &content_object); + bool copyContent(const core::ContentObject &content_object); virtual void reInitialize() override; diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc index 5df55bd5c..0bffd7d18 100644 --- a/libtransport/src/protocols/cbr.cc +++ b/libtransport/src/protocols/cbr.cc @@ -14,7 +14,6 @@ */ #include - #include namespace transport { diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc index 8463f84f9..d1bd566a0 100644 --- a/libtransport/src/protocols/protocol.cc +++ b/libtransport/src/protocols/protocol.cc @@ -14,7 +14,6 @@ */ #include - #include #include @@ -74,6 +73,7 @@ int TransportProtocol::start() { &verification_failed_callback_); socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload_); + socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); // Schedule next interests scheduleNextInterests(); @@ -83,18 +83,25 @@ int TransportProtocol::start() { // Set the protocol as running is_running_ = true; - // Start Event loop - portal_->runEventsLoop(); + if (!is_async_) { + // Start Event loop + portal_->runEventsLoop(); - // Not running anymore - is_running_ = false; + // Not running anymore + is_running_ = false; + } return 0; } void TransportProtocol::stop() { is_running_ = false; - portal_->stopEventsLoop(); + + if (!is_async_) { + portal_->stopEventsLoop(); + } else { + portal_->clear(); + } } void TransportProtocol::resume() { @@ -110,6 +117,8 @@ void TransportProtocol::resume() { } void TransportProtocol::onContentReassembled(std::error_code ec) { + stop(); + if (!on_payload_) { throw errors::RuntimeException( "The read callback must be installed in the transport before " @@ -122,8 +131,6 @@ void TransportProtocol::onContentReassembled(std::error_code ec) { } else { on_payload_->readError(ec); } - - stop(); } } // end namespace protocol diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h index db4524133..73a0a2c64 100644 --- a/libtransport/src/protocols/protocol.h +++ b/libtransport/src/protocols/protocol.h @@ -15,19 +15,18 @@ #pragma once -#include - #include #include #include #include - #include #include #include #include #include +#include + namespace transport { namespace protocol { @@ -107,6 +106,8 @@ class TransportProtocol : public implementation::BasePortal::ConsumerCallback, interface::ConsumerContentObjectVerificationFailedCallback *verification_failed_callback_; ReadCallback *on_payload_; + + bool is_async_; }; } // end namespace protocol diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index f8da69ceb..8f9ccc4f0 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -14,7 +14,6 @@ */ #include - #include #include #include @@ -36,7 +35,8 @@ RaaqmTransportProtocol::RaaqmTransportProtocol( interests_in_flight_(0), cur_path_(nullptr), t0_(utils::SteadyClock::now()), - rate_estimator_(nullptr) { + rate_estimator_(nullptr), + schedule_interests_(true) { init(); } @@ -451,7 +451,9 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::scheduleNextInterests() { - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + bool cancel = (!is_running_ && !is_first_) || !schedule_interests_; + if (TRANSPORT_EXPECT_FALSE(cancel)) { + schedule_interests_ = true; return; } @@ -522,6 +524,7 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { rate_estimator_->onDownloadFinished(); TransportProtocol::onContentReassembled(ec); + schedule_interests_ = false; } void RaaqmTransportProtocol::updateRtt(uint64_t segment) { diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h index ecc466755..fce4194d4 100644 --- a/libtransport/src/protocols/raaqm.h +++ b/libtransport/src/protocols/raaqm.h @@ -16,7 +16,6 @@ #pragma once #include - #include #include #include @@ -135,6 +134,8 @@ class RaaqmTransportProtocol : public TransportProtocol, double drop_lte_; unsigned int wifi_delay_; unsigned int lte_delay_; + + bool schedule_interests_; }; } // end namespace protocol diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc index 72abb599a..4fb352623 100644 --- a/libtransport/src/protocols/rtc.cc +++ b/libtransport/src/protocols/rtc.cc @@ -13,12 +13,11 @@ * limitations under the License. */ -#include - #include #include - #include +#include + #include namespace transport { @@ -42,11 +41,7 @@ RTCTransportProtocol::RTCTransportProtocol( reset(); } -RTCTransportProtocol::~RTCTransportProtocol() { - if (is_running_) { - stop(); - } -} +RTCTransportProtocol::~RTCTransportProtocol() {} int RTCTransportProtocol::start() { if (is_running_) return -1; @@ -61,17 +56,22 @@ int RTCTransportProtocol::start() { is_first_ = false; is_running_ = true; - portal_->runEventsLoop(); - is_running_ = false; + + if (is_async_) { + portal_->runEventsLoop(); + is_running_ = false; + } return 0; } void RTCTransportProtocol::stop() { if (!is_running_) return; - is_running_ = false; - portal_->stopEventsLoop(); + + if (is_async_) { + portal_->stopEventsLoop(); + } } void RTCTransportProtocol::resume() { diff --git a/libtransport/src/utils/CMakeLists.txt b/libtransport/src/utils/CMakeLists.txt index 88d451b6b..1a23459b5 100644 --- a/libtransport/src/utils/CMakeLists.txt +++ b/libtransport/src/utils/CMakeLists.txt @@ -27,7 +27,6 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/min_filter.h ${CMAKE_CURRENT_SOURCE_DIR}/stream_buffer.h ${CMAKE_CURRENT_SOURCE_DIR}/suffix_strategy.h - ${CMAKE_CURRENT_SOURCE_DIR}/event_thread.h ${CMAKE_CURRENT_SOURCE_DIR}/content_store.h ${CMAKE_CURRENT_SOURCE_DIR}/deadline_timer.h ) diff --git a/libtransport/src/utils/event_thread.h b/libtransport/src/utils/event_thread.h deleted file mode 100644 index e50ae9648..000000000 --- a/libtransport/src/utils/event_thread.h +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -#include - -namespace utils { - -class EventThread { - private: - // No copies - EventThread(const EventThread&) = delete; // non construction-copyable - EventThread& operator=(const EventThread&) = delete; // non copyable - - public: - explicit EventThread(asio::io_service& io_service) - : internal_io_service_(nullptr), - io_service_(io_service), - work_(io_service_), - thread_(nullptr) { - run(); - } - - explicit EventThread() - : internal_io_service_(std::make_unique()), - io_service_(*internal_io_service_), - work_(io_service_), - thread_(nullptr) { - run(); - } - - ~EventThread() { stop(); } - - void run() { - if (stopped()) { - io_service_.reset(); - } - - thread_ = std::make_unique([this]() { io_service_.run(); }); - } - - std::thread::id getThreadId() const { - if (thread_) { - return thread_->get_id(); - } else { - throw errors::RuntimeException("Event thread is not running."); - } - } - - template - void add(Func&& f) { - // If the function f - // TODO USe post in mac os, asio->post in xenial - io_service_.post(std::forward(f)); - } - - template - void tryRunHandlerNow(Func&& f) { - io_service_.dispatch(std::forward(f)); - } - - void stop() { - io_service_.stop(); - - if (thread_ && thread_->joinable()) { - thread_->join(); - } - - thread_.reset(); - } - - bool stopped() { return io_service_.stopped(); } - - asio::io_service& getIoService() { return io_service_; } - - private: - std::unique_ptr internal_io_service_; - asio::io_service& io_service_; - asio::io_service::work work_; - std::unique_ptr thread_; -}; - -} // namespace utils \ No newline at end of file -- cgit 1.2.3-korg