From 6922da2298b07d797029da4d40368adf12546639 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Thu, 7 Mar 2019 18:34:52 +0100 Subject: [HICN-85] Added state to connectors. Change-Id: I26d1b37fec4a2482b97a80fa5648f243745908f7 Signed-off-by: Mauro Sardara --- libtransport/src/hicn/transport/core/connector.cc | 3 +- libtransport/src/hicn/transport/core/connector.h | 14 +++++++-- .../src/hicn/transport/core/memif_connector.cc | 36 +++++++++------------- .../src/hicn/transport/core/memif_connector.h | 7 ++--- .../hicn/transport/core/raw_socket_connector.cc | 32 ++++++------------- .../src/hicn/transport/core/raw_socket_connector.h | 2 -- .../hicn/transport/core/tcp_socket_connector.cc | 35 +++++++++++---------- .../src/hicn/transport/core/tcp_socket_connector.h | 4 --- .../hicn/transport/core/udp_socket_connector.cc | 35 +++++++++++---------- .../src/hicn/transport/core/udp_socket_connector.h | 4 --- .../src/hicn/transport/utils/fd_deadline_timer.h | 2 +- 11 files changed, 79 insertions(+), 95 deletions(-) (limited to 'libtransport/src/hicn') diff --git a/libtransport/src/hicn/transport/core/connector.cc b/libtransport/src/hicn/transport/core/connector.cc index e89b98f8a..fc271574c 100644 --- a/libtransport/src/hicn/transport/core/connector.cc +++ b/libtransport/src/hicn/transport/core/connector.cc @@ -25,7 +25,8 @@ Connector::Connector(PacketReceivedCallback &&receive_callback, OnReconnect &&reconnect_callback) : packet_pool_(), receive_callback_(std::move(receive_callback)), - on_reconnect_callback_(std::move(reconnect_callback)) { + on_reconnect_callback_(std::move(reconnect_callback)), + state_(ConnectorState::CLOSED) { init(); } diff --git a/libtransport/src/hicn/transport/core/connector.h b/libtransport/src/hicn/transport/core/connector.h index 529e97bf9..c790f2bfb 100644 --- a/libtransport/src/hicn/transport/core/connector.h +++ b/libtransport/src/hicn/transport/core/connector.h @@ -34,6 +34,13 @@ enum class ConnectorType : uint8_t { }; class Connector { + protected: + enum class ConnectorState { + CLOSED, + CONNECTING, + CONNECTED, + }; + public: static constexpr std::size_t packet_size = 2048; static constexpr std::size_t queue_size = 4096; @@ -48,7 +55,7 @@ class Connector { Connector(PacketReceivedCallback &&receive_callback, OnReconnect &&reconnect_callback); - virtual ~Connector() = default; + virtual ~Connector(){}; virtual void send(const Packet::MemBufPtr &packet) = 0; @@ -59,7 +66,7 @@ class Connector { virtual void enableBurst() = 0; - virtual void state() = 0; + virtual ConnectorState state() { return state_; }; protected: void increasePoolSize(std::size_t size = packet_pool_size); @@ -93,6 +100,9 @@ class Connector { // Connector events PacketReceivedCallback receive_callback_; OnReconnect on_reconnect_callback_; + + // Connector state + ConnectorState state_; }; } // end namespace core diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc index 863e1aa20..f9695800b 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.cc +++ b/libtransport/src/hicn/transport/core/memif_connector.cc @@ -61,15 +61,15 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, memif_worker_(nullptr), timer_set_(false), send_timer_(std::make_unique(event_reactor_)), + disconnect_timer_( + std::make_unique(event_reactor_)), io_service_(io_service), packet_counter_(0), memif_connection_(std::make_unique()), tx_buf_counter_(0), - is_connecting_(true), is_reconnection_(false), data_available_(false), enable_burst_(false), - closed_(false), app_name_(app_name), socket_filename_("") { std::call_once(MemifConnector::flag_, &MemifConnector::init, this); @@ -89,6 +89,7 @@ void MemifConnector::init() { void MemifConnector::connect(uint32_t memif_id, long memif_mode) { TRANSPORT_LOGI("Creating memif"); + state_ = ConnectorState::CONNECTING; memif_id_ = memif_id; socket_filename_ = "/run/vpp/memif.sock"; @@ -97,7 +98,7 @@ void MemifConnector::connect(uint32_t memif_id, long memif_mode) { work_ = std::make_unique(io_service_); - while (is_connecting_) { + while (state_ != ConnectorState::CONNECTED) { MemifConnector::main_event_reactor_.runOneEvent(); } @@ -140,7 +141,7 @@ int MemifConnector::createMemif(uint32_t index, uint8_t mode, char *s) { args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP; args.socket_filename = (uint8_t *)socket_filename_.c_str(); - TRANSPORT_LOGI("Socket filename: %s", args.socket_filename); + TRANSPORT_LOGD("Socket filename: %s", args.socket_filename); args.interface_id = index; /* last argument for memif_create (void * private_ctx) is used by user @@ -297,7 +298,7 @@ int MemifConnector::txBurst(uint16_t qid) { void MemifConnector::sendCallback(const std::error_code &ec) { timer_set_ = false; - if (TRANSPORT_EXPECT_TRUE(!ec && !is_connecting_)) { + if (TRANSPORT_EXPECT_TRUE(!ec && state_ == ConnectorState::CONNECTED)) { doSend(); } } @@ -315,8 +316,8 @@ void MemifConnector::processInputBuffer() { int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) { TRANSPORT_LOGI("memif connected!\n"); MemifConnector *connector = (MemifConnector *)private_ctx; + connector->state_ = ConnectorState::CONNECTED; memif_refill_queue(conn, 0, -1, 0); - connector->is_connecting_ = false; return 0; } @@ -326,7 +327,7 @@ int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) { int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) { TRANSPORT_LOGI("memif disconnected!"); MemifConnector *connector = (MemifConnector *)private_ctx; - // TRANSPORT_LOGI ("Packet received: %u", connector->packet_counter_); + connector->state_ = ConnectorState::CLOSED; TRANSPORT_LOGI("Packet to process: %u", connector->memif_connection_->tx_buf_num); return 0; @@ -390,10 +391,6 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, TRANSPORT_LOGD("freed %d buffers. %u/%u alloc/free buffers", rx, rx, MAX_MEMIF_BUFS - rx); - - // if (connector->enable_burst_) { - // connector->doSend(); - // } } while (ret_val == MEMIF_ERR_NOBUF); connector->io_service_.post( @@ -415,15 +412,17 @@ error: } void MemifConnector::close() { - if (!closed_) { - closed_ = true; - event_reactor_.stop(); - work_.reset(); + if (state_ != ConnectorState::CLOSED) { + disconnect_timer_->expiresFromNow(std::chrono::microseconds(50)); + disconnect_timer_->asyncWait([this](const std::error_code &ec) { + deleteMemif(); + event_reactor_.stop(); + work_.reset(); + }); if (memif_worker_ && memif_worker_->joinable()) { memif_worker_->join(); TRANSPORT_LOGD("Memif worker joined"); - deleteMemif(); } else { TRANSPORT_LOGD("Memif worker not joined"); } @@ -496,11 +495,6 @@ int MemifConnector::doSend() { return 0; } -void MemifConnector::state() { - TRANSPORT_LOGD("Event reactor map: %zu", event_reactor_.mapSize()); - TRANSPORT_LOGD("Output buffer %zu", output_buffer_.size()); -} - void MemifConnector::send(const uint8_t *packet, std::size_t len, const PacketSentCallback &packet_sent) {} diff --git a/libtransport/src/hicn/transport/core/memif_connector.h b/libtransport/src/hicn/transport/core/memif_connector.h index 609571389..057df37e4 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.h +++ b/libtransport/src/hicn/transport/core/memif_connector.h @@ -69,8 +69,6 @@ class MemifConnector : public Connector { void enableBurst() override; - void state() override; - TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; }; private: @@ -112,6 +110,7 @@ class MemifConnector : public Connector { utils::EpollEventReactor event_reactor_; std::atomic_bool timer_set_; std::unique_ptr send_timer_; + std::unique_ptr disconnect_timer_; asio::io_service &io_service_; std::unique_ptr work_; uint32_t packet_counter_; @@ -119,11 +118,9 @@ class MemifConnector : public Connector { uint16_t tx_buf_counter_; PacketRing input_buffer_; - volatile bool is_connecting_; - volatile bool is_reconnection_; + bool is_reconnection_; bool data_available_; bool enable_burst_; - bool closed_; uint32_t memif_id_; uint8_t memif_mode_; std::string app_name_; diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.cc b/libtransport/src/hicn/transport/core/raw_socket_connector.cc index 78241b2ea..12cc4e0fa 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_connector.cc +++ b/libtransport/src/hicn/transport/core/raw_socket_connector.cc @@ -54,6 +54,7 @@ RawSocketConnector::~RawSocketConnector() {} void RawSocketConnector::connect(const std::string &interface_name, const std::string &mac_address_str) { + state_ = ConnectorState::CONNECTING; memset(ðernet_header_, 0, sizeof(ethernet_header_)); struct ifreq ifr; struct ifreq if_mac; @@ -112,8 +113,6 @@ void RawSocketConnector::connect(const std::string &interface_name, doRecvPacket(); } -void RawSocketConnector::state() { return; } - void RawSocketConnector::send(const uint8_t *packet, std::size_t len, const PacketSentCallback &packet_sent) { // asio::async_write(socket_, asio::buffer(packet, len), @@ -124,30 +123,16 @@ void RawSocketConnector::send(const uint8_t *packet, std::size_t len, } void RawSocketConnector::send(const Packet::MemBufPtr &packet) { - // Packet &p = const_cast(packet); - // p.setTcpChecksum(); - - // if (!p.checkIntegrity()) { - // TRANSPORT_LOGW("Sending message with wrong checksum!!!"); - // } - - // std::shared_ptr ptr; - // try { - // ptr = packet.shared_from_this(); - // } catch (std::bad_weak_ptr& exc) { - // TRANSPORT_LOGW("Sending interest which has not been created using a - // shared PTR! A copy will be made."); ptr = - // std::shared_ptr(packet.clone()); - // } - io_service_.post([this, packet]() { bool write_in_progress = !output_buffer_.empty(); output_buffer_.push_back(std::move(packet)); - if (!write_in_progress) { - doSendPacket(); - } else { - // Tell the handle connect it has data to write - data_available_ = true; + if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) { + if (!write_in_progress) { + doSendPacket(); + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } } }); } @@ -202,6 +187,7 @@ void RawSocketConnector::doRecvPacket() { } void RawSocketConnector::doConnect() { + state_ = ConnectorState::CONNECTED; socket_.bind(raw_endpoint(&link_layer_address_, sizeof(link_layer_address_))); } diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.h b/libtransport/src/hicn/transport/core/raw_socket_connector.h index 57a2bc067..a5474f7f8 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_connector.h +++ b/libtransport/src/hicn/transport/core/raw_socket_connector.h @@ -54,8 +54,6 @@ class RawSocketConnector : public Connector { void connect(const std::string &interface_name, const std::string &mac_address_str); - void state() override; - private: void doConnect(); diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.cc b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc index 4c5f90db3..f1fd4bbac 100644 --- a/libtransport/src/hicn/transport/core/tcp_socket_connector.cc +++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc @@ -62,10 +62,8 @@ TcpSocketConnector::TcpSocketConnector( resolver_(io_service_), timer_(io_service_), read_msg_(packet_pool_.makePtr(nullptr)), - is_connecting_(false), is_reconnection_(false), data_available_(false), - is_closed_(false), app_name_(app_name) {} TcpSocketConnector::~TcpSocketConnector() {} @@ -74,11 +72,10 @@ void TcpSocketConnector::connect(std::string ip_address, std::string port) { endpoint_iterator_ = resolver_.resolve( {ip_address, port, asio::ip::resolver_query_base::numeric_service}); + state_ = ConnectorState::CONNECTING; doConnect(); } -void TcpSocketConnector::state() { return; } - void TcpSocketConnector::send(const uint8_t *packet, std::size_t len, const PacketSentCallback &packet_sent) { asio::async_write(socket_, asio::buffer(packet, len), @@ -91,7 +88,7 @@ void TcpSocketConnector::send(const Packet::MemBufPtr &packet) { io_service_.post([this, packet]() { bool write_in_progress = !output_buffer_.empty(); output_buffer_.push_back(std::move(packet)); - if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) { + if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) { if (!write_in_progress) { doWrite(); } @@ -103,11 +100,13 @@ void TcpSocketConnector::send(const Packet::MemBufPtr &packet) { } void TcpSocketConnector::close() { - io_service_.dispatch([this]() { - is_closed_ = true; - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - }); + if (state_ != ConnectorState::CLOSED) { + state_ = ConnectorState::CLOSED; + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + } } void TcpSocketConnector::doWrite() { @@ -208,13 +207,15 @@ void TcpSocketConnector::doReadHeader() { } void TcpSocketConnector::tryReconnect() { - if (!is_connecting_ && !is_closed_) { + if (state_ == ConnectorState::CONNECTED) { TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); - is_connecting_ = true; + state_ = ConnectorState::CONNECTING; is_reconnection_ = true; io_service_.post([this]() { - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } startConnectionTimer(); doConnect(); }); @@ -226,7 +227,7 @@ void TcpSocketConnector::doConnect() { [this](std::error_code ec, tcp::resolver::iterator) { if (!ec) { timer_.cancel(); - is_connecting_ = false; + state_ = ConnectorState::CONNECTED; asio::ip::tcp::no_delay noDelayOption(true); socket_.set_option(noDelayOption); doReadHeader(); @@ -248,7 +249,9 @@ void TcpSocketConnector::doConnect() { }); } -bool TcpSocketConnector::checkConnected() { return !is_connecting_; } +bool TcpSocketConnector::checkConnected() { + return state_ == ConnectorState::CONNECTED; +} void TcpSocketConnector::enableBurst() { return; } diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.h b/libtransport/src/hicn/transport/core/tcp_socket_connector.h index 465eeb912..6df2fedff 100644 --- a/libtransport/src/hicn/transport/core/tcp_socket_connector.h +++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.h @@ -51,8 +51,6 @@ class TcpSocketConnector : public Connector { void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); - void state() override; - private: void doConnect(); @@ -79,10 +77,8 @@ class TcpSocketConnector : public Connector { utils::ObjectPool::Ptr read_msg_; - bool is_connecting_; bool is_reconnection_; bool data_available_; - bool is_closed_; std::string app_name_; }; diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.cc b/libtransport/src/hicn/transport/core/udp_socket_connector.cc index f38891e71..54c0eb978 100644 --- a/libtransport/src/hicn/transport/core/udp_socket_connector.cc +++ b/libtransport/src/hicn/transport/core/udp_socket_connector.cc @@ -38,10 +38,8 @@ UdpSocketConnector::UdpSocketConnector( connection_timer_(io_service_), connection_timeout_(io_service_), read_msg_(packet_pool_.makePtr(nullptr)), - is_connecting_(false), is_reconnection_(false), data_available_(false), - is_closed_(false), app_name_(app_name) {} UdpSocketConnector::~UdpSocketConnector() {} @@ -50,11 +48,10 @@ void UdpSocketConnector::connect(std::string ip_address, std::string port) { endpoint_iterator_ = resolver_.resolve( {ip_address, port, asio::ip::resolver_query_base::numeric_service}); + state_ = ConnectorState::CONNECTING; doConnect(); } -void UdpSocketConnector::state() { return; } - void UdpSocketConnector::send(const uint8_t *packet, std::size_t len, const PacketSentCallback &packet_sent) { socket_.async_send(asio::buffer(packet, len), @@ -67,7 +64,7 @@ void UdpSocketConnector::send(const Packet::MemBufPtr &packet) { io_service_.post([this, packet]() { bool write_in_progress = !output_buffer_.empty(); output_buffer_.push_back(std::move(packet)); - if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) { + if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) { if (!write_in_progress) { doWrite(); } @@ -79,11 +76,13 @@ void UdpSocketConnector::send(const Packet::MemBufPtr &packet) { } void UdpSocketConnector::close() { - io_service_.dispatch([this]() { - is_closed_ = true; - socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both); - socket_.close(); - }); + if (state_ != ConnectorState::CLOSED) { + state_ = ConnectorState::CLOSED; + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + } } void UdpSocketConnector::doWrite() { @@ -136,15 +135,17 @@ void UdpSocketConnector::doRead() { } void UdpSocketConnector::tryReconnect() { - if (!is_connecting_ && !is_closed_) { + if (state_ == ConnectorState::CONNECTED) { TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); - is_connecting_ = true; + state_ = ConnectorState::CONNECTING; is_reconnection_ = true; connection_timer_.expires_from_now(std::chrono::seconds(1)); connection_timer_.async_wait([this](const std::error_code &ec) { if (!ec) { - socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both); - socket_.close(); + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } startConnectionTimer(); doConnect(); } @@ -157,7 +158,7 @@ void UdpSocketConnector::doConnect() { [this](std::error_code ec, udp::resolver::iterator) { if (!ec) { connection_timeout_.cancel(); - is_connecting_ = false; + state_ = ConnectorState::CONNECTED; doRead(); if (data_available_) { @@ -176,7 +177,9 @@ void UdpSocketConnector::doConnect() { }); } -bool UdpSocketConnector::checkConnected() { return !is_connecting_; } +bool UdpSocketConnector::checkConnected() { + return state_ == ConnectorState::CONNECTED; +} void UdpSocketConnector::enableBurst() { return; } diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.h b/libtransport/src/hicn/transport/core/udp_socket_connector.h index 4cde8f2eb..87198efde 100644 --- a/libtransport/src/hicn/transport/core/udp_socket_connector.h +++ b/libtransport/src/hicn/transport/core/udp_socket_connector.h @@ -49,8 +49,6 @@ class UdpSocketConnector : public Connector { void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); - void state() override; - private: void doConnect(); @@ -76,10 +74,8 @@ class UdpSocketConnector : public Connector { utils::ObjectPool::Ptr read_msg_; - bool is_connecting_; bool is_reconnection_; bool data_available_; - bool is_closed_; std::string app_name_; }; diff --git a/libtransport/src/hicn/transport/utils/fd_deadline_timer.h b/libtransport/src/hicn/transport/utils/fd_deadline_timer.h index 3ed4590bc..6fb823a05 100644 --- a/libtransport/src/hicn/transport/utils/fd_deadline_timer.h +++ b/libtransport/src/hicn/transport/utils/fd_deadline_timer.h @@ -53,7 +53,7 @@ class FdDeadlineTimer : public DeadlineTimer { reactor_.addFileDescriptor( timer_fd_, events, - [callback{move(callback)}](const Event &event) -> int { + [callback = std::forward(callback)](const Event &event) -> int { uint64_t s = 0; std::error_code ec; -- cgit 1.2.3-korg