aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libtransport/src/hicn/transport/core/connector.cc3
-rw-r--r--libtransport/src/hicn/transport/core/connector.h14
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.cc36
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.h7
-rw-r--r--libtransport/src/hicn/transport/core/raw_socket_connector.cc32
-rw-r--r--libtransport/src/hicn/transport/core/raw_socket_connector.h2
-rw-r--r--libtransport/src/hicn/transport/core/tcp_socket_connector.cc35
-rw-r--r--libtransport/src/hicn/transport/core/tcp_socket_connector.h4
-rw-r--r--libtransport/src/hicn/transport/core/udp_socket_connector.cc35
-rw-r--r--libtransport/src/hicn/transport/core/udp_socket_connector.h4
-rw-r--r--libtransport/src/hicn/transport/utils/fd_deadline_timer.h2
11 files changed, 79 insertions, 95 deletions
diff --git a/libtransport/src/hicn/transport/core/connector.cc b/libtransport/src/hicn/transport/core/connector.cc
index e89b98f8a..fc271574c 100644
--- a/libtransport/src/hicn/transport/core/connector.cc
+++ b/libtransport/src/hicn/transport/core/connector.cc
@@ -25,7 +25,8 @@ Connector::Connector(PacketReceivedCallback &&receive_callback,
OnReconnect &&reconnect_callback)
: packet_pool_(),
receive_callback_(std::move(receive_callback)),
- on_reconnect_callback_(std::move(reconnect_callback)) {
+ on_reconnect_callback_(std::move(reconnect_callback)),
+ state_(ConnectorState::CLOSED) {
init();
}
diff --git a/libtransport/src/hicn/transport/core/connector.h b/libtransport/src/hicn/transport/core/connector.h
index 529e97bf9..c790f2bfb 100644
--- a/libtransport/src/hicn/transport/core/connector.h
+++ b/libtransport/src/hicn/transport/core/connector.h
@@ -34,6 +34,13 @@ enum class ConnectorType : uint8_t {
};
class Connector {
+ protected:
+ enum class ConnectorState {
+ CLOSED,
+ CONNECTING,
+ CONNECTED,
+ };
+
public:
static constexpr std::size_t packet_size = 2048;
static constexpr std::size_t queue_size = 4096;
@@ -48,7 +55,7 @@ class Connector {
Connector(PacketReceivedCallback &&receive_callback,
OnReconnect &&reconnect_callback);
- virtual ~Connector() = default;
+ virtual ~Connector(){};
virtual void send(const Packet::MemBufPtr &packet) = 0;
@@ -59,7 +66,7 @@ class Connector {
virtual void enableBurst() = 0;
- virtual void state() = 0;
+ virtual ConnectorState state() { return state_; };
protected:
void increasePoolSize(std::size_t size = packet_pool_size);
@@ -93,6 +100,9 @@ class Connector {
// Connector events
PacketReceivedCallback receive_callback_;
OnReconnect on_reconnect_callback_;
+
+ // Connector state
+ ConnectorState state_;
};
} // end namespace core
diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc
index 863e1aa20..f9695800b 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.cc
+++ b/libtransport/src/hicn/transport/core/memif_connector.cc
@@ -61,15 +61,15 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
memif_worker_(nullptr),
timer_set_(false),
send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
+ disconnect_timer_(
+ std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
io_service_(io_service),
packet_counter_(0),
memif_connection_(std::make_unique<memif_connection_t>()),
tx_buf_counter_(0),
- is_connecting_(true),
is_reconnection_(false),
data_available_(false),
enable_burst_(false),
- closed_(false),
app_name_(app_name),
socket_filename_("") {
std::call_once(MemifConnector::flag_, &MemifConnector::init, this);
@@ -89,6 +89,7 @@ void MemifConnector::init() {
void MemifConnector::connect(uint32_t memif_id, long memif_mode) {
TRANSPORT_LOGI("Creating memif");
+ state_ = ConnectorState::CONNECTING;
memif_id_ = memif_id;
socket_filename_ = "/run/vpp/memif.sock";
@@ -97,7 +98,7 @@ void MemifConnector::connect(uint32_t memif_id, long memif_mode) {
work_ = std::make_unique<asio::io_service::work>(io_service_);
- while (is_connecting_) {
+ while (state_ != ConnectorState::CONNECTED) {
MemifConnector::main_event_reactor_.runOneEvent();
}
@@ -140,7 +141,7 @@ int MemifConnector::createMemif(uint32_t index, uint8_t mode, char *s) {
args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP;
args.socket_filename = (uint8_t *)socket_filename_.c_str();
- TRANSPORT_LOGI("Socket filename: %s", args.socket_filename);
+ TRANSPORT_LOGD("Socket filename: %s", args.socket_filename);
args.interface_id = index;
/* last argument for memif_create (void * private_ctx) is used by user
@@ -297,7 +298,7 @@ int MemifConnector::txBurst(uint16_t qid) {
void MemifConnector::sendCallback(const std::error_code &ec) {
timer_set_ = false;
- if (TRANSPORT_EXPECT_TRUE(!ec && !is_connecting_)) {
+ if (TRANSPORT_EXPECT_TRUE(!ec && state_ == ConnectorState::CONNECTED)) {
doSend();
}
}
@@ -315,8 +316,8 @@ void MemifConnector::processInputBuffer() {
int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) {
TRANSPORT_LOGI("memif connected!\n");
MemifConnector *connector = (MemifConnector *)private_ctx;
+ connector->state_ = ConnectorState::CONNECTED;
memif_refill_queue(conn, 0, -1, 0);
- connector->is_connecting_ = false;
return 0;
}
@@ -326,7 +327,7 @@ int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) {
int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) {
TRANSPORT_LOGI("memif disconnected!");
MemifConnector *connector = (MemifConnector *)private_ctx;
- // TRANSPORT_LOGI ("Packet received: %u", connector->packet_counter_);
+ connector->state_ = ConnectorState::CLOSED;
TRANSPORT_LOGI("Packet to process: %u",
connector->memif_connection_->tx_buf_num);
return 0;
@@ -390,10 +391,6 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
TRANSPORT_LOGD("freed %d buffers. %u/%u alloc/free buffers", rx, rx,
MAX_MEMIF_BUFS - rx);
-
- // if (connector->enable_burst_) {
- // connector->doSend();
- // }
} while (ret_val == MEMIF_ERR_NOBUF);
connector->io_service_.post(
@@ -415,15 +412,17 @@ error:
}
void MemifConnector::close() {
- if (!closed_) {
- closed_ = true;
- event_reactor_.stop();
- work_.reset();
+ if (state_ != ConnectorState::CLOSED) {
+ disconnect_timer_->expiresFromNow(std::chrono::microseconds(50));
+ disconnect_timer_->asyncWait([this](const std::error_code &ec) {
+ deleteMemif();
+ event_reactor_.stop();
+ work_.reset();
+ });
if (memif_worker_ && memif_worker_->joinable()) {
memif_worker_->join();
TRANSPORT_LOGD("Memif worker joined");
- deleteMemif();
} else {
TRANSPORT_LOGD("Memif worker not joined");
}
@@ -496,11 +495,6 @@ int MemifConnector::doSend() {
return 0;
}
-void MemifConnector::state() {
- TRANSPORT_LOGD("Event reactor map: %zu", event_reactor_.mapSize());
- TRANSPORT_LOGD("Output buffer %zu", output_buffer_.size());
-}
-
void MemifConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {}
diff --git a/libtransport/src/hicn/transport/core/memif_connector.h b/libtransport/src/hicn/transport/core/memif_connector.h
index 609571389..057df37e4 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.h
+++ b/libtransport/src/hicn/transport/core/memif_connector.h
@@ -69,8 +69,6 @@ class MemifConnector : public Connector {
void enableBurst() override;
- void state() override;
-
TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; };
private:
@@ -112,6 +110,7 @@ class MemifConnector : public Connector {
utils::EpollEventReactor event_reactor_;
std::atomic_bool timer_set_;
std::unique_ptr<utils::FdDeadlineTimer> send_timer_;
+ std::unique_ptr<utils::FdDeadlineTimer> disconnect_timer_;
asio::io_service &io_service_;
std::unique_ptr<asio::io_service::work> work_;
uint32_t packet_counter_;
@@ -119,11 +118,9 @@ class MemifConnector : public Connector {
uint16_t tx_buf_counter_;
PacketRing input_buffer_;
- volatile bool is_connecting_;
- volatile bool is_reconnection_;
+ bool is_reconnection_;
bool data_available_;
bool enable_burst_;
- bool closed_;
uint32_t memif_id_;
uint8_t memif_mode_;
std::string app_name_;
diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.cc b/libtransport/src/hicn/transport/core/raw_socket_connector.cc
index 78241b2ea..12cc4e0fa 100644
--- a/libtransport/src/hicn/transport/core/raw_socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/raw_socket_connector.cc
@@ -54,6 +54,7 @@ RawSocketConnector::~RawSocketConnector() {}
void RawSocketConnector::connect(const std::string &interface_name,
const std::string &mac_address_str) {
+ state_ = ConnectorState::CONNECTING;
memset(&ethernet_header_, 0, sizeof(ethernet_header_));
struct ifreq ifr;
struct ifreq if_mac;
@@ -112,8 +113,6 @@ void RawSocketConnector::connect(const std::string &interface_name,
doRecvPacket();
}
-void RawSocketConnector::state() { return; }
-
void RawSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
// asio::async_write(socket_, asio::buffer(packet, len),
@@ -124,30 +123,16 @@ void RawSocketConnector::send(const uint8_t *packet, std::size_t len,
}
void RawSocketConnector::send(const Packet::MemBufPtr &packet) {
- // Packet &p = const_cast<Packet &>(packet);
- // p.setTcpChecksum();
-
- // if (!p.checkIntegrity()) {
- // TRANSPORT_LOGW("Sending message with wrong checksum!!!");
- // }
-
- // std::shared_ptr<const Packet> ptr;
- // try {
- // ptr = packet.shared_from_this();
- // } catch (std::bad_weak_ptr& exc) {
- // TRANSPORT_LOGW("Sending interest which has not been created using a
- // shared PTR! A copy will be made."); ptr =
- // std::shared_ptr<Packet>(packet.clone());
- // }
-
io_service_.post([this, packet]() {
bool write_in_progress = !output_buffer_.empty();
output_buffer_.push_back(std::move(packet));
- if (!write_in_progress) {
- doSendPacket();
- } else {
- // Tell the handle connect it has data to write
- data_available_ = true;
+ if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
+ if (!write_in_progress) {
+ doSendPacket();
+ } else {
+ // Tell the handle connect it has data to write
+ data_available_ = true;
+ }
}
});
}
@@ -202,6 +187,7 @@ void RawSocketConnector::doRecvPacket() {
}
void RawSocketConnector::doConnect() {
+ state_ = ConnectorState::CONNECTED;
socket_.bind(raw_endpoint(&link_layer_address_, sizeof(link_layer_address_)));
}
diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.h b/libtransport/src/hicn/transport/core/raw_socket_connector.h
index 57a2bc067..a5474f7f8 100644
--- a/libtransport/src/hicn/transport/core/raw_socket_connector.h
+++ b/libtransport/src/hicn/transport/core/raw_socket_connector.h
@@ -54,8 +54,6 @@ class RawSocketConnector : public Connector {
void connect(const std::string &interface_name,
const std::string &mac_address_str);
- void state() override;
-
private:
void doConnect();
diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.cc b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc
index 4c5f90db3..f1fd4bbac 100644
--- a/libtransport/src/hicn/transport/core/tcp_socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc
@@ -62,10 +62,8 @@ TcpSocketConnector::TcpSocketConnector(
resolver_(io_service_),
timer_(io_service_),
read_msg_(packet_pool_.makePtr(nullptr)),
- is_connecting_(false),
is_reconnection_(false),
data_available_(false),
- is_closed_(false),
app_name_(app_name) {}
TcpSocketConnector::~TcpSocketConnector() {}
@@ -74,11 +72,10 @@ void TcpSocketConnector::connect(std::string ip_address, std::string port) {
endpoint_iterator_ = resolver_.resolve(
{ip_address, port, asio::ip::resolver_query_base::numeric_service});
+ state_ = ConnectorState::CONNECTING;
doConnect();
}
-void TcpSocketConnector::state() { return; }
-
void TcpSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
asio::async_write(socket_, asio::buffer(packet, len),
@@ -91,7 +88,7 @@ void TcpSocketConnector::send(const Packet::MemBufPtr &packet) {
io_service_.post([this, packet]() {
bool write_in_progress = !output_buffer_.empty();
output_buffer_.push_back(std::move(packet));
- if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
if (!write_in_progress) {
doWrite();
}
@@ -103,11 +100,13 @@ void TcpSocketConnector::send(const Packet::MemBufPtr &packet) {
}
void TcpSocketConnector::close() {
- io_service_.dispatch([this]() {
- is_closed_ = true;
- socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
- socket_.close();
- });
+ if (state_ != ConnectorState::CLOSED) {
+ state_ = ConnectorState::CLOSED;
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+ }
}
void TcpSocketConnector::doWrite() {
@@ -208,13 +207,15 @@ void TcpSocketConnector::doReadHeader() {
}
void TcpSocketConnector::tryReconnect() {
- if (!is_connecting_ && !is_closed_) {
+ if (state_ == ConnectorState::CONNECTED) {
TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
- is_connecting_ = true;
+ state_ = ConnectorState::CONNECTING;
is_reconnection_ = true;
io_service_.post([this]() {
- socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
- socket_.close();
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
startConnectionTimer();
doConnect();
});
@@ -226,7 +227,7 @@ void TcpSocketConnector::doConnect() {
[this](std::error_code ec, tcp::resolver::iterator) {
if (!ec) {
timer_.cancel();
- is_connecting_ = false;
+ state_ = ConnectorState::CONNECTED;
asio::ip::tcp::no_delay noDelayOption(true);
socket_.set_option(noDelayOption);
doReadHeader();
@@ -248,7 +249,9 @@ void TcpSocketConnector::doConnect() {
});
}
-bool TcpSocketConnector::checkConnected() { return !is_connecting_; }
+bool TcpSocketConnector::checkConnected() {
+ return state_ == ConnectorState::CONNECTED;
+}
void TcpSocketConnector::enableBurst() { return; }
diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.h b/libtransport/src/hicn/transport/core/tcp_socket_connector.h
index 465eeb912..6df2fedff 100644
--- a/libtransport/src/hicn/transport/core/tcp_socket_connector.h
+++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.h
@@ -51,8 +51,6 @@ class TcpSocketConnector : public Connector {
void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
- void state() override;
-
private:
void doConnect();
@@ -79,10 +77,8 @@ class TcpSocketConnector : public Connector {
utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
- bool is_connecting_;
bool is_reconnection_;
bool data_available_;
- bool is_closed_;
std::string app_name_;
};
diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.cc b/libtransport/src/hicn/transport/core/udp_socket_connector.cc
index f38891e71..54c0eb978 100644
--- a/libtransport/src/hicn/transport/core/udp_socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/udp_socket_connector.cc
@@ -38,10 +38,8 @@ UdpSocketConnector::UdpSocketConnector(
connection_timer_(io_service_),
connection_timeout_(io_service_),
read_msg_(packet_pool_.makePtr(nullptr)),
- is_connecting_(false),
is_reconnection_(false),
data_available_(false),
- is_closed_(false),
app_name_(app_name) {}
UdpSocketConnector::~UdpSocketConnector() {}
@@ -50,11 +48,10 @@ void UdpSocketConnector::connect(std::string ip_address, std::string port) {
endpoint_iterator_ = resolver_.resolve(
{ip_address, port, asio::ip::resolver_query_base::numeric_service});
+ state_ = ConnectorState::CONNECTING;
doConnect();
}
-void UdpSocketConnector::state() { return; }
-
void UdpSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
socket_.async_send(asio::buffer(packet, len),
@@ -67,7 +64,7 @@ void UdpSocketConnector::send(const Packet::MemBufPtr &packet) {
io_service_.post([this, packet]() {
bool write_in_progress = !output_buffer_.empty();
output_buffer_.push_back(std::move(packet));
- if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
if (!write_in_progress) {
doWrite();
}
@@ -79,11 +76,13 @@ void UdpSocketConnector::send(const Packet::MemBufPtr &packet) {
}
void UdpSocketConnector::close() {
- io_service_.dispatch([this]() {
- is_closed_ = true;
- socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both);
- socket_.close();
- });
+ if (state_ != ConnectorState::CLOSED) {
+ state_ = ConnectorState::CLOSED;
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+ }
}
void UdpSocketConnector::doWrite() {
@@ -136,15 +135,17 @@ void UdpSocketConnector::doRead() {
}
void UdpSocketConnector::tryReconnect() {
- if (!is_connecting_ && !is_closed_) {
+ if (state_ == ConnectorState::CONNECTED) {
TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
- is_connecting_ = true;
+ state_ = ConnectorState::CONNECTING;
is_reconnection_ = true;
connection_timer_.expires_from_now(std::chrono::seconds(1));
connection_timer_.async_wait([this](const std::error_code &ec) {
if (!ec) {
- socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both);
- socket_.close();
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
startConnectionTimer();
doConnect();
}
@@ -157,7 +158,7 @@ void UdpSocketConnector::doConnect() {
[this](std::error_code ec, udp::resolver::iterator) {
if (!ec) {
connection_timeout_.cancel();
- is_connecting_ = false;
+ state_ = ConnectorState::CONNECTED;
doRead();
if (data_available_) {
@@ -176,7 +177,9 @@ void UdpSocketConnector::doConnect() {
});
}
-bool UdpSocketConnector::checkConnected() { return !is_connecting_; }
+bool UdpSocketConnector::checkConnected() {
+ return state_ == ConnectorState::CONNECTED;
+}
void UdpSocketConnector::enableBurst() { return; }
diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.h b/libtransport/src/hicn/transport/core/udp_socket_connector.h
index 4cde8f2eb..87198efde 100644
--- a/libtransport/src/hicn/transport/core/udp_socket_connector.h
+++ b/libtransport/src/hicn/transport/core/udp_socket_connector.h
@@ -49,8 +49,6 @@ class UdpSocketConnector : public Connector {
void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
- void state() override;
-
private:
void doConnect();
@@ -76,10 +74,8 @@ class UdpSocketConnector : public Connector {
utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
- bool is_connecting_;
bool is_reconnection_;
bool data_available_;
- bool is_closed_;
std::string app_name_;
};
diff --git a/libtransport/src/hicn/transport/utils/fd_deadline_timer.h b/libtransport/src/hicn/transport/utils/fd_deadline_timer.h
index 3ed4590bc..6fb823a05 100644
--- a/libtransport/src/hicn/transport/utils/fd_deadline_timer.h
+++ b/libtransport/src/hicn/transport/utils/fd_deadline_timer.h
@@ -53,7 +53,7 @@ class FdDeadlineTimer : public DeadlineTimer<FdDeadlineTimer> {
reactor_.addFileDescriptor(
timer_fd_, events,
- [callback{move(callback)}](const Event &event) -> int {
+ [callback = std::forward<WaitHandler &&>(callback)](const Event &event) -> int {
uint64_t s = 0;
std::error_code ec;