diff options
author | Mauro Sardara <msardara@cisco.com> | 2020-05-14 20:21:02 +0200 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2020-05-20 10:45:53 +0200 |
commit | 81fb39606b069fbece973995572fa7f90ea1950a (patch) | |
tree | 10c1534707c725eb654741e5b4d280a17ef0c0dc /libtransport/src | |
parent | 67b86555b33c641de14d3c1d0864e571370a71e6 (diff) |
[HICN-613] Add io_service to ConsumerSocket constructor.
Change-Id: Ic1952388e1d2b1e7457c71ae8a959d97aa0cd2d6
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src')
-rw-r--r-- | libtransport/src/core/memif_connector.cc | 4 | ||||
-rw-r--r-- | libtransport/src/core/portal.h | 9 | ||||
-rw-r--r-- | libtransport/src/http/request.cc | 57 | ||||
-rw-r--r-- | libtransport/src/http/response.cc | 14 | ||||
-rw-r--r-- | libtransport/src/implementation/socket_consumer.h | 27 | ||||
-rw-r--r-- | libtransport/src/implementation/socket_producer.h | 20 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_socket_consumer.cc | 6 | ||||
-rw-r--r-- | libtransport/src/interfaces/socket_consumer.cc | 6 | ||||
-rw-r--r-- | libtransport/src/protocols/byte_stream_reassembly.cc | 12 | ||||
-rw-r--r-- | libtransport/src/protocols/byte_stream_reassembly.h | 2 | ||||
-rw-r--r-- | libtransport/src/protocols/cbr.cc | 1 | ||||
-rw-r--r-- | libtransport/src/protocols/protocol.cc | 23 | ||||
-rw-r--r-- | libtransport/src/protocols/protocol.h | 7 | ||||
-rw-r--r-- | libtransport/src/protocols/raaqm.cc | 9 | ||||
-rw-r--r-- | libtransport/src/protocols/raaqm.h | 3 | ||||
-rw-r--r-- | libtransport/src/protocols/rtc.cc | 24 | ||||
-rw-r--r-- | libtransport/src/utils/CMakeLists.txt | 1 | ||||
-rw-r--r-- | libtransport/src/utils/event_thread.h | 99 |
18 files changed, 154 insertions, 170 deletions
diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc index 49f262ec8..553aab42a 100644 --- a/libtransport/src/core/memif_connector.cc +++ b/libtransport/src/core/memif_connector.cc @@ -13,13 +13,13 @@ * limitations under the License. */ -#include <hicn/transport/errors/not_implemented_exception.h> - #include <core/memif_connector.h> +#include <hicn/transport/errors/not_implemented_exception.h> #ifdef __vpp__ #include <sys/epoll.h> + #include <cstdlib> extern "C" { diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h index 05715543a..364a36577 100644 --- a/libtransport/src/core/portal.h +++ b/libtransport/src/core/portal.h @@ -15,6 +15,9 @@ #pragma once +#include <core/forwarder_interface.h> +#include <core/pending_interest.h> +#include <core/udp_socket_connector.h> #include <hicn/transport/config.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/interest.h> @@ -23,12 +26,8 @@ #include <hicn/transport/errors/errors.h> #include <hicn/transport/interfaces/portal.h> #include <hicn/transport/portability/portability.h> -#include <hicn/transport/utils/log.h> #include <hicn/transport/utils/fixed_block_allocator.h> - -#include <core/forwarder_interface.h> -#include <core/pending_interest.h> -#include <core/udp_socket_connector.h> +#include <hicn/transport/utils/log.h> #ifdef __vpp__ #include <core/memif_connector.h> diff --git a/libtransport/src/http/request.cc b/libtransport/src/http/request.cc index 09f709642..87e499cc6 100644 --- a/libtransport/src/http/request.cc +++ b/libtransport/src/http/request.cc @@ -69,6 +69,63 @@ std::string HTTPRequest::getQueryString() const { return query_string_; } std::string HTTPRequest::getRequestString() const { return request_string_; } +std::size_t HTTPRequest::parseHeaders(const uint8_t *buffer, std::size_t size, + HTTPHeaders &headers, + std::string &http_version, + std::string &method, std::string &url) { + const char *crlf2 = "\r\n\r\n"; + const char *begin = (const char *)buffer; + const char *end = begin + size; + const char *begincrlf2 = (const char *)crlf2; + const char *endcrlf2 = begincrlf2 + strlen(crlf2); + auto it = std::search(begin, end, begincrlf2, endcrlf2); + + if (it != end) { + std::stringstream ss; + ss.str(std::string(begin, it + 2)); + + std::string line; + getline(ss, line); + std::istringstream line_s(line); + std::string _http_version; + + line_s >> method; + line_s >> url; + line_s >> _http_version; + std::size_t separator; + if ((separator = _http_version.find('/')) != std::string::npos) { + if (_http_version.substr(0, separator) != "HTTP") { + return 0; + } + http_version = + line.substr(separator + 1, _http_version.length() - separator - 1); + } else { + return 0; + } + + std::size_t param_end; + std::size_t value_start; + while (getline(ss, line)) { + if ((param_end = line.find(':')) != std::string::npos) { + value_start = param_end + 1; + if ((value_start) < line.size()) { + if (line[value_start] == ' ') { + value_start++; + } + if (value_start < line.size()) { + headers[line.substr(0, param_end)] = + line.substr(value_start, line.size() - value_start - 1); + } + } + } else { + return 0; + } + } + } + + return it + strlen(crlf2) - begin; +} + } // namespace http } // namespace transport
\ No newline at end of file diff --git a/libtransport/src/http/response.cc b/libtransport/src/http/response.cc index 409992835..79550898b 100644 --- a/libtransport/src/http/response.cc +++ b/libtransport/src/http/response.cc @@ -17,9 +17,8 @@ #include <hicn/transport/http/response.h> #include <algorithm> -#include <functional> - #include <cstring> +#include <functional> namespace transport { @@ -67,7 +66,7 @@ std::size_t HTTPResponse::parseHeaders(const uint8_t *buffer, std::size_t size, auto it = std::search(begin, end, begincrlf2, endcrlf2); if (it != end) { std::stringstream ss; - ss.str(std::string(begin, it)); + ss.str(std::string(begin, it + 2)); std::string line; getline(ss, line); @@ -86,15 +85,8 @@ std::size_t HTTPResponse::parseHeaders(const uint8_t *buffer, std::size_t size, return 0; } - std::string _status_string; - line_s >> status_code; - line_s >> _status_string; - - auto _it = std::search(line.begin(), line.end(), status_string.begin(), - status_string.end()); - - status_string = std::string(_it, line.end() - 1); + line_s >> status_string; std::size_t param_end; std::size_t value_start; diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index 488f238ba..175678209 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -17,12 +17,11 @@ #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/interfaces/statistics.h> #include <hicn/transport/security/verifier.h> - +#include <hicn/transport/utils/event_thread.h> #include <protocols/cbr.h> #include <protocols/protocol.h> #include <protocols/raaqm.h> #include <protocols/rtc.h> -#include <utils/event_thread.h> namespace transport { namespace implementation { @@ -32,10 +31,11 @@ using namespace interface; using ReadCallback = interface::ConsumerSocket::ReadCallback; class ConsumerSocket : public Socket<BasePortal> { - public: - ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) + private: + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, + std::shared_ptr<Portal> &&portal) : consumer_interface_(consumer), - portal_(std::make_shared<Portal>()), + portal_(portal), async_downloader_(), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), @@ -83,6 +83,17 @@ class ConsumerSocket : public Socket<BasePortal> { } } + public: + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) + : ConsumerSocket(consumer, protocol, std::make_shared<Portal>()) {} + + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, + asio::io_service &io_service) + : ConsumerSocket(consumer, protocol, + std::make_shared<Portal>(io_service)) { + is_async_ = true; + } + ~ConsumerSocket() { stop(); async_downloader_.stop(); @@ -110,7 +121,7 @@ class ConsumerSocket : public Socket<BasePortal> { transport_protocol_->start(); - return CONSUMER_FINISHED; + return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED; } virtual int asyncConsume(const Name &name) { @@ -632,6 +643,10 @@ class ConsumerSocket : public Socket<BasePortal> { socket_option_value = key_content_; break; + case GeneralTransportOptions::ASYNC_MODE: + socket_option_value = is_async_; + break; + default: return SOCKET_OPTION_NOT_GET; } diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index 8c5c453fc..a6f0f969e 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -16,11 +16,9 @@ #pragma once #include <hicn/transport/security/signer.h> - +#include <hicn/transport/utils/event_thread.h> #include <implementation/socket.h> - #include <utils/content_store.h> -#include <utils/event_thread.h> #include <utils/suffix_strategy.h> #include <atomic> @@ -206,12 +204,15 @@ class ProducerSocket : public Socket<BasePortal>, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); + TRANSPORT_LOGD("Send manifest %s", + manifest->getName().toString().c_str()); // Send content objects stored in the queue while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); + TRANSPORT_LOGD( + "Send content %s", + content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } @@ -268,7 +269,8 @@ class ProducerSocket : public Socket<BasePortal>, signer->sign(*content_object); } passContentObjectToCallbacks(content_object); - TRANSPORT_LOGD("Send content %s", content_object->getName().toString().c_str()); + TRANSPORT_LOGD("Send content %s", + content_object->getName().toString().c_str()); } } @@ -283,11 +285,13 @@ class ProducerSocket : public Socket<BasePortal>, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); + TRANSPORT_LOGD("Send manifest %s", + manifest->getName().toString().c_str()); while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); + TRANSPORT_LOGD("Send content %s", + content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } } diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc index 7cf653848..1be6f41a7 100644 --- a/libtransport/src/implementation/tls_socket_consumer.cc +++ b/libtransport/src/implementation/tls_socket_consumer.cc @@ -14,7 +14,6 @@ */ #include <implementation/tls_socket_consumer.h> - #include <openssl/bio.h> #include <openssl/ssl.h> #include <openssl/tls1.h> @@ -304,10 +303,7 @@ int TLSConsumerSocket::asyncConsume(const Name &name) { } if (!async_downloader_tls_.stopped()) { - async_downloader_tls_.add([this, name]() { - is_async_ = true; - download_content(name); - }); + async_downloader_tls_.add([this, name]() { download_content(name); }); } return CONSUMER_RUNNING; diff --git a/libtransport/src/interfaces/socket_consumer.cc b/libtransport/src/interfaces/socket_consumer.cc index b4be16ade..ea0606347 100644 --- a/libtransport/src/interfaces/socket_consumer.cc +++ b/libtransport/src/interfaces/socket_consumer.cc @@ -14,7 +14,6 @@ */ #include <hicn/transport/interfaces/socket_consumer.h> - #include <implementation/socket_consumer.h> namespace transport { @@ -24,6 +23,11 @@ ConsumerSocket::ConsumerSocket(int protocol) { socket_ = std::make_unique<implementation::ConsumerSocket>(this, protocol); } +ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) { + socket_ = std::make_unique<implementation::ConsumerSocket>(this, protocol, + io_service); +} + ConsumerSocket::ConsumerSocket() {} ConsumerSocket::~ConsumerSocket() { socket_->stop(); } diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc index 12631637e..6662bec3f 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.cc +++ b/libtransport/src/protocols/byte_stream_reassembly.cc @@ -16,7 +16,6 @@ #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/utils/array.h> #include <hicn/transport/utils/membuf.h> - #include <implementation/socket_consumer.h> #include <protocols/byte_stream_reassembly.h> #include <protocols/errors.h> @@ -67,7 +66,9 @@ void ByteStreamReassembly::assembleContent() { while (it != received_packets_.end()) { // Check if valid packet if (it->second) { - copyContent(*it->second); + if (TRANSPORT_EXPECT_FALSE(copyContent(*it->second))) { + return; + } } received_packets_.erase(it); @@ -80,7 +81,9 @@ void ByteStreamReassembly::assembleContent() { } } -void ByteStreamReassembly::copyContent(const ContentObject &content_object) { +bool ByteStreamReassembly::copyContent(const ContentObject &content_object) { + bool ret = false; + auto payload = content_object.getPayloadReference(); auto payload_length = payload.second; auto write_size = std::min(payload_length, read_buffer_->tailroom()); @@ -102,10 +105,13 @@ void ByteStreamReassembly::copyContent(const ContentObject &content_object) { index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); if (TRANSPORT_EXPECT_FALSE(download_complete_)) { + ret = download_complete_; notifyApplication(); transport_protocol_->onContentReassembled( make_error_code(protocol_error::success)); } + + return ret; } void ByteStreamReassembly::reInitialize() { diff --git a/libtransport/src/protocols/byte_stream_reassembly.h b/libtransport/src/protocols/byte_stream_reassembly.h index 5e5c9ec6b..e4f62b3a8 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.h +++ b/libtransport/src/protocols/byte_stream_reassembly.h @@ -32,7 +32,7 @@ class ByteStreamReassembly : public Reassembly { virtual void reassemble( std::unique_ptr<core::ContentObjectManifest> &&manifest) override; - virtual void copyContent(const core::ContentObject &content_object); + bool copyContent(const core::ContentObject &content_object); virtual void reInitialize() override; diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc index 5df55bd5c..0bffd7d18 100644 --- a/libtransport/src/protocols/cbr.cc +++ b/libtransport/src/protocols/cbr.cc @@ -14,7 +14,6 @@ */ #include <implementation/socket_consumer.h> - #include <protocols/cbr.h> namespace transport { diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc index 8463f84f9..d1bd566a0 100644 --- a/libtransport/src/protocols/protocol.cc +++ b/libtransport/src/protocols/protocol.cc @@ -14,7 +14,6 @@ */ #include <hicn/transport/interfaces/socket_consumer.h> - #include <implementation/socket_consumer.h> #include <protocols/protocol.h> @@ -74,6 +73,7 @@ int TransportProtocol::start() { &verification_failed_callback_); socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload_); + socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); // Schedule next interests scheduleNextInterests(); @@ -83,18 +83,25 @@ int TransportProtocol::start() { // Set the protocol as running is_running_ = true; - // Start Event loop - portal_->runEventsLoop(); + if (!is_async_) { + // Start Event loop + portal_->runEventsLoop(); - // Not running anymore - is_running_ = false; + // Not running anymore + is_running_ = false; + } return 0; } void TransportProtocol::stop() { is_running_ = false; - portal_->stopEventsLoop(); + + if (!is_async_) { + portal_->stopEventsLoop(); + } else { + portal_->clear(); + } } void TransportProtocol::resume() { @@ -110,6 +117,8 @@ void TransportProtocol::resume() { } void TransportProtocol::onContentReassembled(std::error_code ec) { + stop(); + if (!on_payload_) { throw errors::RuntimeException( "The read callback must be installed in the transport before " @@ -122,8 +131,6 @@ void TransportProtocol::onContentReassembled(std::error_code ec) { } else { on_payload_->readError(ec); } - - stop(); } } // end namespace protocol diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h index db4524133..73a0a2c64 100644 --- a/libtransport/src/protocols/protocol.h +++ b/libtransport/src/protocols/protocol.h @@ -15,19 +15,18 @@ #pragma once -#include <atomic> - #include <hicn/transport/interfaces/callbacks.h> #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/interfaces/statistics.h> #include <hicn/transport/utils/object_pool.h> - #include <implementation/socket.h> #include <protocols/data_processing_events.h> #include <protocols/indexer.h> #include <protocols/packet_manager.h> #include <protocols/reassembly.h> +#include <atomic> + namespace transport { namespace protocol { @@ -107,6 +106,8 @@ class TransportProtocol : public implementation::BasePortal::ConsumerCallback, interface::ConsumerContentObjectVerificationFailedCallback *verification_failed_callback_; ReadCallback *on_payload_; + + bool is_async_; }; } // end namespace protocol diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index f8da69ceb..8f9ccc4f0 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -14,7 +14,6 @@ */ #include <hicn/transport/interfaces/socket_consumer.h> - #include <implementation/socket_consumer.h> #include <protocols/errors.h> #include <protocols/indexer.h> @@ -36,7 +35,8 @@ RaaqmTransportProtocol::RaaqmTransportProtocol( interests_in_flight_(0), cur_path_(nullptr), t0_(utils::SteadyClock::now()), - rate_estimator_(nullptr) { + rate_estimator_(nullptr), + schedule_interests_(true) { init(); } @@ -451,7 +451,9 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::scheduleNextInterests() { - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + bool cancel = (!is_running_ && !is_first_) || !schedule_interests_; + if (TRANSPORT_EXPECT_FALSE(cancel)) { + schedule_interests_ = true; return; } @@ -522,6 +524,7 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { rate_estimator_->onDownloadFinished(); TransportProtocol::onContentReassembled(ec); + schedule_interests_ = false; } void RaaqmTransportProtocol::updateRtt(uint64_t segment) { diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h index ecc466755..fce4194d4 100644 --- a/libtransport/src/protocols/raaqm.h +++ b/libtransport/src/protocols/raaqm.h @@ -16,7 +16,6 @@ #pragma once #include <hicn/transport/utils/chrono_typedefs.h> - #include <protocols/byte_stream_reassembly.h> #include <protocols/congestion_window_protocol.h> #include <protocols/protocol.h> @@ -135,6 +134,8 @@ class RaaqmTransportProtocol : public TransportProtocol, double drop_lte_; unsigned int wifi_delay_; unsigned int lte_delay_; + + bool schedule_interests_; }; } // end namespace protocol diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc index 72abb599a..4fb352623 100644 --- a/libtransport/src/protocols/rtc.cc +++ b/libtransport/src/protocols/rtc.cc @@ -13,12 +13,11 @@ * limitations under the License. */ -#include <protocols/rtc.h> - #include <hicn/transport/interfaces/socket_consumer.h> #include <implementation/socket_consumer.h> - #include <math.h> +#include <protocols/rtc.h> + #include <random> namespace transport { @@ -42,11 +41,7 @@ RTCTransportProtocol::RTCTransportProtocol( reset(); } -RTCTransportProtocol::~RTCTransportProtocol() { - if (is_running_) { - stop(); - } -} +RTCTransportProtocol::~RTCTransportProtocol() {} int RTCTransportProtocol::start() { if (is_running_) return -1; @@ -61,17 +56,22 @@ int RTCTransportProtocol::start() { is_first_ = false; is_running_ = true; - portal_->runEventsLoop(); - is_running_ = false; + + if (is_async_) { + portal_->runEventsLoop(); + is_running_ = false; + } return 0; } void RTCTransportProtocol::stop() { if (!is_running_) return; - is_running_ = false; - portal_->stopEventsLoop(); + + if (is_async_) { + portal_->stopEventsLoop(); + } } void RTCTransportProtocol::resume() { diff --git a/libtransport/src/utils/CMakeLists.txt b/libtransport/src/utils/CMakeLists.txt index 88d451b6b..1a23459b5 100644 --- a/libtransport/src/utils/CMakeLists.txt +++ b/libtransport/src/utils/CMakeLists.txt @@ -27,7 +27,6 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/min_filter.h ${CMAKE_CURRENT_SOURCE_DIR}/stream_buffer.h ${CMAKE_CURRENT_SOURCE_DIR}/suffix_strategy.h - ${CMAKE_CURRENT_SOURCE_DIR}/event_thread.h ${CMAKE_CURRENT_SOURCE_DIR}/content_store.h ${CMAKE_CURRENT_SOURCE_DIR}/deadline_timer.h ) diff --git a/libtransport/src/utils/event_thread.h b/libtransport/src/utils/event_thread.h deleted file mode 100644 index e50ae9648..000000000 --- a/libtransport/src/utils/event_thread.h +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/errors/runtime_exception.h> -#include <memory> - -#include <asio.hpp> - -namespace utils { - -class EventThread { - private: - // No copies - EventThread(const EventThread&) = delete; // non construction-copyable - EventThread& operator=(const EventThread&) = delete; // non copyable - - public: - explicit EventThread(asio::io_service& io_service) - : internal_io_service_(nullptr), - io_service_(io_service), - work_(io_service_), - thread_(nullptr) { - run(); - } - - explicit EventThread() - : internal_io_service_(std::make_unique<asio::io_service>()), - io_service_(*internal_io_service_), - work_(io_service_), - thread_(nullptr) { - run(); - } - - ~EventThread() { stop(); } - - void run() { - if (stopped()) { - io_service_.reset(); - } - - thread_ = std::make_unique<std::thread>([this]() { io_service_.run(); }); - } - - std::thread::id getThreadId() const { - if (thread_) { - return thread_->get_id(); - } else { - throw errors::RuntimeException("Event thread is not running."); - } - } - - template <typename Func> - void add(Func&& f) { - // If the function f - // TODO USe post in mac os, asio->post in xenial - io_service_.post(std::forward<Func&&>(f)); - } - - template <typename Func> - void tryRunHandlerNow(Func&& f) { - io_service_.dispatch(std::forward<Func&&>(f)); - } - - void stop() { - io_service_.stop(); - - if (thread_ && thread_->joinable()) { - thread_->join(); - } - - thread_.reset(); - } - - bool stopped() { return io_service_.stopped(); } - - asio::io_service& getIoService() { return io_service_; } - - private: - std::unique_ptr<asio::io_service> internal_io_service_; - asio::io_service& io_service_; - asio::io_service::work work_; - std::unique_ptr<std::thread> thread_; -}; - -} // namespace utils
\ No newline at end of file |