diff options
author | Mauro Sardara <msardara@cisco.com> | 2019-04-16 18:38:07 +0200 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2019-04-18 16:56:04 +0000 |
commit | b375370d0f11163da8cb752c4a3f992a89ef80ee (patch) | |
tree | b34e8d92fb616261209a09b89e3538ea94b4c3c3 /libtransport/src/hicn/transport/core/udp_socket_connector.cc | |
parent | 564dfea33b993c3ff6572894ef35f91ba37d23ed (diff) |
[HICN-178] Sync send of control messages.v19.04
Change-Id: I9a07c6c806ceba10f80a5f67337dce2eee76120d
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/core/udp_socket_connector.cc')
-rw-r--r-- | libtransport/src/hicn/transport/core/udp_socket_connector.cc | 100 |
1 files changed, 56 insertions, 44 deletions
diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.cc b/libtransport/src/hicn/transport/core/udp_socket_connector.cc index 54c0eb978..38945e755 100644 --- a/libtransport/src/hicn/transport/core/udp_socket_connector.cc +++ b/libtransport/src/hicn/transport/core/udp_socket_connector.cc @@ -21,6 +21,7 @@ #include <hicn/transport/utils/log.h> #include <hicn/transport/utils/object_pool.h> +#include <thread> #include <vector> namespace transport { @@ -36,7 +37,6 @@ UdpSocketConnector::UdpSocketConnector( socket_(io_service_), resolver_(io_service_), connection_timer_(io_service_), - connection_timeout_(io_service_), read_msg_(packet_pool_.makePtr(nullptr)), is_reconnection_(false), data_available_(false), @@ -54,10 +54,17 @@ void UdpSocketConnector::connect(std::string ip_address, std::string port) { void UdpSocketConnector::send(const uint8_t *packet, std::size_t len, const PacketSentCallback &packet_sent) { - socket_.async_send(asio::buffer(packet, len), - [packet_sent](std::error_code ec, std::size_t /*length*/) { - packet_sent(); - }); + if (packet_sent != 0) { + socket_.async_send( + asio::buffer(packet, len), + [packet_sent](std::error_code ec, std::size_t /*length*/) { + packet_sent(); + }); + } else { + if (state_ == ConnectorState::CONNECTED) { + socket_.send(asio::buffer(packet, len)); + } + } } void UdpSocketConnector::send(const Packet::MemBufPtr &packet) { @@ -76,6 +83,14 @@ void UdpSocketConnector::send(const Packet::MemBufPtr &packet) { } void UdpSocketConnector::close() { + if (io_service_.stopped()) { + doClose(); + } else { + io_service_.dispatch(std::bind(&UdpSocketConnector::doClose, this)); + } +} + +void UdpSocketConnector::doClose() { if (state_ != ConnectorState::CLOSED) { state_ = ConnectorState::CLOSED; if (socket_.is_open()) { @@ -86,8 +101,6 @@ void UdpSocketConnector::close() { } void UdpSocketConnector::doWrite() { - // TODO improve this piece of code for sending many buffers togethers - // if list contains more than one packet auto packet = output_buffer_.front().get(); auto array = std::vector<asio::const_buffer>(); @@ -97,8 +110,8 @@ void UdpSocketConnector::doWrite() { current = current->next(); } while (current != packet); - socket_.async_send(std::move(array), [this /*, packet*/](std::error_code ec, - std::size_t length) { + socket_.async_send(std::move(array), [this](std::error_code ec, + std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { output_buffer_.pop_front(); if (!output_buffer_.empty()) { @@ -139,54 +152,53 @@ void UdpSocketConnector::tryReconnect() { TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); state_ = ConnectorState::CONNECTING; is_reconnection_ = true; - connection_timer_.expires_from_now(std::chrono::seconds(1)); - connection_timer_.async_wait([this](const std::error_code &ec) { - if (!ec) { - if (socket_.is_open()) { - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - } - startConnectionTimer(); - doConnect(); + io_service_.post([this]() { + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); } + + doConnect(); + startConnectionTimer(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); }); } } void UdpSocketConnector::doConnect() { - asio::async_connect(socket_, endpoint_iterator_, - [this](std::error_code ec, udp::resolver::iterator) { - if (!ec) { - connection_timeout_.cancel(); - state_ = ConnectorState::CONNECTED; - doRead(); - - if (data_available_) { - data_available_ = false; - doWrite(); - } - - if (is_reconnection_) { - is_reconnection_ = false; - on_reconnect_callback_(); - } - } else { - sleep(1); - doConnect(); - } - }); + asio::async_connect( + socket_, endpoint_iterator_, + [this](std::error_code ec, udp::resolver::iterator) { + if (!ec) { + connection_timer_.cancel(); + state_ = ConnectorState::CONNECTED; + doRead(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + } + + on_reconnect_callback_(); + } else { + doConnect(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + }); } bool UdpSocketConnector::checkConnected() { return state_ == ConnectorState::CONNECTED; } -void UdpSocketConnector::enableBurst() { return; } - void UdpSocketConnector::startConnectionTimer() { - connection_timeout_.expires_from_now(std::chrono::seconds(60)); - connection_timeout_.async_wait(std::bind(&UdpSocketConnector::handleDeadline, - this, std::placeholders::_1)); + connection_timer_.expires_from_now(std::chrono::seconds(60)); + connection_timer_.async_wait(std::bind(&UdpSocketConnector::handleDeadline, + this, std::placeholders::_1)); } void UdpSocketConnector::handleDeadline(const std::error_code &ec) { |