From 79e0d4f89c4d532189aae06cc5dfbc14e3269703 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Mon, 11 Feb 2019 10:44:29 +0100 Subject: [HICN-50] Added udp application connector. Change-Id: I0c5afad4b404ec485f50b1342b81e70ef85a5163 Signed-off-by: Mauro Sardara Signed-off-by: michele papalini --- .../src/hicn/transport/core/CMakeLists.txt | 8 +- libtransport/src/hicn/transport/core/connector.cc | 8 +- libtransport/src/hicn/transport/core/connector.h | 27 ++- .../src/hicn/transport/core/forwarder_interface.h | 20 +- .../transport/core/hicn_forwarder_interface.cc | 121 +++++++--- .../hicn/transport/core/hicn_forwarder_interface.h | 26 ++- .../src/hicn/transport/core/memif_connector.cc | 5 +- .../src/hicn/transport/core/memif_connector.h | 2 - libtransport/src/hicn/transport/core/portal.h | 33 +-- .../hicn/transport/core/raw_socket_connector.cc | 4 +- .../src/hicn/transport/core/raw_socket_connector.h | 3 - .../src/hicn/transport/core/raw_socket_interface.h | 10 + .../src/hicn/transport/core/socket_connector.cc | 257 --------------------- .../src/hicn/transport/core/socket_connector.h | 91 -------- .../hicn/transport/core/tcp_socket_connector.cc | 255 ++++++++++++++++++++ .../src/hicn/transport/core/tcp_socket_connector.h | 89 +++++++ .../hicn/transport/core/udp_socket_connector.cc | 201 ++++++++++++++++ .../src/hicn/transport/core/udp_socket_connector.h | 88 +++++++ .../hicn/transport/core/vpp_forwarder_interface.cc | 32 +-- .../hicn/transport/core/vpp_forwarder_interface.h | 13 +- libtransport/src/hicn/transport/protocols/rtc.cc | 4 +- 21 files changed, 847 insertions(+), 450 deletions(-) delete mode 100644 libtransport/src/hicn/transport/core/socket_connector.cc delete mode 100644 libtransport/src/hicn/transport/core/socket_connector.h create mode 100644 libtransport/src/hicn/transport/core/tcp_socket_connector.cc create mode 100644 libtransport/src/hicn/transport/core/tcp_socket_connector.h create mode 100644 libtransport/src/hicn/transport/core/udp_socket_connector.cc create mode 100644 libtransport/src/hicn/transport/core/udp_socket_connector.h (limited to 'libtransport/src/hicn') diff --git a/libtransport/src/hicn/transport/core/CMakeLists.txt b/libtransport/src/hicn/transport/core/CMakeLists.txt index dff93adeb..0e674fcac 100644 --- a/libtransport/src/hicn/transport/core/CMakeLists.txt +++ b/libtransport/src/hicn/transport/core/CMakeLists.txt @@ -17,7 +17,6 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/content_object.h ${CMAKE_CURRENT_SOURCE_DIR}/facade.h ${CMAKE_CURRENT_SOURCE_DIR}/interest.h - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest_inline.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.h @@ -29,7 +28,8 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/portal.h ${CMAKE_CURRENT_SOURCE_DIR}/prefix.h ${CMAKE_CURRENT_SOURCE_DIR}/connector.h - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.h ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.h ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_interface.h @@ -39,12 +39,12 @@ list(APPEND HEADER_FILES list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/content_object.cc ${CMAKE_CURRENT_SOURCE_DIR}/interest.cc - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.cc ${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.cc ${CMAKE_CURRENT_SOURCE_DIR}/packet.cc ${CMAKE_CURRENT_SOURCE_DIR}/name.cc ${CMAKE_CURRENT_SOURCE_DIR}/prefix.cc - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.cc ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.cc ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.cc ${CMAKE_CURRENT_SOURCE_DIR}/connector.cc diff --git a/libtransport/src/hicn/transport/core/connector.cc b/libtransport/src/hicn/transport/core/connector.cc index ff567d78a..e89b98f8a 100644 --- a/libtransport/src/hicn/transport/core/connector.cc +++ b/libtransport/src/hicn/transport/core/connector.cc @@ -21,7 +21,13 @@ namespace core { std::once_flag Connector::init_flag_; -Connector::Connector() : packet_pool_() { init(); } +Connector::Connector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback) + : packet_pool_(), + receive_callback_(std::move(receive_callback)), + on_reconnect_callback_(std::move(reconnect_callback)) { + init(); +} void Connector::init() { increasePoolSize(); } diff --git a/libtransport/src/hicn/transport/core/connector.h b/libtransport/src/hicn/transport/core/connector.h index ae82ee973..529e97bf9 100644 --- a/libtransport/src/hicn/transport/core/connector.h +++ b/libtransport/src/hicn/transport/core/connector.h @@ -33,19 +33,20 @@ enum class ConnectorType : uint8_t { VPP_CONNECTOR, }; -static constexpr std::size_t packet_size = 2048; -static constexpr std::size_t queue_size = 4096; -static constexpr std::size_t packet_pool_size = 4096; - -using PacketRing = utils::CircularFifo; -using PacketQueue = std::deque; -using PacketReceivedCallback = std::function; -using OnReconnect = std::function; -using PacketSentCallback = std::function; - class Connector { public: - Connector(); + static constexpr std::size_t packet_size = 2048; + static constexpr std::size_t queue_size = 4096; + static constexpr std::size_t packet_pool_size = 4096; + + using PacketRing = utils::CircularFifo; + using PacketQueue = std::deque; + using PacketReceivedCallback = std::function; + using OnReconnect = std::function; + using PacketSentCallback = std::function; + + Connector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback); virtual ~Connector() = default; @@ -88,6 +89,10 @@ class Connector { static std::once_flag init_flag_; utils::ObjectPool packet_pool_; PacketQueue output_buffer_; + + // Connector events + PacketReceivedCallback receive_callback_; + OnReconnect on_reconnect_callback_; }; } // end namespace core diff --git a/libtransport/src/hicn/transport/core/forwarder_interface.h b/libtransport/src/hicn/transport/core/forwarder_interface.h index e7b6fb1a6..de9f3b568 100644 --- a/libtransport/src/hicn/transport/core/forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/forwarder_interface.h @@ -16,7 +16,7 @@ #pragma once #include -#include +#include #include #include @@ -54,8 +54,6 @@ class ForwarderInterface { } public: - static constexpr uint8_t ack_code = 102; - virtual ~ForwarderInterface() {} TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { @@ -70,6 +68,20 @@ class ForwarderInterface { return static_cast(*this).getMtu(); } + TRANSPORT_ALWAYS_INLINE static bool isControlMessage(const uint8_t *message) { + return Implementation::isControlMessageImpl(message); + } + + template + TRANSPORT_ALWAYS_INLINE void processControlMessageReply(R &&packet_buffer) { + return static_cast(*this).processControlMessageReplyImpl( + std::forward(packet_buffer)); + } + + TRANSPORT_ALWAYS_INLINE void closeConnection() { + return static_cast(*this).closeConnection(); + } + template < typename R, typename = std::enable_if_t< @@ -97,7 +109,7 @@ class ForwarderInterface { counters_.tx_bytes += len; // Perfect forwarding - connector_.send(packet, len, std::forward(packet_sent)); + connector_.send(packet, len, std::forward(packet_sent)); } TRANSPORT_ALWAYS_INLINE void shutdown() { connector_.close(); } diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc index 9dc3b63bb..1c8060906 100644 --- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc +++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc @@ -15,16 +15,18 @@ #include -#define ADDR_INET 1 -#define ADDR_INET6 2 -#define ADD_ROUTE 3 -#define REQUEST_LIGHT 100 - union AddressLight { uint32_t ipv4; struct in6_addr ipv6; }; +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; +} CommandHeader; + typedef struct { uint8_t message_type; uint8_t command_id; @@ -37,51 +39,104 @@ typedef struct { uint8_t len; } RouteToSelfCommand; -namespace transport { - -namespace core { - -HicnForwarderInterface::HicnForwarderInterface(SocketConnector &connector) - : ForwarderInterface(connector) {} - -HicnForwarderInterface::~HicnForwarderInterface() {} +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; + char symbolic_or_connid[16]; +} DeleteSelfConnectionCommand; -void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); } +namespace { +static constexpr uint8_t addr_inet = 1; +static constexpr uint8_t addr_inet6 = 2; +static constexpr uint8_t add_route_command = 3; +static constexpr uint8_t delete_connection_command = 5; +static constexpr uint8_t request_light = 0xc0; +static constexpr char identifier[] = "SELF"; -void HicnForwarderInterface::registerRoute(Prefix &prefix) { - auto addr = prefix.toSockaddr(); - const char *identifier = {"SELF_ROUTE"}; +void fillCommandHeader(CommandHeader *header) { + // Allocate and fill the header + header->message_type = request_light; + header->length = 1; +} - // allocate command payload - RouteToSelfCommand *route_to_self = new RouteToSelfCommand(); +std::unique_ptr createCommandRoute( + std::unique_ptr &&addr, uint8_t prefix_length) { + auto command = std::make_unique(); // check and set IP address if (addr->sa_family == AF_INET) { - route_to_self->address_type = ADDR_INET; - route_to_self->address.ipv4 = ((Sockaddr4 *)addr.get())->sin_addr.s_addr; + command->address_type = addr_inet; + command->address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr; } else if (addr->sa_family == AF_INET6) { - route_to_self->address_type = ADDR_INET6; - route_to_self->address.ipv6 = ((Sockaddr6 *)addr.get())->sin6_addr; + command->address_type = addr_inet6; + command->address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr; } // Fill remaining payload fields #ifndef _WIN32 - strcpy(route_to_self->symbolic_or_connid, identifier); + strcpy(command->symbolic_or_connid, identifier); #else - strcpy_s(route_to_self->symbolic_or_connid, strlen(route_to_self->symbolic_or_connid), identifier); + strcpy_s(route_to_self->symbolic_or_connid, + strlen(route_to_self->symbolic_or_connid), identifier); #endif - route_to_self->cost = 1; - route_to_self->len = (uint8_t) prefix.getPrefixLength(); + command->cost = 1; + command->len = (uint8_t)prefix_length; // Allocate and fill the header - route_to_self->command_id = ADD_ROUTE; - route_to_self->message_type = REQUEST_LIGHT; - route_to_self->length = 1; + command->command_id = add_route_command; + fillCommandHeader((CommandHeader *)command.get()); + + return command; +} + +std::unique_ptr createCommandDeleteConnection() { + auto command = std::make_unique(); + fillCommandHeader((CommandHeader *)command.get()); + command->command_id = delete_connection_command; + +#ifndef _WIN32 + strcpy(command->symbolic_or_connid, identifier); +#else + strcpy_s(route_to_self->symbolic_or_connid, + strlen(route_to_self->symbolic_or_connid), identifier); +#endif + + return command; +} + +} // namespace + +namespace transport { + +namespace core { + +HicnForwarderInterface::HicnForwarderInterface(UdpSocketConnector &connector) + : ForwarderInterface( + connector) {} + +HicnForwarderInterface::~HicnForwarderInterface() {} + +void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); } + +void HicnForwarderInterface::registerRoute(Prefix &prefix) { + auto command = + createCommandRoute(prefix.toSockaddr(), prefix.getPrefixLength()) + .release(); + send((uint8_t *)command, sizeof(RouteToSelfCommand), + [command]() { delete command; }); +} - send((uint8_t *)route_to_self, sizeof(RouteToSelfCommand), - [route_to_self]() { delete route_to_self; }); +void HicnForwarderInterface::closeConnection() { + auto command = createCommandDeleteConnection().release(); + send((uint8_t *)command, sizeof(DeleteSelfConnectionCommand), + [this, command]() { + delete command; + connector_.close(); + }); } } // namespace core -} // namespace transport \ No newline at end of file +} // namespace transport diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h index e57fae105..b11841b69 100644 --- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h @@ -17,7 +17,7 @@ #include #include -#include +#include #include @@ -26,7 +26,10 @@ namespace transport { namespace core { class HicnForwarderInterface - : public ForwarderInterface { + : public ForwarderInterface { + static constexpr uint8_t ack_code = 0xc2; + static constexpr uint8_t nack_code = 0xc3; + public: union addressLight { uint32_t ipv4; @@ -46,9 +49,9 @@ class HicnForwarderInterface }; using route_to_self_command = struct route_to_self_command; - using ConnectorType = SocketConnector; + using ConnectorType = UdpSocketConnector; - HicnForwarderInterface(SocketConnector &connector); + HicnForwarderInterface(UdpSocketConnector &connector); ~HicnForwarderInterface(); @@ -58,6 +61,21 @@ class HicnForwarderInterface std::uint16_t getMtu() { return interface_mtu; } + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return message[0] == ack_code || message[0] == nack_code; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) { + if (packet_buffer->data()[0] == nack_code) { + throw errors::RuntimeException( + "Received Nack message from hicn light forwarder."); + } + } + + void closeConnection(); + private: static constexpr std::uint16_t interface_mtu = 1500; }; diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc index 38b2a2a98..c69a87fb7 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.cc +++ b/libtransport/src/hicn/transport/core/memif_connector.cc @@ -57,7 +57,7 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, OnReconnect &&on_reconnect_callback, asio::io_service &io_service, std::string app_name) - : Connector(), + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), memif_worker_(nullptr), timer_set_(false), send_timer_(std::make_unique(event_reactor_)), @@ -71,8 +71,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, enable_burst_(false), closed_(false), app_name_(app_name), - receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback), socket_filename_("") { std::call_once(MemifConnector::flag_, &MemifConnector::init, this); } @@ -372,7 +370,6 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, packet->append(packet_length); if (!connector->input_buffer_.push(std::move(packet))) { - TRANSPORT_LOGI("Error pushing packet. Ring buffer full."); // TODO Here we should consider the possibility to signal the congestion diff --git a/libtransport/src/hicn/transport/core/memif_connector.h b/libtransport/src/hicn/transport/core/memif_connector.h index 3d2e8411d..06a8fd73e 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.h +++ b/libtransport/src/hicn/transport/core/memif_connector.h @@ -128,8 +128,6 @@ class MemifConnector : public Connector { uint8_t memif_mode_; std::string app_name_; uint16_t transmission_index_; - PacketReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; utils::SpinLock write_msgs_lock_; std::string socket_filename_; diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index 0932b56c6..7efbc2009 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -22,7 +22,7 @@ #include #include #include -#include +#include #include #include #include @@ -222,22 +222,25 @@ class Portal { } TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) { - for (auto &pend_interest : pending_interest_hash_table_) { - pend_interest.second->cancelTimer(); - } - - clear(); - if (kill_connection) { - connector_.close(); + forwarder_interface_.closeConnection(); } - io_service_.post([this]() { io_service_.stop(); }); + io_service_.post([this]() { + clear(); + io_service_.stop(); + }); } TRANSPORT_ALWAYS_INLINE void killConnection() { connector_.close(); } - TRANSPORT_ALWAYS_INLINE void clear() { pending_interest_hash_table_.clear(); } + TRANSPORT_ALWAYS_INLINE void clear() { + for (auto &pend_interest : pending_interest_hash_table_) { + pend_interest.second->cancelTimer(); + } + + pending_interest_hash_table_.clear(); + } TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { return io_service_; @@ -260,17 +263,16 @@ class Portal { return; } - if (packet_buffer->data()[0] == ForwarderInt::ack_code) { - // Hicn forwarder message + if (TRANSPORT_EXPECT_FALSE( + ForwarderInt::isControlMessage(packet_buffer->data()))) { processControlMessage(std::move(packet_buffer)); return; } - bool is_interest = Packet::isInterest(packet_buffer->data()); Packet::Format format = Packet::getFormatFromBuffer(packet_buffer->data()); if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { - if (!is_interest) { + if (!Packet::isInterest(packet_buffer->data())) { processContentObject( ContentObject::Ptr(new ContentObject(std::move(packet_buffer)))); } else { @@ -329,8 +331,7 @@ class Portal { TRANSPORT_ALWAYS_INLINE void processControlMessage( Packet::MemBufPtr &&packet_buffer) { - // Control message as response to the route set by a producer. - // Do nothing + forwarder_interface_.processControlMessageReply(std::move(packet_buffer)); } private: diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.cc b/libtransport/src/hicn/transport/core/raw_socket_connector.cc index 5cfff39fb..fe16d2132 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_connector.cc +++ b/libtransport/src/hicn/transport/core/raw_socket_connector.cc @@ -39,15 +39,13 @@ RawSocketConnector::RawSocketConnector( PacketReceivedCallback &&receive_callback, OnReconnect &&on_reconnect_callback, asio::io_service &io_service, std::string app_name) - : Connector(), + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), io_service_(io_service), socket_(io_service_, raw_protocol(PF_PACKET, SOCK_RAW)), // resolver_(io_service_), timer_(io_service_), read_msg_(packet_pool_.makePtr(nullptr)), data_available_(false), - receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback), app_name_(app_name) { memset(&link_layer_address_, 0, sizeof(link_layer_address_)); } diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.h b/libtransport/src/hicn/transport/core/raw_socket_connector.h index 5e39efa0e..bb24d9d54 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_connector.h +++ b/libtransport/src/hicn/transport/core/raw_socket_connector.h @@ -74,9 +74,6 @@ class RawSocketConnector : public Connector { utils::ObjectPool::Ptr read_msg_; bool data_available_; - - PacketReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; std::string app_name_; }; diff --git a/libtransport/src/hicn/transport/core/raw_socket_interface.h b/libtransport/src/hicn/transport/core/raw_socket_interface.h index c030af662..ac48e5874 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_interface.h +++ b/libtransport/src/hicn/transport/core/raw_socket_interface.h @@ -41,6 +41,16 @@ class RawSocketInterface std::uint16_t getMtu() { return interface_mtu; } + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return false; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) {} + + TRANSPORT_ALWAYS_INLINE void closeConnection(){}; + private: static constexpr std::uint16_t interface_mtu = 1500; std::string remote_mac_address_; diff --git a/libtransport/src/hicn/transport/core/socket_connector.cc b/libtransport/src/hicn/transport/core/socket_connector.cc deleted file mode 100644 index 7bf0570ad..000000000 --- a/libtransport/src/hicn/transport/core/socket_connector.cc +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifdef _WIN32 -#include -#endif -#include -#include -#include -#include - -#include - -namespace transport { - -namespace core { - -namespace { -class NetworkMessage { - public: - static constexpr std::size_t fixed_header_length = 10; - - static std::size_t decodeHeader(const uint8_t *packet) { - // General checks - // CCNX Control packet format - uint8_t first_byte = packet[0]; - uint8_t ip_format = (packet[0] & 0xf0) >> 4; - - if (TRANSPORT_EXPECT_FALSE(first_byte == 102)) { - // Get packet length - return 44; - } else if (TRANSPORT_EXPECT_TRUE(ip_format == 6 || ip_format == 4)) { - Packet::Format format = Packet::getFormatFromBuffer(packet); - return Packet::getHeaderSizeFromBuffer(format, packet) + - Packet::getPayloadSizeFromBuffer(format, packet); - } - - return 0; - } -}; -} // namespace - -SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback, - OnReconnect &&on_reconnect_callback, - asio::io_service &io_service, - std::string app_name) - : Connector(), - io_service_(io_service), - socket_(io_service_), - resolver_(io_service_), - timer_(io_service_), - read_msg_(packet_pool_.makePtr(nullptr)), - is_connecting_(false), - is_reconnection_(false), - data_available_(false), - is_closed_(false), - receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback), - app_name_(app_name) {} - -SocketConnector::~SocketConnector() {} - -void SocketConnector::connect(std::string ip_address, std::string port) { - endpoint_iterator_ = resolver_.resolve( - {ip_address, port, asio::ip::resolver_query_base::numeric_service}); - - doConnect(); -} - -void SocketConnector::state() { return; } - -void SocketConnector::send(const uint8_t *packet, std::size_t len, - const PacketSentCallback &packet_sent) { - asio::async_write(socket_, asio::buffer(packet, len), - [packet_sent](std::error_code ec, std::size_t /*length*/) { - packet_sent(); - }); -} - -void SocketConnector::send(const Packet::MemBufPtr &packet) { - io_service_.post([this, packet]() { - bool write_in_progress = !output_buffer_.empty(); - output_buffer_.push_back(std::move(packet)); - if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) { - if (!write_in_progress) { - doWrite(); - } - } else { - // Tell the handle connect it has data to write - data_available_ = true; - } - }); -} - -void SocketConnector::close() { - io_service_.dispatch([this]() { - is_closed_ = true; - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - }); -} - -void SocketConnector::doWrite() { - // TODO improve this piece of code for sending many buffers togethers - // if list contains more than one packet - auto packet = output_buffer_.front().get(); - auto array = std::vector(); - - const utils::MemBuf *current = packet; - do { - array.push_back(asio::const_buffer(current->data(), current->length())); - current = current->next(); - } while (current != packet); - - asio::async_write( - socket_, std::move(array), - [this /*, packet*/](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - output_buffer_.pop_front(); - if (!output_buffer_.empty()) { - doWrite(); - } - } else if (ec.value() == - static_cast(std::errc::operation_canceled)) { - // The connection has been closed by the application. - return; - } else { - TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); - tryReconnect(); - } - }); -} - -void SocketConnector::doReadBody(std::size_t body_length) { - asio::async_read( - socket_, asio::buffer(read_msg_->writableTail(), body_length), - asio::transfer_exactly(body_length), - [this](std::error_code ec, std::size_t length) { - read_msg_->append(length); - if (TRANSPORT_EXPECT_TRUE(!ec)) { - receive_callback_(std::move(read_msg_)); - doReadHeader(); - } else if (ec.value() == - static_cast(std::errc::operation_canceled)) { - // The connection has been closed by the application. - return; - } else { - TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); - tryReconnect(); - } - }); -} - -void SocketConnector::doReadHeader() { - read_msg_ = getPacket(); - asio::async_read( - socket_, - asio::buffer(read_msg_->writableData(), - NetworkMessage::fixed_header_length), - asio::transfer_exactly(NetworkMessage::fixed_header_length), - [this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - read_msg_->append(NetworkMessage::fixed_header_length); - std::size_t body_length = 0; - if ((body_length = NetworkMessage::decodeHeader(read_msg_->data())) > - 0) { - doReadBody(body_length - length); - } else { - TRANSPORT_LOGE("Decoding error. Ignoring packet."); - } - } else if (ec.value() == - static_cast(std::errc::operation_canceled)) { - // The connection has been closed by the application. - return; - } else { - TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); - tryReconnect(); - } - }); -} - -void SocketConnector::tryReconnect() { - if (!is_connecting_ && !is_closed_) { - TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); - is_connecting_ = true; - is_reconnection_ = true; - io_service_.post([this]() { - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - startConnectionTimer(); - doConnect(); - }); - } -} - -void SocketConnector::doConnect() { - asio::async_connect(socket_, endpoint_iterator_, - [this](std::error_code ec, tcp::resolver::iterator) { - if (!ec) { - timer_.cancel(); - is_connecting_ = false; - asio::ip::tcp::no_delay noDelayOption(true); - socket_.set_option(noDelayOption); - doReadHeader(); - - if (data_available_) { - data_available_ = false; - doWrite(); - } - - if (is_reconnection_) { - is_reconnection_ = false; - TRANSPORT_LOGI("Connection recovered!\n"); - on_reconnect_callback_(); - } - } else { - sleep(1); - doConnect(); - } - }); -} - -bool SocketConnector::checkConnected() { return !is_connecting_; } - -void SocketConnector::enableBurst() { return; } - -void SocketConnector::startConnectionTimer() { - timer_.expires_from_now(std::chrono::seconds(60)); - timer_.async_wait( - std::bind(&SocketConnector::handleDeadline, this, std::placeholders::_1)); -} - -void SocketConnector::handleDeadline(const std::error_code &ec) { - if (!ec) { - io_service_.post([this]() { - socket_.close(); - TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n"); - io_service_.stop(); - }); - } -} - -} // end namespace core - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/socket_connector.h b/libtransport/src/hicn/transport/core/socket_connector.h deleted file mode 100644 index 6eff1aff5..000000000 --- a/libtransport/src/hicn/transport/core/socket_connector.h +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#include -#include -#include - -namespace transport { -namespace core { - -using asio::ip::tcp; - -class SocketConnector : public Connector { - public: - SocketConnector(PacketReceivedCallback &&receive_callback, - OnReconnect &&reconnect_callback, - asio::io_service &io_service, - std::string app_name = "Libtransport"); - - ~SocketConnector() override; - - void send(const Packet::MemBufPtr &packet) override; - - void send(const uint8_t *packet, std::size_t len, - const PacketSentCallback &packet_sent = 0) override; - - void close() override; - - void enableBurst() override; - - void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); - - void state() override; - - private: - void doConnect(); - - void doReadHeader(); - - void doReadBody(std::size_t body_length); - - void doWrite(); - - bool checkConnected(); - - private: - void handleDeadline(const std::error_code &ec); - - void startConnectionTimer(); - - void tryReconnect(); - - asio::io_service &io_service_; - asio::ip::tcp::socket socket_; - asio::ip::tcp::resolver resolver_; - asio::ip::tcp::resolver::iterator endpoint_iterator_; - asio::steady_timer timer_; - - utils::ObjectPool::Ptr read_msg_; - - bool is_connecting_; - bool is_reconnection_; - bool data_available_; - bool is_closed_; - - PacketReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; - std::string app_name_; -}; - -} // end namespace core - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.cc b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc new file mode 100644 index 000000000..ade0f2611 --- /dev/null +++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef _WIN32 +#include +#endif +#include +#include +#include +#include + +#include + +namespace transport { + +namespace core { + +namespace { +class NetworkMessage { + public: + static constexpr std::size_t fixed_header_length = 10; + + static std::size_t decodeHeader(const uint8_t *packet) { + // General checks + // CCNX Control packet format + uint8_t first_byte = packet[0]; + uint8_t ip_format = (packet[0] & 0xf0) >> 4; + + if (TRANSPORT_EXPECT_FALSE(first_byte == 102)) { + // Get packet length + return 44; + } else if (TRANSPORT_EXPECT_TRUE(ip_format == 6 || ip_format == 4)) { + Packet::Format format = Packet::getFormatFromBuffer(packet); + return Packet::getHeaderSizeFromBuffer(format, packet) + + Packet::getPayloadSizeFromBuffer(format, packet); + } + + return 0; + } +}; +} // namespace + +TcpSocketConnector::TcpSocketConnector( + PacketReceivedCallback &&receive_callback, + OnReconnect &&on_reconnect_callback, asio::io_service &io_service, + std::string app_name) + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), + io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + timer_(io_service_), + read_msg_(packet_pool_.makePtr(nullptr)), + is_connecting_(false), + is_reconnection_(false), + data_available_(false), + is_closed_(false), + app_name_(app_name) {} + +TcpSocketConnector::~TcpSocketConnector() {} + +void TcpSocketConnector::connect(std::string ip_address, std::string port) { + endpoint_iterator_ = resolver_.resolve( + {ip_address, port, asio::ip::resolver_query_base::numeric_service}); + + doConnect(); +} + +void TcpSocketConnector::state() { return; } + +void TcpSocketConnector::send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent) { + asio::async_write(socket_, asio::buffer(packet, len), + [packet_sent](std::error_code ec, std::size_t /*length*/) { + packet_sent(); + }); +} + +void TcpSocketConnector::send(const Packet::MemBufPtr &packet) { + io_service_.post([this, packet]() { + bool write_in_progress = !output_buffer_.empty(); + output_buffer_.push_back(std::move(packet)); + if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) { + if (!write_in_progress) { + doWrite(); + } + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } + }); +} + +void TcpSocketConnector::close() { + io_service_.dispatch([this]() { + is_closed_ = true; + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + }); +} + +void TcpSocketConnector::doWrite() { + // TODO improve this piece of code for sending many buffers togethers + // if list contains more than one packet + auto packet = output_buffer_.front().get(); + auto array = std::vector(); + + const utils::MemBuf *current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); + + asio::async_write( + socket_, std::move(array), + [this /*, packet*/](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + output_buffer_.pop_front(); + if (!output_buffer_.empty()) { + doWrite(); + } + } else if (ec.value() == + static_cast(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void TcpSocketConnector::doReadBody(std::size_t body_length) { + asio::async_read( + socket_, asio::buffer(read_msg_->writableTail(), body_length), + asio::transfer_exactly(body_length), + [this](std::error_code ec, std::size_t length) { + read_msg_->append(length); + if (TRANSPORT_EXPECT_TRUE(!ec)) { + receive_callback_(std::move(read_msg_)); + doReadHeader(); + } else if (ec.value() == + static_cast(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void TcpSocketConnector::doReadHeader() { + read_msg_ = getPacket(); + asio::async_read( + socket_, + asio::buffer(read_msg_->writableData(), + NetworkMessage::fixed_header_length), + asio::transfer_exactly(NetworkMessage::fixed_header_length), + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + read_msg_->append(NetworkMessage::fixed_header_length); + std::size_t body_length = 0; + if ((body_length = NetworkMessage::decodeHeader(read_msg_->data())) > + 0) { + doReadBody(body_length - length); + } else { + TRANSPORT_LOGE("Decoding error. Ignoring packet."); + } + } else if (ec.value() == + static_cast(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void TcpSocketConnector::tryReconnect() { + if (!is_connecting_ && !is_closed_) { + TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); + is_connecting_ = true; + is_reconnection_ = true; + io_service_.post([this]() { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + startConnectionTimer(); + doConnect(); + }); + } +} + +void TcpSocketConnector::doConnect() { + asio::async_connect(socket_, endpoint_iterator_, + [this](std::error_code ec, tcp::resolver::iterator) { + if (!ec) { + timer_.cancel(); + is_connecting_ = false; + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); + doReadHeader(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + TRANSPORT_LOGI("Connection recovered!\n"); + on_reconnect_callback_(); + } + } else { + sleep(1); + doConnect(); + } + }); +} + +bool TcpSocketConnector::checkConnected() { return !is_connecting_; } + +void TcpSocketConnector::enableBurst() { return; } + +void TcpSocketConnector::startConnectionTimer() { + timer_.expires_from_now(std::chrono::seconds(60)); + timer_.async_wait(std::bind(&TcpSocketConnector::handleDeadline, this, + std::placeholders::_1)); +} + +void TcpSocketConnector::handleDeadline(const std::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n"); + io_service_.stop(); + }); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.h b/libtransport/src/hicn/transport/core/tcp_socket_connector.h new file mode 100644 index 000000000..8dfda4eb8 --- /dev/null +++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace transport { +namespace core { + +using asio::ip::tcp; + +class TcpSocketConnector : public Connector { + public: + TcpSocketConnector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback, + asio::io_service &io_service, + std::string app_name = "Libtransport"); + + ~TcpSocketConnector() override; + + void send(const Packet::MemBufPtr &packet) override; + + void send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent = 0) override; + + void close() override; + + void enableBurst() override; + + void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); + + void state() override; + + private: + void doConnect(); + + void doReadHeader(); + + void doReadBody(std::size_t body_length); + + void doWrite(); + + bool checkConnected(); + + private: + void handleDeadline(const std::error_code &ec); + + void startConnectionTimer(); + + void tryReconnect(); + + asio::io_service &io_service_; + asio::ip::tcp::socket socket_; + asio::ip::tcp::resolver resolver_; + asio::ip::tcp::resolver::iterator endpoint_iterator_; + asio::steady_timer timer_; + + utils::ObjectPool::Ptr read_msg_; + + bool is_connecting_; + bool is_reconnection_; + bool data_available_; + bool is_closed_; + + std::string app_name_; +}; + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.cc b/libtransport/src/hicn/transport/core/udp_socket_connector.cc new file mode 100644 index 000000000..f38891e71 --- /dev/null +++ b/libtransport/src/hicn/transport/core/udp_socket_connector.cc @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef _WIN32 +#include +#endif +#include +#include +#include +#include + +#include + +namespace transport { + +namespace core { + +UdpSocketConnector::UdpSocketConnector( + PacketReceivedCallback &&receive_callback, + OnReconnect &&on_reconnect_callback, asio::io_service &io_service, + std::string app_name) + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), + io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + connection_timer_(io_service_), + connection_timeout_(io_service_), + read_msg_(packet_pool_.makePtr(nullptr)), + is_connecting_(false), + is_reconnection_(false), + data_available_(false), + is_closed_(false), + app_name_(app_name) {} + +UdpSocketConnector::~UdpSocketConnector() {} + +void UdpSocketConnector::connect(std::string ip_address, std::string port) { + endpoint_iterator_ = resolver_.resolve( + {ip_address, port, asio::ip::resolver_query_base::numeric_service}); + + doConnect(); +} + +void UdpSocketConnector::state() { return; } + +void UdpSocketConnector::send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent) { + socket_.async_send(asio::buffer(packet, len), + [packet_sent](std::error_code ec, std::size_t /*length*/) { + packet_sent(); + }); +} + +void UdpSocketConnector::send(const Packet::MemBufPtr &packet) { + io_service_.post([this, packet]() { + bool write_in_progress = !output_buffer_.empty(); + output_buffer_.push_back(std::move(packet)); + if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) { + if (!write_in_progress) { + doWrite(); + } + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } + }); +} + +void UdpSocketConnector::close() { + io_service_.dispatch([this]() { + is_closed_ = true; + socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both); + socket_.close(); + }); +} + +void UdpSocketConnector::doWrite() { + // TODO improve this piece of code for sending many buffers togethers + // if list contains more than one packet + auto packet = output_buffer_.front().get(); + auto array = std::vector(); + + const utils::MemBuf *current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); + + socket_.async_send(std::move(array), [this /*, packet*/](std::error_code ec, + std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + output_buffer_.pop_front(); + if (!output_buffer_.empty()) { + doWrite(); + } + } else if (ec.value() == static_cast(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void UdpSocketConnector::doRead() { + read_msg_ = getPacket(); + socket_.async_receive( + asio::buffer(read_msg_->writableData(), Connector::packet_size), + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + read_msg_->append(length); + receive_callback_(std::move(read_msg_)); + doRead(); + } else if (ec.value() == + static_cast(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void UdpSocketConnector::tryReconnect() { + if (!is_connecting_ && !is_closed_) { + TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); + is_connecting_ = true; + is_reconnection_ = true; + connection_timer_.expires_from_now(std::chrono::seconds(1)); + connection_timer_.async_wait([this](const std::error_code &ec) { + if (!ec) { + socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both); + socket_.close(); + startConnectionTimer(); + doConnect(); + } + }); + } +} + +void UdpSocketConnector::doConnect() { + asio::async_connect(socket_, endpoint_iterator_, + [this](std::error_code ec, udp::resolver::iterator) { + if (!ec) { + connection_timeout_.cancel(); + is_connecting_ = false; + doRead(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + on_reconnect_callback_(); + } + } else { + sleep(1); + doConnect(); + } + }); +} + +bool UdpSocketConnector::checkConnected() { return !is_connecting_; } + +void UdpSocketConnector::enableBurst() { return; } + +void UdpSocketConnector::startConnectionTimer() { + connection_timeout_.expires_from_now(std::chrono::seconds(60)); + connection_timeout_.async_wait(std::bind(&UdpSocketConnector::handleDeadline, + this, std::placeholders::_1)); +} + +void UdpSocketConnector::handleDeadline(const std::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n"); + io_service_.stop(); + }); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.h b/libtransport/src/hicn/transport/core/udp_socket_connector.h new file mode 100644 index 000000000..4704fa50b --- /dev/null +++ b/libtransport/src/hicn/transport/core/udp_socket_connector.h @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace transport { +namespace core { + +using asio::ip::udp; + +class UdpSocketConnector : public Connector { + public: + UdpSocketConnector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback, + asio::io_service &io_service, + std::string app_name = "Libtransport"); + + ~UdpSocketConnector() override; + + void send(const Packet::MemBufPtr &packet) override; + + void send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent = 0) override; + + void close() override; + + void enableBurst() override; + + void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); + + void state() override; + + private: + void doConnect(); + + void doRead(); + + void doWrite(); + + bool checkConnected(); + + private: + void handleDeadline(const std::error_code &ec); + + void startConnectionTimer(); + + void tryReconnect(); + + asio::io_service &io_service_; + asio::ip::udp::socket socket_; + asio::ip::udp::resolver resolver_; + asio::ip::udp::resolver::iterator endpoint_iterator_; + asio::steady_timer connection_timer_; + asio::steady_timer connection_timeout_; + + utils::ObjectPool::Ptr read_msg_; + + bool is_connecting_; + bool is_reconnection_; + bool data_available_; + bool is_closed_; + + std::string app_name_; +}; + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc index 69b18c0d9..8dc607295 100644 --- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc +++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc @@ -47,20 +47,7 @@ VPPForwarderInterface::VPPForwarderInterface(MemifConnector &connector) sw_if_index_(~0), face_id_(~0) {} -VPPForwarderInterface::~VPPForwarderInterface() { - if (sw_if_index_ != uint32_t(~0) && VPPForwarderInterface::memif_api_) { - int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_, - sw_if_index_); - - if (ret < 0) { - TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_); - } - } - - if (VPPForwarderInterface::api_) { - vpp_binary_api_destroy(VPPForwarderInterface::api_); - } -} +VPPForwarderInterface::~VPPForwarderInterface() {} /** * @brief Create a memif interface in the local VPP forwarder. @@ -220,6 +207,23 @@ void VPPForwarderInterface::registerRoute(Prefix &prefix) { } } +void VPPForwarderInterface::closeConnection() { + if (sw_if_index_ != uint32_t(~0) && VPPForwarderInterface::memif_api_) { + int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_, + sw_if_index_); + + if (ret < 0) { + TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_); + } + } + + if (VPPForwarderInterface::api_) { + vpp_binary_api_destroy(VPPForwarderInterface::api_); + } + + connector_.close(); +} + } // namespace core } // namespace transport diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h index 322cd1f8b..62af8bc3b 100644 --- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h @@ -31,6 +31,8 @@ namespace core { class VPPForwarderInterface : public ForwarderInterface { + static constexpr std::uint16_t interface_mtu = 1500; + public: VPPForwarderInterface(MemifConnector &connector); @@ -44,6 +46,16 @@ class VPPForwarderInterface TRANSPORT_ALWAYS_INLINE std::uint16_t getMtu() { return interface_mtu; } + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return false; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) {} + + void closeConnection(); + private: uint32_t getMemifConfiguration(); @@ -58,7 +70,6 @@ class VPPForwarderInterface uint32_t sw_if_index_; uint32_t face_id_; static std::mutex global_lock_; - static constexpr std::uint16_t interface_mtu = 1500; }; } // namespace core diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index dbf1c1bd1..1356ad566 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -614,10 +614,10 @@ void RTCTransportProtocol::processRtcpHeader(uint8_t *offset) { uint8_t pkt_type = (*(offset + 1)); switch (pkt_type) { case HICN_RTCP_RR: // Receiver report - TRANSPORT_LOGI("got RR packet\n"); + TRANSPORT_LOGD("got RR packet\n"); break; case HICN_RTCP_SR: // Sender report - TRANSPORT_LOGI("got SR packet\n"); + TRANSPORT_LOGD("got SR packet\n"); break; case HICN_RTCP_SDES: // Description processSDES(offset); -- cgit 1.2.3-korg