aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/io_modules
diff options
context:
space:
mode:
authorLuca Muscariello <muscariello@ieee.org>2021-04-15 09:05:46 +0200
committerMauro Sardara <msardara@cisco.com>2021-04-15 16:36:16 +0200
commite92e9e839ca2cf42b56322b2489ccc0d8bf767af (patch)
tree9f1647c83a87fbf982ae329e800af25dbfb226b5 /libtransport/src/io_modules
parent3e541d7c947cc2f9db145f26c9274efd29a6fb56 (diff)
[HICN-690] Transport Library Major Refactory
The current patch provides a major refactory of the transportlibrary. A summary of the different components that underwent major modifications is reported below. - Transport protocol updates The hierarchy of classes has been optimized to have common transport services across different transport protocols. This can allow to customize a transport protocol with new features. - A new real-time communication protocol The RTC protocol has been optimized in terms of algorithms to reduce consumer-producer synchronization latency. - A novel socket API The API has been reworked to be easier to consumer but also to have a more efficient integration in L4 proxies. - Several performance improvements A large number of performance improvements have been included in particular to make the entire stack zero-copy and optimize cache miss. - New memory buffer framework Memory management has been reworked entirely to provide a more efficient infra with a richer API. Buffers are now allocated in blocks and a single buffer holds the memory for (1) the shared_ptr control block, (2) the metadata of the packet (e.g. name, pointer to other buffers if buffer is chained and relevant offsets), and (3) the packet itself, as it is sent/received over the network. - A new slab allocator Dynamic memory allocation is now managed by a novel slab allocator that is optimised for packet processing and connection management. Memory is organized in pools of blocks all of the same size which are used during the processing of outgoing/incoming packets. When a memory block Is allocated is always taken from a global pool and when it is deallocated is returned to the pool, thus avoiding the cost of any heap allocation in the data path. - New transport connectors Consumer and producer end-points can communication either using an hicn packet forwarder or with direct connector based on shared memories or sockets. The usage of transport connectors typically for unit and funcitonal testing but may have additional usage. - Support for FEC/ECC for transport services FEC/ECC via reed solomon is supported by default and made available to transport services as a modular component. Reed solomon block codes is a default FEC model that can be replaced in a modular way by many other codes including RLNC not avaiable in this distribution. The current FEC framework support variable size padding and efficiently makes use of the infra memory buffers to avoid additiona copies. - Secure transport framework for signature computation and verification Crypto support is nativelty used in hICN for integrity and authenticity. Novel support that includes RTC has been implemented and made modular and reusable acrosso different transport protocols. - TLS - Transport layer security over hicn Point to point confidentiality is provided by integrating TLS on top of hICN reliable and non-reliable transport. The integration is common and makes a different use of the TLS record. - MLS - Messaging layer security over hicn MLS integration on top of hICN is made by using the MLSPP implemetation open sourced by Cisco. We have included instrumentation tools to deploy performance and functional tests of groups of end-points. - Android support The overall code has been heavily tested in Android environments and has received heavy lifting to better run natively in recent Android OS. Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> Change-Id: If477ba2fa686e6f47bdf96307ac60938766aef69 Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/io_modules')
-rw-r--r--libtransport/src/io_modules/CMakeLists.txt37
-rw-r--r--libtransport/src/io_modules/forwarder/CMakeLists.txt44
-rw-r--r--libtransport/src/io_modules/forwarder/configuration.h89
-rw-r--r--libtransport/src/io_modules/forwarder/errors.cc52
-rw-r--r--libtransport/src/io_modules/forwarder/errors.h91
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder.cc296
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder.h90
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder_module.cc87
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder_module.h70
-rw-r--r--libtransport/src/io_modules/forwarder/global_id_counter.h54
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel.cc288
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel.h147
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc177
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel_listener.h110
-rw-r--r--libtransport/src/io_modules/loopback/CMakeLists.txt34
-rw-r--r--libtransport/src/io_modules/loopback/local_face.cc69
-rw-r--r--libtransport/src/io_modules/loopback/local_face.h54
-rw-r--r--libtransport/src/io_modules/loopback/loopback_module.cc84
-rw-r--r--libtransport/src/io_modules/loopback/loopback_module.h70
-rw-r--r--libtransport/src/io_modules/memif/CMakeLists.txt56
-rw-r--r--libtransport/src/io_modules/memif/hicn_vapi.c229
-rw-r--r--libtransport/src/io_modules/memif/hicn_vapi.h82
-rw-r--r--libtransport/src/io_modules/memif/memif_connector.cc493
-rw-r--r--libtransport/src/io_modules/memif/memif_connector.h130
-rw-r--r--libtransport/src/io_modules/memif/memif_vapi.c127
-rw-r--r--libtransport/src/io_modules/memif/memif_vapi.h54
-rw-r--r--libtransport/src/io_modules/memif/vpp_forwarder_module.cc263
-rw-r--r--libtransport/src/io_modules/memif/vpp_forwarder_module.h83
-rw-r--r--libtransport/src/io_modules/raw_socket/raw_socket_connector.cc201
-rw-r--r--libtransport/src/io_modules/raw_socket/raw_socket_connector.h80
-rw-r--r--libtransport/src/io_modules/raw_socket/raw_socket_interface.cc56
-rw-r--r--libtransport/src/io_modules/raw_socket/raw_socket_interface.h61
-rw-r--r--libtransport/src/io_modules/udp/CMakeLists.txt47
-rw-r--r--libtransport/src/io_modules/udp/hicn_forwarder_module.cc181
-rw-r--r--libtransport/src/io_modules/udp/hicn_forwarder_module.h86
-rw-r--r--libtransport/src/io_modules/udp/udp_socket_connector.cc211
-rw-r--r--libtransport/src/io_modules/udp/udp_socket_connector.h89
37 files changed, 4472 insertions, 0 deletions
diff --git a/libtransport/src/io_modules/CMakeLists.txt b/libtransport/src/io_modules/CMakeLists.txt
new file mode 100644
index 000000000..6553b9a2b
--- /dev/null
+++ b/libtransport/src/io_modules/CMakeLists.txt
@@ -0,0 +1,37 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+if (${CMAKE_SYSTEM_NAME} MATCHES Android)
+ list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp/hicn_forwarder_module.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp/udp_socket_connector.cc
+ )
+
+ list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp/hicn_forwarder_module.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp/udp_socket_connector.h
+ )
+
+ set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
+ set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
+else()
+ add_subdirectory(udp)
+ add_subdirectory(loopback)
+ add_subdirectory(forwarder)
+
+ if (__vpp__)
+ add_subdirectory(memif)
+ endif()
+endif() \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/CMakeLists.txt b/libtransport/src/io_modules/forwarder/CMakeLists.txt
new file mode 100644
index 000000000..92662bc4c
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/CMakeLists.txt
@@ -0,0 +1,44 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+
+list(APPEND MODULE_HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/endpoint.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/errors.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/global_counter.h
+)
+
+list(APPEND MODULE_SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.cc
+)
+
+build_module(forwarder_module
+ SHARED
+ SOURCES ${MODULE_SOURCE_FILES}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT lib${LIBTRANSPORT}
+ INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}
+ DEFINITIONS ${COMPILER_DEFINITIONS}
+ COMPILE_OPTIONS ${COMPILE_FLAGS}
+)
diff --git a/libtransport/src/io_modules/forwarder/configuration.h b/libtransport/src/io_modules/forwarder/configuration.h
new file mode 100644
index 000000000..fcaa5530d
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/configuration.h
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace transport {
+namespace core {
+
+struct ListenerConfig {
+ std::string address;
+ std::uint16_t port;
+ std::string name;
+};
+
+struct ConnectorConfig {
+ std::string local_address;
+ std::uint16_t local_port;
+ std::string remote_address;
+ std::uint16_t remote_port;
+ std::string name;
+};
+
+struct RouteConfig {
+ std::string prefix;
+ uint16_t weight;
+ std::string connector;
+ std::string name;
+};
+
+class Configuration {
+ public:
+ Configuration() : n_threads_(1) {}
+
+ bool empty() {
+ return listeners_.empty() && connectors_.empty() && routes_.empty();
+ }
+
+ Configuration& setThreadNumber(std::size_t threads) {
+ n_threads_ = threads;
+ return *this;
+ }
+
+ std::size_t getThreadNumber() { return n_threads_; }
+
+ template <typename... Args>
+ Configuration& addListener(Args&&... args) {
+ listeners_.emplace_back(std::forward<Args>(args)...);
+ return *this;
+ }
+
+ template <typename... Args>
+ Configuration& addConnector(Args&&... args) {
+ connectors_.emplace_back(std::forward<Args>(args)...);
+ return *this;
+ }
+
+ template <typename... Args>
+ Configuration& addRoute(Args&&... args) {
+ routes_.emplace_back(std::forward<Args>(args)...);
+ return *this;
+ }
+
+ std::vector<ListenerConfig>& getListeners() { return listeners_; }
+
+ std::vector<ConnectorConfig>& getConnectors() { return connectors_; }
+
+ std::vector<RouteConfig>& getRoutes() { return routes_; }
+
+ private:
+ std::vector<ListenerConfig> listeners_;
+ std::vector<ConnectorConfig> connectors_;
+ std::vector<RouteConfig> routes_;
+ std::size_t n_threads_;
+};
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/errors.cc b/libtransport/src/io_modules/forwarder/errors.cc
new file mode 100644
index 000000000..b5f131499
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/errors.cc
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ */
+
+#include <io_modules/forwarder/errors.h>
+
+namespace transport {
+namespace core {
+
+const std::error_category& forwarder_category() {
+ static forwarder_category_impl instance;
+
+ return instance;
+}
+
+const char* forwarder_category_impl::name() const throw() {
+ return "proxy::connector::error";
+}
+
+std::string forwarder_category_impl::message(int ev) const {
+ switch (static_cast<forwarder_error>(ev)) {
+ case forwarder_error::success: {
+ return "Success";
+ }
+ case forwarder_error::disconnected: {
+ return "Connector is disconnected";
+ }
+ case forwarder_error::receive_failed: {
+ return "Packet reception failed";
+ }
+ case forwarder_error::send_failed: {
+ return "Packet send failed";
+ }
+ case forwarder_error::memory_allocation_error: {
+ return "Impossible to allocate memory for packet pool";
+ }
+ case forwarder_error::invalid_connector_type: {
+ return "Invalid type specified for connector.";
+ }
+ case forwarder_error::invalid_connector: {
+ return "Created connector was invalid.";
+ }
+ case forwarder_error::interest_cache_miss: {
+ return "interest cache miss.";
+ }
+ default: {
+ return "Unknown connector error";
+ }
+ }
+}
+} // namespace core
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/errors.h b/libtransport/src/io_modules/forwarder/errors.h
new file mode 100644
index 000000000..dd5cc8fe7
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/errors.h
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <system_error>
+
+namespace transport {
+namespace core {
+/**
+ * @brief Get the default server error category.
+ * @return The default server error category instance.
+ *
+ * @warning The first call to this function is thread-safe only starting with
+ * C++11.
+ */
+const std::error_category& forwarder_category();
+
+/**
+ * The list of errors.
+ */
+enum class forwarder_error {
+ success = 0,
+ send_failed,
+ receive_failed,
+ disconnected,
+ memory_allocation_error,
+ invalid_connector_type,
+ invalid_connector,
+ interest_cache_miss
+};
+
+/**
+ * @brief Create an error_code instance for the given error.
+ * @param error The error.
+ * @return The error_code instance.
+ */
+inline std::error_code make_error_code(forwarder_error error) {
+ return std::error_code(static_cast<int>(error), forwarder_category());
+}
+
+/**
+ * @brief Create an error_condition instance for the given error.
+ * @param error The error.
+ * @return The error_condition instance.
+ */
+inline std::error_condition make_error_condition(forwarder_error error) {
+ return std::error_condition(static_cast<int>(error), forwarder_category());
+}
+
+/**
+ * @brief A server error category.
+ */
+class forwarder_category_impl : public std::error_category {
+ public:
+ /**
+ * @brief Get the name of the category.
+ * @return The name of the category.
+ */
+ virtual const char* name() const throw();
+
+ /**
+ * @brief Get the error message for a given error.
+ * @param ev The error numeric value.
+ * @return The message associated to the error.
+ */
+ virtual std::string message(int ev) const;
+};
+} // namespace core
+} // namespace transport
+
+namespace std {
+// namespace system {
+template <>
+struct is_error_code_enum<::transport::core::forwarder_error>
+ : public std::true_type {};
+// } // namespace system
+} // namespace std
diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc
new file mode 100644
index 000000000..7e89e2f9f
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/forwarder.cc
@@ -0,0 +1,296 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/global_configuration.h>
+#include <core/local_connector.h>
+#include <io_modules/forwarder/forwarder.h>
+#include <io_modules/forwarder/global_id_counter.h>
+#include <io_modules/forwarder/udp_tunnel.h>
+#include <io_modules/forwarder/udp_tunnel_listener.h>
+
+namespace transport {
+
+namespace core {
+
+constexpr char Forwarder::forwarder_config_section[];
+
+Forwarder::Forwarder() : config_() {
+ using namespace std::placeholders;
+ GlobalConfiguration::getInstance().registerConfigurationParser(
+ forwarder_config_section,
+ std::bind(&Forwarder::parseForwarderConfiguration, this, _1, _2));
+
+ if (!config_.empty()) {
+ initThreads();
+ initListeners();
+ initConnectors();
+ }
+}
+
+Forwarder::~Forwarder() {
+ for (auto &l : listeners_) {
+ l->close();
+ }
+
+ for (auto &c : remote_connectors_) {
+ c.second->close();
+ }
+
+ GlobalConfiguration::getInstance().unregisterConfigurationParser(
+ forwarder_config_section);
+}
+
+void Forwarder::initThreads() {
+ for (unsigned i = 0; i < config_.getThreadNumber(); i++) {
+ thread_pool_.emplace_back(io_service_, /* detached */ false);
+ }
+}
+
+void Forwarder::initListeners() {
+ using namespace std::placeholders;
+ for (auto &l : config_.getListeners()) {
+ listeners_.emplace_back(std::make_shared<UdpTunnelListener>(
+ io_service_,
+ std::bind(&Forwarder::onPacketFromListener, this, _1, _2, _3),
+ asio::ip::udp::endpoint(asio::ip::address::from_string(l.address),
+ l.port)));
+ }
+}
+
+void Forwarder::initConnectors() {
+ using namespace std::placeholders;
+ for (auto &c : config_.getConnectors()) {
+ auto id = GlobalCounter<Connector::Id>::getInstance().getNext();
+ auto conn = new UdpTunnelConnector(
+ io_service_, std::bind(&Forwarder::onPacketReceived, this, _1, _2, _3),
+ std::bind(&Forwarder::onPacketSent, this, _1, _2),
+ std::bind(&Forwarder::onConnectorClosed, this, _1),
+ std::bind(&Forwarder::onConnectorReconnected, this, _1));
+ conn->setConnectorId(id);
+ remote_connectors_.emplace(id, conn);
+ conn->connect(c.remote_address, c.remote_port, c.local_address,
+ c.local_port);
+ }
+}
+
+Connector::Id Forwarder::registerLocalConnector(
+ asio::io_service &io_service,
+ Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback) {
+ utils::SpinLock::Acquire locked(connector_lock_);
+ auto id = GlobalCounter<Connector::Id>::getInstance().getNext();
+ auto connector = std::make_shared<LocalConnector>(
+ io_service, receive_callback, nullptr, nullptr, reconnect_callback);
+ connector->setConnectorId(id);
+ local_connectors_.emplace(id, std::move(connector));
+ return id;
+}
+
+Forwarder &Forwarder::deleteConnector(Connector::Id id) {
+ utils::SpinLock::Acquire locked(connector_lock_);
+ auto it = local_connectors_.find(id);
+ if (it != local_connectors_.end()) {
+ it->second->close();
+ local_connectors_.erase(it);
+ }
+
+ return *this;
+}
+
+Connector::Ptr Forwarder::getConnector(Connector::Id id) {
+ utils::SpinLock::Acquire locked(connector_lock_);
+ auto it = local_connectors_.find(id);
+ if (it != local_connectors_.end()) {
+ return it->second;
+ }
+
+ return nullptr;
+}
+
+void Forwarder::onPacketFromListener(Connector *connector,
+ utils::MemBuf &packet_buffer,
+ const std::error_code &ec) {
+ // Create connector
+ connector->setReceiveCallback(
+ std::bind(&Forwarder::onPacketReceived, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
+
+ TRANSPORT_LOGD("Packet received from listener.");
+
+ {
+ utils::SpinLock::Acquire locked(connector_lock_);
+ remote_connectors_.emplace(connector->getConnectorId(),
+ connector->shared_from_this());
+ }
+ // TODO Check if control packet or not. For the moment it is not.
+ onPacketReceived(connector, packet_buffer, ec);
+}
+
+void Forwarder::onPacketReceived(Connector *connector,
+ utils::MemBuf &packet_buffer,
+ const std::error_code &ec) {
+ // Figure out the type of packet we received
+ bool is_interest = Packet::isInterest(packet_buffer.data());
+
+ Packet *packet = nullptr;
+ if (is_interest) {
+ packet = static_cast<Interest *>(&packet_buffer);
+ } else {
+ packet = static_cast<ContentObject *>(&packet_buffer);
+ }
+
+ for (auto &c : local_connectors_) {
+ auto role = c.second->getRole();
+ auto is_producer = role == Connector::Role::PRODUCER;
+ if ((is_producer && is_interest) || (!is_producer && !is_interest)) {
+ c.second->send(*packet);
+ } else {
+ TRANSPORT_LOGD(
+ "Error sending packet to local connector. is_interest = %d - "
+ "is_producer = %d",
+ (int)is_interest, (int)is_producer);
+ }
+ }
+
+ // PCS Lookup + FIB lookup. Skip for now
+
+ // Forward packet to local connectors
+}
+
+void Forwarder::send(Packet &packet) {
+ // TODo Here a nice PIT/CS / FIB would be required:)
+ // For now let's just forward the packet on the remote connector we get
+ if (remote_connectors_.begin() == remote_connectors_.end()) {
+ return;
+ }
+
+ auto remote_endpoint =
+ remote_connectors_.begin()->second->getRemoteEndpoint();
+ TRANSPORT_LOGD("Sending packet to: %s:%u",
+ remote_endpoint.getAddress().to_string().c_str(),
+ remote_endpoint.getPort());
+ remote_connectors_.begin()->second->send(packet);
+}
+
+void Forwarder::onPacketSent(Connector *connector, const std::error_code &ec) {}
+
+void Forwarder::onConnectorClosed(Connector *connector) {}
+
+void Forwarder::onConnectorReconnected(Connector *connector) {}
+
+void Forwarder::parseForwarderConfiguration(
+ const libconfig::Setting &forwarder_config, std::error_code &ec) {
+ using namespace libconfig;
+
+ // n_thread
+ if (forwarder_config.exists("n_threads")) {
+ // Get number of threads
+ int n_threads = 1;
+ forwarder_config.lookupValue("n_threads", n_threads);
+ TRANSPORT_LOGD("Forwarder threads from config file: %u", n_threads);
+ config_.setThreadNumber(n_threads);
+ }
+
+ // listeners
+ if (forwarder_config.exists("listeners")) {
+ // get path where looking for modules
+ const Setting &listeners = forwarder_config.lookup("listeners");
+ auto count = listeners.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &listener = listeners[i];
+ ListenerConfig list;
+ unsigned port;
+
+ list.name = listener.getName();
+ listener.lookupValue("local_address", list.address);
+ listener.lookupValue("local_port", port);
+ list.port = (uint16_t)(port);
+
+ TRANSPORT_LOGD("Adding listener %s, (%s:%u)", list.name.c_str(),
+ list.address.c_str(), list.port);
+ config_.addListener(std::move(list));
+ }
+ }
+
+ // connectors
+ if (forwarder_config.exists("connectors")) {
+ // get path where looking for modules
+ const Setting &connectors = forwarder_config.lookup("connectors");
+ auto count = connectors.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &connector = connectors[i];
+ ConnectorConfig conn;
+
+ conn.name = connector.getName();
+ unsigned port = 0;
+
+ if (!connector.lookupValue("local_address", conn.local_address)) {
+ conn.local_address = "";
+ }
+
+ if (!connector.lookupValue("local_port", port)) {
+ port = 0;
+ }
+
+ conn.local_port = (uint16_t)(port);
+
+ if (!connector.lookupValue("remote_address", conn.remote_address)) {
+ throw errors::RuntimeException(
+ "Error in configuration file: remote_address is a mandatory field "
+ "of Connectors.");
+ }
+
+ if (!connector.lookupValue("remote_port", port)) {
+ throw errors::RuntimeException(
+ "Error in configuration file: remote_port is a mandatory field "
+ "of Connectors.");
+ }
+
+ conn.remote_port = (uint16_t)(port);
+
+ TRANSPORT_LOGD("Adding connector %s, (%s:%u %s:%u)", conn.name.c_str(),
+ conn.local_address.c_str(), conn.local_port,
+ conn.remote_address.c_str(), conn.remote_port);
+ config_.addConnector(std::move(conn));
+ }
+ }
+
+ // Routes
+ if (forwarder_config.exists("routes")) {
+ const Setting &routes = forwarder_config.lookup("routes");
+ auto count = routes.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &route = routes[i];
+ RouteConfig r;
+ unsigned weight;
+
+ r.name = route.getName();
+ route.lookupValue("prefix", r.prefix);
+ route.lookupValue("weight", weight);
+ route.lookupValue("connector", r.connector);
+ r.weight = (uint16_t)(weight);
+
+ TRANSPORT_LOGD("Adding route %s %s (%s %u)", r.name.c_str(),
+ r.prefix.c_str(), r.connector.c_str(), r.weight);
+ config_.addRoute(std::move(r));
+ }
+ }
+}
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/forwarder.h b/libtransport/src/io_modules/forwarder/forwarder.h
new file mode 100644
index 000000000..5b564bb5e
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/forwarder.h
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/io_module.h>
+#include <hicn/transport/core/prefix.h>
+#include <hicn/transport/utils/event_thread.h>
+#include <hicn/transport/utils/singleton.h>
+#include <hicn/transport/utils/spinlock.h>
+#include <io_modules/forwarder/configuration.h>
+#include <io_modules/forwarder/udp_tunnel_listener.h>
+
+#include <atomic>
+#include <libconfig.h++>
+#include <unordered_map>
+
+namespace transport {
+
+namespace core {
+
+class Forwarder : public utils::Singleton<Forwarder> {
+ static constexpr char forwarder_config_section[] = "forwarder";
+ friend class utils::Singleton<Forwarder>;
+
+ public:
+ Forwarder();
+
+ ~Forwarder();
+
+ void initThreads();
+ void initListeners();
+ void initConnectors();
+
+ Connector::Id registerLocalConnector(
+ asio::io_service &io_service,
+ Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback);
+
+ Forwarder &deleteConnector(Connector::Id id);
+
+ Connector::Ptr getConnector(Connector::Id id);
+
+ void send(Packet &packet);
+
+ void stop();
+
+ private:
+ void onPacketFromListener(Connector *connector, utils::MemBuf &packet_buffer,
+ const std::error_code &ec);
+ void onPacketReceived(Connector *connector, utils::MemBuf &packet_buffer,
+ const std::error_code &ec);
+ void onPacketSent(Connector *connector, const std::error_code &ec);
+ void onConnectorClosed(Connector *connector);
+ void onConnectorReconnected(Connector *connector);
+
+ void parseForwarderConfiguration(const libconfig::Setting &io_config,
+ std::error_code &ec);
+
+ asio::io_service io_service_;
+ utils::SpinLock connector_lock_;
+
+ /**
+ * Connectors and listeners must be declares *before* thread_pool_, so that
+ * threads destructors will wait for them to gracefully close before being
+ * destroyed.
+ */
+ std::unordered_map<Connector::Id, Connector::Ptr> remote_connectors_;
+ std::unordered_map<Connector::Id, Connector::Ptr> local_connectors_;
+ std::vector<UdpTunnelListener::Ptr> listeners_;
+
+ std::vector<utils::EventThread> thread_pool_;
+
+ Configuration config_;
+};
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.cc b/libtransport/src/io_modules/forwarder/forwarder_module.cc
new file mode 100644
index 000000000..356b42d3b
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/forwarder_module.cc
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/errors/not_implemented_exception.h>
+#include <hicn/transport/utils/log.h>
+#include <io_modules/forwarder/forwarder_module.h>
+
+namespace transport {
+
+namespace core {
+
+ForwarderModule::ForwarderModule()
+ : IoModule(),
+ name_(""),
+ connector_id_(Connector::invalid_connector),
+ forwarder_(Forwarder::getInstance()) {}
+
+ForwarderModule::~ForwarderModule() {
+ forwarder_.deleteConnector(connector_id_);
+}
+
+bool ForwarderModule::isConnected() { return true; }
+
+void ForwarderModule::send(Packet &packet) {
+ IoModule::send(packet);
+ forwarder_.send(packet);
+ // TRANSPORT_LOGD("ForwarderModule: sending from %u to %d", local_id_,
+ // 1 - local_id_);
+
+ // local_faces_.at(1 - local_id_).onPacket(packet);
+}
+
+void ForwarderModule::send(const uint8_t *packet, std::size_t len) {
+ // not supported
+ throw errors::NotImplementedException();
+}
+
+void ForwarderModule::registerRoute(const Prefix &prefix) {
+ // For the moment we route packets from one socket to the other.
+ // Next step will be to introduce a FIB
+ return;
+}
+
+void ForwarderModule::closeConnection() {
+ forwarder_.deleteConnector(connector_id_);
+}
+
+void ForwarderModule::init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name) {
+ connector_id_ = forwarder_.registerLocalConnector(
+ io_service, std::move(receive_callback), std::move(reconnect_callback));
+ name_ = app_name;
+}
+
+void ForwarderModule::processControlMessageReply(utils::MemBuf &packet_buffer) {
+ return;
+}
+
+void ForwarderModule::connect(bool is_consumer) {
+ forwarder_.getConnector(connector_id_)
+ ->setRole(is_consumer ? Connector::Role::CONSUMER
+ : Connector::Role::PRODUCER);
+}
+
+std::uint32_t ForwarderModule::getMtu() { return interface_mtu; }
+
+bool ForwarderModule::isControlMessage(const uint8_t *message) { return false; }
+
+extern "C" IoModule *create_module(void) { return new ForwarderModule(); }
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.h b/libtransport/src/io_modules/forwarder/forwarder_module.h
new file mode 100644
index 000000000..58bfb7996
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/forwarder_module.h
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/io_module.h>
+#include <hicn/transport/core/prefix.h>
+#include <io_modules/forwarder/forwarder.h>
+
+#include <atomic>
+
+namespace transport {
+
+namespace core {
+
+class Forwarder;
+
+class ForwarderModule : public IoModule {
+ static constexpr std::uint16_t interface_mtu = 1500;
+
+ public:
+ ForwarderModule();
+
+ ~ForwarderModule();
+
+ void connect(bool is_consumer) override;
+
+ void send(Packet &packet) override;
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ bool isConnected() override;
+
+ void init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name = "Libtransport") override;
+
+ void registerRoute(const Prefix &prefix) override;
+
+ std::uint32_t getMtu() override;
+
+ bool isControlMessage(const uint8_t *message) override;
+
+ void processControlMessageReply(utils::MemBuf &packet_buffer) override;
+
+ void closeConnection() override;
+
+ private:
+ std::string name_;
+ Connector::Id connector_id_;
+ Forwarder &forwarder_;
+};
+
+extern "C" IoModule *create_module(void);
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/global_id_counter.h b/libtransport/src/io_modules/forwarder/global_id_counter.h
new file mode 100644
index 000000000..fe8d76730
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/global_id_counter.h
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <mutex>
+
+namespace transport {
+
+namespace core {
+
+template <typename T = uint64_t>
+class GlobalCounter {
+ public:
+ static GlobalCounter& getInstance() {
+ std::lock_guard<std::mutex> lock(global_mutex_);
+
+ if (!instance_) {
+ instance_.reset(new GlobalCounter());
+ }
+
+ return *instance_;
+ }
+
+ T getNext() { return counter_++; }
+
+ private:
+ GlobalCounter() : counter_(0) {}
+ static std::unique_ptr<GlobalCounter<T>> instance_;
+ static std::mutex global_mutex_;
+ std::atomic<T> counter_;
+};
+
+template <typename T>
+std::unique_ptr<GlobalCounter<T>> GlobalCounter<T>::instance_ = nullptr;
+
+template <typename T>
+std::mutex GlobalCounter<T>::global_mutex_;
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.cc b/libtransport/src/io_modules/forwarder/udp_tunnel.cc
new file mode 100644
index 000000000..dc725fc4e
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/udp_tunnel.cc
@@ -0,0 +1,288 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ */
+
+#include <hicn/transport/utils/branch_prediction.h>
+#include <io_modules/forwarder/errors.h>
+#include <io_modules/forwarder/udp_tunnel.h>
+
+#include <iostream>
+#include <thread>
+#include <vector>
+
+namespace transport {
+namespace core {
+
+UdpTunnelConnector::~UdpTunnelConnector() {}
+
+void UdpTunnelConnector::connect(const std::string &hostname, uint16_t port,
+ const std::string &bind_address,
+ uint16_t bind_port) {
+ if (state_ == State::CLOSED) {
+ state_ = State::CONNECTING;
+ endpoint_iterator_ = resolver_.resolve({hostname, std::to_string(port)});
+ remote_endpoint_send_ = *endpoint_iterator_;
+ socket_->open(remote_endpoint_send_.protocol());
+
+ if (!bind_address.empty() && bind_port != 0) {
+ using namespace asio::ip;
+ socket_->bind(
+ udp::endpoint(address::from_string(bind_address), bind_port));
+ }
+
+ state_ = State::CONNECTED;
+
+ remote_endpoint_ = Endpoint(remote_endpoint_send_);
+ local_endpoint_ = Endpoint(socket_->local_endpoint());
+
+ doRecvPacket();
+
+#ifdef LINUX
+ send_timer_.expires_from_now(std::chrono::microseconds(50));
+ send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this,
+ std::placeholders::_1));
+#endif
+ }
+}
+
+void UdpTunnelConnector::send(Packet &packet) {
+ strand_->post([this, pkt{packet.shared_from_this()}]() {
+ bool write_in_progress = !output_buffer_.empty();
+ output_buffer_.push_back(std::move(pkt));
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ if (!write_in_progress) {
+ doSendPacket();
+ }
+ } else {
+ data_available_ = true;
+ }
+ });
+}
+
+void UdpTunnelConnector::send(const uint8_t *packet, std::size_t len) {}
+
+void UdpTunnelConnector::close() {
+ TRANSPORT_LOGD("UDPTunnelConnector::close");
+ state_ = State::CLOSED;
+ bool is_socket_owned = socket_.use_count() == 1;
+ if (is_socket_owned) {
+ io_service_.dispatch([this]() {
+ this->socket_->close();
+ // on_close_callback_(shared_from_this());
+ });
+ }
+}
+
+void UdpTunnelConnector::doSendPacket() {
+#ifdef LINUX
+ send_timer_.expires_from_now(std::chrono::microseconds(50));
+ send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this,
+ std::placeholders::_1));
+#else
+ auto packet = output_buffer_.front().get();
+ auto array = std::vector<asio::const_buffer>();
+
+ const ::utils::MemBuf *current = packet;
+ do {
+ array.push_back(asio::const_buffer(current->data(), current->length()));
+ current = current->next();
+ } while (current != packet);
+
+ socket_->async_send_to(
+ std::move(array), remote_endpoint_send_,
+ strand_->wrap([this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ sent_callback_(this, make_error_code(forwarder_error::success));
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ sendFailed();
+ sent_callback_(this, ec);
+ }
+
+ output_buffer_.pop_front();
+ if (!output_buffer_.empty()) {
+ doSendPacket();
+ }
+ }));
+#endif
+}
+
+#ifdef LINUX
+void UdpTunnelConnector::writeHandler(std::error_code ec) {
+ if (TRANSPORT_EXPECT_FALSE(state_ != State::CONNECTED)) {
+ return;
+ }
+
+ auto len = std::min(output_buffer_.size(), std::size_t(Connector::max_burst));
+
+ if (len) {
+ int m = 0;
+ for (auto &p : output_buffer_) {
+ auto packet = p.get();
+ ::utils::MemBuf *current = packet;
+ int b = 0;
+ do {
+ // array.push_back(asio::const_buffer(current->data(),
+ // current->length()));
+ tx_iovecs_[m][b].iov_base = current->writableData();
+ tx_iovecs_[m][b].iov_len = current->length();
+ current = current->next();
+ b++;
+ } while (current != packet);
+
+ tx_msgs_[m].msg_hdr.msg_iov = tx_iovecs_[m];
+ tx_msgs_[m].msg_hdr.msg_iovlen = b;
+ tx_msgs_[m].msg_hdr.msg_name = remote_endpoint_send_.data();
+ tx_msgs_[m].msg_hdr.msg_namelen = remote_endpoint_send_.size();
+ m++;
+
+ if (--len == 0) {
+ break;
+ }
+ }
+
+ int retval = sendmmsg(socket_->native_handle(), tx_msgs_, m, MSG_DONTWAIT);
+ if (retval > 0) {
+ while (retval--) {
+ output_buffer_.pop_front();
+ }
+ } else if (retval != EWOULDBLOCK && retval != EAGAIN) {
+ TRANSPORT_LOGE("Error sending messages! %s %d\n", strerror(errno),
+ retval);
+ return;
+ }
+ }
+
+ if (!output_buffer_.empty()) {
+ send_timer_.expires_from_now(std::chrono::microseconds(50));
+ send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this,
+ std::placeholders::_1));
+ }
+}
+
+void UdpTunnelConnector::readHandler(std::error_code ec) {
+ TRANSPORT_LOGD("UdpTunnelConnector receive packet");
+
+ // TRANSPORT_LOGD("UdpTunnelConnector received packet length=%lu", length);
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ if (current_position_ == 0) {
+ for (int i = 0; i < max_burst; i++) {
+ auto read_buffer = getRawBuffer();
+ rx_iovecs_[i][0].iov_base = read_buffer.first;
+ rx_iovecs_[i][0].iov_len = read_buffer.second;
+ rx_msgs_[i].msg_hdr.msg_iov = rx_iovecs_[i];
+ rx_msgs_[i].msg_hdr.msg_iovlen = 1;
+ }
+ }
+
+ int res = recvmmsg(socket_->native_handle(), rx_msgs_ + current_position_,
+ max_burst - current_position_, MSG_DONTWAIT, nullptr);
+ if (res < 0) {
+ TRANSPORT_LOGE("Error receiving messages! %s %d\n", strerror(errno),
+ res);
+ return;
+ }
+
+ for (int i = 0; i < res; i++) {
+ auto packet = getPacketFromBuffer(
+ reinterpret_cast<uint8_t *>(
+ rx_msgs_[current_position_].msg_hdr.msg_iov[0].iov_base),
+ rx_msgs_[current_position_].msg_len);
+ receiveSuccess(*packet);
+ receive_callback_(this, *packet,
+ make_error_code(forwarder_error::success));
+ ++current_position_;
+ }
+
+ doRecvPacket();
+ } else {
+ TRANSPORT_LOGE(
+ "Error in UDP: Receiving packets from a not connected socket.");
+ }
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ TRANSPORT_LOGE("The connection has been closed by the application.");
+ return;
+ } else {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ // receive_callback_(this, *read_msg_, ec);
+ TRANSPORT_LOGE("Error in UDP connector: %d %s", ec.value(),
+ ec.message().c_str());
+ } else {
+ TRANSPORT_LOGE("Error while not connector");
+ }
+ }
+}
+#endif
+
+void UdpTunnelConnector::doRecvPacket() {
+#ifdef LINUX
+ if (state_ == State::CONNECTED) {
+#if ((ASIO_VERSION / 100 % 1000) < 11)
+ socket_->async_receive(asio::null_buffers(),
+#else
+ socket_->async_wait(asio::ip::tcp::socket::wait_read,
+#endif
+ std::bind(&UdpTunnelConnector::readHandler, this,
+ std::placeholders::_1));
+ }
+#else
+ TRANSPORT_LOGD("UdpTunnelConnector receive packet");
+ read_msg_ = getRawBuffer();
+ socket_->async_receive_from(
+ asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_recv_,
+ [this](std::error_code ec, std::size_t length) {
+ TRANSPORT_LOGD("UdpTunnelConnector received packet length=%lu", length);
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ auto packet = getPacketFromBuffer(read_msg_.first, length);
+ receiveSuccess(*packet);
+ receive_callback_(this, *packet,
+ make_error_code(forwarder_error::success));
+ doRecvPacket();
+ } else {
+ TRANSPORT_LOGE(
+ "Error in UDP: Receiving packets from a not connected socket.");
+ }
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ TRANSPORT_LOGE("The connection has been closed by the application.");
+ return;
+ } else {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ TRANSPORT_LOGE("Error in UDP connector: %d %s", ec.value(),
+ ec.message().c_str());
+ } else {
+ TRANSPORT_LOGE("Error while not connector");
+ }
+ }
+ });
+#endif
+}
+
+void UdpTunnelConnector::doConnect() {
+ asio::async_connect(
+ *socket_, endpoint_iterator_,
+ [this](std::error_code ec, asio::ip::udp::resolver::iterator) {
+ if (!ec) {
+ state_ = State::CONNECTED;
+ doRecvPacket();
+
+ if (data_available_) {
+ data_available_ = false;
+ doSendPacket();
+ }
+ } else {
+ TRANSPORT_LOGE("[Hproxy] - UDP Connection failed!!!");
+ timer_.expires_from_now(std::chrono::milliseconds(500));
+ timer_.async_wait(std::bind(&UdpTunnelConnector::doConnect, this));
+ }
+ });
+}
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.h b/libtransport/src/io_modules/forwarder/udp_tunnel.h
new file mode 100644
index 000000000..df472af91
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/udp_tunnel.h
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/platform.h>
+#include <io_modules/forwarder/errors.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <iostream>
+#include <memory>
+
+namespace transport {
+namespace core {
+
+class UdpTunnelListener;
+
+class UdpTunnelConnector : public Connector {
+ friend class UdpTunnelListener;
+
+ public:
+ template <typename ReceiveCallback, typename SentCallback, typename OnClose,
+ typename OnReconnect>
+ UdpTunnelConnector(asio::io_service &io_service,
+ ReceiveCallback &&receive_callback,
+ SentCallback &&packet_sent, OnClose &&on_close_callback,
+ OnReconnect &&on_reconnect)
+ : Connector(receive_callback, packet_sent, on_close_callback,
+ on_reconnect),
+ io_service_(io_service),
+ strand_(std::make_shared<asio::io_service::strand>(io_service_)),
+ socket_(std::make_shared<asio::ip::udp::socket>(io_service_)),
+ resolver_(io_service_),
+ timer_(io_service_),
+#ifdef LINUX
+ send_timer_(io_service_),
+ tx_iovecs_{0},
+ tx_msgs_{0},
+ rx_iovecs_{0},
+ rx_msgs_{0},
+ current_position_(0),
+#else
+ read_msg_(nullptr, 0),
+#endif
+ data_available_(false) {
+ }
+
+ template <typename ReceiveCallback, typename SentCallback, typename OnClose,
+ typename OnReconnect, typename EndpointType>
+ UdpTunnelConnector(std::shared_ptr<asio::ip::udp::socket> &socket,
+ std::shared_ptr<asio::io_service::strand> &strand,
+ ReceiveCallback &&receive_callback,
+ SentCallback &&packet_sent, OnClose &&on_close_callback,
+ OnReconnect &&on_reconnect, EndpointType &&remote_endpoint)
+ : Connector(receive_callback, packet_sent, on_close_callback,
+ on_reconnect),
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ io_service_(socket->get_io_service()),
+#else
+ io_service_((asio::io_context &)(socket->get_executor().context())),
+#endif
+ strand_(strand),
+ socket_(socket),
+ resolver_(io_service_),
+ remote_endpoint_send_(std::forward<EndpointType &&>(remote_endpoint)),
+ timer_(io_service_),
+#ifdef LINUX
+ send_timer_(io_service_),
+ tx_iovecs_{0},
+ tx_msgs_{0},
+ rx_iovecs_{0},
+ rx_msgs_{0},
+ current_position_(0),
+#else
+ read_msg_(nullptr, 0),
+#endif
+ data_available_(false) {
+ if (socket_->is_open()) {
+ state_ = State::CONNECTED;
+ remote_endpoint_ = Endpoint(remote_endpoint_send_);
+ local_endpoint_ = socket_->local_endpoint();
+ }
+ }
+
+ ~UdpTunnelConnector() override;
+
+ void send(Packet &packet) override;
+
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ void close() override;
+
+ void connect(const std::string &hostname, std::uint16_t port,
+ const std::string &bind_address = "",
+ std::uint16_t bind_port = 0);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ void doConnect();
+ void doRecvPacket();
+
+ void doRecvPacket(utils::MemBuf &buffer) {
+ receive_callback_(this, buffer, make_error_code(forwarder_error::success));
+ }
+
+#ifdef LINUX
+ void readHandler(std::error_code ec);
+ void writeHandler(std::error_code ec);
+#endif
+
+ void setConnected() { state_ = State::CONNECTED; }
+
+ void doSendPacket();
+ void doClose();
+
+ private:
+ asio::io_service &io_service_;
+ std::shared_ptr<asio::io_service::strand> strand_;
+ std::shared_ptr<asio::ip::udp::socket> socket_;
+ asio::ip::udp::resolver resolver_;
+ asio::ip::udp::resolver::iterator endpoint_iterator_;
+ asio::ip::udp::endpoint remote_endpoint_send_;
+ asio::ip::udp::endpoint remote_endpoint_recv_;
+
+ asio::steady_timer timer_;
+
+#ifdef LINUX
+ asio::steady_timer send_timer_;
+ struct iovec tx_iovecs_[max_burst][8];
+ struct mmsghdr tx_msgs_[max_burst];
+ struct iovec rx_iovecs_[max_burst][8];
+ struct mmsghdr rx_msgs_[max_burst];
+ std::uint8_t current_position_;
+#else
+ std::pair<uint8_t *, std::size_t> read_msg_;
+#endif
+
+ bool data_available_;
+};
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc
new file mode 100644
index 000000000..12246c3cf
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ */
+
+#include <hicn/transport/utils/hash.h>
+#include <hicn/transport/utils/log.h>
+#include <io_modules/forwarder/udp_tunnel.h>
+#include <io_modules/forwarder/udp_tunnel_listener.h>
+
+#ifndef LINUX
+namespace std {
+size_t hash<asio::ip::udp::endpoint>::operator()(
+ const asio::ip::udp::endpoint &endpoint) const {
+ auto hash_ip = endpoint.address().is_v4()
+ ? endpoint.address().to_v4().to_ulong()
+ : utils::hash::fnv32_buf(
+ endpoint.address().to_v6().to_bytes().data(), 16);
+ uint16_t port = endpoint.port();
+ return utils::hash::fnv32_buf(&port, 2, hash_ip);
+}
+} // namespace std
+#endif
+
+namespace transport {
+namespace core {
+
+UdpTunnelListener::~UdpTunnelListener() {}
+
+void UdpTunnelListener::close() {
+ strand_->post([this]() {
+ if (socket_->is_open()) {
+ socket_->close();
+ }
+ });
+}
+
+#ifdef LINUX
+void UdpTunnelListener::readHandler(std::error_code ec) {
+ TRANSPORT_LOGD("UdpTunnelConnector receive packet");
+
+ // TRANSPORT_LOGD("UdpTunnelConnector received packet length=%lu", length);
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (current_position_ == 0) {
+ for (int i = 0; i < Connector::max_burst; i++) {
+ auto read_buffer = Connector::getRawBuffer();
+ iovecs_[i][0].iov_base = read_buffer.first;
+ iovecs_[i][0].iov_len = read_buffer.second;
+ msgs_[i].msg_hdr.msg_iov = iovecs_[i];
+ msgs_[i].msg_hdr.msg_iovlen = 1;
+ msgs_[i].msg_hdr.msg_name = &remote_endpoints_[i];
+ msgs_[i].msg_hdr.msg_namelen = sizeof(remote_endpoints_[i]);
+ }
+ }
+
+ int res = recvmmsg(socket_->native_handle(), msgs_ + current_position_,
+ Connector::max_burst - current_position_, MSG_DONTWAIT,
+ nullptr);
+ if (res < 0) {
+ TRANSPORT_LOGE("Error in recvmmsg.");
+ }
+
+ for (int i = 0; i < res; i++) {
+ auto packet = Connector::getPacketFromBuffer(
+ reinterpret_cast<uint8_t *>(
+ msgs_[current_position_].msg_hdr.msg_iov[0].iov_base),
+ msgs_[current_position_].msg_len);
+ auto connector_id =
+ utils::hash::fnv64_buf(msgs_[current_position_].msg_hdr.msg_name,
+ msgs_[current_position_].msg_hdr.msg_namelen);
+
+ auto connector = connectors_.find(connector_id);
+ if (connector == connectors_.end()) {
+ // Create new connector corresponding to new client
+
+ /*
+ * Get the remote endpoint for this particular message
+ */
+ using namespace asio::ip;
+ if (local_endpoint_.address().is_v4()) {
+ auto addr = reinterpret_cast<struct sockaddr_in *>(
+ &remote_endpoints_[current_position_]);
+ address_v4::bytes_type address_bytes;
+ std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin_addr),
+ address_bytes.size(), address_bytes.begin());
+ address_v4 address(address_bytes);
+ remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin_port));
+ } else {
+ auto addr = reinterpret_cast<struct sockaddr_in6 *>(
+ &remote_endpoints_[current_position_]);
+ address_v6::bytes_type address_bytes;
+ std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin6_addr),
+ address_bytes.size(), address_bytes.begin());
+ address_v6 address(address_bytes);
+ remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin6_port));
+ }
+
+ /**
+ * Create new connector sharing the same socket of this listener.
+ */
+ auto ret = connectors_.emplace(
+ connector_id,
+ std::make_shared<UdpTunnelConnector>(
+ socket_, strand_, receive_callback_,
+ [](Connector *, const std::error_code &) {}, [](Connector *) {},
+ [](Connector *) {}, std::move(remote_endpoint_)));
+ connector = ret.first;
+ connector->second->setConnectorId(connector_id);
+ }
+
+ /**
+ * Use connector callback to process incoming message.
+ */
+ UdpTunnelConnector *c =
+ dynamic_cast<UdpTunnelConnector *>(connector->second.get());
+ c->doRecvPacket(*packet);
+
+ ++current_position_;
+ }
+
+ doRecvPacket();
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ TRANSPORT_LOGE("The connection has been closed by the application.");
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ }
+}
+#endif
+
+void UdpTunnelListener::doRecvPacket() {
+#ifdef LINUX
+#if ((ASIO_VERSION / 100 % 1000) < 11)
+ socket_->async_receive(
+ asio::null_buffers(),
+#else
+ socket_->async_wait(
+ asio::ip::tcp::socket::wait_read,
+#endif
+ std::bind(&UdpTunnelListener::readHandler, this, std::placeholders::_1));
+#else
+ read_msg_ = Connector::getRawBuffer();
+ socket_->async_receive_from(
+ asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_,
+ [this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ auto packet = Connector::getPacketFromBuffer(read_msg_.first, length);
+ auto connector_id =
+ std::hash<asio::ip::udp::endpoint>{}(remote_endpoint_);
+ auto connector = connectors_.find(connector_id);
+ if (connector == connectors_.end()) {
+ // Create new connector corresponding to new client
+ auto ret = connectors_.emplace(
+ connector_id, std::make_shared<UdpTunnelConnector>(
+ socket_, strand_, receive_callback_,
+ [](Connector *, const std::error_code &) {},
+ [](Connector *) {}, [](Connector *) {},
+ std::move(remote_endpoint_)));
+ connector = ret.first;
+ connector->second->setConnectorId(connector_id);
+ }
+
+ UdpTunnelConnector *c =
+ dynamic_cast<UdpTunnelConnector *>(connector->second.get());
+ c->doRecvPacket(*packet);
+ doRecvPacket();
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ TRANSPORT_LOGE("The connection has been closed by the application.");
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ }
+ });
+#endif
+}
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h
new file mode 100644
index 000000000..0ee40a400
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/platform.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <unordered_map>
+
+namespace std {
+template <>
+struct hash<asio::ip::udp::endpoint> {
+ size_t operator()(const asio::ip::udp::endpoint &endpoint) const;
+};
+} // namespace std
+
+namespace transport {
+namespace core {
+
+class UdpTunnelListener
+ : public std::enable_shared_from_this<UdpTunnelListener> {
+ using PacketReceivedCallback = Connector::PacketReceivedCallback;
+ using EndpointId = std::pair<uint32_t, uint16_t>;
+
+ static constexpr uint16_t default_port = 5004;
+
+ public:
+ using Ptr = std::shared_ptr<UdpTunnelListener>;
+
+ template <typename ReceiveCallback>
+ UdpTunnelListener(asio::io_service &io_service,
+ ReceiveCallback &&receive_callback,
+ asio::ip::udp::endpoint endpoint = asio::ip::udp::endpoint(
+ asio::ip::udp::v4(), default_port))
+ : io_service_(io_service),
+ strand_(std::make_shared<asio::io_service::strand>(io_service_)),
+ socket_(std::make_shared<asio::ip::udp::socket>(io_service_,
+ endpoint.protocol())),
+ local_endpoint_(endpoint),
+ receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)),
+#ifndef LINUX
+ read_msg_(nullptr, 0)
+#else
+ iovecs_{0},
+ msgs_{0},
+ current_position_(0)
+#endif
+ {
+ if (endpoint.protocol() == asio::ip::udp::v6()) {
+ std::error_code ec;
+ socket_->set_option(asio::ip::v6_only(false), ec);
+ // Call succeeds only on dual stack systems.
+ }
+ socket_->bind(local_endpoint_);
+ io_service_.post(std::bind(&UdpTunnelListener::doRecvPacket, this));
+ }
+
+ ~UdpTunnelListener();
+
+ void close();
+
+ int deleteConnector(Connector *connector) {
+ return connectors_.erase(connector->getConnectorId());
+ }
+
+ template <typename ReceiveCallback>
+ void setReceiveCallback(ReceiveCallback &&callback) {
+ receive_callback_ = std::forward<ReceiveCallback &&>(callback);
+ }
+
+ Connector *findConnector(Connector::Id connId) {
+ auto it = connectors_.find(connId);
+ if (it != connectors_.end()) {
+ return it->second.get();
+ }
+
+ return nullptr;
+ }
+
+ private:
+ void doRecvPacket();
+
+ void readHandler(std::error_code ec);
+
+ asio::io_service &io_service_;
+ std::shared_ptr<asio::io_service::strand> strand_;
+ std::shared_ptr<asio::ip::udp::socket> socket_;
+ asio::ip::udp::endpoint local_endpoint_;
+ asio::ip::udp::endpoint remote_endpoint_;
+ std::unordered_map<Connector::Id, std::shared_ptr<Connector>> connectors_;
+
+ PacketReceivedCallback receive_callback_;
+
+#ifdef LINUX
+ struct iovec iovecs_[Connector::max_burst][8];
+ struct mmsghdr msgs_[Connector::max_burst];
+ struct sockaddr_storage remote_endpoints_[Connector::max_burst];
+ std::uint8_t current_position_;
+#else
+ std::pair<uint8_t *, std::size_t> read_msg_;
+#endif
+};
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/loopback/CMakeLists.txt b/libtransport/src/io_modules/loopback/CMakeLists.txt
new file mode 100644
index 000000000..ac6dc8068
--- /dev/null
+++ b/libtransport/src/io_modules/loopback/CMakeLists.txt
@@ -0,0 +1,34 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+
+list(APPEND MODULE_HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/loopback_module.h
+)
+
+list(APPEND MODULE_SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/loopback_module.cc
+)
+
+build_module(loopback_module
+ SHARED
+ SOURCES ${MODULE_SOURCE_FILES}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT lib${LIBTRANSPORT}
+ INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}
+ # LIBRARY_ROOT_DIR "vpp_plugins"
+ DEFINITIONS ${COMPILER_DEFINITIONS}
+ COMPILE_OPTIONS ${COMPILE_FLAGS}
+)
diff --git a/libtransport/src/io_modules/loopback/local_face.cc b/libtransport/src/io_modules/loopback/local_face.cc
new file mode 100644
index 000000000..a59dab235
--- /dev/null
+++ b/libtransport/src/io_modules/loopback/local_face.cc
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/utils/log.h>
+#include <io_modules/loopback/local_face.h>
+
+#include <asio/io_service.hpp>
+
+namespace transport {
+namespace core {
+
+Face::Face(Connector::PacketReceivedCallback &&receive_callback,
+ asio::io_service &io_service, const std::string &app_name)
+ : receive_callback_(std::move(receive_callback)),
+ io_service_(io_service),
+ name_(app_name) {}
+
+Face::Face(const Face &other)
+ : receive_callback_(other.receive_callback_),
+ io_service_(other.io_service_),
+ name_(other.name_) {}
+
+Face::Face(Face &&other)
+ : receive_callback_(std::move(other.receive_callback_)),
+ io_service_(other.io_service_),
+ name_(std::move(other.name_)) {}
+
+Face &Face::operator=(const Face &other) {
+ receive_callback_ = other.receive_callback_;
+ io_service_ = other.io_service_;
+ name_ = other.name_;
+
+ return *this;
+}
+
+Face &Face::operator=(Face &&other) {
+ receive_callback_ = std::move(other.receive_callback_);
+ io_service_ = std::move(other.io_service_);
+ name_ = std::move(other.name_);
+
+ return *this;
+}
+
+void Face::onPacket(const Packet &packet) {
+ TRANSPORT_LOGD("Sending content to local socket.");
+
+ if (Packet::isInterest(packet.data())) {
+ rescheduleOnIoService<Interest>(packet);
+ } else {
+ rescheduleOnIoService<ContentObject>(packet);
+ }
+}
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/loopback/local_face.h b/libtransport/src/io_modules/loopback/local_face.h
new file mode 100644
index 000000000..1cbcc2c72
--- /dev/null
+++ b/libtransport/src/io_modules/loopback/local_face.h
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/utils/move_wrapper.h>
+
+#include <asio/io_service.hpp>
+
+namespace transport {
+namespace core {
+
+class Face {
+ public:
+ Face(Connector::PacketReceivedCallback &&receive_callback,
+ asio::io_service &io_service, const std::string &app_name);
+
+ Face(const Face &other);
+ Face(Face &&other);
+ void onPacket(const Packet &packet);
+ Face &operator=(Face &&other);
+ Face &operator=(const Face &other);
+
+ private:
+ template <typename T>
+ void rescheduleOnIoService(const Packet &packet) {
+ auto p = core::PacketManager<T>::getInstance().getPacket();
+ p->replace(packet.data(), packet.length());
+ io_service_.get().post([this, p]() mutable {
+ receive_callback_(nullptr, *p, make_error_code(0));
+ });
+ }
+
+ Connector::PacketReceivedCallback receive_callback_;
+ std::reference_wrapper<asio::io_service> io_service_;
+ std::string name_;
+};
+
+} // namespace core
+} // namespace transport
diff --git a/libtransport/src/io_modules/loopback/loopback_module.cc b/libtransport/src/io_modules/loopback/loopback_module.cc
new file mode 100644
index 000000000..0bdbf8c8e
--- /dev/null
+++ b/libtransport/src/io_modules/loopback/loopback_module.cc
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/errors/not_implemented_exception.h>
+#include <hicn/transport/utils/log.h>
+#include <io_modules/loopback/loopback_module.h>
+
+namespace transport {
+
+namespace core {
+
+std::vector<std::unique_ptr<LocalConnector>> LoopbackModule::local_faces_;
+std::atomic<uint32_t> LoopbackModule::global_counter_(0);
+
+LoopbackModule::LoopbackModule() : IoModule(), local_id_(~0) {}
+
+LoopbackModule::~LoopbackModule() {}
+
+void LoopbackModule::connect(bool is_consumer) {}
+
+bool LoopbackModule::isConnected() { return true; }
+
+void LoopbackModule::send(Packet &packet) {
+ IoModule::send(packet);
+
+ TRANSPORT_LOGD("LoopbackModule: sending from %u to %d", local_id_,
+ 1 - local_id_);
+
+ local_faces_.at(1 - local_id_)->send(packet);
+}
+
+void LoopbackModule::send(const uint8_t *packet, std::size_t len) {
+ // not supported
+ throw errors::NotImplementedException();
+}
+
+void LoopbackModule::registerRoute(const Prefix &prefix) {
+ // For the moment we route packets from one socket to the other.
+ // Next step will be to introduce a FIB
+ return;
+}
+
+void LoopbackModule::closeConnection() {
+ local_faces_.erase(local_faces_.begin() + local_id_);
+}
+
+void LoopbackModule::init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name) {
+ if (local_id_ == uint32_t(~0) && global_counter_ < 2) {
+ local_id_ = global_counter_++;
+ local_faces_.emplace(
+ local_faces_.begin() + local_id_,
+ new LocalConnector(io_service, std::move(receive_callback), nullptr,
+ nullptr, std::move(reconnect_callback)));
+ }
+}
+
+void LoopbackModule::processControlMessageReply(utils::MemBuf &packet_buffer) {
+ return;
+}
+
+std::uint32_t LoopbackModule::getMtu() { return interface_mtu; }
+
+bool LoopbackModule::isControlMessage(const uint8_t *message) { return false; }
+
+extern "C" IoModule *create_module(void) { return new LoopbackModule(); }
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/loopback/loopback_module.h b/libtransport/src/io_modules/loopback/loopback_module.h
new file mode 100644
index 000000000..219fa8841
--- /dev/null
+++ b/libtransport/src/io_modules/loopback/loopback_module.h
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <core/local_connector.h>
+#include <hicn/transport/core/io_module.h>
+#include <hicn/transport/core/prefix.h>
+
+#include <atomic>
+
+namespace transport {
+
+namespace core {
+
+class LoopbackModule : public IoModule {
+ static constexpr std::uint16_t interface_mtu = 1500;
+
+ public:
+ LoopbackModule();
+
+ ~LoopbackModule();
+
+ void connect(bool is_consumer) override;
+
+ void send(Packet &packet) override;
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ bool isConnected() override;
+
+ void init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name = "Libtransport") override;
+
+ void registerRoute(const Prefix &prefix) override;
+
+ std::uint32_t getMtu() override;
+
+ bool isControlMessage(const uint8_t *message) override;
+
+ void processControlMessageReply(utils::MemBuf &packet_buffer) override;
+
+ void closeConnection() override;
+
+ private:
+ static std::vector<std::unique_ptr<LocalConnector>> local_faces_;
+ static std::atomic<uint32_t> global_counter_;
+
+ private:
+ uint32_t local_id_;
+};
+
+extern "C" IoModule *create_module(void);
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/memif/CMakeLists.txt b/libtransport/src/io_modules/memif/CMakeLists.txt
new file mode 100644
index 000000000..c8a930e7b
--- /dev/null
+++ b/libtransport/src/io_modules/memif/CMakeLists.txt
@@ -0,0 +1,56 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+find_package(Vpp REQUIRED)
+find_package(Libmemif REQUIRED)
+
+if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR)
+ find_package(HicnPlugin REQUIRED)
+ find_package(SafeVapi REQUIRED)
+else()
+ list(APPEND DEPENDENCIES
+ ${SAFE_VAPI_SHARED}
+ )
+endif()
+
+list(APPEND MODULE_HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/hicn_vapi.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_vapi.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_module.h
+)
+
+list(APPEND MODULE_SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/hicn_vapi.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_vapi.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_module.cc
+)
+
+build_module(memif_module
+ SHARED
+ SOURCES ${MODULE_SOURCE_FILES}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT lib${LIBTRANSPORT}
+ LINK_LIBRARIES ${LIBMEMIF_LIBRARIES} ${SAFE_VAPI_LIBRARIES}
+ INCLUDE_DIRS
+ ${LIBTRANSPORT_INCLUDE_DIRS}
+ ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}
+ ${VPP_INCLUDE_DIRS}
+ ${LIBMEMIF_INCLUDE_DIRS}
+ ${SAFE_VAPI_INCLUDE_DIRS}
+ DEFINITIONS ${COMPILER_DEFINITIONS}
+ COMPILE_OPTIONS ${COMPILE_FLAGS}
+)
diff --git a/libtransport/src/io_modules/memif/hicn_vapi.c b/libtransport/src/io_modules/memif/hicn_vapi.c
new file mode 100644
index 000000000..b83a36b47
--- /dev/null
+++ b/libtransport/src/io_modules/memif/hicn_vapi.c
@@ -0,0 +1,229 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/utils/log.h>
+#include <io_modules/memif/hicn_vapi.h>
+
+#define HICN_VPP_PLUGIN
+#include <hicn/name.h>
+#undef HICN_VPP_PLUGIN
+
+#include <vapi/hicn.api.vapi.h>
+#include <vapi/ip.api.vapi.h>
+#include <vapi/vapi_safe.h>
+#include <vlib/vlib.h>
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+#include <vnet/ip/format.h>
+#include <vnet/ip/ip4_packet.h>
+#include <vnet/ip/ip6_packet.h>
+#include <vpp_plugins/hicn/error.h>
+#include <vppinfra/error.h>
+
+/////////////////////////////////////////////////////
+const char *HICN_ERROR_STRING[] = {
+#define _(a, b, c) c,
+ foreach_hicn_error
+#undef _
+};
+/////////////////////////////////////////////////////
+
+/*********************** Missing Symbol in vpp libraries
+ * *************************/
+u8 *format_vl_api_address_union(u8 *s, va_list *args) { return NULL; }
+
+/*********************************************************************************/
+
+DEFINE_VAPI_MSG_IDS_HICN_API_JSON
+DEFINE_VAPI_MSG_IDS_IP_API_JSON
+
+static vapi_error_e register_prod_app_cb(
+ vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+ vapi_payload_hicn_api_register_prod_app_reply *reply) {
+ hicn_producer_output_params *output_params =
+ (hicn_producer_output_params *)callback_ctx;
+
+ if (reply == NULL) return rv;
+
+ output_params->cs_reserved = reply->cs_reserved;
+ output_params->prod_addr = (ip_address_t *)malloc(sizeof(ip_address_t));
+ memset(output_params->prod_addr, 0, sizeof(ip_address_t));
+ if (reply->prod_addr.af == ADDRESS_IP6)
+ memcpy(&output_params->prod_addr->v6, reply->prod_addr.un.ip6,
+ sizeof(ip6_address_t));
+ else
+ memcpy(&output_params->prod_addr->v4, reply->prod_addr.un.ip4,
+ sizeof(ip4_address_t));
+ output_params->face_id = reply->faceid;
+
+ return reply->retval;
+}
+
+int hicn_vapi_register_prod_app(vapi_ctx_t ctx,
+ hicn_producer_input_params *input_params,
+ hicn_producer_output_params *output_params) {
+ vapi_lock();
+ vapi_msg_hicn_api_register_prod_app *msg =
+ vapi_alloc_hicn_api_register_prod_app(ctx);
+
+ if (ip46_address_is_ip4((ip46_address_t *)&input_params->prefix->address)) {
+ memcpy(&msg->payload.prefix.address.un.ip4, &input_params->prefix->address,
+ sizeof(ip4_address_t));
+ msg->payload.prefix.address.af = ADDRESS_IP4;
+ } else {
+ memcpy(&msg->payload.prefix.address.un.ip6, &input_params->prefix->address,
+ sizeof(ip6_address_t));
+ msg->payload.prefix.address.af = ADDRESS_IP6;
+ }
+ msg->payload.prefix.len = input_params->prefix->len;
+
+ msg->payload.swif = input_params->swif;
+ msg->payload.cs_reserved = input_params->cs_reserved;
+
+ int ret = vapi_hicn_api_register_prod_app(ctx, msg, register_prod_app_cb,
+ output_params);
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e face_prod_del_cb(
+ vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+ vapi_payload_hicn_api_face_prod_del_reply *reply) {
+ if (reply == NULL) return rv;
+
+ return reply->retval;
+}
+
+int hicn_vapi_face_prod_del(vapi_ctx_t ctx,
+ hicn_del_face_app_input_params *input_params) {
+ vapi_lock();
+ vapi_msg_hicn_api_face_prod_del *msg = vapi_alloc_hicn_api_face_prod_del(ctx);
+
+ msg->payload.faceid = input_params->face_id;
+
+ int ret = vapi_hicn_api_face_prod_del(ctx, msg, face_prod_del_cb, NULL);
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e register_cons_app_cb(
+ vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+ vapi_payload_hicn_api_register_cons_app_reply *reply) {
+ hicn_consumer_output_params *output_params =
+ (hicn_consumer_output_params *)callback_ctx;
+
+ if (reply == NULL) return rv;
+
+ output_params->src6 = (ip_address_t *)malloc(sizeof(ip_address_t));
+ output_params->src4 = (ip_address_t *)malloc(sizeof(ip_address_t));
+ memset(output_params->src6, 0, sizeof(ip_address_t));
+ memset(output_params->src4, 0, sizeof(ip_address_t));
+ memcpy(&output_params->src6->v6, &reply->src_addr6.un.ip6,
+ sizeof(ip6_address_t));
+ memcpy(&output_params->src4->v4, &reply->src_addr4.un.ip4,
+ sizeof(ip4_address_t));
+
+ output_params->face_id1 = reply->faceid1;
+ output_params->face_id2 = reply->faceid2;
+
+ return reply->retval;
+}
+
+int hicn_vapi_register_cons_app(vapi_ctx_t ctx,
+ hicn_consumer_input_params *input_params,
+ hicn_consumer_output_params *output_params) {
+ vapi_lock();
+ vapi_msg_hicn_api_register_cons_app *msg =
+ vapi_alloc_hicn_api_register_cons_app(ctx);
+
+ msg->payload.swif = input_params->swif;
+
+ int ret = vapi_hicn_api_register_cons_app(ctx, msg, register_cons_app_cb,
+ output_params);
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e face_cons_del_cb(
+ vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+ vapi_payload_hicn_api_face_cons_del_reply *reply) {
+ if (reply == NULL) return rv;
+
+ return reply->retval;
+}
+
+int hicn_vapi_face_cons_del(vapi_ctx_t ctx,
+ hicn_del_face_app_input_params *input_params) {
+ vapi_lock();
+ vapi_msg_hicn_api_face_cons_del *msg = vapi_alloc_hicn_api_face_cons_del(ctx);
+
+ msg->payload.faceid = input_params->face_id;
+
+ int ret = vapi_hicn_api_face_cons_del(ctx, msg, face_cons_del_cb, NULL);
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e reigster_route_cb(
+ vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+ vapi_payload_ip_route_add_del_reply *reply) {
+ if (reply == NULL) return rv;
+
+ return reply->retval;
+}
+
+int hicn_vapi_register_route(vapi_ctx_t ctx,
+ hicn_producer_set_route_params *input_params) {
+ vapi_lock();
+ vapi_msg_ip_route_add_del *msg = vapi_alloc_ip_route_add_del(ctx, 1);
+
+ msg->payload.is_add = 1;
+ if (ip46_address_is_ip4((ip46_address_t *)(input_params->prod_addr))) {
+ memcpy(&msg->payload.route.prefix.address.un.ip4,
+ &input_params->prefix->address.v4, sizeof(ip4_address_t));
+ msg->payload.route.prefix.address.af = ADDRESS_IP4;
+ msg->payload.route.prefix.len = input_params->prefix->len;
+ } else {
+ memcpy(&msg->payload.route.prefix.address.un.ip6,
+ &input_params->prefix->address.v6, sizeof(ip6_address_t));
+ msg->payload.route.prefix.address.af = ADDRESS_IP6;
+ msg->payload.route.prefix.len = input_params->prefix->len;
+ }
+
+ msg->payload.route.paths[0].sw_if_index = ~0;
+ msg->payload.route.paths[0].table_id = 0;
+ if (ip46_address_is_ip4((ip46_address_t *)(input_params->prod_addr))) {
+ memcpy(&(msg->payload.route.paths[0].nh.address.ip4),
+ input_params->prod_addr->v4.as_u8, sizeof(ip4_address_t));
+ msg->payload.route.paths[0].proto = FIB_API_PATH_NH_PROTO_IP4;
+ } else {
+ memcpy(&(msg->payload.route.paths[0].nh.address.ip6),
+ input_params->prod_addr->v6.as_u8, sizeof(ip6_address_t));
+ msg->payload.route.paths[0].proto = FIB_API_PATH_NH_PROTO_IP6;
+ }
+
+ msg->payload.route.paths[0].type = FIB_API_PATH_FLAG_NONE;
+ msg->payload.route.paths[0].flags = FIB_API_PATH_FLAG_NONE;
+
+ int ret = vapi_ip_route_add_del(ctx, msg, reigster_route_cb, NULL);
+
+ vapi_unlock();
+ return ret;
+}
+
+char *hicn_vapi_get_error_string(int ret_val) {
+ return get_error_string(ret_val);
+}
diff --git a/libtransport/src/io_modules/memif/hicn_vapi.h b/libtransport/src/io_modules/memif/hicn_vapi.h
new file mode 100644
index 000000000..e94c97749
--- /dev/null
+++ b/libtransport/src/io_modules/memif/hicn_vapi.h
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+#include <hicn/util/ip_address.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <vapi/vapi.h>
+
+#include "stdint.h"
+
+typedef struct {
+ ip_prefix_t* prefix;
+ uint32_t swif;
+ uint32_t cs_reserved;
+} hicn_producer_input_params;
+
+typedef struct {
+ uint32_t swif;
+} hicn_consumer_input_params;
+
+typedef struct {
+ uint32_t face_id;
+} hicn_del_face_app_input_params;
+
+typedef struct {
+ uint32_t cs_reserved;
+ ip_address_t* prod_addr;
+ uint32_t face_id;
+} hicn_producer_output_params;
+
+typedef struct {
+ ip_address_t* src4;
+ ip_address_t* src6;
+ uint32_t face_id1;
+ uint32_t face_id2;
+} hicn_consumer_output_params;
+
+typedef struct {
+ ip_prefix_t* prefix;
+ ip_address_t* prod_addr;
+} hicn_producer_set_route_params;
+
+int hicn_vapi_register_prod_app(vapi_ctx_t ctx,
+ hicn_producer_input_params* input_params,
+ hicn_producer_output_params* output_params);
+
+int hicn_vapi_register_cons_app(vapi_ctx_t ctx,
+ hicn_consumer_input_params* input_params,
+ hicn_consumer_output_params* output_params);
+
+int hicn_vapi_register_route(vapi_ctx_t ctx,
+ hicn_producer_set_route_params* input_params);
+
+int hicn_vapi_face_cons_del(vapi_ctx_t ctx,
+ hicn_del_face_app_input_params* input_params);
+
+int hicn_vapi_face_prod_del(vapi_ctx_t ctx,
+ hicn_del_face_app_input_params* input_params);
+
+char* hicn_vapi_get_error_string(int ret_val);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/libtransport/src/io_modules/memif/memif_connector.cc b/libtransport/src/io_modules/memif/memif_connector.cc
new file mode 100644
index 000000000..4a688d68f
--- /dev/null
+++ b/libtransport/src/io_modules/memif/memif_connector.cc
@@ -0,0 +1,493 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/errors/not_implemented_exception.h>
+#include <io_modules/memif/memif_connector.h>
+#include <sys/epoll.h>
+
+#include <cstdlib>
+
+extern "C" {
+#include <memif/libmemif.h>
+};
+
+#define CANCEL_TIMER 1
+
+namespace transport {
+
+namespace core {
+
+struct memif_connection {
+ uint16_t index;
+ /* memif conenction handle */
+ memif_conn_handle_t conn;
+ /* transmit queue id */
+ uint16_t tx_qid;
+ /* tx buffers */
+ memif_buffer_t *tx_bufs;
+ /* allocated tx buffers counter */
+ /* number of tx buffers pointing to shared memory */
+ uint16_t tx_buf_num;
+ /* rx buffers */
+ memif_buffer_t *rx_bufs;
+ /* allcoated rx buffers counter */
+ /* number of rx buffers pointing to shared memory */
+ uint16_t rx_buf_num;
+ /* interface ip address */
+ uint8_t ip_addr[4];
+};
+
+std::once_flag MemifConnector::flag_;
+utils::EpollEventReactor MemifConnector::main_event_reactor_;
+
+MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
+ PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback,
+ OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service,
+ std::string app_name)
+ : Connector(std::move(receive_callback), std::move(packet_sent),
+ std::move(close_callback), std::move(on_reconnect)),
+ memif_worker_(nullptr),
+ timer_set_(false),
+ send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
+ disconnect_timer_(
+ std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
+ io_service_(io_service),
+ memif_connection_(std::make_unique<memif_connection_t>()),
+ tx_buf_counter_(0),
+ is_reconnection_(false),
+ data_available_(false),
+ app_name_(app_name),
+ socket_filename_("") {
+ std::call_once(MemifConnector::flag_, &MemifConnector::init, this);
+}
+
+MemifConnector::~MemifConnector() { close(); }
+
+void MemifConnector::init() {
+ /* initialize memory interface */
+ int err = memif_init(controlFdUpdate, const_cast<char *>(app_name_.c_str()),
+ nullptr, nullptr, nullptr);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_init: %s", memif_strerror(err));
+ }
+}
+
+void MemifConnector::connect(uint32_t memif_id, long memif_mode) {
+ state_ = State::CONNECTING;
+
+ memif_id_ = memif_id;
+ socket_filename_ = "/run/vpp/memif.sock";
+
+ createMemif(memif_id, memif_mode, nullptr);
+
+ work_ = std::make_unique<asio::io_service::work>(io_service_);
+
+ while (state_ != State::CONNECTED) {
+ MemifConnector::main_event_reactor_.runOneEvent();
+ }
+
+ int err;
+
+ /* get interrupt queue id */
+ int fd = -1;
+ err = memif_get_queue_efd(memif_connection_->conn, 0, &fd);
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_get_queue_efd: %s", memif_strerror(err));
+ return;
+ }
+
+ // Remove fd from main epoll
+ main_event_reactor_.delFileDescriptor(fd);
+
+ // Add fd to epoll of instance
+ event_reactor_.addFileDescriptor(
+ fd, EPOLLIN, [this](const utils::Event &evt) -> int {
+ return onInterrupt(memif_connection_->conn, this, 0);
+ });
+
+ memif_worker_ = std::make_unique<std::thread>(
+ std::bind(&MemifConnector::threadMain, this));
+}
+
+int MemifConnector::createMemif(uint32_t index, uint8_t mode, char *s) {
+ memif_connection_t *c = memif_connection_.get();
+
+ /* setting memif connection arguments */
+ memif_conn_args_t args;
+ memset(&args, 0, sizeof(args));
+
+ args.is_master = mode;
+ args.log2_ring_size = MEMIF_LOG2_RING_SIZE;
+ args.buffer_size = MEMIF_BUF_SIZE;
+ args.num_s2m_rings = 1;
+ args.num_m2s_rings = 1;
+ strncpy((char *)args.interface_name, IF_NAME, strlen(IF_NAME) + 1);
+ args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP;
+
+ int err;
+
+ err = memif_create_socket(&args.socket, socket_filename_.c_str(), nullptr);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ throw errors::RuntimeException(memif_strerror(err));
+ }
+
+ args.interface_id = index;
+ /* last argument for memif_create (void * private_ctx) is used by user
+ to identify connection. this context is returned with callbacks */
+
+ /* default interrupt */
+ if (s == nullptr) {
+ err = memif_create(&c->conn, &args, onConnect, onDisconnect, onInterrupt,
+ this);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ throw errors::RuntimeException(memif_strerror(err));
+ }
+ }
+
+ c->index = (uint16_t)index;
+ c->tx_qid = 0;
+ /* alloc memif buffers */
+ c->rx_buf_num = 0;
+ c->rx_bufs = static_cast<memif_buffer_t *>(
+ malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS));
+ c->tx_buf_num = 0;
+ c->tx_bufs = static_cast<memif_buffer_t *>(
+ malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS));
+
+ // memif_set_rx_mode (c->conn, MEMIF_RX_MODE_POLLING, 0);
+
+ return 0;
+}
+
+int MemifConnector::deleteMemif() {
+ memif_connection_t *c = memif_connection_.get();
+
+ if (c->rx_bufs) {
+ free(c->rx_bufs);
+ }
+
+ c->rx_bufs = nullptr;
+ c->rx_buf_num = 0;
+
+ if (c->tx_bufs) {
+ free(c->tx_bufs);
+ }
+
+ c->tx_bufs = nullptr;
+ c->tx_buf_num = 0;
+
+ int err;
+ /* disconenct then delete memif connection */
+ err = memif_delete(&c->conn);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_delete: %s", memif_strerror(err));
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(c->conn != nullptr)) {
+ TRANSPORT_LOGE("memif delete fail");
+ }
+
+ return 0;
+}
+
+int MemifConnector::controlFdUpdate(int fd, uint8_t events, void *private_ctx) {
+ /* convert memif event definitions to epoll events */
+ if (events & MEMIF_FD_EVENT_DEL) {
+ return MemifConnector::main_event_reactor_.delFileDescriptor(fd);
+ }
+
+ uint32_t evt = 0;
+
+ if (events & MEMIF_FD_EVENT_READ) {
+ evt |= EPOLLIN;
+ }
+
+ if (events & MEMIF_FD_EVENT_WRITE) {
+ evt |= EPOLLOUT;
+ }
+
+ if (events & MEMIF_FD_EVENT_MOD) {
+ return MemifConnector::main_event_reactor_.modFileDescriptor(fd, evt);
+ }
+
+ return MemifConnector::main_event_reactor_.addFileDescriptor(
+ fd, evt, [](const utils::Event &evt) -> int {
+ uint32_t event = 0;
+ int memif_err = 0;
+
+ if (evt.events & EPOLLIN) {
+ event |= MEMIF_FD_EVENT_READ;
+ }
+
+ if (evt.events & EPOLLOUT) {
+ event |= MEMIF_FD_EVENT_WRITE;
+ }
+
+ if (evt.events & EPOLLERR) {
+ event |= MEMIF_FD_EVENT_ERROR;
+ }
+
+ memif_err = memif_control_fd_handler(evt.data.fd, event);
+
+ if (TRANSPORT_EXPECT_FALSE(memif_err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_control_fd_handler: %s",
+ memif_strerror(memif_err));
+ }
+
+ return 0;
+ });
+}
+
+int MemifConnector::bufferAlloc(long n, uint16_t qid) {
+ memif_connection_t *c = memif_connection_.get();
+ int err;
+ uint16_t r;
+ /* set data pointer to shared memory and set buffer_len to shared mmeory
+ * buffer len */
+ err = memif_buffer_alloc(c->conn, qid, c->tx_bufs, n, &r, 2000);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_buffer_alloc: %s", memif_strerror(err));
+ return -1;
+ }
+
+ c->tx_buf_num += r;
+ return r;
+}
+
+int MemifConnector::txBurst(uint16_t qid) {
+ memif_connection_t *c = memif_connection_.get();
+ int err;
+ uint16_t r;
+ /* inform peer memif interface about data in shared memory buffers */
+ /* mark memif buffers as free */
+ err = memif_tx_burst(c->conn, qid, c->tx_bufs, c->tx_buf_num, &r);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_tx_burst: %s", memif_strerror(err));
+ }
+
+ // err = memif_refill_queue(c->conn, qid, r, 0);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_tx_burst: %s", memif_strerror(err));
+ c->tx_buf_num -= r;
+ return -1;
+ }
+
+ c->tx_buf_num -= r;
+ return 0;
+}
+
+void MemifConnector::sendCallback(const std::error_code &ec) {
+ timer_set_ = false;
+
+ if (TRANSPORT_EXPECT_TRUE(!ec && state_ == State::CONNECTED)) {
+ doSend();
+ }
+}
+
+void MemifConnector::processInputBuffer(std::uint16_t total_packets) {
+ utils::MemBuf::Ptr ptr;
+
+ for (; total_packets > 0; total_packets--) {
+ if (input_buffer_.pop(ptr)) {
+ receive_callback_(this, *ptr, std::make_error_code(std::errc(0)));
+ }
+ }
+}
+
+/* informs user about connected status. private_ctx is used by user to identify
+ connection (multiple connections WIP) */
+int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) {
+ MemifConnector *connector = (MemifConnector *)private_ctx;
+ connector->state_ = State::CONNECTED;
+ memif_refill_queue(conn, 0, -1, 0);
+
+ return 0;
+}
+
+/* informs user about disconnected status. private_ctx is used by user to
+ identify connection (multiple connections WIP) */
+int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) {
+ MemifConnector *connector = (MemifConnector *)private_ctx;
+ connector->state_ = State::CLOSED;
+ return 0;
+}
+
+void MemifConnector::threadMain() { event_reactor_.runEventLoop(1000); }
+
+int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
+ uint16_t qid) {
+ MemifConnector *connector = (MemifConnector *)private_ctx;
+
+ memif_connection_t *c = connector->memif_connection_.get();
+ int err = MEMIF_ERR_SUCCESS, ret_val;
+ uint16_t total_packets = 0;
+ uint16_t rx;
+
+ do {
+ err = memif_rx_burst(conn, qid, c->rx_bufs, MAX_MEMIF_BUFS, &rx);
+ ret_val = err;
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS &&
+ err != MEMIF_ERR_NOBUF)) {
+ TRANSPORT_LOGE("memif_rx_burst: %s", memif_strerror(err));
+ goto error;
+ }
+
+ c->rx_buf_num += rx;
+
+ if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) {
+ TRANSPORT_LOGE("socket stopped: ignoring %u packets", rx);
+ goto error;
+ }
+
+ std::size_t packet_length;
+ for (int i = 0; i < rx; i++) {
+ auto buffer = connector->getRawBuffer();
+ packet_length = (c->rx_bufs + i)->len;
+ std::memcpy(buffer.first, (c->rx_bufs + i)->data, packet_length);
+ auto packet = connector->getPacketFromBuffer(buffer.first, packet_length);
+
+ if (!connector->input_buffer_.push(std::move(packet))) {
+ TRANSPORT_LOGE("Error pushing packet. Ring buffer full.");
+
+ // TODO Here we should consider the possibility to signal the congestion
+ // to the application, that would react properly (e.g. slow down
+ // message)
+ }
+ }
+
+ /* mark memif buffers and shared memory buffers as free */
+ /* free processed buffers */
+
+ err = memif_refill_queue(conn, qid, rx, 0);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_buffer_free: %s", memif_strerror(err));
+ }
+
+ c->rx_buf_num -= rx;
+ total_packets += rx;
+
+ } while (ret_val == MEMIF_ERR_NOBUF);
+
+ connector->io_service_.post(
+ std::bind(&MemifConnector::processInputBuffer, connector, total_packets));
+
+ return 0;
+
+error:
+ err = memif_refill_queue(c->conn, qid, rx, 0);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_buffer_free: %s", memif_strerror(err));
+ }
+ c->rx_buf_num -= rx;
+
+ return 0;
+}
+
+void MemifConnector::close() {
+ if (state_ != State::CLOSED) {
+ disconnect_timer_->expiresFromNow(std::chrono::microseconds(50));
+ disconnect_timer_->asyncWait([this](const std::error_code &ec) {
+ deleteMemif();
+ event_reactor_.stop();
+ work_.reset();
+ });
+
+ if (memif_worker_ && memif_worker_->joinable()) {
+ memif_worker_->join();
+ }
+ }
+}
+
+void MemifConnector::send(Packet &packet) {
+ {
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+ output_buffer_.push_back(packet.shared_from_this());
+ }
+#if CANCEL_TIMER
+ if (!timer_set_) {
+ timer_set_ = true;
+ send_timer_->expiresFromNow(std::chrono::microseconds(50));
+ send_timer_->asyncWait(
+ std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1));
+ }
+#endif
+}
+
+int MemifConnector::doSend() {
+ std::size_t max = 0;
+ int32_t n = 0;
+ std::size_t size = 0;
+
+ {
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+ size = output_buffer_.size();
+ }
+
+ do {
+ max = size < MAX_MEMIF_BUFS ? size : MAX_MEMIF_BUFS;
+ n = bufferAlloc(max, memif_connection_->tx_qid);
+
+ if (TRANSPORT_EXPECT_FALSE(n < 0)) {
+ TRANSPORT_LOGE("Error allocating buffers.");
+ return -1;
+ }
+
+ for (uint16_t i = 0; i < n; i++) {
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+
+ auto packet = output_buffer_.front().get();
+ const utils::MemBuf *current = packet;
+ std::size_t offset = 0;
+ uint8_t *shared_buffer =
+ reinterpret_cast<uint8_t *>(memif_connection_->tx_bufs[i].data);
+ do {
+ std::memcpy(shared_buffer + offset, current->data(), current->length());
+ offset += current->length();
+ current = current->next();
+ } while (current != packet);
+
+ memif_connection_->tx_bufs[i].len = uint32_t(offset);
+
+ output_buffer_.pop_front();
+ }
+
+ txBurst(memif_connection_->tx_qid);
+
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+ size = output_buffer_.size();
+ } while (size > 0);
+
+ return 0;
+}
+
+void MemifConnector::send(const uint8_t *packet, std::size_t len) {
+ throw errors::NotImplementedException();
+}
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/io_modules/memif/memif_connector.h b/libtransport/src/io_modules/memif/memif_connector.h
new file mode 100644
index 000000000..bed3516dc
--- /dev/null
+++ b/libtransport/src/io_modules/memif/memif_connector.h
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/portability.h>
+#include <hicn/transport/utils/ring_buffer.h>
+//#include <hicn/transport/core/hicn_vapi.h>
+#include <utils/epoll_event_reactor.h>
+#include <utils/fd_deadline_timer.h>
+
+#include <asio.hpp>
+#include <deque>
+#include <mutex>
+#include <thread>
+
+#define _Static_assert static_assert
+
+namespace transport {
+
+namespace core {
+
+typedef struct memif_connection memif_connection_t;
+
+#define APP_NAME "libtransport"
+#define IF_NAME "vpp_connection"
+
+#define MEMIF_BUF_SIZE 2048
+#define MEMIF_LOG2_RING_SIZE 13
+#define MAX_MEMIF_BUFS (1 << MEMIF_LOG2_RING_SIZE)
+
+class MemifConnector : public Connector {
+ using memif_conn_handle_t = void *;
+ using PacketRing = utils::CircularFifo<utils::MemBuf::Ptr, queue_size>;
+
+ public:
+ MemifConnector(PacketReceivedCallback &&receive_callback,
+ PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback,
+ OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
+
+ ~MemifConnector() override;
+
+ void send(Packet &packet) override;
+
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ void close() override;
+
+ void connect(uint32_t memif_id, long memif_mode);
+
+ TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; };
+
+ private:
+ void init();
+
+ int doSend();
+
+ int createMemif(uint32_t index, uint8_t mode, char *s);
+
+ uint32_t getMemifConfiguration();
+
+ int deleteMemif();
+
+ static int controlFdUpdate(int fd, uint8_t events, void *private_ctx);
+
+ static int onConnect(memif_conn_handle_t conn, void *private_ctx);
+
+ static int onDisconnect(memif_conn_handle_t conn, void *private_ctx);
+
+ static int onInterrupt(memif_conn_handle_t conn, void *private_ctx,
+ uint16_t qid);
+
+ void threadMain();
+
+ int txBurst(uint16_t qid);
+
+ int bufferAlloc(long n, uint16_t qid);
+
+ void sendCallback(const std::error_code &ec);
+
+ void processInputBuffer(std::uint16_t total_packets);
+
+ private:
+ static utils::EpollEventReactor main_event_reactor_;
+ static std::unique_ptr<std::thread> main_worker_;
+
+ int epfd;
+ std::unique_ptr<std::thread> memif_worker_;
+ utils::EpollEventReactor event_reactor_;
+ std::atomic_bool timer_set_;
+ std::unique_ptr<utils::FdDeadlineTimer> send_timer_;
+ std::unique_ptr<utils::FdDeadlineTimer> disconnect_timer_;
+ asio::io_service &io_service_;
+ std::unique_ptr<asio::io_service::work> work_;
+ std::unique_ptr<memif_connection_t> memif_connection_;
+ uint16_t tx_buf_counter_;
+
+ PacketRing input_buffer_;
+ bool is_reconnection_;
+ bool data_available_;
+ uint32_t memif_id_;
+ uint8_t memif_mode_;
+ std::string app_name_;
+ uint16_t transmission_index_;
+ utils::SpinLock write_msgs_lock_;
+ std::string socket_filename_;
+
+ static std::once_flag flag_;
+};
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/io_modules/memif/memif_vapi.c b/libtransport/src/io_modules/memif/memif_vapi.c
new file mode 100644
index 000000000..b3da2b012
--- /dev/null
+++ b/libtransport/src/io_modules/memif/memif_vapi.c
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <fcntl.h>
+#include <hicn/transport/config.h>
+#include <inttypes.h>
+#include <io_modules/memif/memif_vapi.h>
+#include <semaphore.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <vapi/vapi_safe.h>
+#include <vppinfra/clib.h>
+
+DEFINE_VAPI_MSG_IDS_MEMIF_API_JSON
+
+static vapi_error_e memif_details_cb(vapi_ctx_t ctx, void *callback_ctx,
+ vapi_error_e rv, bool is_last,
+ vapi_payload_memif_details *reply) {
+ uint32_t *last_memif_id = (uint32_t *)callback_ctx;
+ uint32_t current_memif_id = 0;
+ if (reply != NULL) {
+ current_memif_id = reply->id;
+ } else {
+ return rv;
+ }
+
+ if (current_memif_id >= *last_memif_id) {
+ *last_memif_id = current_memif_id + 1;
+ }
+
+ return rv;
+}
+
+int memif_vapi_get_next_memif_id(vapi_ctx_t ctx, uint32_t *memif_id) {
+ vapi_lock();
+ vapi_msg_memif_dump *msg = vapi_alloc_memif_dump(ctx);
+ int ret = vapi_memif_dump(ctx, msg, memif_details_cb, memif_id);
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e memif_create_cb(vapi_ctx_t ctx, void *callback_ctx,
+ vapi_error_e rv, bool is_last,
+ vapi_payload_memif_create_reply *reply) {
+ memif_output_params_t *output_params = (memif_output_params_t *)callback_ctx;
+
+ if (reply == NULL) return rv;
+
+ output_params->sw_if_index = reply->sw_if_index;
+
+ return rv;
+}
+
+int memif_vapi_create_memif(vapi_ctx_t ctx, memif_create_params_t *input_params,
+ memif_output_params_t *output_params) {
+ vapi_lock();
+ vapi_msg_memif_create *msg = vapi_alloc_memif_create(ctx);
+
+ int ret = 0;
+ if (input_params->socket_id == ~0) {
+ // invalid socket-id
+ ret = -1;
+ goto END;
+ }
+
+ if (!is_pow2(input_params->ring_size)) {
+ // ring size must be power of 2
+ ret = -1;
+ goto END;
+ }
+
+ if (input_params->rx_queues > 255 || input_params->rx_queues < 1) {
+ // rx queue must be between 1 - 255
+ ret = -1;
+ goto END;
+ }
+
+ if (input_params->tx_queues > 255 || input_params->tx_queues < 1) {
+ // tx queue must be between 1 - 255
+ ret = -1;
+ goto END;
+ }
+
+ msg->payload.role = input_params->role;
+ msg->payload.mode = input_params->mode;
+ msg->payload.rx_queues = input_params->rx_queues;
+ msg->payload.tx_queues = input_params->tx_queues;
+ msg->payload.id = input_params->id;
+ msg->payload.socket_id = input_params->socket_id;
+ msg->payload.ring_size = input_params->ring_size;
+ msg->payload.buffer_size = input_params->buffer_size;
+
+ ret = vapi_memif_create(ctx, msg, memif_create_cb, output_params);
+END:
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e memif_delete_cb(vapi_ctx_t ctx, void *callback_ctx,
+ vapi_error_e rv, bool is_last,
+ vapi_payload_memif_delete_reply *reply) {
+ if (reply == NULL) return rv;
+
+ return reply->retval;
+}
+
+int memif_vapi_delete_memif(vapi_ctx_t ctx, uint32_t sw_if_index) {
+ vapi_lock();
+ vapi_msg_memif_delete *msg = vapi_alloc_memif_delete(ctx);
+
+ msg->payload.sw_if_index = sw_if_index;
+
+ int ret = vapi_memif_delete(ctx, msg, memif_delete_cb, NULL);
+ vapi_unlock();
+ return ret;
+}
diff --git a/libtransport/src/io_modules/memif/memif_vapi.h b/libtransport/src/io_modules/memif/memif_vapi.h
new file mode 100644
index 000000000..bcf06ed43
--- /dev/null
+++ b/libtransport/src/io_modules/memif/memif_vapi.h
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <vapi/memif.api.vapi.h>
+
+#include "stdint.h"
+
+typedef struct memif_create_params_s {
+ uint8_t role;
+ uint8_t mode;
+ uint8_t rx_queues;
+ uint8_t tx_queues;
+ uint32_t id;
+ uint32_t socket_id;
+ uint8_t secret[24];
+ uint32_t ring_size;
+ uint16_t buffer_size;
+ uint8_t hw_addr[6];
+} memif_create_params_t;
+
+typedef struct memif_output_params_s {
+ uint32_t sw_if_index;
+} memif_output_params_t;
+
+int memif_vapi_get_next_memif_id(vapi_ctx_t ctx, uint32_t *memif_id);
+
+int memif_vapi_create_memif(vapi_ctx_t ctx, memif_create_params_t *input_params,
+ memif_output_params_t *output_params);
+
+int memif_vapi_delete_memif(vapi_ctx_t ctx, uint32_t sw_if_index);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/libtransport/src/io_modules/memif/vpp_forwarder_module.cc b/libtransport/src/io_modules/memif/vpp_forwarder_module.cc
new file mode 100644
index 000000000..dcbcd7ed0
--- /dev/null
+++ b/libtransport/src/io_modules/memif/vpp_forwarder_module.cc
@@ -0,0 +1,263 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/errors/not_implemented_exception.h>
+#include <io_modules/memif/hicn_vapi.h>
+#include <io_modules/memif/memif_connector.h>
+#include <io_modules/memif/memif_vapi.h>
+#include <io_modules/memif/vpp_forwarder_module.h>
+
+extern "C" {
+#include <memif/libmemif.h>
+};
+
+typedef enum { MASTER = 0, SLAVE = 1 } memif_role_t;
+
+#define MEMIF_DEFAULT_RING_SIZE 2048
+#define MEMIF_DEFAULT_RX_QUEUES 1
+#define MEMIF_DEFAULT_TX_QUEUES 1
+#define MEMIF_DEFAULT_BUFFER_SIZE 2048
+
+namespace transport {
+
+namespace core {
+
+VPPForwarderModule::VPPForwarderModule()
+ : IoModule(),
+ connector_(nullptr),
+ sw_if_index_(~0),
+ face_id1_(~0),
+ face_id2_(~0),
+ is_consumer_(false) {}
+
+VPPForwarderModule::~VPPForwarderModule() { delete connector_; }
+
+void VPPForwarderModule::init(
+ Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service, const std::string &app_name) {
+ if (!connector_) {
+ connector_ =
+ new MemifConnector(std::move(receive_callback), 0, 0,
+ std::move(reconnect_callback), io_service, app_name);
+ }
+}
+
+void VPPForwarderModule::processControlMessageReply(
+ utils::MemBuf &packet_buffer) {
+ throw errors::NotImplementedException();
+}
+
+bool VPPForwarderModule::isControlMessage(const uint8_t *message) {
+ return false;
+}
+
+bool VPPForwarderModule::isConnected() { return connector_->isConnected(); };
+
+void VPPForwarderModule::send(Packet &packet) {
+ IoModule::send(packet);
+ connector_->send(packet);
+}
+
+void VPPForwarderModule::send(const uint8_t *packet, std::size_t len) {
+ counters_.tx_packets++;
+ counters_.tx_bytes += len;
+
+ // Perfect forwarding
+ connector_->send(packet, len);
+}
+
+std::uint32_t VPPForwarderModule::getMtu() { return interface_mtu; }
+
+/**
+ * @brief Create a memif interface in the local VPP forwarder.
+ */
+uint32_t VPPForwarderModule::getMemifConfiguration() {
+ memif_create_params_t input_params = {0};
+
+ int ret = memif_vapi_get_next_memif_id(VPPForwarderModule::sock_, &memif_id_);
+
+ if (ret < 0) {
+ throw errors::RuntimeException(
+ "Error getting next memif id. Could not create memif interface.");
+ }
+
+ input_params.id = memif_id_;
+ input_params.role = memif_role_t::MASTER;
+ input_params.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP;
+ input_params.rx_queues = MEMIF_DEFAULT_RX_QUEUES;
+ input_params.tx_queues = MEMIF_DEFAULT_TX_QUEUES;
+ input_params.ring_size = MEMIF_DEFAULT_RING_SIZE;
+ input_params.buffer_size = MEMIF_DEFAULT_BUFFER_SIZE;
+
+ memif_output_params_t output_params = {0};
+
+ ret = memif_vapi_create_memif(VPPForwarderModule::sock_, &input_params,
+ &output_params);
+
+ if (ret < 0) {
+ throw errors::RuntimeException(
+ "Error creating memif interface in the local VPP forwarder.");
+ }
+
+ return output_params.sw_if_index;
+}
+
+void VPPForwarderModule::consumerConnection() {
+ hicn_consumer_input_params input = {0};
+ hicn_consumer_output_params output = {0};
+ ip_address_t ip4_address;
+ ip_address_t ip6_address;
+
+ output.src4 = &ip4_address;
+ output.src6 = &ip6_address;
+ input.swif = sw_if_index_;
+
+ int ret =
+ hicn_vapi_register_cons_app(VPPForwarderModule::sock_, &input, &output);
+
+ if (ret < 0) {
+ throw errors::RuntimeException(hicn_vapi_get_error_string(ret));
+ }
+
+ face_id1_ = output.face_id1;
+ face_id2_ = output.face_id2;
+
+ std::memcpy(inet_address_.v4.as_u8, output.src4->v4.as_u8, IPV4_ADDR_LEN);
+
+ std::memcpy(inet6_address_.v6.as_u8, output.src6->v6.as_u8, IPV6_ADDR_LEN);
+}
+
+void VPPForwarderModule::producerConnection() {
+ // Producer connection will be set when we set the first route.
+}
+
+void VPPForwarderModule::connect(bool is_consumer) {
+ int retry = 20;
+
+ TRANSPORT_LOGI("Connecting to VPP through vapi.");
+ vapi_error_e ret = vapi_connect_safe(&sock_, 0);
+
+ while (ret != VAPI_OK && retry > 0) {
+ TRANSPORT_LOGE("Error connecting to VPP through vapi. Retrying..");
+ --retry;
+ ret = vapi_connect_safe(&sock_, 0);
+ }
+
+ if (ret != VAPI_OK) {
+ throw std::runtime_error(
+ "Impossible to connect to forwarder. Is VPP running?");
+ }
+
+ TRANSPORT_LOGI("Connected to VPP through vapi.");
+
+ sw_if_index_ = getMemifConfiguration();
+
+ is_consumer_ = is_consumer;
+ if (is_consumer_) {
+ consumerConnection();
+ }
+
+ connector_->connect(memif_id_, 0);
+ connector_->setRole(is_consumer_ ? Connector::Role::CONSUMER
+ : Connector::Role::PRODUCER);
+}
+
+void VPPForwarderModule::registerRoute(const Prefix &prefix) {
+ const ip_prefix_t &addr = prefix.toIpPrefixStruct();
+
+ ip_prefix_t producer_prefix;
+ ip_address_t producer_locator;
+
+ if (face_id1_ == uint32_t(~0)) {
+ hicn_producer_input_params input;
+ std::memset(&input, 0, sizeof(input));
+
+ hicn_producer_output_params output;
+ std::memset(&output, 0, sizeof(output));
+
+ input.prefix = &producer_prefix;
+ output.prod_addr = &producer_locator;
+
+ // Here we have to ask to the actual connector what is the
+ // memif_id, since this function should be called after the
+ // memif creation.n
+ input.swif = sw_if_index_;
+ input.prefix->address = addr.address;
+ input.prefix->family = addr.family;
+ input.prefix->len = addr.len;
+ input.cs_reserved = content_store_reserved_;
+
+ int ret =
+ hicn_vapi_register_prod_app(VPPForwarderModule::sock_, &input, &output);
+
+ if (ret < 0) {
+ throw errors::RuntimeException(hicn_vapi_get_error_string(ret));
+ }
+
+ inet6_address_ = *output.prod_addr;
+
+ face_id1_ = output.face_id;
+ } else {
+ hicn_producer_set_route_params params;
+ params.prefix = &producer_prefix;
+ params.prefix->address = addr.address;
+ params.prefix->family = addr.family;
+ params.prefix->len = addr.len;
+ params.prod_addr = &producer_locator;
+
+ int ret = hicn_vapi_register_route(VPPForwarderModule::sock_, &params);
+
+ if (ret < 0) {
+ throw errors::RuntimeException(hicn_vapi_get_error_string(ret));
+ }
+ }
+}
+
+void VPPForwarderModule::closeConnection() {
+ if (VPPForwarderModule::sock_) {
+ connector_->close();
+
+ if (is_consumer_) {
+ hicn_del_face_app_input_params params;
+ params.face_id = face_id1_;
+ hicn_vapi_face_cons_del(VPPForwarderModule::sock_, &params);
+ params.face_id = face_id2_;
+ hicn_vapi_face_cons_del(VPPForwarderModule::sock_, &params);
+ } else {
+ hicn_del_face_app_input_params params;
+ params.face_id = face_id1_;
+ hicn_vapi_face_prod_del(VPPForwarderModule::sock_, &params);
+ }
+
+ if (sw_if_index_ != uint32_t(~0)) {
+ int ret =
+ memif_vapi_delete_memif(VPPForwarderModule::sock_, sw_if_index_);
+ if (ret < 0) {
+ TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_);
+ }
+ }
+
+ vapi_disconnect_safe();
+ VPPForwarderModule::sock_ = nullptr;
+ }
+}
+
+extern "C" IoModule *create_module(void) { return new VPPForwarderModule(); }
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/memif/vpp_forwarder_module.h b/libtransport/src/io_modules/memif/vpp_forwarder_module.h
new file mode 100644
index 000000000..8c4114fed
--- /dev/null
+++ b/libtransport/src/io_modules/memif/vpp_forwarder_module.h
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/io_module.h>
+#include <hicn/transport/core/prefix.h>
+
+#ifdef always_inline
+#undef always_inline
+#endif
+extern "C" {
+#include <vapi/vapi_safe.h>
+};
+
+namespace transport {
+
+namespace core {
+
+class MemifConnector;
+
+class VPPForwarderModule : public IoModule {
+ static constexpr std::uint16_t interface_mtu = 1500;
+
+ public:
+ VPPForwarderModule();
+ ~VPPForwarderModule();
+
+ void connect(bool is_consumer) override;
+
+ void send(Packet &packet) override;
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ bool isConnected() override;
+
+ void init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name = "Libtransport") override;
+
+ void registerRoute(const Prefix &prefix) override;
+
+ std::uint32_t getMtu() override;
+
+ bool isControlMessage(const uint8_t *message) override;
+
+ void processControlMessageReply(utils::MemBuf &packet_buffer) override;
+
+ void closeConnection() override;
+
+ private:
+ uint32_t getMemifConfiguration();
+ void consumerConnection();
+ void producerConnection();
+
+ private:
+ MemifConnector *connector_;
+ uint32_t memif_id_;
+ uint32_t sw_if_index_;
+ // A consumer socket in vpp has two faces (ipv4 and ipv6)
+ uint32_t face_id1_;
+ uint32_t face_id2_;
+ bool is_consumer_;
+ vapi_ctx_t sock_;
+};
+
+extern "C" IoModule *create_module(void);
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_connector.cc b/libtransport/src/io_modules/raw_socket/raw_socket_connector.cc
new file mode 100644
index 000000000..0bfcc2a58
--- /dev/null
+++ b/libtransport/src/io_modules/raw_socket/raw_socket_connector.cc
@@ -0,0 +1,201 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/raw_socket_connector.h>
+#include <hicn/transport/utils/conversions.h>
+#include <hicn/transport/utils/log.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+
+#define MY_DEST_MAC0 0x0a
+#define MY_DEST_MAC1 0x7b
+#define MY_DEST_MAC2 0x7c
+#define MY_DEST_MAC3 0x1c
+#define MY_DEST_MAC4 0x4a
+#define MY_DEST_MAC5 0x14
+
+namespace transport {
+
+namespace core {
+
+RawSocketConnector::RawSocketConnector(
+ PacketReceivedCallback &&receive_callback,
+ OnReconnect &&on_reconnect_callback, asio::io_service &io_service,
+ std::string app_name)
+ : Connector(std::move(receive_callback), std::move(on_reconnect_callback)),
+ io_service_(io_service),
+ socket_(io_service_, raw_protocol(PF_PACKET, SOCK_RAW)),
+ // resolver_(io_service_),
+ timer_(io_service_),
+ read_msg_(packet_pool_.makePtr(nullptr)),
+ data_available_(false),
+ app_name_(app_name) {
+ memset(&link_layer_address_, 0, sizeof(link_layer_address_));
+}
+
+RawSocketConnector::~RawSocketConnector() {}
+
+void RawSocketConnector::connect(const std::string &interface_name,
+ const std::string &mac_address_str) {
+ state_ = ConnectorState::CONNECTING;
+ memset(&ethernet_header_, 0, sizeof(ethernet_header_));
+ struct ifreq ifr;
+ struct ifreq if_mac;
+ uint8_t mac_address[6];
+
+ utils::convertStringToMacAddress(mac_address_str, mac_address);
+
+ // Get interface mac address
+ int fd = static_cast<int>(socket_.native_handle());
+
+ /* Get the index of the interface to send on */
+ memset(&ifr, 0, sizeof(struct ifreq));
+ strncpy(ifr.ifr_name, interface_name.c_str(), interface_name.size());
+
+ // if (ioctl(fd, SIOCGIFINDEX, &if_idx) < 0) {
+ // perror("SIOCGIFINDEX");
+ // }
+
+ /* Get the MAC address of the interface to send on */
+ memset(&if_mac, 0, sizeof(struct ifreq));
+ strncpy(if_mac.ifr_name, interface_name.c_str(), interface_name.size());
+ if (ioctl(fd, SIOCGIFHWADDR, &if_mac) < 0) {
+ perror("SIOCGIFHWADDR");
+ throw errors::RuntimeException("Interface does not exist");
+ }
+
+ /* Ethernet header */
+ for (int i = 0; i < 6; i++) {
+ ethernet_header_.ether_shost[i] =
+ ((uint8_t *)&if_mac.ifr_hwaddr.sa_data)[i];
+ ethernet_header_.ether_dhost[i] = mac_address[i];
+ }
+
+ /* Ethertype field */
+ ethernet_header_.ether_type = htons(ETH_P_IPV6);
+
+ strcpy(ifr.ifr_name, interface_name.c_str());
+
+ if (0 == ioctl(fd, SIOCGIFHWADDR, &ifr)) {
+ memcpy(link_layer_address_.sll_addr, ifr.ifr_hwaddr.sa_data, 6);
+ }
+
+ // memset(&ifr, 0, sizeof(ifr));
+ // ioctl(fd, SIOCGIFFLAGS, &ifr);
+ // ifr.ifr_flags |= IFF_PROMISC;
+ // ioctl(fd, SIOCSIFFLAGS, &ifr);
+
+ link_layer_address_.sll_family = AF_PACKET;
+ link_layer_address_.sll_protocol = htons(ETH_P_ALL);
+ link_layer_address_.sll_ifindex = if_nametoindex(interface_name.c_str());
+ link_layer_address_.sll_hatype = 1;
+ link_layer_address_.sll_halen = 6;
+
+ // startConnectionTimer();
+ doConnect();
+ doRecvPacket();
+}
+
+void RawSocketConnector::send(const uint8_t *packet, std::size_t len,
+ const PacketSentCallback &packet_sent) {
+ if (packet_sent != 0) {
+ socket_.async_send(
+ asio::buffer(packet, len),
+ [packet_sent](std::error_code ec, std::size_t /*length*/) {
+ packet_sent();
+ });
+ } else {
+ if (state_ == ConnectorState::CONNECTED) {
+ socket_.send(asio::buffer(packet, len));
+ }
+ }
+}
+
+void RawSocketConnector::send(const Packet::MemBufPtr &packet) {
+ io_service_.post([this, packet]() {
+ bool write_in_progress = !output_buffer_.empty();
+ output_buffer_.push_back(std::move(packet));
+ if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
+ if (!write_in_progress) {
+ doSendPacket();
+ } else {
+ // Tell the handle connect it has data to write
+ data_available_ = true;
+ }
+ }
+ });
+}
+
+void RawSocketConnector::close() {
+ io_service_.post([this]() { socket_.close(); });
+}
+
+void RawSocketConnector::doSendPacket() {
+ auto packet = output_buffer_.front().get();
+ auto array = std::vector<asio::const_buffer>();
+
+ const utils::MemBuf *current = packet;
+ do {
+ array.push_back(asio::const_buffer(current->data(), current->length()));
+ current = current->next();
+ } while (current != packet);
+
+ socket_.async_send(
+ std::move(array),
+ [this /*, packet*/](std::error_code ec, std::size_t bytes_transferred) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ output_buffer_.pop_front();
+ if (!output_buffer_.empty()) {
+ doSendPacket();
+ }
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ }
+ });
+}
+
+void RawSocketConnector::doRecvPacket() {
+ read_msg_ = getPacket();
+ socket_.async_receive(
+ asio::buffer(read_msg_->writableData(), packet_size),
+ [this](std::error_code ec, std::size_t bytes_transferred) mutable {
+ if (!ec) {
+ // Ignore packets that are not for us
+ uint8_t *dst_mac_address = const_cast<uint8_t *>(read_msg_->data());
+ if (!std::memcmp(dst_mac_address, ethernet_header_.ether_shost,
+ ETHER_ADDR_LEN)) {
+ read_msg_->append(bytes_transferred);
+ read_msg_->trimStart(sizeof(struct ether_header));
+ receive_callback_(std::move(read_msg_));
+ }
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ }
+ doRecvPacket();
+ });
+}
+
+void RawSocketConnector::doConnect() {
+ state_ = ConnectorState::CONNECTED;
+ socket_.bind(raw_endpoint(&link_layer_address_, sizeof(link_layer_address_)));
+}
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_connector.h b/libtransport/src/io_modules/raw_socket/raw_socket_connector.h
new file mode 100644
index 000000000..aba4b1105
--- /dev/null
+++ b/libtransport/src/io_modules/raw_socket/raw_socket_connector.h
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <core/connector.h>
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/name.h>
+#include <linux/if_packet.h>
+#include <net/ethernet.h>
+#include <sys/socket.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <deque>
+
+namespace transport {
+
+namespace core {
+
+using asio::generic::raw_protocol;
+using raw_endpoint = asio::generic::basic_endpoint<raw_protocol>;
+
+class RawSocketConnector : public Connector {
+ public:
+ RawSocketConnector(PacketReceivedCallback &&receive_callback,
+ OnReconnect &&reconnect_callback,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
+
+ ~RawSocketConnector() override;
+
+ void send(const Packet::MemBufPtr &packet) override;
+
+ void send(const uint8_t *packet, std::size_t len,
+ const PacketSentCallback &packet_sent = 0) override;
+
+ void close() override;
+
+ void connect(const std::string &interface_name,
+ const std::string &mac_address_str);
+
+ private:
+ void doConnect();
+
+ void doRecvPacket();
+
+ void doSendPacket();
+
+ private:
+ asio::io_service &io_service_;
+ raw_protocol::socket socket_;
+
+ struct ether_header ethernet_header_;
+
+ struct sockaddr_ll link_layer_address_;
+
+ asio::steady_timer timer_;
+
+ utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
+
+ bool data_available_;
+ std::string app_name_;
+};
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_interface.cc b/libtransport/src/io_modules/raw_socket/raw_socket_interface.cc
new file mode 100644
index 000000000..dcf489f59
--- /dev/null
+++ b/libtransport/src/io_modules/raw_socket/raw_socket_interface.cc
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/raw_socket_interface.h>
+#include <hicn/transport/utils/linux.h>
+
+#include <fstream>
+
+namespace transport {
+
+namespace core {
+
+static std::string config_folder_path = "/etc/transport/interface.conf.d";
+
+RawSocketInterface::RawSocketInterface(RawSocketConnector &connector)
+ : ForwarderInterface<RawSocketInterface, RawSocketConnector>(connector) {}
+
+RawSocketInterface::~RawSocketInterface() {}
+
+void RawSocketInterface::connect(bool is_consumer) {
+ std::string complete_filename =
+ config_folder_path + std::string("/") + output_interface_;
+
+ std::ifstream is(complete_filename);
+ std::string interface;
+
+ if (is) {
+ is >> remote_mac_address_;
+ }
+
+ // Get interface ip address
+ struct sockaddr_in6 address = {0};
+ utils::retrieveInterfaceAddress(output_interface_, &address);
+
+ std::memcpy(&inet6_address_.v6.as_u8, &address.sin6_addr,
+ sizeof(address.sin6_addr));
+ connector_.connect(output_interface_, remote_mac_address_);
+}
+
+void RawSocketInterface::registerRoute(Prefix &prefix) { return; }
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_interface.h b/libtransport/src/io_modules/raw_socket/raw_socket_interface.h
new file mode 100644
index 000000000..7036cac7e
--- /dev/null
+++ b/libtransport/src/io_modules/raw_socket/raw_socket_interface.h
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <core/forwarder_interface.h>
+#include <core/raw_socket_connector.h>
+#include <hicn/transport/core/prefix.h>
+
+#include <atomic>
+#include <deque>
+
+namespace transport {
+
+namespace core {
+
+class RawSocketInterface
+ : public ForwarderInterface<RawSocketInterface, RawSocketConnector> {
+ public:
+ typedef RawSocketConnector ConnectorType;
+
+ RawSocketInterface(RawSocketConnector &connector);
+
+ ~RawSocketInterface();
+
+ void connect(bool is_consumer);
+
+ void registerRoute(Prefix &prefix);
+
+ std::uint16_t getMtu() { return interface_mtu; }
+
+ TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl(
+ const uint8_t *message) {
+ return false;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl(
+ Packet::MemBufPtr &&packet_buffer) {}
+
+ TRANSPORT_ALWAYS_INLINE void closeConnection(){};
+
+ private:
+ static constexpr std::uint16_t interface_mtu = 1500;
+ std::string remote_mac_address_;
+};
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/udp/CMakeLists.txt b/libtransport/src/io_modules/udp/CMakeLists.txt
new file mode 100644
index 000000000..1a43492dc
--- /dev/null
+++ b/libtransport/src/io_modules/udp/CMakeLists.txt
@@ -0,0 +1,47 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+
+list(APPEND MODULE_HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_module.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.h
+)
+
+list(APPEND MODULE_SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_module.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.cc
+)
+
+# add_executable(hicnlight_module MACOSX_BUNDLE ${MODULE_SOURCE_FILES})
+# target_include_directories(hicnlight_module PRIVATE ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS})
+# set_target_properties(hicnlight_module PROPERTIES
+# BUNDLE True
+# MACOSX_BUNDLE_GUI_IDENTIFIER my.domain.style.identifier.hicnlight_module
+# MACOSX_BUNDLE_BUNDLE_NAME hicnlight_module
+# MACOSX_BUNDLE_BUNDLE_VERSION "0.1"
+# MACOSX_BUNDLE_SHORT_VERSION_STRING "0.1"
+# # MACOSX_BUNDLE_INFO_PLIST ${CMAKE_SOURCE_DIR}/cmake/customtemplate.plist.in
+# )
+
+build_module(hicnlight_module
+ SHARED
+ SOURCES ${MODULE_SOURCE_FILES}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT lib${LIBTRANSPORT}
+ INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}
+ # LIBRARY_ROOT_DIR "vpp_plugins"
+ DEFINITIONS ${COMPILER_DEFINITIONS}
+ COMPILE_OPTIONS ${COMPILE_FLAGS}
+)
diff --git a/libtransport/src/io_modules/udp/hicn_forwarder_module.cc b/libtransport/src/io_modules/udp/hicn_forwarder_module.cc
new file mode 100644
index 000000000..ba08dd8c0
--- /dev/null
+++ b/libtransport/src/io_modules/udp/hicn_forwarder_module.cc
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <io_modules/udp/hicn_forwarder_module.h>
+#include <io_modules/udp/udp_socket_connector.h>
+
+union AddressLight {
+ uint32_t ipv4;
+ struct in6_addr ipv6;
+};
+
+typedef struct {
+ uint8_t message_type;
+ uint8_t command_id;
+ uint16_t length;
+ uint32_t seq_num;
+} CommandHeader;
+
+typedef struct {
+ uint8_t message_type;
+ uint8_t command_id;
+ uint16_t length;
+ uint32_t seq_num;
+ char symbolic_or_connid[16];
+ union AddressLight address;
+ uint16_t cost;
+ uint8_t address_type;
+ uint8_t len;
+} RouteToSelfCommand;
+
+typedef struct {
+ uint8_t message_type;
+ uint8_t command_id;
+ uint16_t length;
+ uint32_t seq_num;
+ char symbolic_or_connid[16];
+} DeleteSelfConnectionCommand;
+
+namespace {
+static constexpr uint8_t addr_inet = 1;
+static constexpr uint8_t addr_inet6 = 2;
+static constexpr uint8_t add_route_command = 3;
+static constexpr uint8_t delete_connection_command = 5;
+static constexpr uint8_t request_light = 0xc0;
+static constexpr char identifier[] = "SELF";
+
+void fillCommandHeader(CommandHeader *header) {
+ // Allocate and fill the header
+ header->message_type = request_light;
+ header->length = 1;
+}
+
+RouteToSelfCommand createCommandRoute(std::unique_ptr<sockaddr> &&addr,
+ uint8_t prefix_length) {
+ RouteToSelfCommand command = {0};
+
+ // check and set IP address
+ if (addr->sa_family == AF_INET) {
+ command.address_type = addr_inet;
+ command.address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr;
+ } else if (addr->sa_family == AF_INET6) {
+ command.address_type = addr_inet6;
+ command.address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr;
+ }
+
+ // Fill remaining payload fields
+#ifndef _WIN32
+ strcpy(command.symbolic_or_connid, identifier);
+#else
+ strcpy_s(command.symbolic_or_connid, 16, identifier);
+#endif
+ command.cost = 1;
+ command.len = (uint8_t)prefix_length;
+
+ // Allocate and fill the header
+ command.command_id = add_route_command;
+ fillCommandHeader((CommandHeader *)&command);
+
+ return command;
+}
+
+DeleteSelfConnectionCommand createCommandDeleteConnection() {
+ DeleteSelfConnectionCommand command = {0};
+ fillCommandHeader((CommandHeader *)&command);
+ command.command_id = delete_connection_command;
+
+#ifndef _WIN32
+ strcpy(command.symbolic_or_connid, identifier);
+#else
+ strcpy_s(command.symbolic_or_connid, 16, identifier);
+#endif
+
+ return command;
+}
+
+} // namespace
+
+namespace transport {
+
+namespace core {
+
+HicnForwarderModule::HicnForwarderModule() : IoModule(), connector_(nullptr) {}
+
+HicnForwarderModule::~HicnForwarderModule() {}
+
+void HicnForwarderModule::connect(bool is_consumer) {
+ connector_->connect();
+ connector_->setRole(is_consumer ? Connector::Role::CONSUMER
+ : Connector::Role::PRODUCER);
+}
+
+bool HicnForwarderModule::isConnected() { return connector_->isConnected(); }
+
+void HicnForwarderModule::send(Packet &packet) {
+ IoModule::send(packet);
+ packet.setChecksum();
+ connector_->send(packet);
+}
+
+void HicnForwarderModule::send(const uint8_t *packet, std::size_t len) {
+ counters_.tx_packets++;
+ counters_.tx_bytes += len;
+
+ // Perfect forwarding
+ connector_->send(packet, len);
+}
+
+void HicnForwarderModule::registerRoute(const Prefix &prefix) {
+ auto command = createCommandRoute(prefix.toSockaddr(),
+ (uint8_t)prefix.getPrefixLength());
+ send((uint8_t *)&command, sizeof(RouteToSelfCommand));
+}
+
+void HicnForwarderModule::closeConnection() {
+ auto command = createCommandDeleteConnection();
+ send((uint8_t *)&command, sizeof(DeleteSelfConnectionCommand));
+ connector_->close();
+}
+
+void HicnForwarderModule::init(
+ Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service, const std::string &app_name) {
+ if (!connector_) {
+ connector_ = new UdpSocketConnector(std::move(receive_callback), nullptr,
+ nullptr, std::move(reconnect_callback),
+ io_service, app_name);
+ }
+}
+
+void HicnForwarderModule::processControlMessageReply(
+ utils::MemBuf &packet_buffer) {
+ if (packet_buffer.data()[0] == nack_code) {
+ throw errors::RuntimeException(
+ "Received Nack message from hicn light forwarder.");
+ }
+}
+
+std::uint32_t HicnForwarderModule::getMtu() { return interface_mtu; }
+
+bool HicnForwarderModule::isControlMessage(const uint8_t *message) {
+ return message[0] == ack_code || message[0] == nack_code;
+}
+
+extern "C" IoModule *create_module(void) { return new HicnForwarderModule(); }
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/udp/hicn_forwarder_module.h b/libtransport/src/io_modules/udp/hicn_forwarder_module.h
new file mode 100644
index 000000000..845db73bf
--- /dev/null
+++ b/libtransport/src/io_modules/udp/hicn_forwarder_module.h
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/io_module.h>
+#include <hicn/transport/core/prefix.h>
+
+namespace transport {
+
+namespace core {
+
+class UdpSocketConnector;
+
+class HicnForwarderModule : public IoModule {
+ static constexpr uint8_t ack_code = 0xc2;
+ static constexpr uint8_t nack_code = 0xc3;
+ static constexpr std::uint16_t interface_mtu = 1500;
+
+ public:
+ union addressLight {
+ uint32_t ipv4;
+ struct in6_addr ipv6;
+ };
+
+ struct route_to_self_command {
+ uint8_t messageType;
+ uint8_t commandID;
+ uint16_t length;
+ uint32_t seqNum;
+ char symbolicOrConnid[16];
+ union addressLight address;
+ uint16_t cost;
+ uint8_t addressType;
+ uint8_t len;
+ };
+
+ using route_to_self_command = struct route_to_self_command;
+
+ HicnForwarderModule();
+
+ ~HicnForwarderModule();
+
+ void connect(bool is_consumer) override;
+
+ void send(Packet &packet) override;
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ bool isConnected() override;
+
+ void init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name = "Libtransport") override;
+
+ void registerRoute(const Prefix &prefix) override;
+
+ std::uint32_t getMtu() override;
+
+ bool isControlMessage(const uint8_t *message) override;
+
+ void processControlMessageReply(utils::MemBuf &packet_buffer) override;
+
+ void closeConnection() override;
+
+ private:
+ UdpSocketConnector *connector_;
+};
+
+extern "C" IoModule *create_module(void);
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/udp/udp_socket_connector.cc b/libtransport/src/io_modules/udp/udp_socket_connector.cc
new file mode 100644
index 000000000..456886a54
--- /dev/null
+++ b/libtransport/src/io_modules/udp/udp_socket_connector.cc
@@ -0,0 +1,211 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef _WIN32
+#include <hicn/transport/portability/win_portability.h>
+#endif
+
+#include <hicn/transport/errors/errors.h>
+#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/object_pool.h>
+#include <io_modules/udp/udp_socket_connector.h>
+
+#include <thread>
+#include <vector>
+
+namespace transport {
+
+namespace core {
+
+UdpSocketConnector::UdpSocketConnector(
+ PacketReceivedCallback &&receive_callback, PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback, OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service, std::string app_name)
+ : Connector(std::move(receive_callback), std::move(packet_sent),
+ std::move(close_callback), std::move(on_reconnect)),
+ io_service_(io_service),
+ socket_(io_service_),
+ resolver_(io_service_),
+ connection_timer_(io_service_),
+ read_msg_(std::make_pair(nullptr, 0)),
+ is_reconnection_(false),
+ data_available_(false),
+ app_name_(app_name) {}
+
+UdpSocketConnector::~UdpSocketConnector() {}
+
+void UdpSocketConnector::connect(std::string ip_address, std::string port) {
+ endpoint_iterator_ = resolver_.resolve(
+ {ip_address, port, asio::ip::resolver_query_base::numeric_service});
+
+ state_ = Connector::State::CONNECTING;
+ doConnect();
+}
+
+void UdpSocketConnector::send(const uint8_t *packet, std::size_t len) {
+ socket_.async_send(asio::buffer(packet, len),
+ [this](std::error_code ec, std::size_t /*length*/) {
+ if (sent_callback_) {
+ sent_callback_(this, ec);
+ }
+ });
+}
+
+void UdpSocketConnector::send(Packet &packet) {
+ io_service_.post([this, _packet{packet.shared_from_this()}]() {
+ bool write_in_progress = !output_buffer_.empty();
+ output_buffer_.push_back(std::move(_packet));
+ if (TRANSPORT_EXPECT_TRUE(state_ == Connector::State::CONNECTED)) {
+ if (!write_in_progress) {
+ doWrite();
+ }
+ } else {
+ // Tell the handle connect it has data to write
+ data_available_ = true;
+ }
+ });
+}
+
+void UdpSocketConnector::close() {
+ if (io_service_.stopped()) {
+ doClose();
+ } else {
+ io_service_.dispatch(std::bind(&UdpSocketConnector::doClose, this));
+ }
+}
+
+void UdpSocketConnector::doClose() {
+ if (state_ != Connector::State::CLOSED) {
+ state_ = Connector::State::CLOSED;
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+ }
+}
+
+void UdpSocketConnector::doWrite() {
+ auto packet = output_buffer_.front().get();
+ auto array = std::vector<asio::const_buffer>();
+
+ const utils::MemBuf *current = packet;
+ do {
+ array.push_back(asio::const_buffer(current->data(), current->length()));
+ current = current->next();
+ } while (current != packet);
+
+ socket_.async_send(std::move(array), [this](std::error_code ec,
+ std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ output_buffer_.pop_front();
+ if (!output_buffer_.empty()) {
+ doWrite();
+ }
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ tryReconnect();
+ }
+ });
+}
+
+void UdpSocketConnector::doRead() {
+ read_msg_ = getRawBuffer();
+ socket_.async_receive(
+ asio::buffer(read_msg_.first, read_msg_.second),
+ [this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ auto packet = getPacketFromBuffer(read_msg_.first, length);
+ receive_callback_(this, *packet, std::make_error_code(std::errc(0)));
+ doRead();
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ tryReconnect();
+ }
+ });
+}
+
+void UdpSocketConnector::tryReconnect() {
+ if (state_ == Connector::State::CONNECTED) {
+ TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
+ state_ = Connector::State::CONNECTING;
+ is_reconnection_ = true;
+ io_service_.post([this]() {
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+
+ doConnect();
+ startConnectionTimer();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ });
+ }
+}
+
+void UdpSocketConnector::doConnect() {
+ asio::async_connect(
+ socket_, endpoint_iterator_,
+ [this](std::error_code ec, udp::resolver::iterator) {
+ if (!ec) {
+ connection_timer_.cancel();
+ state_ = Connector::State::CONNECTED;
+ doRead();
+
+ if (data_available_) {
+ data_available_ = false;
+ doWrite();
+ }
+
+ if (is_reconnection_) {
+ is_reconnection_ = false;
+ }
+
+ on_reconnect_callback_(this);
+ } else {
+ doConnect();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ });
+}
+
+bool UdpSocketConnector::checkConnected() {
+ return state_ == Connector::State::CONNECTED;
+}
+
+void UdpSocketConnector::startConnectionTimer() {
+ connection_timer_.expires_from_now(std::chrono::seconds(60));
+ connection_timer_.async_wait(std::bind(&UdpSocketConnector::handleDeadline,
+ this, std::placeholders::_1));
+}
+
+void UdpSocketConnector::handleDeadline(const std::error_code &ec) {
+ if (!ec) {
+ io_service_.post([this]() {
+ socket_.close();
+ TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n");
+ });
+ }
+}
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/io_modules/udp/udp_socket_connector.h b/libtransport/src/io_modules/udp/udp_socket_connector.h
new file mode 100644
index 000000000..8ab08e17a
--- /dev/null
+++ b/libtransport/src/io_modules/udp/udp_socket_connector.h
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/core/name.h>
+#include <hicn/transport/core/packet.h>
+#include <hicn/transport/utils/branch_prediction.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <deque>
+
+namespace transport {
+namespace core {
+
+using asio::ip::udp;
+
+class UdpSocketConnector : public Connector {
+ public:
+ UdpSocketConnector(PacketReceivedCallback &&receive_callback,
+ PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback,
+ OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
+
+ ~UdpSocketConnector() override;
+
+ void send(Packet &packet) override;
+
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ void close() override;
+
+ void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
+
+ private:
+ void doConnect();
+
+ void doRead();
+
+ void doWrite();
+
+ void doClose();
+
+ bool checkConnected();
+
+ private:
+ void handleDeadline(const std::error_code &ec);
+
+ void startConnectionTimer();
+
+ void tryReconnect();
+
+ asio::io_service &io_service_;
+ asio::ip::udp::socket socket_;
+ asio::ip::udp::resolver resolver_;
+ asio::ip::udp::resolver::iterator endpoint_iterator_;
+ asio::steady_timer connection_timer_;
+
+ std::pair<uint8_t *, std::size_t> read_msg_;
+
+ bool is_reconnection_;
+ bool data_available_;
+
+ std::string app_name_;
+};
+
+} // end namespace core
+
+} // end namespace transport