diff options
author | Luca Muscariello <muscariello@ieee.org> | 2021-04-15 09:05:46 +0200 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2021-04-15 16:36:16 +0200 |
commit | e92e9e839ca2cf42b56322b2489ccc0d8bf767af (patch) | |
tree | 9f1647c83a87fbf982ae329e800af25dbfb226b5 /libtransport/src/io_modules/udp | |
parent | 3e541d7c947cc2f9db145f26c9274efd29a6fb56 (diff) |
[HICN-690] Transport Library Major Refactory
The current patch provides a major refactory of the transportlibrary.
A summary of the different components that underwent major modifications is
reported below.
- Transport protocol updates
The hierarchy of classes has been optimized to have common transport services
across different transport protocols. This can allow to customize a transport
protocol with new features.
- A new real-time communication protocol
The RTC protocol has been optimized in terms of algorithms to reduce
consumer-producer synchronization latency.
- A novel socket API
The API has been reworked to be easier to consumer but also to have a more
efficient integration in L4 proxies.
- Several performance improvements
A large number of performance improvements have been included in
particular to make the entire stack zero-copy and optimize cache miss.
- New memory buffer framework
Memory management has been reworked entirely to provide a more efficient infra
with a richer API. Buffers are now allocated in blocks and a single buffer
holds the memory for (1) the shared_ptr control block, (2) the metadata of the
packet (e.g. name, pointer to other buffers if buffer is chained and relevant
offsets), and (3) the packet itself, as it is sent/received over the network.
- A new slab allocator
Dynamic memory allocation is now managed by a novel slab allocator that is
optimised for packet processing and connection management. Memory is organized
in pools of blocks all of the same size which are used during the processing of
outgoing/incoming packets. When a memory block Is allocated is always taken
from a global pool and when it is deallocated is returned to the pool, thus
avoiding the cost of any heap allocation in the data path.
- New transport connectors
Consumer and producer end-points can communication either using an hicn packet
forwarder or with direct connector based on shared memories or sockets.
The usage of transport connectors typically for unit and funcitonal
testing but may have additional usage.
- Support for FEC/ECC for transport services
FEC/ECC via reed solomon is supported by default and made available to
transport services as a modular component. Reed solomon block codes is a
default FEC model that can be replaced in a modular way by many other
codes including RLNC not avaiable in this distribution.
The current FEC framework support variable size padding and efficiently
makes use of the infra memory buffers to avoid additiona copies.
- Secure transport framework for signature computation and verification
Crypto support is nativelty used in hICN for integrity and authenticity.
Novel support that includes RTC has been implemented and made modular
and reusable acrosso different transport protocols.
- TLS - Transport layer security over hicn
Point to point confidentiality is provided by integrating TLS on top of
hICN reliable and non-reliable transport. The integration is common and
makes a different use of the TLS record.
- MLS - Messaging layer security over hicn
MLS integration on top of hICN is made by using the MLSPP implemetation
open sourced by Cisco. We have included instrumentation tools to deploy
performance and functional tests of groups of end-points.
- Android support
The overall code has been heavily tested in Android environments and
has received heavy lifting to better run natively in recent Android OS.
Co-authored-by: Mauro Sardara <msardara@cisco.com>
Co-authored-by: Michele Papalini <micpapal@cisco.com>
Co-authored-by: Olivier Roques <oroques+fdio@cisco.com>
Co-authored-by: Giulio Grassi <gigrassi@cisco.com>
Change-Id: If477ba2fa686e6f47bdf96307ac60938766aef69
Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/io_modules/udp')
5 files changed, 614 insertions, 0 deletions
diff --git a/libtransport/src/io_modules/udp/CMakeLists.txt b/libtransport/src/io_modules/udp/CMakeLists.txt new file mode 100644 index 000000000..1a43492dc --- /dev/null +++ b/libtransport/src/io_modules/udp/CMakeLists.txt @@ -0,0 +1,47 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + + +list(APPEND MODULE_HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_module.h + ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.h +) + +list(APPEND MODULE_SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_module.cc + ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.cc +) + +# add_executable(hicnlight_module MACOSX_BUNDLE ${MODULE_SOURCE_FILES}) +# target_include_directories(hicnlight_module PRIVATE ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}) +# set_target_properties(hicnlight_module PROPERTIES +# BUNDLE True +# MACOSX_BUNDLE_GUI_IDENTIFIER my.domain.style.identifier.hicnlight_module +# MACOSX_BUNDLE_BUNDLE_NAME hicnlight_module +# MACOSX_BUNDLE_BUNDLE_VERSION "0.1" +# MACOSX_BUNDLE_SHORT_VERSION_STRING "0.1" +# # MACOSX_BUNDLE_INFO_PLIST ${CMAKE_SOURCE_DIR}/cmake/customtemplate.plist.in +# ) + +build_module(hicnlight_module + SHARED + SOURCES ${MODULE_SOURCE_FILES} + DEPENDS ${DEPENDENCIES} + COMPONENT lib${LIBTRANSPORT} + INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} + # LIBRARY_ROOT_DIR "vpp_plugins" + DEFINITIONS ${COMPILER_DEFINITIONS} + COMPILE_OPTIONS ${COMPILE_FLAGS} +) diff --git a/libtransport/src/io_modules/udp/hicn_forwarder_module.cc b/libtransport/src/io_modules/udp/hicn_forwarder_module.cc new file mode 100644 index 000000000..ba08dd8c0 --- /dev/null +++ b/libtransport/src/io_modules/udp/hicn_forwarder_module.cc @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <io_modules/udp/hicn_forwarder_module.h> +#include <io_modules/udp/udp_socket_connector.h> + +union AddressLight { + uint32_t ipv4; + struct in6_addr ipv6; +}; + +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; +} CommandHeader; + +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; + char symbolic_or_connid[16]; + union AddressLight address; + uint16_t cost; + uint8_t address_type; + uint8_t len; +} RouteToSelfCommand; + +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; + char symbolic_or_connid[16]; +} DeleteSelfConnectionCommand; + +namespace { +static constexpr uint8_t addr_inet = 1; +static constexpr uint8_t addr_inet6 = 2; +static constexpr uint8_t add_route_command = 3; +static constexpr uint8_t delete_connection_command = 5; +static constexpr uint8_t request_light = 0xc0; +static constexpr char identifier[] = "SELF"; + +void fillCommandHeader(CommandHeader *header) { + // Allocate and fill the header + header->message_type = request_light; + header->length = 1; +} + +RouteToSelfCommand createCommandRoute(std::unique_ptr<sockaddr> &&addr, + uint8_t prefix_length) { + RouteToSelfCommand command = {0}; + + // check and set IP address + if (addr->sa_family == AF_INET) { + command.address_type = addr_inet; + command.address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr; + } else if (addr->sa_family == AF_INET6) { + command.address_type = addr_inet6; + command.address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr; + } + + // Fill remaining payload fields +#ifndef _WIN32 + strcpy(command.symbolic_or_connid, identifier); +#else + strcpy_s(command.symbolic_or_connid, 16, identifier); +#endif + command.cost = 1; + command.len = (uint8_t)prefix_length; + + // Allocate and fill the header + command.command_id = add_route_command; + fillCommandHeader((CommandHeader *)&command); + + return command; +} + +DeleteSelfConnectionCommand createCommandDeleteConnection() { + DeleteSelfConnectionCommand command = {0}; + fillCommandHeader((CommandHeader *)&command); + command.command_id = delete_connection_command; + +#ifndef _WIN32 + strcpy(command.symbolic_or_connid, identifier); +#else + strcpy_s(command.symbolic_or_connid, 16, identifier); +#endif + + return command; +} + +} // namespace + +namespace transport { + +namespace core { + +HicnForwarderModule::HicnForwarderModule() : IoModule(), connector_(nullptr) {} + +HicnForwarderModule::~HicnForwarderModule() {} + +void HicnForwarderModule::connect(bool is_consumer) { + connector_->connect(); + connector_->setRole(is_consumer ? Connector::Role::CONSUMER + : Connector::Role::PRODUCER); +} + +bool HicnForwarderModule::isConnected() { return connector_->isConnected(); } + +void HicnForwarderModule::send(Packet &packet) { + IoModule::send(packet); + packet.setChecksum(); + connector_->send(packet); +} + +void HicnForwarderModule::send(const uint8_t *packet, std::size_t len) { + counters_.tx_packets++; + counters_.tx_bytes += len; + + // Perfect forwarding + connector_->send(packet, len); +} + +void HicnForwarderModule::registerRoute(const Prefix &prefix) { + auto command = createCommandRoute(prefix.toSockaddr(), + (uint8_t)prefix.getPrefixLength()); + send((uint8_t *)&command, sizeof(RouteToSelfCommand)); +} + +void HicnForwarderModule::closeConnection() { + auto command = createCommandDeleteConnection(); + send((uint8_t *)&command, sizeof(DeleteSelfConnectionCommand)); + connector_->close(); +} + +void HicnForwarderModule::init( + Connector::PacketReceivedCallback &&receive_callback, + Connector::OnReconnectCallback &&reconnect_callback, + asio::io_service &io_service, const std::string &app_name) { + if (!connector_) { + connector_ = new UdpSocketConnector(std::move(receive_callback), nullptr, + nullptr, std::move(reconnect_callback), + io_service, app_name); + } +} + +void HicnForwarderModule::processControlMessageReply( + utils::MemBuf &packet_buffer) { + if (packet_buffer.data()[0] == nack_code) { + throw errors::RuntimeException( + "Received Nack message from hicn light forwarder."); + } +} + +std::uint32_t HicnForwarderModule::getMtu() { return interface_mtu; } + +bool HicnForwarderModule::isControlMessage(const uint8_t *message) { + return message[0] == ack_code || message[0] == nack_code; +} + +extern "C" IoModule *create_module(void) { return new HicnForwarderModule(); } + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/io_modules/udp/hicn_forwarder_module.h b/libtransport/src/io_modules/udp/hicn_forwarder_module.h new file mode 100644 index 000000000..845db73bf --- /dev/null +++ b/libtransport/src/io_modules/udp/hicn_forwarder_module.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/io_module.h> +#include <hicn/transport/core/prefix.h> + +namespace transport { + +namespace core { + +class UdpSocketConnector; + +class HicnForwarderModule : public IoModule { + static constexpr uint8_t ack_code = 0xc2; + static constexpr uint8_t nack_code = 0xc3; + static constexpr std::uint16_t interface_mtu = 1500; + + public: + union addressLight { + uint32_t ipv4; + struct in6_addr ipv6; + }; + + struct route_to_self_command { + uint8_t messageType; + uint8_t commandID; + uint16_t length; + uint32_t seqNum; + char symbolicOrConnid[16]; + union addressLight address; + uint16_t cost; + uint8_t addressType; + uint8_t len; + }; + + using route_to_self_command = struct route_to_self_command; + + HicnForwarderModule(); + + ~HicnForwarderModule(); + + void connect(bool is_consumer) override; + + void send(Packet &packet) override; + void send(const uint8_t *packet, std::size_t len) override; + + bool isConnected() override; + + void init(Connector::PacketReceivedCallback &&receive_callback, + Connector::OnReconnectCallback &&reconnect_callback, + asio::io_service &io_service, + const std::string &app_name = "Libtransport") override; + + void registerRoute(const Prefix &prefix) override; + + std::uint32_t getMtu() override; + + bool isControlMessage(const uint8_t *message) override; + + void processControlMessageReply(utils::MemBuf &packet_buffer) override; + + void closeConnection() override; + + private: + UdpSocketConnector *connector_; +}; + +extern "C" IoModule *create_module(void); + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/io_modules/udp/udp_socket_connector.cc b/libtransport/src/io_modules/udp/udp_socket_connector.cc new file mode 100644 index 000000000..456886a54 --- /dev/null +++ b/libtransport/src/io_modules/udp/udp_socket_connector.cc @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef _WIN32 +#include <hicn/transport/portability/win_portability.h> +#endif + +#include <hicn/transport/errors/errors.h> +#include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/object_pool.h> +#include <io_modules/udp/udp_socket_connector.h> + +#include <thread> +#include <vector> + +namespace transport { + +namespace core { + +UdpSocketConnector::UdpSocketConnector( + PacketReceivedCallback &&receive_callback, PacketSentCallback &&packet_sent, + OnCloseCallback &&close_callback, OnReconnectCallback &&on_reconnect, + asio::io_service &io_service, std::string app_name) + : Connector(std::move(receive_callback), std::move(packet_sent), + std::move(close_callback), std::move(on_reconnect)), + io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + connection_timer_(io_service_), + read_msg_(std::make_pair(nullptr, 0)), + is_reconnection_(false), + data_available_(false), + app_name_(app_name) {} + +UdpSocketConnector::~UdpSocketConnector() {} + +void UdpSocketConnector::connect(std::string ip_address, std::string port) { + endpoint_iterator_ = resolver_.resolve( + {ip_address, port, asio::ip::resolver_query_base::numeric_service}); + + state_ = Connector::State::CONNECTING; + doConnect(); +} + +void UdpSocketConnector::send(const uint8_t *packet, std::size_t len) { + socket_.async_send(asio::buffer(packet, len), + [this](std::error_code ec, std::size_t /*length*/) { + if (sent_callback_) { + sent_callback_(this, ec); + } + }); +} + +void UdpSocketConnector::send(Packet &packet) { + io_service_.post([this, _packet{packet.shared_from_this()}]() { + bool write_in_progress = !output_buffer_.empty(); + output_buffer_.push_back(std::move(_packet)); + if (TRANSPORT_EXPECT_TRUE(state_ == Connector::State::CONNECTED)) { + if (!write_in_progress) { + doWrite(); + } + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } + }); +} + +void UdpSocketConnector::close() { + if (io_service_.stopped()) { + doClose(); + } else { + io_service_.dispatch(std::bind(&UdpSocketConnector::doClose, this)); + } +} + +void UdpSocketConnector::doClose() { + if (state_ != Connector::State::CLOSED) { + state_ = Connector::State::CLOSED; + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + } +} + +void UdpSocketConnector::doWrite() { + auto packet = output_buffer_.front().get(); + auto array = std::vector<asio::const_buffer>(); + + const utils::MemBuf *current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); + + socket_.async_send(std::move(array), [this](std::error_code ec, + std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + output_buffer_.pop_front(); + if (!output_buffer_.empty()) { + doWrite(); + } + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void UdpSocketConnector::doRead() { + read_msg_ = getRawBuffer(); + socket_.async_receive( + asio::buffer(read_msg_.first, read_msg_.second), + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + auto packet = getPacketFromBuffer(read_msg_.first, length); + receive_callback_(this, *packet, std::make_error_code(std::errc(0))); + doRead(); + } else if (ec.value() == + static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void UdpSocketConnector::tryReconnect() { + if (state_ == Connector::State::CONNECTED) { + TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); + state_ = Connector::State::CONNECTING; + is_reconnection_ = true; + io_service_.post([this]() { + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + + doConnect(); + startConnectionTimer(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + }); + } +} + +void UdpSocketConnector::doConnect() { + asio::async_connect( + socket_, endpoint_iterator_, + [this](std::error_code ec, udp::resolver::iterator) { + if (!ec) { + connection_timer_.cancel(); + state_ = Connector::State::CONNECTED; + doRead(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + } + + on_reconnect_callback_(this); + } else { + doConnect(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + }); +} + +bool UdpSocketConnector::checkConnected() { + return state_ == Connector::State::CONNECTED; +} + +void UdpSocketConnector::startConnectionTimer() { + connection_timer_.expires_from_now(std::chrono::seconds(60)); + connection_timer_.async_wait(std::bind(&UdpSocketConnector::handleDeadline, + this, std::placeholders::_1)); +} + +void UdpSocketConnector::handleDeadline(const std::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n"); + }); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/io_modules/udp/udp_socket_connector.h b/libtransport/src/io_modules/udp/udp_socket_connector.h new file mode 100644 index 000000000..8ab08e17a --- /dev/null +++ b/libtransport/src/io_modules/udp/udp_socket_connector.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> +#include <hicn/transport/core/connector.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/global_object_pool.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/core/name.h> +#include <hicn/transport/core/packet.h> +#include <hicn/transport/utils/branch_prediction.h> + +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <deque> + +namespace transport { +namespace core { + +using asio::ip::udp; + +class UdpSocketConnector : public Connector { + public: + UdpSocketConnector(PacketReceivedCallback &&receive_callback, + PacketSentCallback &&packet_sent, + OnCloseCallback &&close_callback, + OnReconnectCallback &&on_reconnect, + asio::io_service &io_service, + std::string app_name = "Libtransport"); + + ~UdpSocketConnector() override; + + void send(Packet &packet) override; + + void send(const uint8_t *packet, std::size_t len) override; + + void close() override; + + void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); + + private: + void doConnect(); + + void doRead(); + + void doWrite(); + + void doClose(); + + bool checkConnected(); + + private: + void handleDeadline(const std::error_code &ec); + + void startConnectionTimer(); + + void tryReconnect(); + + asio::io_service &io_service_; + asio::ip::udp::socket socket_; + asio::ip::udp::resolver resolver_; + asio::ip::udp::resolver::iterator endpoint_iterator_; + asio::steady_timer connection_timer_; + + std::pair<uint8_t *, std::size_t> read_msg_; + + bool is_reconnection_; + bool data_available_; + + std::string app_name_; +}; + +} // end namespace core + +} // end namespace transport |