diff options
Diffstat (limited to 'libtransport')
19 files changed, 533 insertions, 136 deletions
diff --git a/libtransport/src/hicn/transport/core/CMakeLists.txt b/libtransport/src/hicn/transport/core/CMakeLists.txt index dff93adeb..0e674fcac 100644 --- a/libtransport/src/hicn/transport/core/CMakeLists.txt +++ b/libtransport/src/hicn/transport/core/CMakeLists.txt @@ -17,7 +17,6 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/content_object.h ${CMAKE_CURRENT_SOURCE_DIR}/facade.h ${CMAKE_CURRENT_SOURCE_DIR}/interest.h - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest_inline.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.h @@ -29,7 +28,8 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/portal.h ${CMAKE_CURRENT_SOURCE_DIR}/prefix.h ${CMAKE_CURRENT_SOURCE_DIR}/connector.h - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.h ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.h ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_interface.h @@ -39,12 +39,12 @@ list(APPEND HEADER_FILES list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/content_object.cc ${CMAKE_CURRENT_SOURCE_DIR}/interest.cc - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.cc ${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.cc ${CMAKE_CURRENT_SOURCE_DIR}/packet.cc ${CMAKE_CURRENT_SOURCE_DIR}/name.cc ${CMAKE_CURRENT_SOURCE_DIR}/prefix.cc - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.cc ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.cc ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.cc ${CMAKE_CURRENT_SOURCE_DIR}/connector.cc diff --git a/libtransport/src/hicn/transport/core/connector.cc b/libtransport/src/hicn/transport/core/connector.cc index ff567d78a..e89b98f8a 100644 --- a/libtransport/src/hicn/transport/core/connector.cc +++ b/libtransport/src/hicn/transport/core/connector.cc @@ -21,7 +21,13 @@ namespace core { std::once_flag Connector::init_flag_; -Connector::Connector() : packet_pool_() { init(); } +Connector::Connector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback) + : packet_pool_(), + receive_callback_(std::move(receive_callback)), + on_reconnect_callback_(std::move(reconnect_callback)) { + init(); +} void Connector::init() { increasePoolSize(); } diff --git a/libtransport/src/hicn/transport/core/connector.h b/libtransport/src/hicn/transport/core/connector.h index ae82ee973..529e97bf9 100644 --- a/libtransport/src/hicn/transport/core/connector.h +++ b/libtransport/src/hicn/transport/core/connector.h @@ -33,19 +33,20 @@ enum class ConnectorType : uint8_t { VPP_CONNECTOR, }; -static constexpr std::size_t packet_size = 2048; -static constexpr std::size_t queue_size = 4096; -static constexpr std::size_t packet_pool_size = 4096; - -using PacketRing = utils::CircularFifo<Packet::MemBufPtr, queue_size>; -using PacketQueue = std::deque<Packet::MemBufPtr>; -using PacketReceivedCallback = std::function<void(Packet::MemBufPtr &&)>; -using OnReconnect = std::function<void()>; -using PacketSentCallback = std::function<void()>; - class Connector { public: - Connector(); + static constexpr std::size_t packet_size = 2048; + static constexpr std::size_t queue_size = 4096; + static constexpr std::size_t packet_pool_size = 4096; + + using PacketRing = utils::CircularFifo<Packet::MemBufPtr, queue_size>; + using PacketQueue = std::deque<Packet::MemBufPtr>; + using PacketReceivedCallback = std::function<void(Packet::MemBufPtr &&)>; + using OnReconnect = std::function<void()>; + using PacketSentCallback = std::function<void()>; + + Connector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback); virtual ~Connector() = default; @@ -88,6 +89,10 @@ class Connector { static std::once_flag init_flag_; utils::ObjectPool<utils::MemBuf> packet_pool_; PacketQueue output_buffer_; + + // Connector events + PacketReceivedCallback receive_callback_; + OnReconnect on_reconnect_callback_; }; } // end namespace core diff --git a/libtransport/src/hicn/transport/core/forwarder_interface.h b/libtransport/src/hicn/transport/core/forwarder_interface.h index e7b6fb1a6..de9f3b568 100644 --- a/libtransport/src/hicn/transport/core/forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/forwarder_interface.h @@ -16,7 +16,7 @@ #pragma once #include <hicn/transport/core/prefix.h> -#include <hicn/transport/core/socket_connector.h> +#include <hicn/transport/core/udp_socket_connector.h> #include <hicn/transport/portability/portability.h> #include <deque> @@ -54,8 +54,6 @@ class ForwarderInterface { } public: - static constexpr uint8_t ack_code = 102; - virtual ~ForwarderInterface() {} TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { @@ -70,6 +68,20 @@ class ForwarderInterface { return static_cast<Implementation &>(*this).getMtu(); } + TRANSPORT_ALWAYS_INLINE static bool isControlMessage(const uint8_t *message) { + return Implementation::isControlMessageImpl(message); + } + + template <typename R> + TRANSPORT_ALWAYS_INLINE void processControlMessageReply(R &&packet_buffer) { + return static_cast<Implementation &>(*this).processControlMessageReplyImpl( + std::forward<R &&>(packet_buffer)); + } + + TRANSPORT_ALWAYS_INLINE void closeConnection() { + return static_cast<Implementation &>(*this).closeConnection(); + } + template < typename R, typename = std::enable_if_t< @@ -97,7 +109,7 @@ class ForwarderInterface { counters_.tx_bytes += len; // Perfect forwarding - connector_.send(packet, len, std::forward<Handler>(packet_sent)); + connector_.send(packet, len, std::forward<Handler &&>(packet_sent)); } TRANSPORT_ALWAYS_INLINE void shutdown() { connector_.close(); } diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc index 9dc3b63bb..1c8060906 100644 --- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc +++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc @@ -15,11 +15,6 @@ #include <hicn/transport/core/hicn_forwarder_interface.h> -#define ADDR_INET 1 -#define ADDR_INET6 2 -#define ADD_ROUTE 3 -#define REQUEST_LIGHT 100 - union AddressLight { uint32_t ipv4; struct in6_addr ipv6; @@ -30,6 +25,13 @@ typedef struct { uint8_t command_id; uint16_t length; uint32_t seq_num; +} CommandHeader; + +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; char symbolic_or_connid[16]; union AddressLight address; uint16_t cost; @@ -37,51 +39,104 @@ typedef struct { uint8_t len; } RouteToSelfCommand; -namespace transport { - -namespace core { - -HicnForwarderInterface::HicnForwarderInterface(SocketConnector &connector) - : ForwarderInterface<HicnForwarderInterface, SocketConnector>(connector) {} - -HicnForwarderInterface::~HicnForwarderInterface() {} +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; + char symbolic_or_connid[16]; +} DeleteSelfConnectionCommand; -void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); } +namespace { +static constexpr uint8_t addr_inet = 1; +static constexpr uint8_t addr_inet6 = 2; +static constexpr uint8_t add_route_command = 3; +static constexpr uint8_t delete_connection_command = 5; +static constexpr uint8_t request_light = 0xc0; +static constexpr char identifier[] = "SELF"; -void HicnForwarderInterface::registerRoute(Prefix &prefix) { - auto addr = prefix.toSockaddr(); - const char *identifier = {"SELF_ROUTE"}; +void fillCommandHeader(CommandHeader *header) { + // Allocate and fill the header + header->message_type = request_light; + header->length = 1; +} - // allocate command payload - RouteToSelfCommand *route_to_self = new RouteToSelfCommand(); +std::unique_ptr<RouteToSelfCommand> createCommandRoute( + std::unique_ptr<sockaddr> &&addr, uint8_t prefix_length) { + auto command = std::make_unique<RouteToSelfCommand>(); // check and set IP address if (addr->sa_family == AF_INET) { - route_to_self->address_type = ADDR_INET; - route_to_self->address.ipv4 = ((Sockaddr4 *)addr.get())->sin_addr.s_addr; + command->address_type = addr_inet; + command->address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr; } else if (addr->sa_family == AF_INET6) { - route_to_self->address_type = ADDR_INET6; - route_to_self->address.ipv6 = ((Sockaddr6 *)addr.get())->sin6_addr; + command->address_type = addr_inet6; + command->address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr; } // Fill remaining payload fields #ifndef _WIN32 - strcpy(route_to_self->symbolic_or_connid, identifier); + strcpy(command->symbolic_or_connid, identifier); #else - strcpy_s(route_to_self->symbolic_or_connid, strlen(route_to_self->symbolic_or_connid), identifier); + strcpy_s(route_to_self->symbolic_or_connid, + strlen(route_to_self->symbolic_or_connid), identifier); #endif - route_to_self->cost = 1; - route_to_self->len = (uint8_t) prefix.getPrefixLength(); + command->cost = 1; + command->len = (uint8_t)prefix_length; // Allocate and fill the header - route_to_self->command_id = ADD_ROUTE; - route_to_self->message_type = REQUEST_LIGHT; - route_to_self->length = 1; + command->command_id = add_route_command; + fillCommandHeader((CommandHeader *)command.get()); + + return command; +} + +std::unique_ptr<DeleteSelfConnectionCommand> createCommandDeleteConnection() { + auto command = std::make_unique<DeleteSelfConnectionCommand>(); + fillCommandHeader((CommandHeader *)command.get()); + command->command_id = delete_connection_command; + +#ifndef _WIN32 + strcpy(command->symbolic_or_connid, identifier); +#else + strcpy_s(route_to_self->symbolic_or_connid, + strlen(route_to_self->symbolic_or_connid), identifier); +#endif + + return command; +} + +} // namespace + +namespace transport { + +namespace core { + +HicnForwarderInterface::HicnForwarderInterface(UdpSocketConnector &connector) + : ForwarderInterface<HicnForwarderInterface, UdpSocketConnector>( + connector) {} + +HicnForwarderInterface::~HicnForwarderInterface() {} + +void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); } + +void HicnForwarderInterface::registerRoute(Prefix &prefix) { + auto command = + createCommandRoute(prefix.toSockaddr(), prefix.getPrefixLength()) + .release(); + send((uint8_t *)command, sizeof(RouteToSelfCommand), + [command]() { delete command; }); +} - send((uint8_t *)route_to_self, sizeof(RouteToSelfCommand), - [route_to_self]() { delete route_to_self; }); +void HicnForwarderInterface::closeConnection() { + auto command = createCommandDeleteConnection().release(); + send((uint8_t *)command, sizeof(DeleteSelfConnectionCommand), + [this, command]() { + delete command; + connector_.close(); + }); } } // namespace core -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h index e57fae105..b11841b69 100644 --- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h @@ -17,7 +17,7 @@ #include <hicn/transport/core/forwarder_interface.h> #include <hicn/transport/core/prefix.h> -#include <hicn/transport/core/socket_connector.h> +#include <hicn/transport/core/udp_socket_connector.h> #include <deque> @@ -26,7 +26,10 @@ namespace transport { namespace core { class HicnForwarderInterface - : public ForwarderInterface<HicnForwarderInterface, SocketConnector> { + : public ForwarderInterface<HicnForwarderInterface, UdpSocketConnector> { + static constexpr uint8_t ack_code = 0xc2; + static constexpr uint8_t nack_code = 0xc3; + public: union addressLight { uint32_t ipv4; @@ -46,9 +49,9 @@ class HicnForwarderInterface }; using route_to_self_command = struct route_to_self_command; - using ConnectorType = SocketConnector; + using ConnectorType = UdpSocketConnector; - HicnForwarderInterface(SocketConnector &connector); + HicnForwarderInterface(UdpSocketConnector &connector); ~HicnForwarderInterface(); @@ -58,6 +61,21 @@ class HicnForwarderInterface std::uint16_t getMtu() { return interface_mtu; } + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return message[0] == ack_code || message[0] == nack_code; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) { + if (packet_buffer->data()[0] == nack_code) { + throw errors::RuntimeException( + "Received Nack message from hicn light forwarder."); + } + } + + void closeConnection(); + private: static constexpr std::uint16_t interface_mtu = 1500; }; diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc index 38b2a2a98..c69a87fb7 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.cc +++ b/libtransport/src/hicn/transport/core/memif_connector.cc @@ -57,7 +57,7 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, OnReconnect &&on_reconnect_callback, asio::io_service &io_service, std::string app_name) - : Connector(), + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), memif_worker_(nullptr), timer_set_(false), send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)), @@ -71,8 +71,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, enable_burst_(false), closed_(false), app_name_(app_name), - receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback), socket_filename_("") { std::call_once(MemifConnector::flag_, &MemifConnector::init, this); } @@ -372,7 +370,6 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, packet->append(packet_length); if (!connector->input_buffer_.push(std::move(packet))) { - TRANSPORT_LOGI("Error pushing packet. Ring buffer full."); // TODO Here we should consider the possibility to signal the congestion diff --git a/libtransport/src/hicn/transport/core/memif_connector.h b/libtransport/src/hicn/transport/core/memif_connector.h index 3d2e8411d..06a8fd73e 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.h +++ b/libtransport/src/hicn/transport/core/memif_connector.h @@ -128,8 +128,6 @@ class MemifConnector : public Connector { uint8_t memif_mode_; std::string app_name_; uint16_t transmission_index_; - PacketReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; utils::SpinLock write_msgs_lock_; std::string socket_filename_; diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index 0932b56c6..7efbc2009 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -22,7 +22,7 @@ #include <hicn/transport/core/name.h> #include <hicn/transport/core/pending_interest.h> #include <hicn/transport/core/prefix.h> -#include <hicn/transport/core/socket_connector.h> +#include <hicn/transport/core/udp_socket_connector.h> #include <hicn/transport/errors/errors.h> #include <hicn/transport/portability/portability.h> #include <hicn/transport/utils/log.h> @@ -222,22 +222,25 @@ class Portal { } TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) { - for (auto &pend_interest : pending_interest_hash_table_) { - pend_interest.second->cancelTimer(); - } - - clear(); - if (kill_connection) { - connector_.close(); + forwarder_interface_.closeConnection(); } - io_service_.post([this]() { io_service_.stop(); }); + io_service_.post([this]() { + clear(); + io_service_.stop(); + }); } TRANSPORT_ALWAYS_INLINE void killConnection() { connector_.close(); } - TRANSPORT_ALWAYS_INLINE void clear() { pending_interest_hash_table_.clear(); } + TRANSPORT_ALWAYS_INLINE void clear() { + for (auto &pend_interest : pending_interest_hash_table_) { + pend_interest.second->cancelTimer(); + } + + pending_interest_hash_table_.clear(); + } TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { return io_service_; @@ -260,17 +263,16 @@ class Portal { return; } - if (packet_buffer->data()[0] == ForwarderInt::ack_code) { - // Hicn forwarder message + if (TRANSPORT_EXPECT_FALSE( + ForwarderInt::isControlMessage(packet_buffer->data()))) { processControlMessage(std::move(packet_buffer)); return; } - bool is_interest = Packet::isInterest(packet_buffer->data()); Packet::Format format = Packet::getFormatFromBuffer(packet_buffer->data()); if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { - if (!is_interest) { + if (!Packet::isInterest(packet_buffer->data())) { processContentObject( ContentObject::Ptr(new ContentObject(std::move(packet_buffer)))); } else { @@ -329,8 +331,7 @@ class Portal { TRANSPORT_ALWAYS_INLINE void processControlMessage( Packet::MemBufPtr &&packet_buffer) { - // Control message as response to the route set by a producer. - // Do nothing + forwarder_interface_.processControlMessageReply(std::move(packet_buffer)); } private: diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.cc b/libtransport/src/hicn/transport/core/raw_socket_connector.cc index 5cfff39fb..fe16d2132 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_connector.cc +++ b/libtransport/src/hicn/transport/core/raw_socket_connector.cc @@ -39,15 +39,13 @@ RawSocketConnector::RawSocketConnector( PacketReceivedCallback &&receive_callback, OnReconnect &&on_reconnect_callback, asio::io_service &io_service, std::string app_name) - : Connector(), + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), io_service_(io_service), socket_(io_service_, raw_protocol(PF_PACKET, SOCK_RAW)), // resolver_(io_service_), timer_(io_service_), read_msg_(packet_pool_.makePtr(nullptr)), data_available_(false), - receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback), app_name_(app_name) { memset(&link_layer_address_, 0, sizeof(link_layer_address_)); } diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.h b/libtransport/src/hicn/transport/core/raw_socket_connector.h index 5e39efa0e..bb24d9d54 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_connector.h +++ b/libtransport/src/hicn/transport/core/raw_socket_connector.h @@ -74,9 +74,6 @@ class RawSocketConnector : public Connector { utils::ObjectPool<utils::MemBuf>::Ptr read_msg_; bool data_available_; - - PacketReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; std::string app_name_; }; diff --git a/libtransport/src/hicn/transport/core/raw_socket_interface.h b/libtransport/src/hicn/transport/core/raw_socket_interface.h index c030af662..ac48e5874 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_interface.h +++ b/libtransport/src/hicn/transport/core/raw_socket_interface.h @@ -41,6 +41,16 @@ class RawSocketInterface std::uint16_t getMtu() { return interface_mtu; } + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return false; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) {} + + TRANSPORT_ALWAYS_INLINE void closeConnection(){}; + private: static constexpr std::uint16_t interface_mtu = 1500; std::string remote_mac_address_; diff --git a/libtransport/src/hicn/transport/core/socket_connector.cc b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc index 7bf0570ad..ade0f2611 100644 --- a/libtransport/src/hicn/transport/core/socket_connector.cc +++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc @@ -16,7 +16,7 @@ #ifdef _WIN32 #include <hicn/transport/portability/win_portability.h> #endif -#include <hicn/transport/core/socket_connector.h> +#include <hicn/transport/core/tcp_socket_connector.h> #include <hicn/transport/errors/errors.h> #include <hicn/transport/utils/log.h> #include <hicn/transport/utils/object_pool.h> @@ -52,11 +52,11 @@ class NetworkMessage { }; } // namespace -SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback, - OnReconnect &&on_reconnect_callback, - asio::io_service &io_service, - std::string app_name) - : Connector(), +TcpSocketConnector::TcpSocketConnector( + PacketReceivedCallback &&receive_callback, + OnReconnect &&on_reconnect_callback, asio::io_service &io_service, + std::string app_name) + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), io_service_(io_service), socket_(io_service_), resolver_(io_service_), @@ -66,30 +66,28 @@ SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback, is_reconnection_(false), data_available_(false), is_closed_(false), - receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback), app_name_(app_name) {} -SocketConnector::~SocketConnector() {} +TcpSocketConnector::~TcpSocketConnector() {} -void SocketConnector::connect(std::string ip_address, std::string port) { +void TcpSocketConnector::connect(std::string ip_address, std::string port) { endpoint_iterator_ = resolver_.resolve( {ip_address, port, asio::ip::resolver_query_base::numeric_service}); doConnect(); } -void SocketConnector::state() { return; } +void TcpSocketConnector::state() { return; } -void SocketConnector::send(const uint8_t *packet, std::size_t len, - const PacketSentCallback &packet_sent) { +void TcpSocketConnector::send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent) { asio::async_write(socket_, asio::buffer(packet, len), [packet_sent](std::error_code ec, std::size_t /*length*/) { packet_sent(); }); } -void SocketConnector::send(const Packet::MemBufPtr &packet) { +void TcpSocketConnector::send(const Packet::MemBufPtr &packet) { io_service_.post([this, packet]() { bool write_in_progress = !output_buffer_.empty(); output_buffer_.push_back(std::move(packet)); @@ -104,7 +102,7 @@ void SocketConnector::send(const Packet::MemBufPtr &packet) { }); } -void SocketConnector::close() { +void TcpSocketConnector::close() { io_service_.dispatch([this]() { is_closed_ = true; socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); @@ -112,7 +110,7 @@ void SocketConnector::close() { }); } -void SocketConnector::doWrite() { +void TcpSocketConnector::doWrite() { // TODO improve this piece of code for sending many buffers togethers // if list contains more than one packet auto packet = output_buffer_.front().get(); @@ -143,7 +141,7 @@ void SocketConnector::doWrite() { }); } -void SocketConnector::doReadBody(std::size_t body_length) { +void TcpSocketConnector::doReadBody(std::size_t body_length) { asio::async_read( socket_, asio::buffer(read_msg_->writableTail(), body_length), asio::transfer_exactly(body_length), @@ -163,7 +161,7 @@ void SocketConnector::doReadBody(std::size_t body_length) { }); } -void SocketConnector::doReadHeader() { +void TcpSocketConnector::doReadHeader() { read_msg_ = getPacket(); asio::async_read( socket_, @@ -191,7 +189,7 @@ void SocketConnector::doReadHeader() { }); } -void SocketConnector::tryReconnect() { +void TcpSocketConnector::tryReconnect() { if (!is_connecting_ && !is_closed_) { TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); is_connecting_ = true; @@ -205,7 +203,7 @@ void SocketConnector::tryReconnect() { } } -void SocketConnector::doConnect() { +void TcpSocketConnector::doConnect() { asio::async_connect(socket_, endpoint_iterator_, [this](std::error_code ec, tcp::resolver::iterator) { if (!ec) { @@ -232,17 +230,17 @@ void SocketConnector::doConnect() { }); } -bool SocketConnector::checkConnected() { return !is_connecting_; } +bool TcpSocketConnector::checkConnected() { return !is_connecting_; } -void SocketConnector::enableBurst() { return; } +void TcpSocketConnector::enableBurst() { return; } -void SocketConnector::startConnectionTimer() { +void TcpSocketConnector::startConnectionTimer() { timer_.expires_from_now(std::chrono::seconds(60)); - timer_.async_wait( - std::bind(&SocketConnector::handleDeadline, this, std::placeholders::_1)); + timer_.async_wait(std::bind(&TcpSocketConnector::handleDeadline, this, + std::placeholders::_1)); } -void SocketConnector::handleDeadline(const std::error_code &ec) { +void TcpSocketConnector::handleDeadline(const std::error_code &ec) { if (!ec) { io_service_.post([this]() { socket_.close(); diff --git a/libtransport/src/hicn/transport/core/socket_connector.h b/libtransport/src/hicn/transport/core/tcp_socket_connector.h index 6eff1aff5..8dfda4eb8 100644 --- a/libtransport/src/hicn/transport/core/socket_connector.h +++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.h @@ -28,14 +28,14 @@ namespace core { using asio::ip::tcp; -class SocketConnector : public Connector { +class TcpSocketConnector : public Connector { public: - SocketConnector(PacketReceivedCallback &&receive_callback, - OnReconnect &&reconnect_callback, - asio::io_service &io_service, - std::string app_name = "Libtransport"); + TcpSocketConnector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback, + asio::io_service &io_service, + std::string app_name = "Libtransport"); - ~SocketConnector() override; + ~TcpSocketConnector() override; void send(const Packet::MemBufPtr &packet) override; @@ -81,8 +81,6 @@ class SocketConnector : public Connector { bool data_available_; bool is_closed_; - PacketReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; std::string app_name_; }; diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.cc b/libtransport/src/hicn/transport/core/udp_socket_connector.cc new file mode 100644 index 000000000..f38891e71 --- /dev/null +++ b/libtransport/src/hicn/transport/core/udp_socket_connector.cc @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef _WIN32 +#include <hicn/transport/portability/win_portability.h> +#endif +#include <hicn/transport/core/udp_socket_connector.h> +#include <hicn/transport/errors/errors.h> +#include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/object_pool.h> + +#include <vector> + +namespace transport { + +namespace core { + +UdpSocketConnector::UdpSocketConnector( + PacketReceivedCallback &&receive_callback, + OnReconnect &&on_reconnect_callback, asio::io_service &io_service, + std::string app_name) + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), + io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + connection_timer_(io_service_), + connection_timeout_(io_service_), + read_msg_(packet_pool_.makePtr(nullptr)), + is_connecting_(false), + is_reconnection_(false), + data_available_(false), + is_closed_(false), + app_name_(app_name) {} + +UdpSocketConnector::~UdpSocketConnector() {} + +void UdpSocketConnector::connect(std::string ip_address, std::string port) { + endpoint_iterator_ = resolver_.resolve( + {ip_address, port, asio::ip::resolver_query_base::numeric_service}); + + doConnect(); +} + +void UdpSocketConnector::state() { return; } + +void UdpSocketConnector::send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent) { + socket_.async_send(asio::buffer(packet, len), + [packet_sent](std::error_code ec, std::size_t /*length*/) { + packet_sent(); + }); +} + +void UdpSocketConnector::send(const Packet::MemBufPtr &packet) { + io_service_.post([this, packet]() { + bool write_in_progress = !output_buffer_.empty(); + output_buffer_.push_back(std::move(packet)); + if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) { + if (!write_in_progress) { + doWrite(); + } + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } + }); +} + +void UdpSocketConnector::close() { + io_service_.dispatch([this]() { + is_closed_ = true; + socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both); + socket_.close(); + }); +} + +void UdpSocketConnector::doWrite() { + // TODO improve this piece of code for sending many buffers togethers + // if list contains more than one packet + auto packet = output_buffer_.front().get(); + auto array = std::vector<asio::const_buffer>(); + + const utils::MemBuf *current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); + + socket_.async_send(std::move(array), [this /*, packet*/](std::error_code ec, + std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + output_buffer_.pop_front(); + if (!output_buffer_.empty()) { + doWrite(); + } + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void UdpSocketConnector::doRead() { + read_msg_ = getPacket(); + socket_.async_receive( + asio::buffer(read_msg_->writableData(), Connector::packet_size), + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + read_msg_->append(length); + receive_callback_(std::move(read_msg_)); + doRead(); + } else if (ec.value() == + static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void UdpSocketConnector::tryReconnect() { + if (!is_connecting_ && !is_closed_) { + TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); + is_connecting_ = true; + is_reconnection_ = true; + connection_timer_.expires_from_now(std::chrono::seconds(1)); + connection_timer_.async_wait([this](const std::error_code &ec) { + if (!ec) { + socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both); + socket_.close(); + startConnectionTimer(); + doConnect(); + } + }); + } +} + +void UdpSocketConnector::doConnect() { + asio::async_connect(socket_, endpoint_iterator_, + [this](std::error_code ec, udp::resolver::iterator) { + if (!ec) { + connection_timeout_.cancel(); + is_connecting_ = false; + doRead(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + on_reconnect_callback_(); + } + } else { + sleep(1); + doConnect(); + } + }); +} + +bool UdpSocketConnector::checkConnected() { return !is_connecting_; } + +void UdpSocketConnector::enableBurst() { return; } + +void UdpSocketConnector::startConnectionTimer() { + connection_timeout_.expires_from_now(std::chrono::seconds(60)); + connection_timeout_.async_wait(std::bind(&UdpSocketConnector::handleDeadline, + this, std::placeholders::_1)); +} + +void UdpSocketConnector::handleDeadline(const std::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n"); + io_service_.stop(); + }); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.h b/libtransport/src/hicn/transport/core/udp_socket_connector.h new file mode 100644 index 000000000..4704fa50b --- /dev/null +++ b/libtransport/src/hicn/transport/core/udp_socket_connector.h @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/connector.h> +#include <hicn/transport/core/name.h> +#include <hicn/transport/utils/branch_prediction.h> + +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <deque> + +namespace transport { +namespace core { + +using asio::ip::udp; + +class UdpSocketConnector : public Connector { + public: + UdpSocketConnector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback, + asio::io_service &io_service, + std::string app_name = "Libtransport"); + + ~UdpSocketConnector() override; + + void send(const Packet::MemBufPtr &packet) override; + + void send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent = 0) override; + + void close() override; + + void enableBurst() override; + + void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); + + void state() override; + + private: + void doConnect(); + + void doRead(); + + void doWrite(); + + bool checkConnected(); + + private: + void handleDeadline(const std::error_code &ec); + + void startConnectionTimer(); + + void tryReconnect(); + + asio::io_service &io_service_; + asio::ip::udp::socket socket_; + asio::ip::udp::resolver resolver_; + asio::ip::udp::resolver::iterator endpoint_iterator_; + asio::steady_timer connection_timer_; + asio::steady_timer connection_timeout_; + + utils::ObjectPool<utils::MemBuf>::Ptr read_msg_; + + bool is_connecting_; + bool is_reconnection_; + bool data_available_; + bool is_closed_; + + std::string app_name_; +}; + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc index 69b18c0d9..8dc607295 100644 --- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc +++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc @@ -47,20 +47,7 @@ VPPForwarderInterface::VPPForwarderInterface(MemifConnector &connector) sw_if_index_(~0), face_id_(~0) {} -VPPForwarderInterface::~VPPForwarderInterface() { - if (sw_if_index_ != uint32_t(~0) && VPPForwarderInterface::memif_api_) { - int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_, - sw_if_index_); - - if (ret < 0) { - TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_); - } - } - - if (VPPForwarderInterface::api_) { - vpp_binary_api_destroy(VPPForwarderInterface::api_); - } -} +VPPForwarderInterface::~VPPForwarderInterface() {} /** * @brief Create a memif interface in the local VPP forwarder. @@ -220,6 +207,23 @@ void VPPForwarderInterface::registerRoute(Prefix &prefix) { } } +void VPPForwarderInterface::closeConnection() { + if (sw_if_index_ != uint32_t(~0) && VPPForwarderInterface::memif_api_) { + int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_, + sw_if_index_); + + if (ret < 0) { + TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_); + } + } + + if (VPPForwarderInterface::api_) { + vpp_binary_api_destroy(VPPForwarderInterface::api_); + } + + connector_.close(); +} + } // namespace core } // namespace transport diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h index 322cd1f8b..62af8bc3b 100644 --- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h @@ -31,6 +31,8 @@ namespace core { class VPPForwarderInterface : public ForwarderInterface<VPPForwarderInterface, MemifConnector> { + static constexpr std::uint16_t interface_mtu = 1500; + public: VPPForwarderInterface(MemifConnector &connector); @@ -44,6 +46,16 @@ class VPPForwarderInterface TRANSPORT_ALWAYS_INLINE std::uint16_t getMtu() { return interface_mtu; } + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return false; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) {} + + void closeConnection(); + private: uint32_t getMemifConfiguration(); @@ -58,7 +70,6 @@ class VPPForwarderInterface uint32_t sw_if_index_; uint32_t face_id_; static std::mutex global_lock_; - static constexpr std::uint16_t interface_mtu = 1500; }; } // namespace core diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index dbf1c1bd1..1356ad566 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -614,10 +614,10 @@ void RTCTransportProtocol::processRtcpHeader(uint8_t *offset) { uint8_t pkt_type = (*(offset + 1)); switch (pkt_type) { case HICN_RTCP_RR: // Receiver report - TRANSPORT_LOGI("got RR packet\n"); + TRANSPORT_LOGD("got RR packet\n"); break; case HICN_RTCP_SR: // Sender report - TRANSPORT_LOGI("got SR packet\n"); + TRANSPORT_LOGD("got SR packet\n"); break; case HICN_RTCP_SDES: // Description processSDES(offset); |