aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.cc5
-rw-r--r--libtransport/src/hicn/transport/core/portal.h16
-rw-r--r--libtransport/src/hicn/transport/core/socket_connector.cc19
-rw-r--r--libtransport/src/hicn/transport/core/socket_connector.h1
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.cc4
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/socket_consumer.h3
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/socket_producer.cc6
-rwxr-xr-xutils/src/ping_client.cc13
8 files changed, 42 insertions, 25 deletions
diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc
index b3785e5c3..6c5f2ff5f 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.cc
+++ b/libtransport/src/hicn/transport/core/memif_connector.cc
@@ -38,7 +38,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
timer_set_(false),
send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
io_service_(io_service),
- work_(std::make_unique<asio::io_service::work>(io_service_)),
packet_counter_(0),
memif_connection_({}),
tx_buf_counter_(0),
@@ -74,6 +73,8 @@ void MemifConnector::connect(uint32_t memif_id, long memif_mode) {
createMemif(memif_id, memif_mode, nullptr);
+ work_ = std::make_unique<asio::io_service::work>(io_service_);
+
while (is_connecting_) {
MemifConnector::main_event_reactor_.runOneEvent();
}
@@ -402,7 +403,7 @@ void MemifConnector::close() {
if (!closed_) {
closed_ = true;
event_reactor_.stop();
- io_service_.stop();
+ work_.reset();
if (memif_worker_ && memif_worker_->joinable()) {
memif_worker_->join();
diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h
index 52a721a35..58406bbde 100644
--- a/libtransport/src/hicn/transport/core/portal.h
+++ b/libtransport/src/hicn/transport/core/portal.h
@@ -97,9 +97,7 @@ class Portal {
virtual void onInterest(Interest::Ptr &&i) = 0;
};
- Portal() : Portal(internal_io_service_) {
- internal_work_ = std::make_unique<asio::io_service::work>(io_service_);
- }
+ Portal() : Portal(internal_io_service_) {}
Portal(asio::io_service &io_service)
: io_service_(io_service),
@@ -130,8 +128,7 @@ class Portal {
}
~Portal() {
- connector_.close();
- stopEventsLoop();
+ stopEventsLoop(true);
}
TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
@@ -227,15 +224,17 @@ class Portal {
forwarder_interface_.send(content_object);
}
- TRANSPORT_ALWAYS_INLINE void stopEventsLoop() {
- internal_work_.reset();
-
+ TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) {
for (auto &pend_interest : pending_interest_hash_table_) {
pend_interest.second->cancelTimer();
}
clear();
+ if(kill_connection) {
+ connector_.close();
+ }
+
io_service_.post([this]() { io_service_.stop(); });
}
@@ -340,7 +339,6 @@ class Portal {
private:
asio::io_service &io_service_;
asio::io_service internal_io_service_;
- std::unique_ptr<asio::io_service::work> internal_work_;
std::string app_name_;
diff --git a/libtransport/src/hicn/transport/core/socket_connector.cc b/libtransport/src/hicn/transport/core/socket_connector.cc
index 332b87ec7..704d3badb 100644
--- a/libtransport/src/hicn/transport/core/socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/socket_connector.cc
@@ -62,6 +62,7 @@ SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback,
is_connecting_(false),
is_reconnection_(false),
data_available_(false),
+ is_closed_(false),
receive_callback_(receive_callback),
on_reconnect_callback_(on_reconnect_callback),
app_name_(app_name) {}
@@ -102,7 +103,11 @@ void SocketConnector::send(const Packet::MemBufPtr &packet) {
}
void SocketConnector::close() {
- io_service_.post([this]() { socket_.close(); });
+ io_service_.dispatch([this]() {
+ is_closed_ = true;
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ });
}
void SocketConnector::doWrite() {
@@ -125,6 +130,9 @@ void SocketConnector::doWrite() {
if (!output_buffer_.empty()) {
doWrite();
}
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
} else {
TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
tryReconnect();
@@ -141,6 +149,9 @@ void SocketConnector::doReadBody(std::size_t body_length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
receive_callback_(std::move(read_msg_));
doReadHeader();
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
} else {
TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
tryReconnect();
@@ -165,6 +176,9 @@ void SocketConnector::doReadHeader() {
} else {
TRANSPORT_LOGE("Decoding error. Ignoring packet.");
}
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
} else {
TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
tryReconnect();
@@ -173,11 +187,12 @@ void SocketConnector::doReadHeader() {
}
void SocketConnector::tryReconnect() {
- if (!is_connecting_) {
+ if (!is_connecting_ && !is_closed_) {
TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
is_connecting_ = true;
is_reconnection_ = true;
io_service_.post([this]() {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
socket_.close();
startConnectionTimer();
doConnect();
diff --git a/libtransport/src/hicn/transport/core/socket_connector.h b/libtransport/src/hicn/transport/core/socket_connector.h
index b1757e59a..e014111e2 100644
--- a/libtransport/src/hicn/transport/core/socket_connector.h
+++ b/libtransport/src/hicn/transport/core/socket_connector.h
@@ -79,6 +79,7 @@ class SocketConnector : public Connector {
bool is_connecting_;
bool is_reconnection_;
bool data_available_;
+ bool is_closed_;
PacketReceivedCallback receive_callback_;
OnReconnect on_reconnect_callback_;
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
index 27ed4e65f..89411e92c 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -99,7 +99,7 @@ int ConsumerSocket::consume(const Name &name,
transport_protocol_->start(receive_buffer);
- return CONSUMER_READY;
+ return CONSUMER_FINISHED;
}
int ConsumerSocket::asyncConsume(
@@ -115,7 +115,7 @@ int ConsumerSocket::asyncConsume(
});
}
- return CONSUMER_READY;
+ return CONSUMER_RUNNING;
}
void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest,
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index 9e309aae8..536d2fde3 100755
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -26,8 +26,9 @@
#include <hicn/transport/utils/event_thread.h>
#include <hicn/transport/utils/sharable_vector.h>
-#define CONSUMER_READY 0
+#define CONSUMER_FINISHED 0
#define CONSUMER_BUSY 1
+#define CONSUMER_RUNNING 2
namespace transport {
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 69adc2b3f..d9204f111 100755
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -54,9 +54,9 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
}
ProducerSocket::~ProducerSocket() {
- TRANSPORT_LOGI("Destroying the ProducerSocket");
+
processing_thread_stop_ = true;
- portal_->stopEventsLoop();
+ portal_->stopEventsLoop(true);
if (processing_thread_.joinable()) {
processing_thread_.join();
@@ -79,8 +79,6 @@ void ProducerSocket::serveForever() {
}
void ProducerSocket::stop() {
- TRANSPORT_LOGI("Calling stop for ProducerSocket");
- portal_->killConnection();
portal_->stopEventsLoop();
}
diff --git a/utils/src/ping_client.cc b/utils/src/ping_client.cc
index e98a8b422..24f7bd7c9 100755
--- a/utils/src/ping_client.cc
+++ b/utils/src/ping_client.cc
@@ -76,13 +76,15 @@ class Configuration {
class Client : interface::BasePortal::ConsumerCallback {
public:
- Client(Configuration *c) : portal_() {
+ Client(Configuration *c)
+ : portal_(),
+ signals_(portal_.getIoService(), SIGINT, SIGQUIT) {
// Let the main thread to catch SIGINT and SIGQUIT
- // asio::signal_set signals(io_service, SIGINT, SIGQUIT);
- // signals.async_wait(std::bind(&Client::afterSignal, this));
-
portal_.connect();
portal_.setConsumerCallback(this);
+
+ signals_.async_wait(std::bind(&Client::afterSignal, this));
+
timer_.reset(new asio::steady_timer(portal_.getIoService()));
config_ = c;
sequence_number_ = config_->first_suffix_;
@@ -272,7 +274,7 @@ class Client : interface::BasePortal::ConsumerCallback {
std::cout << "Stop ping" << std::endl;
std::cout << "Sent: " << sent_ << " Received: " << received_
<< " Timeouts: " << timedout_ << std::endl;
- portal_.stopEventsLoop();
+ portal_.stopEventsLoop(true);
}
void reset() {
@@ -289,6 +291,7 @@ class Client : interface::BasePortal::ConsumerCallback {
private:
SendTimeMap send_timestamps_;
interface::BasePortal portal_;
+ asio::signal_set signals_;
uint64_t sequence_number_;
uint64_t last_jump_;
uint64_t processed_;