diff options
8 files changed, 42 insertions, 25 deletions
diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc index b3785e5c3..6c5f2ff5f 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.cc +++ b/libtransport/src/hicn/transport/core/memif_connector.cc @@ -38,7 +38,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, timer_set_(false), send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)), io_service_(io_service), - work_(std::make_unique<asio::io_service::work>(io_service_)), packet_counter_(0), memif_connection_({}), tx_buf_counter_(0), @@ -74,6 +73,8 @@ void MemifConnector::connect(uint32_t memif_id, long memif_mode) { createMemif(memif_id, memif_mode, nullptr); + work_ = std::make_unique<asio::io_service::work>(io_service_); + while (is_connecting_) { MemifConnector::main_event_reactor_.runOneEvent(); } @@ -402,7 +403,7 @@ void MemifConnector::close() { if (!closed_) { closed_ = true; event_reactor_.stop(); - io_service_.stop(); + work_.reset(); if (memif_worker_ && memif_worker_->joinable()) { memif_worker_->join(); diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index 52a721a35..58406bbde 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -97,9 +97,7 @@ class Portal { virtual void onInterest(Interest::Ptr &&i) = 0; }; - Portal() : Portal(internal_io_service_) { - internal_work_ = std::make_unique<asio::io_service::work>(io_service_); - } + Portal() : Portal(internal_io_service_) {} Portal(asio::io_service &io_service) : io_service_(io_service), @@ -130,8 +128,7 @@ class Portal { } ~Portal() { - connector_.close(); - stopEventsLoop(); + stopEventsLoop(true); } TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { @@ -227,15 +224,17 @@ class Portal { forwarder_interface_.send(content_object); } - TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { - internal_work_.reset(); - + TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) { for (auto &pend_interest : pending_interest_hash_table_) { pend_interest.second->cancelTimer(); } clear(); + if(kill_connection) { + connector_.close(); + } + io_service_.post([this]() { io_service_.stop(); }); } @@ -340,7 +339,6 @@ class Portal { private: asio::io_service &io_service_; asio::io_service internal_io_service_; - std::unique_ptr<asio::io_service::work> internal_work_; std::string app_name_; diff --git a/libtransport/src/hicn/transport/core/socket_connector.cc b/libtransport/src/hicn/transport/core/socket_connector.cc index 332b87ec7..704d3badb 100644 --- a/libtransport/src/hicn/transport/core/socket_connector.cc +++ b/libtransport/src/hicn/transport/core/socket_connector.cc @@ -62,6 +62,7 @@ SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback, is_connecting_(false), is_reconnection_(false), data_available_(false), + is_closed_(false), receive_callback_(receive_callback), on_reconnect_callback_(on_reconnect_callback), app_name_(app_name) {} @@ -102,7 +103,11 @@ void SocketConnector::send(const Packet::MemBufPtr &packet) { } void SocketConnector::close() { - io_service_.post([this]() { socket_.close(); }); + io_service_.dispatch([this]() { + is_closed_ = true; + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + }); } void SocketConnector::doWrite() { @@ -125,6 +130,9 @@ void SocketConnector::doWrite() { if (!output_buffer_.empty()) { doWrite(); } + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; } else { TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); tryReconnect(); @@ -141,6 +149,9 @@ void SocketConnector::doReadBody(std::size_t body_length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { receive_callback_(std::move(read_msg_)); doReadHeader(); + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; } else { TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); tryReconnect(); @@ -165,6 +176,9 @@ void SocketConnector::doReadHeader() { } else { TRANSPORT_LOGE("Decoding error. Ignoring packet."); } + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; } else { TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); tryReconnect(); @@ -173,11 +187,12 @@ void SocketConnector::doReadHeader() { } void SocketConnector::tryReconnect() { - if (!is_connecting_) { + if (!is_connecting_ && !is_closed_) { TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); is_connecting_ = true; is_reconnection_ = true; io_service_.post([this]() { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); socket_.close(); startConnectionTimer(); doConnect(); diff --git a/libtransport/src/hicn/transport/core/socket_connector.h b/libtransport/src/hicn/transport/core/socket_connector.h index b1757e59a..e014111e2 100644 --- a/libtransport/src/hicn/transport/core/socket_connector.h +++ b/libtransport/src/hicn/transport/core/socket_connector.h @@ -79,6 +79,7 @@ class SocketConnector : public Connector { bool is_connecting_; bool is_reconnection_; bool data_available_; + bool is_closed_; PacketReceivedCallback receive_callback_; OnReconnect on_reconnect_callback_; diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index 27ed4e65f..89411e92c 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -99,7 +99,7 @@ int ConsumerSocket::consume(const Name &name, transport_protocol_->start(receive_buffer); - return CONSUMER_READY; + return CONSUMER_FINISHED; } int ConsumerSocket::asyncConsume( @@ -115,7 +115,7 @@ int ConsumerSocket::asyncConsume( }); } - return CONSUMER_READY; + return CONSUMER_RUNNING; } void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest, diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 9e309aae8..536d2fde3 100755 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -26,8 +26,9 @@ #include <hicn/transport/utils/event_thread.h> #include <hicn/transport/utils/sharable_vector.h> -#define CONSUMER_READY 0 +#define CONSUMER_FINISHED 0 #define CONSUMER_BUSY 1 +#define CONSUMER_RUNNING 2 namespace transport { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 69adc2b3f..d9204f111 100755 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -54,9 +54,9 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) } ProducerSocket::~ProducerSocket() { - TRANSPORT_LOGI("Destroying the ProducerSocket"); + processing_thread_stop_ = true; - portal_->stopEventsLoop(); + portal_->stopEventsLoop(true); if (processing_thread_.joinable()) { processing_thread_.join(); @@ -79,8 +79,6 @@ void ProducerSocket::serveForever() { } void ProducerSocket::stop() { - TRANSPORT_LOGI("Calling stop for ProducerSocket"); - portal_->killConnection(); portal_->stopEventsLoop(); } diff --git a/utils/src/ping_client.cc b/utils/src/ping_client.cc index e98a8b422..24f7bd7c9 100755 --- a/utils/src/ping_client.cc +++ b/utils/src/ping_client.cc @@ -76,13 +76,15 @@ class Configuration { class Client : interface::BasePortal::ConsumerCallback { public: - Client(Configuration *c) : portal_() { + Client(Configuration *c) + : portal_(), + signals_(portal_.getIoService(), SIGINT, SIGQUIT) { // Let the main thread to catch SIGINT and SIGQUIT - // asio::signal_set signals(io_service, SIGINT, SIGQUIT); - // signals.async_wait(std::bind(&Client::afterSignal, this)); - portal_.connect(); portal_.setConsumerCallback(this); + + signals_.async_wait(std::bind(&Client::afterSignal, this)); + timer_.reset(new asio::steady_timer(portal_.getIoService())); config_ = c; sequence_number_ = config_->first_suffix_; @@ -272,7 +274,7 @@ class Client : interface::BasePortal::ConsumerCallback { std::cout << "Stop ping" << std::endl; std::cout << "Sent: " << sent_ << " Received: " << received_ << " Timeouts: " << timedout_ << std::endl; - portal_.stopEventsLoop(); + portal_.stopEventsLoop(true); } void reset() { @@ -289,6 +291,7 @@ class Client : interface::BasePortal::ConsumerCallback { private: SendTimeMap send_timestamps_; interface::BasePortal portal_; + asio::signal_set signals_; uint64_t sequence_number_; uint64_t last_jump_; uint64_t processed_; |