diff options
Diffstat (limited to 'libtransport')
16 files changed, 155 insertions, 59 deletions
diff --git a/libtransport/cmake/Modules/Packaging.cmake b/libtransport/cmake/Modules/Packaging.cmake index 6dc0eb457..64d1dd325 100755 --- a/libtransport/cmake/Modules/Packaging.cmake +++ b/libtransport/cmake/Modules/Packaging.cmake @@ -32,48 +32,44 @@ set(lib${LIBTRANSPORT}-devel_DESCRIPTION ${lib${LIBTRANSPORT}_DESCRIPTION} if ((BUILD_MEMIF_CONNECTOR OR BUILD_VPP_PLUGIN) AND "${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") set(lib${LIBTRANSPORT}_DEB_DEPENDENCIES - "libhicn (>= 1.0), libparc (>= 1.0), vpp-lib (== 19.01-release)" + "libhicn (>= stable_version), libparc (>= 1.0), vpp-lib (== stable_version-release)" CACHE STRING "Dependencies for deb/rpm package." ) set(lib${LIBTRANSPORT}_RPM_DEPENDENCIES - "libhicn >= 1.0, libparc >= 1.0, vpp-lib = 19.01-release" + "libhicn >= stable_version, libparc >= 1.0, vpp-lib = stable_version-release" CACHE STRING "Dependencies for deb/rpm package." ) set(lib${LIBTRANSPORT}-dev_DEB_DEPENDENCIES - "libtransport (>= 1.0), libasio-dev (>= 1.10), libhicn-dev (>= 1.0), libparc-dev (>= 1.0), vpp-dev (== 19.01-release)" + "lib${LIBTRANSPORT} (>= stable_version), libasio-dev (>= 1.10), libhicn-dev (>= stable_version), libparc-dev (>= 1.0), vpp-dev (== stable_version-release)" CACHE STRING "Dependencies for deb/rpm package." ) - set(lib${LIBTRANSPORT}-devel_RPM_DEPENDENCIES - "libtransport >= 1.0, asio-devel >= 1.10, libhicn-devel >= 1.0, libparc-devel >= 1.0, vpp-devel = 19.01-release" + set(lib${LIBTRANSPORT}-dev_RPM_DEPENDENCIES + "lib${LIBTRANSPORT} >= stable_version, asio-devel >= 1.10, libhicn-devel >= stable_version, libparc-devel >= 1.0, vpp-devel = stable_version-release" CACHE STRING "Dependencies for deb/rpm package." ) else() -<<<<<<< HEAD - -======= ->>>>>>> 691dfde... [HICN-3] First version of packaging system based on cmake. set(lib${LIBTRANSPORT}_DEB_DEPENDENCIES - "libhicn (>= 1.0), libparc (>= 1.0)" + "libhicn (>= stable_version), libparc (>= 1.0)" CACHE STRING "Dependencies for deb/rpm package." ) set(lib${LIBTRANSPORT}_RPM_DEPENDENCIES - "libhicn >= 1.0, libparc >= 1.0" + "libhicn >= stable_version, libparc >= 1.0" CACHE STRING "Dependencies for deb/rpm package." ) set(lib${LIBTRANSPORT}-dev_DEB_DEPENDENCIES - "libtransport (>= 1.0), libasio-dev (>= 1.10), libhicn-dev (>= 1.0), libparc-dev (>= 1.0)" + "lib${LIBTRANSPORT} (>= stable_version), libasio-dev (>= 1.10), libhicn-dev (>= stable_version), libparc-dev (>= 1.0)" CACHE STRING "Dependencies for deb/rpm package." ) - set(lib${LIBTRANSPORT}-devel_RPM_DEPENDENCIES - "libtransport >= 1.0, asio-devel >= 1.10, libhicn-devel >= 1.0, libparc-devel >= 1.0" + set(lib${LIBTRANSPORT}-dev_RPM_DEPENDENCIES + "lib${LIBTRANSPORT} >= stable_version, asio-devel >= 1.10, libhicn-devel >= stable_version, libparc-devel >= 1.0" CACHE STRING "Dependencies for deb/rpm package." ) diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc index b3785e5c3..6c5f2ff5f 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.cc +++ b/libtransport/src/hicn/transport/core/memif_connector.cc @@ -38,7 +38,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, timer_set_(false), send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)), io_service_(io_service), - work_(std::make_unique<asio::io_service::work>(io_service_)), packet_counter_(0), memif_connection_({}), tx_buf_counter_(0), @@ -74,6 +73,8 @@ void MemifConnector::connect(uint32_t memif_id, long memif_mode) { createMemif(memif_id, memif_mode, nullptr); + work_ = std::make_unique<asio::io_service::work>(io_service_); + while (is_connecting_) { MemifConnector::main_event_reactor_.runOneEvent(); } @@ -402,7 +403,7 @@ void MemifConnector::close() { if (!closed_) { closed_ = true; event_reactor_.stop(); - io_service_.stop(); + work_.reset(); if (memif_worker_ && memif_worker_->joinable()) { memif_worker_->join(); diff --git a/libtransport/src/hicn/transport/core/pending_interest.cc b/libtransport/src/hicn/transport/core/pending_interest.cc index 8f6de1839..a2df9ba44 100644 --- a/libtransport/src/hicn/transport/core/pending_interest.cc +++ b/libtransport/src/hicn/transport/core/pending_interest.cc @@ -20,12 +20,28 @@ namespace transport { namespace core { PendingInterest::PendingInterest() - : interest_(nullptr, nullptr), timer_(), received_(false) {} + : interest_(nullptr, nullptr), + timer_(), + on_content_object_callback_(), + on_interest_timeout_callback_(), + received_(false) {} PendingInterest::PendingInterest(Interest::Ptr &&interest, std::unique_ptr<asio::steady_timer> &&timer) : interest_(std::move(interest)), timer_(std::move(timer)), + on_content_object_callback_(), + on_interest_timeout_callback_(), + received_(false) {} + +PendingInterest::PendingInterest(Interest::Ptr &&interest, + const OnContentObjectCallback &&on_content_object, + const OnInterestTimeoutCallback &&on_interest_timeout, + std::unique_ptr<asio::steady_timer> &&timer) + : interest_(std::move(interest)), + timer_(std::move(timer)), + on_content_object_callback_(std::move(on_content_object)), + on_interest_timeout_callback_(std::move(on_interest_timeout)), received_(false) {} PendingInterest::~PendingInterest() { @@ -34,16 +50,28 @@ PendingInterest::~PendingInterest() { void PendingInterest::cancelTimer() { timer_->cancel(); } -bool PendingInterest::isReceived() const { return received_; } - void PendingInterest::setReceived() { received_ = true; } +bool PendingInterest::isReceived() const { return received_; } + Interest::Ptr &&PendingInterest::getInterest() { return std::move(interest_); } -void PendingInterest::setReceived(bool received) { - PendingInterest::received_ = received; +const OnContentObjectCallback &PendingInterest::getOnDataCallback() const { + return on_content_object_callback_; +} + +void PendingInterest::setOnDataCallback(const OnContentObjectCallback &on_content_object) { + PendingInterest::on_content_object_callback_ = on_content_object; +} + +const OnInterestTimeoutCallback &PendingInterest::getOnTimeoutCallback() const { + return on_interest_timeout_callback_; +} + +void PendingInterest::setOnTimeoutCallback(const OnInterestTimeoutCallback &on_interest_timeout) { + PendingInterest::on_interest_timeout_callback_ = on_interest_timeout; } } // end namespace core -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/pending_interest.h b/libtransport/src/hicn/transport/core/pending_interest.h index cbcafb5d9..58b51db79 100644 --- a/libtransport/src/hicn/transport/core/pending_interest.h +++ b/libtransport/src/hicn/transport/core/pending_interest.h @@ -34,6 +34,8 @@ class RawSocketInterface; template <typename ForwarderInt> class Portal; +typedef std::function<void(Interest::Ptr&&, ContentObject::Ptr&&)> OnContentObjectCallback; +typedef std::function<void(Interest::Ptr&&)> OnInterestTimeoutCallback; typedef std::function<void(const std::error_code &)> TimerCallback; class PendingInterest { @@ -47,9 +49,12 @@ class PendingInterest { PendingInterest(Interest::Ptr &&interest, std::unique_ptr<asio::steady_timer> &&timer); - ~PendingInterest(); + PendingInterest(Interest::Ptr &&interest, + const OnContentObjectCallback &&on_content_object, + const OnInterestTimeoutCallback &&on_interest_timeout, + std::unique_ptr<asio::steady_timer> &&timer); - bool isReceived() const; + ~PendingInterest(); template <typename Handler> TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) { @@ -62,17 +67,23 @@ class PendingInterest { void setReceived(); + bool isReceived() const; + Interest::Ptr &&getInterest(); - void setReceived(bool received); + const OnContentObjectCallback &getOnDataCallback() const; + + void setOnDataCallback(const OnContentObjectCallback &on_content_object); - bool isValid() const; + const OnInterestTimeoutCallback &getOnTimeoutCallback() const; - void setValid(bool valid); + void setOnTimeoutCallback(const OnInterestTimeoutCallback &on_interest_timeout); private: Interest::Ptr interest_; std::unique_ptr<asio::steady_timer> timer_; + OnContentObjectCallback on_content_object_callback_; + OnInterestTimeoutCallback on_interest_timeout_callback_; bool received_; }; diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index a79717037..58406bbde 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -97,13 +97,10 @@ class Portal { virtual void onInterest(Interest::Ptr &&i) = 0; }; - Portal() : Portal(internal_io_service_) { - internal_work_ = std::make_unique<asio::io_service::work>(io_service_); - } + Portal() : Portal(internal_io_service_) {} Portal(asio::io_service &io_service) : io_service_(io_service), - is_running_(false), app_name_("libtransport_application"), consumer_callback_(nullptr), producer_callback_(nullptr), @@ -131,8 +128,7 @@ class Portal { } ~Portal() { - connector_.close(); - stopEventsLoop(); + stopEventsLoop(true); } TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { @@ -157,9 +153,30 @@ class Portal { std::placeholders::_1, name)); } + TRANSPORT_ALWAYS_INLINE void sendInterest(Interest::Ptr &&interest, + const OnContentObjectCallback &&on_content_object_callback, + const OnInterestTimeoutCallback &&on_interest_timeout_callback) { + + const Name name(interest->getName(), true); + + // Send it + forwarder_interface_.send(*interest); + + pending_interest_hash_table_[name] = std::make_unique<PendingInterest>( + std::move(interest), std::move(on_content_object_callback), + std::move(on_interest_timeout_callback), + std::make_unique<asio::steady_timer>(io_service_)); + + pending_interest_hash_table_[name]->startCountdown( + std::bind(&Portal<ForwarderInt>::timerHandler, + this, std::placeholders::_1, name)); + + } + TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, const Name &name) { - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + bool is_stopped = io_service_.stopped(); + if (TRANSPORT_EXPECT_FALSE(is_stopped)) { return; } @@ -170,7 +187,9 @@ class Portal { std::unique_ptr<PendingInterest> ptr = std::move(it->second); pending_interest_hash_table_.erase(it); - if (consumer_callback_) { + if(ptr->getOnTimeoutCallback() != UNSET_CALLBACK){ + ptr->on_interest_timeout_callback_(std::move(ptr->getInterest())); + }else if (consumer_callback_) { consumer_callback_->onTimeout(std::move(ptr->getInterest())); } } @@ -189,9 +208,7 @@ class Portal { io_service_.reset(); // ensure that run()/poll() will do some work } - is_running_ = true; this->io_service_.run(); - is_running_ = false; } TRANSPORT_ALWAYS_INLINE void runOneEvent() { @@ -199,9 +216,7 @@ class Portal { io_service_.reset(); // ensure that run()/poll() will do some work } - is_running_ = true; this->io_service_.run_one(); - is_running_ = false; } TRANSPORT_ALWAYS_INLINE void sendContentObject( @@ -209,16 +224,17 @@ class Portal { forwarder_interface_.send(content_object); } - TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { - is_running_ = false; - internal_work_.reset(); - + TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) { for (auto &pend_interest : pending_interest_hash_table_) { pend_interest.second->cancelTimer(); } clear(); + if(kill_connection) { + connector_.close(); + } + io_service_.post([this]() { io_service_.stop(); }); } @@ -242,7 +258,8 @@ class Portal { private: TRANSPORT_ALWAYS_INLINE void processIncomingMessages( Packet::MemBufPtr &&packet_buffer) { - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + bool is_stopped = io_service_.stopped(); + if (TRANSPORT_EXPECT_FALSE(is_stopped)) { return; } @@ -293,7 +310,11 @@ class Portal { interest_ptr->setReceived(); pending_interest_hash_table_.erase(content_object->getName()); - if (consumer_callback_) { + if(interest_ptr->getOnDataCallback() != UNSET_CALLBACK){ + interest_ptr->on_content_object_callback_( + std::move(interest_ptr->getInterest()), + std::move(content_object)); + }else if (consumer_callback_) { consumer_callback_->onContentObject( std::move(interest_ptr->getInterest()), std::move(content_object)); @@ -318,9 +339,6 @@ class Portal { private: asio::io_service &io_service_; asio::io_service internal_io_service_; - std::unique_ptr<asio::io_service::work> internal_work_; - - volatile bool is_running_; std::string app_name_; diff --git a/libtransport/src/hicn/transport/core/raw_socket_interface.cc b/libtransport/src/hicn/transport/core/raw_socket_interface.cc index 37aaff7e0..4cf7b2ca6 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_interface.cc +++ b/libtransport/src/hicn/transport/core/raw_socket_interface.cc @@ -41,7 +41,7 @@ void RawSocketInterface::connect(bool is_consumer) { } // Get interface ip address - struct sockaddr_in6 address; + struct sockaddr_in6 address = {0}; utils::retrieveInterfaceAddress(output_interface_, &address); inet6_address_.family = address.sin6_family; diff --git a/libtransport/src/hicn/transport/core/socket_connector.cc b/libtransport/src/hicn/transport/core/socket_connector.cc index 332b87ec7..704d3badb 100644 --- a/libtransport/src/hicn/transport/core/socket_connector.cc +++ b/libtransport/src/hicn/transport/core/socket_connector.cc @@ -62,6 +62,7 @@ SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback, is_connecting_(false), is_reconnection_(false), data_available_(false), + is_closed_(false), receive_callback_(receive_callback), on_reconnect_callback_(on_reconnect_callback), app_name_(app_name) {} @@ -102,7 +103,11 @@ void SocketConnector::send(const Packet::MemBufPtr &packet) { } void SocketConnector::close() { - io_service_.post([this]() { socket_.close(); }); + io_service_.dispatch([this]() { + is_closed_ = true; + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + }); } void SocketConnector::doWrite() { @@ -125,6 +130,9 @@ void SocketConnector::doWrite() { if (!output_buffer_.empty()) { doWrite(); } + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; } else { TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); tryReconnect(); @@ -141,6 +149,9 @@ void SocketConnector::doReadBody(std::size_t body_length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { receive_callback_(std::move(read_msg_)); doReadHeader(); + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; } else { TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); tryReconnect(); @@ -165,6 +176,9 @@ void SocketConnector::doReadHeader() { } else { TRANSPORT_LOGE("Decoding error. Ignoring packet."); } + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; } else { TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); tryReconnect(); @@ -173,11 +187,12 @@ void SocketConnector::doReadHeader() { } void SocketConnector::tryReconnect() { - if (!is_connecting_) { + if (!is_connecting_ && !is_closed_) { TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); is_connecting_ = true; is_reconnection_ = true; io_service_.post([this]() { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); socket_.close(); startConnectionTimer(); doConnect(); diff --git a/libtransport/src/hicn/transport/core/socket_connector.h b/libtransport/src/hicn/transport/core/socket_connector.h index d7a05aab4..e014111e2 100644 --- a/libtransport/src/hicn/transport/core/socket_connector.h +++ b/libtransport/src/hicn/transport/core/socket_connector.h @@ -19,6 +19,7 @@ #include <hicn/transport/core/name.h> #include <hicn/transport/utils/branch_prediction.h> +#include <asio/steady_timer.hpp> #include <asio.hpp> #include <deque> @@ -78,6 +79,7 @@ class SocketConnector : public Connector { bool is_connecting_; bool is_reconnection_; bool data_available_; + bool is_closed_; PacketReceivedCallback receive_callback_; OnReconnect on_reconnect_callback_; diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc index 37e1d7b3e..e06858cc3 100644 --- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc +++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc @@ -397,7 +397,7 @@ void AsyncFullDuplexSocket::onContentRetrieved(ConsumerSocket &s, return; } - TRANSPORT_LOGI("Received content with size %lu", size); + TRANSPORT_LOGI("Received content with size %zu", size); if (!ec) { read_callback_->readBufferAvailable(std::move(*receive_buffer_)); } else { diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc index de3e84417..cc4f478af 100755 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc @@ -22,6 +22,9 @@ namespace interface { RTCConsumerSocket::RTCConsumerSocket(int protocol, asio::io_service &io_service) : ConsumerSocket(protocol, io_service) {} +RTCConsumerSocket::RTCConsumerSocket(int protocol) + : ConsumerSocket(protocol) {} + RTCConsumerSocket::~RTCConsumerSocket() {} void RTCConsumerSocket::handleRTCPPacket(uint8_t *packet, size_t len) { diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h index 86ccf6e22..cfde3128d 100755 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h @@ -25,6 +25,8 @@ class RTCConsumerSocket : public ConsumerSocket { public: explicit RTCConsumerSocket(int protocol, asio::io_service &io_service); + explicit RTCConsumerSocket(int protocol); + ~RTCConsumerSocket(); void handleRTCPPacket(uint8_t *packet, size_t len); diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 7b39e7ac9..f19502dee 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -55,6 +55,24 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) prodLabel_ = ((rand() % 255) << 24UL); } +RTCProducerSocket::RTCProducerSocket() + : ProducerSocket(), + currentSeg_(1), + nack_(std::make_shared<ContentObject>()), + producedBytes_(0), + producedPackets_(0), + bytesProductionRate_(0), + packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), + perSecondFactor_(1000 / STATS_INTERVAL_DURATION) { + auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); + nack_payload->append(NACK_HEADER_SIZE); + nack_->appendPayload(std::move(nack_payload)); + lastStats_ = std::chrono::steady_clock::now(); + srand(time(NULL)); + prodLabel_ = ((rand() % 255) << 24UL); +} + + RTCProducerSocket::~RTCProducerSocket() {} void RTCProducerSocket::registerName(Prefix &producer_namespace) { diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index 1a42bdc56..f1bcaa9e8 100755 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -28,6 +28,9 @@ namespace interface { class RTCProducerSocket : public ProducerSocket { public: RTCProducerSocket(asio::io_service &io_service); + + RTCProducerSocket(); + ~RTCProducerSocket(); void registerName(Prefix &producer_namespace); diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index 27ed4e65f..89411e92c 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -99,7 +99,7 @@ int ConsumerSocket::consume(const Name &name, transport_protocol_->start(receive_buffer); - return CONSUMER_READY; + return CONSUMER_FINISHED; } int ConsumerSocket::asyncConsume( @@ -115,7 +115,7 @@ int ConsumerSocket::asyncConsume( }); } - return CONSUMER_READY; + return CONSUMER_RUNNING; } void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest, diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 9e309aae8..536d2fde3 100755 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -26,8 +26,9 @@ #include <hicn/transport/utils/event_thread.h> #include <hicn/transport/utils/sharable_vector.h> -#define CONSUMER_READY 0 +#define CONSUMER_FINISHED 0 #define CONSUMER_BUSY 1 +#define CONSUMER_RUNNING 2 namespace transport { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 69adc2b3f..d9204f111 100755 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -54,9 +54,9 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) } ProducerSocket::~ProducerSocket() { - TRANSPORT_LOGI("Destroying the ProducerSocket"); + processing_thread_stop_ = true; - portal_->stopEventsLoop(); + portal_->stopEventsLoop(true); if (processing_thread_.joinable()) { processing_thread_.join(); @@ -79,8 +79,6 @@ void ProducerSocket::serveForever() { } void ProducerSocket::stop() { - TRANSPORT_LOGI("Calling stop for ProducerSocket"); - portal_->killConnection(); portal_->stopEventsLoop(); } |