aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/core/udp_socket_connector.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/core/udp_socket_connector.cc')
-rw-r--r--libtransport/src/hicn/transport/core/udp_socket_connector.cc100
1 files changed, 56 insertions, 44 deletions
diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.cc b/libtransport/src/hicn/transport/core/udp_socket_connector.cc
index 54c0eb978..38945e755 100644
--- a/libtransport/src/hicn/transport/core/udp_socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/udp_socket_connector.cc
@@ -21,6 +21,7 @@
#include <hicn/transport/utils/log.h>
#include <hicn/transport/utils/object_pool.h>
+#include <thread>
#include <vector>
namespace transport {
@@ -36,7 +37,6 @@ UdpSocketConnector::UdpSocketConnector(
socket_(io_service_),
resolver_(io_service_),
connection_timer_(io_service_),
- connection_timeout_(io_service_),
read_msg_(packet_pool_.makePtr(nullptr)),
is_reconnection_(false),
data_available_(false),
@@ -54,10 +54,17 @@ void UdpSocketConnector::connect(std::string ip_address, std::string port) {
void UdpSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
- socket_.async_send(asio::buffer(packet, len),
- [packet_sent](std::error_code ec, std::size_t /*length*/) {
- packet_sent();
- });
+ if (packet_sent != 0) {
+ socket_.async_send(
+ asio::buffer(packet, len),
+ [packet_sent](std::error_code ec, std::size_t /*length*/) {
+ packet_sent();
+ });
+ } else {
+ if (state_ == ConnectorState::CONNECTED) {
+ socket_.send(asio::buffer(packet, len));
+ }
+ }
}
void UdpSocketConnector::send(const Packet::MemBufPtr &packet) {
@@ -76,6 +83,14 @@ void UdpSocketConnector::send(const Packet::MemBufPtr &packet) {
}
void UdpSocketConnector::close() {
+ if (io_service_.stopped()) {
+ doClose();
+ } else {
+ io_service_.dispatch(std::bind(&UdpSocketConnector::doClose, this));
+ }
+}
+
+void UdpSocketConnector::doClose() {
if (state_ != ConnectorState::CLOSED) {
state_ = ConnectorState::CLOSED;
if (socket_.is_open()) {
@@ -86,8 +101,6 @@ void UdpSocketConnector::close() {
}
void UdpSocketConnector::doWrite() {
- // TODO improve this piece of code for sending many buffers togethers
- // if list contains more than one packet
auto packet = output_buffer_.front().get();
auto array = std::vector<asio::const_buffer>();
@@ -97,8 +110,8 @@ void UdpSocketConnector::doWrite() {
current = current->next();
} while (current != packet);
- socket_.async_send(std::move(array), [this /*, packet*/](std::error_code ec,
- std::size_t length) {
+ socket_.async_send(std::move(array), [this](std::error_code ec,
+ std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
output_buffer_.pop_front();
if (!output_buffer_.empty()) {
@@ -139,54 +152,53 @@ void UdpSocketConnector::tryReconnect() {
TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
state_ = ConnectorState::CONNECTING;
is_reconnection_ = true;
- connection_timer_.expires_from_now(std::chrono::seconds(1));
- connection_timer_.async_wait([this](const std::error_code &ec) {
- if (!ec) {
- if (socket_.is_open()) {
- socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
- socket_.close();
- }
- startConnectionTimer();
- doConnect();
+ io_service_.post([this]() {
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
}
+
+ doConnect();
+ startConnectionTimer();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
});
}
}
void UdpSocketConnector::doConnect() {
- asio::async_connect(socket_, endpoint_iterator_,
- [this](std::error_code ec, udp::resolver::iterator) {
- if (!ec) {
- connection_timeout_.cancel();
- state_ = ConnectorState::CONNECTED;
- doRead();
-
- if (data_available_) {
- data_available_ = false;
- doWrite();
- }
-
- if (is_reconnection_) {
- is_reconnection_ = false;
- on_reconnect_callback_();
- }
- } else {
- sleep(1);
- doConnect();
- }
- });
+ asio::async_connect(
+ socket_, endpoint_iterator_,
+ [this](std::error_code ec, udp::resolver::iterator) {
+ if (!ec) {
+ connection_timer_.cancel();
+ state_ = ConnectorState::CONNECTED;
+ doRead();
+
+ if (data_available_) {
+ data_available_ = false;
+ doWrite();
+ }
+
+ if (is_reconnection_) {
+ is_reconnection_ = false;
+ }
+
+ on_reconnect_callback_();
+ } else {
+ doConnect();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ });
}
bool UdpSocketConnector::checkConnected() {
return state_ == ConnectorState::CONNECTED;
}
-void UdpSocketConnector::enableBurst() { return; }
-
void UdpSocketConnector::startConnectionTimer() {
- connection_timeout_.expires_from_now(std::chrono::seconds(60));
- connection_timeout_.async_wait(std::bind(&UdpSocketConnector::handleDeadline,
- this, std::placeholders::_1));
+ connection_timer_.expires_from_now(std::chrono::seconds(60));
+ connection_timer_.async_wait(std::bind(&UdpSocketConnector::handleDeadline,
+ this, std::placeholders::_1));
}
void UdpSocketConnector::handleDeadline(const std::error_code &ec) {