aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/io_modules
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/io_modules')
-rw-r--r--libtransport/src/io_modules/CMakeLists.txt37
-rw-r--r--libtransport/src/io_modules/forwarder/CMakeLists.txt44
-rw-r--r--libtransport/src/io_modules/forwarder/configuration.h89
-rw-r--r--libtransport/src/io_modules/forwarder/errors.cc52
-rw-r--r--libtransport/src/io_modules/forwarder/errors.h91
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder.cc296
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder.h90
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder_module.cc87
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder_module.h70
-rw-r--r--libtransport/src/io_modules/forwarder/global_id_counter.h54
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel.cc288
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel.h147
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc177
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel_listener.h110
-rw-r--r--libtransport/src/io_modules/loopback/CMakeLists.txt34
-rw-r--r--libtransport/src/io_modules/loopback/local_face.cc69
-rw-r--r--libtransport/src/io_modules/loopback/local_face.h54
-rw-r--r--libtransport/src/io_modules/loopback/loopback_module.cc84
-rw-r--r--libtransport/src/io_modules/loopback/loopback_module.h70
-rw-r--r--libtransport/src/io_modules/memif/CMakeLists.txt56
-rw-r--r--libtransport/src/io_modules/memif/hicn_vapi.c229
-rw-r--r--libtransport/src/io_modules/memif/hicn_vapi.h82
-rw-r--r--libtransport/src/io_modules/memif/memif_connector.cc493
-rw-r--r--libtransport/src/io_modules/memif/memif_connector.h130
-rw-r--r--libtransport/src/io_modules/memif/memif_vapi.c127
-rw-r--r--libtransport/src/io_modules/memif/memif_vapi.h54
-rw-r--r--libtransport/src/io_modules/memif/vpp_forwarder_module.cc263
-rw-r--r--libtransport/src/io_modules/memif/vpp_forwarder_module.h83
-rw-r--r--libtransport/src/io_modules/raw_socket/raw_socket_connector.cc201
-rw-r--r--libtransport/src/io_modules/raw_socket/raw_socket_connector.h80
-rw-r--r--libtransport/src/io_modules/raw_socket/raw_socket_interface.cc56
-rw-r--r--libtransport/src/io_modules/raw_socket/raw_socket_interface.h61
-rw-r--r--libtransport/src/io_modules/udp/CMakeLists.txt47
-rw-r--r--libtransport/src/io_modules/udp/hicn_forwarder_module.cc181
-rw-r--r--libtransport/src/io_modules/udp/hicn_forwarder_module.h86
-rw-r--r--libtransport/src/io_modules/udp/udp_socket_connector.cc211
-rw-r--r--libtransport/src/io_modules/udp/udp_socket_connector.h89
37 files changed, 4472 insertions, 0 deletions
diff --git a/libtransport/src/io_modules/CMakeLists.txt b/libtransport/src/io_modules/CMakeLists.txt
new file mode 100644
index 000000000..6553b9a2b
--- /dev/null
+++ b/libtransport/src/io_modules/CMakeLists.txt
@@ -0,0 +1,37 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+if (${CMAKE_SYSTEM_NAME} MATCHES Android)
+ list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp/hicn_forwarder_module.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp/udp_socket_connector.cc
+ )
+
+ list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp/hicn_forwarder_module.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp/udp_socket_connector.h
+ )
+
+ set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
+ set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
+else()
+ add_subdirectory(udp)
+ add_subdirectory(loopback)
+ add_subdirectory(forwarder)
+
+ if (__vpp__)
+ add_subdirectory(memif)
+ endif()
+endif() \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/CMakeLists.txt b/libtransport/src/io_modules/forwarder/CMakeLists.txt
new file mode 100644
index 000000000..92662bc4c
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/CMakeLists.txt
@@ -0,0 +1,44 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+
+list(APPEND MODULE_HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/endpoint.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/errors.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/global_counter.h
+)
+
+list(APPEND MODULE_SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.cc
+)
+
+build_module(forwarder_module
+ SHARED
+ SOURCES ${MODULE_SOURCE_FILES}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT lib${LIBTRANSPORT}
+ INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}
+ DEFINITIONS ${COMPILER_DEFINITIONS}
+ COMPILE_OPTIONS ${COMPILE_FLAGS}
+)
diff --git a/libtransport/src/io_modules/forwarder/configuration.h b/libtransport/src/io_modules/forwarder/configuration.h
new file mode 100644
index 000000000..fcaa5530d
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/configuration.h
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace transport {
+namespace core {
+
+struct ListenerConfig {
+ std::string address;
+ std::uint16_t port;
+ std::string name;
+};
+
+struct ConnectorConfig {
+ std::string local_address;
+ std::uint16_t local_port;
+ std::string remote_address;
+ std::uint16_t remote_port;
+ std::string name;
+};
+
+struct RouteConfig {
+ std::string prefix;
+ uint16_t weight;
+ std::string connector;
+ std::string name;
+};
+
+class Configuration {
+ public:
+ Configuration() : n_threads_(1) {}
+
+ bool empty() {
+ return listeners_.empty() && connectors_.empty() && routes_.empty();
+ }
+
+ Configuration& setThreadNumber(std::size_t threads) {
+ n_threads_ = threads;
+ return *this;
+ }
+
+ std::size_t getThreadNumber() { return n_threads_; }
+
+ template <typename... Args>
+ Configuration& addListener(Args&&... args) {
+ listeners_.emplace_back(std::forward<Args>(args)...);
+ return *this;
+ }
+
+ template <typename... Args>
+ Configuration& addConnector(Args&&... args) {
+ connectors_.emplace_back(std::forward<Args>(args)...);
+ return *this;
+ }
+
+ template <typename... Args>
+ Configuration& addRoute(Args&&... args) {
+ routes_.emplace_back(std::forward<Args>(args)...);
+ return *this;
+ }
+
+ std::vector<ListenerConfig>& getListeners() { return listeners_; }
+
+ std::vector<ConnectorConfig>& getConnectors() { return connectors_; }
+
+ std::vector<RouteConfig>& getRoutes() { return routes_; }
+
+ private:
+ std::vector<ListenerConfig> listeners_;
+ std::vector<ConnectorConfig> connectors_;
+ std::vector<RouteConfig> routes_;
+ std::size_t n_threads_;
+};
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/errors.cc b/libtransport/src/io_modules/forwarder/errors.cc
new file mode 100644
index 000000000..b5f131499
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/errors.cc
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ */
+
+#include <io_modules/forwarder/errors.h>
+
+namespace transport {
+namespace core {
+
+const std::error_category& forwarder_category() {
+ static forwarder_category_impl instance;
+
+ return instance;
+}
+
+const char* forwarder_category_impl::name() const throw() {
+ return "proxy::connector::error";
+}
+
+std::string forwarder_category_impl::message(int ev) const {
+ switch (static_cast<forwarder_error>(ev)) {
+ case forwarder_error::success: {
+ return "Success";
+ }
+ case forwarder_error::disconnected: {
+ return "Connector is disconnected";
+ }
+ case forwarder_error::receive_failed: {
+ return "Packet reception failed";
+ }
+ case forwarder_error::send_failed: {
+ return "Packet send failed";
+ }
+ case forwarder_error::memory_allocation_error: {
+ return "Impossible to allocate memory for packet pool";
+ }
+ case forwarder_error::invalid_connector_type: {
+ return "Invalid type specified for connector.";
+ }
+ case forwarder_error::invalid_connector: {
+ return "Created connector was invalid.";
+ }
+ case forwarder_error::interest_cache_miss: {
+ return "interest cache miss.";
+ }
+ default: {
+ return "Unknown connector error";
+ }
+ }
+}
+} // namespace core
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/errors.h b/libtransport/src/io_modules/forwarder/errors.h
new file mode 100644
index 000000000..dd5cc8fe7
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/errors.h
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <system_error>
+
+namespace transport {
+namespace core {
+/**
+ * @brief Get the default server error category.
+ * @return The default server error category instance.
+ *
+ * @warning The first call to this function is thread-safe only starting with
+ * C++11.
+ */
+const std::error_category& forwarder_category();
+
+/**
+ * The list of errors.
+ */
+enum class forwarder_error {
+ success = 0,
+ send_failed,
+ receive_failed,
+ disconnected,
+ memory_allocation_error,
+ invalid_connector_type,
+ invalid_connector,
+ interest_cache_miss
+};
+
+/**
+ * @brief Create an error_code instance for the given error.
+ * @param error The error.
+ * @return The error_code instance.
+ */
+inline std::error_code make_error_code(forwarder_error error) {
+ return std::error_code(static_cast<int>(error), forwarder_category());
+}
+
+/**
+ * @brief Create an error_condition instance for the given error.
+ * @param error The error.
+ * @return The error_condition instance.
+ */
+inline std::error_condition make_error_condition(forwarder_error error) {
+ return std::error_condition(static_cast<int>(error), forwarder_category());
+}
+
+/**
+ * @brief A server error category.
+ */
+class forwarder_category_impl : public std::error_category {
+ public:
+ /**
+ * @brief Get the name of the category.
+ * @return The name of the category.
+ */
+ virtual const char* name() const throw();
+
+ /**
+ * @brief Get the error message for a given error.
+ * @param ev The error numeric value.
+ * @return The message associated to the error.
+ */
+ virtual std::string message(int ev) const;
+};
+} // namespace core
+} // namespace transport
+
+namespace std {
+// namespace system {
+template <>
+struct is_error_code_enum<::transport::core::forwarder_error>
+ : public std::true_type {};
+// } // namespace system
+} // namespace std
diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc
new file mode 100644
index 000000000..7e89e2f9f
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/forwarder.cc
@@ -0,0 +1,296 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/global_configuration.h>
+#include <core/local_connector.h>
+#include <io_modules/forwarder/forwarder.h>
+#include <io_modules/forwarder/global_id_counter.h>
+#include <io_modules/forwarder/udp_tunnel.h>
+#include <io_modules/forwarder/udp_tunnel_listener.h>
+
+namespace transport {
+
+namespace core {
+
+constexpr char Forwarder::forwarder_config_section[];
+
+Forwarder::Forwarder() : config_() {
+ using namespace std::placeholders;
+ GlobalConfiguration::getInstance().registerConfigurationParser(
+ forwarder_config_section,
+ std::bind(&Forwarder::parseForwarderConfiguration, this, _1, _2));
+
+ if (!config_.empty()) {
+ initThreads();
+ initListeners();
+ initConnectors();
+ }
+}
+
+Forwarder::~Forwarder() {
+ for (auto &l : listeners_) {
+ l->close();
+ }
+
+ for (auto &c : remote_connectors_) {
+ c.second->close();
+ }
+
+ GlobalConfiguration::getInstance().unregisterConfigurationParser(
+ forwarder_config_section);
+}
+
+void Forwarder::initThreads() {
+ for (unsigned i = 0; i < config_.getThreadNumber(); i++) {
+ thread_pool_.emplace_back(io_service_, /* detached */ false);
+ }
+}
+
+void Forwarder::initListeners() {
+ using namespace std::placeholders;
+ for (auto &l : config_.getListeners()) {
+ listeners_.emplace_back(std::make_shared<UdpTunnelListener>(
+ io_service_,
+ std::bind(&Forwarder::onPacketFromListener, this, _1, _2, _3),
+ asio::ip::udp::endpoint(asio::ip::address::from_string(l.address),
+ l.port)));
+ }
+}
+
+void Forwarder::initConnectors() {
+ using namespace std::placeholders;
+ for (auto &c : config_.getConnectors()) {
+ auto id = GlobalCounter<Connector::Id>::getInstance().getNext();
+ auto conn = new UdpTunnelConnector(
+ io_service_, std::bind(&Forwarder::onPacketReceived, this, _1, _2, _3),
+ std::bind(&Forwarder::onPacketSent, this, _1, _2),
+ std::bind(&Forwarder::onConnectorClosed, this, _1),
+ std::bind(&Forwarder::onConnectorReconnected, this, _1));
+ conn->setConnectorId(id);
+ remote_connectors_.emplace(id, conn);
+ conn->connect(c.remote_address, c.remote_port, c.local_address,
+ c.local_port);
+ }
+}
+
+Connector::Id Forwarder::registerLocalConnector(
+ asio::io_service &io_service,
+ Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback) {
+ utils::SpinLock::Acquire locked(connector_lock_);
+ auto id = GlobalCounter<Connector::Id>::getInstance().getNext();
+ auto connector = std::make_shared<LocalConnector>(
+ io_service, receive_callback, nullptr, nullptr, reconnect_callback);
+ connector->setConnectorId(id);
+ local_connectors_.emplace(id, std::move(connector));
+ return id;
+}
+
+Forwarder &Forwarder::deleteConnector(Connector::Id id) {
+ utils::SpinLock::Acquire locked(connector_lock_);
+ auto it = local_connectors_.find(id);
+ if (it != local_connectors_.end()) {
+ it->second->close();
+ local_connectors_.erase(it);
+ }
+
+ return *this;
+}
+
+Connector::Ptr Forwarder::getConnector(Connector::Id id) {
+ utils::SpinLock::Acquire locked(connector_lock_);
+ auto it = local_connectors_.find(id);
+ if (it != local_connectors_.end()) {
+ return it->second;
+ }
+
+ return nullptr;
+}
+
+void Forwarder::onPacketFromListener(Connector *connector,
+ utils::MemBuf &packet_buffer,
+ const std::error_code &ec) {
+ // Create connector
+ connector->setReceiveCallback(
+ std::bind(&Forwarder::onPacketReceived, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
+
+ TRANSPORT_LOGD("Packet received from listener.");
+
+ {
+ utils::SpinLock::Acquire locked(connector_lock_);
+ remote_connectors_.emplace(connector->getConnectorId(),
+ connector->shared_from_this());
+ }
+ // TODO Check if control packet or not. For the moment it is not.
+ onPacketReceived(connector, packet_buffer, ec);
+}
+
+void Forwarder::onPacketReceived(Connector *connector,
+ utils::MemBuf &packet_buffer,
+ const std::error_code &ec) {
+ // Figure out the type of packet we received
+ bool is_interest = Packet::isInterest(packet_buffer.data());
+
+ Packet *packet = nullptr;
+ if (is_interest) {
+ packet = static_cast<Interest *>(&packet_buffer);
+ } else {
+ packet = static_cast<ContentObject *>(&packet_buffer);
+ }
+
+ for (auto &c : local_connectors_) {
+ auto role = c.second->getRole();
+ auto is_producer = role == Connector::Role::PRODUCER;
+ if ((is_producer && is_interest) || (!is_producer && !is_interest)) {
+ c.second->send(*packet);
+ } else {
+ TRANSPORT_LOGD(
+ "Error sending packet to local connector. is_interest = %d - "
+ "is_producer = %d",
+ (int)is_interest, (int)is_producer);
+ }
+ }
+
+ // PCS Lookup + FIB lookup. Skip for now
+
+ // Forward packet to local connectors
+}
+
+void Forwarder::send(Packet &packet) {
+ // TODo Here a nice PIT/CS / FIB would be required:)
+ // For now let's just forward the packet on the remote connector we get
+ if (remote_connectors_.begin() == remote_connectors_.end()) {
+ return;
+ }
+
+ auto remote_endpoint =
+ remote_connectors_.begin()->second->getRemoteEndpoint();
+ TRANSPORT_LOGD("Sending packet to: %s:%u",
+ remote_endpoint.getAddress().to_string().c_str(),
+ remote_endpoint.getPort());
+ remote_connectors_.begin()->second->send(packet);
+}
+
+void Forwarder::onPacketSent(Connector *connector, const std::error_code &ec) {}
+
+void Forwarder::onConnectorClosed(Connector *connector) {}
+
+void Forwarder::onConnectorReconnected(Connector *connector) {}
+
+void Forwarder::parseForwarderConfiguration(
+ const libconfig::Setting &forwarder_config, std::error_code &ec) {
+ using namespace libconfig;
+
+ // n_thread
+ if (forwarder_config.exists("n_threads")) {
+ // Get number of threads
+ int n_threads = 1;
+ forwarder_config.lookupValue("n_threads", n_threads);
+ TRANSPORT_LOGD("Forwarder threads from config file: %u", n_threads);
+ config_.setThreadNumber(n_threads);
+ }
+
+ // listeners
+ if (forwarder_config.exists("listeners")) {
+ // get path where looking for modules
+ const Setting &listeners = forwarder_config.lookup("listeners");
+ auto count = listeners.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &listener = listeners[i];
+ ListenerConfig list;
+ unsigned port;
+
+ list.name = listener.getName();
+ listener.lookupValue("local_address", list.address);
+ listener.lookupValue("local_port", port);
+ list.port = (uint16_t)(port);
+
+ TRANSPORT_LOGD("Adding listener %s, (%s:%u)", list.name.c_str(),
+ list.address.c_str(), list.port);
+ config_.addListener(std::move(list));
+ }
+ }
+
+ // connectors
+ if (forwarder_config.exists("connectors")) {
+ // get path where looking for modules
+ const Setting &connectors = forwarder_config.lookup("connectors");
+ auto count = connectors.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &connector = connectors[i];
+ ConnectorConfig conn;
+
+ conn.name = connector.getName();
+ unsigned port = 0;
+
+ if (!connector.lookupValue("local_address", conn.local_address)) {
+ conn.local_address = "";
+ }
+
+ if (!connector.lookupValue("local_port", port)) {
+ port = 0;
+ }
+
+ conn.local_port = (uint16_t)(port);
+
+ if (!connector.lookupValue("remote_address", conn.remote_address)) {
+ throw errors::RuntimeException(
+ "Error in configuration file: remote_address is a mandatory field "
+ "of Connectors.");
+ }
+
+ if (!connector.lookupValue("remote_port", port)) {
+ throw errors::RuntimeException(
+ "Error in configuration file: remote_port is a mandatory field "
+ "of Connectors.");
+ }
+
+ conn.remote_port = (uint16_t)(port);
+
+ TRANSPORT_LOGD("Adding connector %s, (%s:%u %s:%u)", conn.name.c_str(),
+ conn.local_address.c_str(), conn.local_port,
+ conn.remote_address.c_str(), conn.remote_port);
+ config_.addConnector(std::move(conn));
+ }
+ }
+
+ // Routes
+ if (forwarder_config.exists("routes")) {
+ const Setting &routes = forwarder_config.lookup("routes");
+ auto count = routes.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &route = routes[i];
+ RouteConfig r;
+ unsigned weight;
+
+ r.name = route.getName();
+ route.lookupValue("prefix", r.prefix);
+ route.lookupValue("weight", weight);
+ route.lookupValue("connector", r.connector);
+ r.weight = (uint16_t)(weight);
+
+ TRANSPORT_LOGD("Adding route %s %s (%s %u)", r.name.c_str(),
+ r.prefix.c_str(), r.connector.c_str(), r.weight);
+ config_.addRoute(std::move(r));
+ }
+ }
+}
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/forwarder.h b/libtransport/src/io_modules/forwarder/forwarder.h
new file mode 100644
index 000000000..5b564bb5e
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/forwarder.h
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/io_module.h>
+#include <hicn/transport/core/prefix.h>
+#include <hicn/transport/utils/event_thread.h>
+#include <hicn/transport/utils/singleton.h>
+#include <hicn/transport/utils/spinlock.h>
+#include <io_modules/forwarder/configuration.h>
+#include <io_modules/forwarder/udp_tunnel_listener.h>
+
+#include <atomic>
+#include <libconfig.h++>
+#include <unordered_map>
+
+namespace transport {
+
+namespace core {
+
+class Forwarder : public utils::Singleton<Forwarder> {
+ static constexpr char forwarder_config_section[] = "forwarder";
+ friend class utils::Singleton<Forwarder>;
+
+ public:
+ Forwarder();
+
+ ~Forwarder();
+
+ void initThreads();
+ void initListeners();
+ void initConnectors();
+
+ Connector::Id registerLocalConnector(
+ asio::io_service &io_service,
+ Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback);
+
+ Forwarder &deleteConnector(Connector::Id id);
+
+ Connector::Ptr getConnector(Connector::Id id);
+
+ void send(Packet &packet);
+
+ void stop();
+
+ private:
+ void onPacketFromListener(Connector *connector, utils::MemBuf &packet_buffer,
+ const std::error_code &ec);
+ void onPacketReceived(Connector *connector, utils::MemBuf &packet_buffer,
+ const std::error_code &ec);
+ void onPacketSent(Connector *connector, const std::error_code &ec);
+ void onConnectorClosed(Connector *connector);
+ void onConnectorReconnected(Connector *connector);
+
+ void parseForwarderConfiguration(const libconfig::Setting &io_config,
+ std::error_code &ec);
+
+ asio::io_service io_service_;
+ utils::SpinLock connector_lock_;
+
+ /**
+ * Connectors and listeners must be declares *before* thread_pool_, so that
+ * threads destructors will wait for them to gracefully close before being
+ * destroyed.
+ */
+ std::unordered_map<Connector::Id, Connector::Ptr> remote_connectors_;
+ std::unordered_map<Connector::Id, Connector::Ptr> local_connectors_;
+ std::vector<UdpTunnelListener::Ptr> listeners_;
+
+ std::vector<utils::EventThread> thread_pool_;
+
+ Configuration config_;
+};
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.cc b/libtransport/src/io_modules/forwarder/forwarder_module.cc
new file mode 100644
index 000000000..356b42d3b
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/forwarder_module.cc
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/errors/not_implemented_exception.h>
+#include <hicn/transport/utils/log.h>
+#include <io_modules/forwarder/forwarder_module.h>
+
+namespace transport {
+
+namespace core {
+
+ForwarderModule::ForwarderModule()
+ : IoModule(),
+ name_(""),
+ connector_id_(Connector::invalid_connector),
+ forwarder_(Forwarder::getInstance()) {}
+
+ForwarderModule::~ForwarderModule() {
+ forwarder_.deleteConnector(connector_id_);
+}
+
+bool ForwarderModule::isConnected() { return true; }
+
+void ForwarderModule::send(Packet &packet) {
+ IoModule::send(packet);
+ forwarder_.send(packet);
+ // TRANSPORT_LOGD("ForwarderModule: sending from %u to %d", local_id_,
+ // 1 - local_id_);
+
+ // local_faces_.at(1 - local_id_).onPacket(packet);
+}
+
+void ForwarderModule::send(const uint8_t *packet, std::size_t len) {
+ // not supported
+ throw errors::NotImplementedException();
+}
+
+void ForwarderModule::registerRoute(const Prefix &prefix) {
+ // For the moment we route packets from one socket to the other.
+ // Next step will be to introduce a FIB
+ return;
+}
+
+void ForwarderModule::closeConnection() {
+ forwarder_.deleteConnector(connector_id_);
+}
+
+void ForwarderModule::init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name) {
+ connector_id_ = forwarder_.registerLocalConnector(
+ io_service, std::move(receive_callback), std::move(reconnect_callback));
+ name_ = app_name;
+}
+
+void ForwarderModule::processControlMessageReply(utils::MemBuf &packet_buffer) {
+ return;
+}
+
+void ForwarderModule::connect(bool is_consumer) {
+ forwarder_.getConnector(connector_id_)
+ ->setRole(is_consumer ? Connector::Role::CONSUMER
+ : Connector::Role::PRODUCER);
+}
+
+std::uint32_t ForwarderModule::getMtu() { return interface_mtu; }
+
+bool ForwarderModule::isControlMessage(const uint8_t *message) { return false; }
+
+extern "C" IoModule *create_module(void) { return new ForwarderModule(); }
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.h b/libtransport/src/io_modules/forwarder/forwarder_module.h
new file mode 100644
index 000000000..58bfb7996
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/forwarder_module.h
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/io_module.h>
+#include <hicn/transport/core/prefix.h>
+#include <io_modules/forwarder/forwarder.h>
+
+#include <atomic>
+
+namespace transport {
+
+namespace core {
+
+class Forwarder;
+
+class ForwarderModule : public IoModule {
+ static constexpr std::uint16_t interface_mtu = 1500;
+
+ public:
+ ForwarderModule();
+
+ ~ForwarderModule();
+
+ void connect(bool is_consumer) override;
+
+ void send(Packet &packet) override;
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ bool isConnected() override;
+
+ void init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name = "Libtransport") override;
+
+ void registerRoute(const Prefix &prefix) override;
+
+ std::uint32_t getMtu() override;
+
+ bool isControlMessage(const uint8_t *message) override;
+
+ void processControlMessageReply(utils::MemBuf &packet_buffer) override;
+
+ void closeConnection() override;
+
+ private:
+ std::string name_;
+ Connector::Id connector_id_;
+ Forwarder &forwarder_;
+};
+
+extern "C" IoModule *create_module(void);
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/global_id_counter.h b/libtransport/src/io_modules/forwarder/global_id_counter.h
new file mode 100644
index 000000000..fe8d76730
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/global_id_counter.h
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <mutex>
+
+namespace transport {
+
+namespace core {
+
+template <typename T = uint64_t>
+class GlobalCounter {
+ public:
+ static GlobalCounter& getInstance() {
+ std::lock_guard<std::mutex> lock(global_mutex_);
+
+ if (!instance_) {
+ instance_.reset(new GlobalCounter());
+ }
+
+ return *instance_;
+ }
+
+ T getNext() { return counter_++; }
+
+ private:
+ GlobalCounter() : counter_(0) {}
+ static std::unique_ptr<GlobalCounter<T>> instance_;
+ static std::mutex global_mutex_;
+ std::atomic<T> counter_;
+};
+
+template <typename T>
+std::unique_ptr<GlobalCounter<T>> GlobalCounter<T>::instance_ = nullptr;
+
+template <typename T>
+std::mutex GlobalCounter<T>::global_mutex_;
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.cc b/libtransport/src/io_modules/forwarder/udp_tunnel.cc
new file mode 100644
index 000000000..dc725fc4e
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/udp_tunnel.cc
@@ -0,0 +1,288 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ */
+
+#include <hicn/transport/utils/branch_prediction.h>
+#include <io_modules/forwarder/errors.h>
+#include <io_modules/forwarder/udp_tunnel.h>
+
+#include <iostream>
+#include <thread>
+#include <vector>
+
+namespace transport {
+namespace core {
+
+UdpTunnelConnector::~UdpTunnelConnector() {}
+
+void UdpTunnelConnector::connect(const std::string &hostname, uint16_t port,
+ const std::string &bind_address,
+ uint16_t bind_port) {
+ if (state_ == State::CLOSED) {
+ state_ = State::CONNECTING;
+ endpoint_iterator_ = resolver_.resolve({hostname, std::to_string(port)});
+ remote_endpoint_send_ = *endpoint_iterator_;
+ socket_->open(remote_endpoint_send_.protocol());
+
+ if (!bind_address.empty() && bind_port != 0) {
+ using namespace asio::ip;
+ socket_->bind(
+ udp::endpoint(address::from_string(bind_address), bind_port));
+ }
+
+ state_ = State::CONNECTED;
+
+ remote_endpoint_ = Endpoint(remote_endpoint_send_);
+ local_endpoint_ = Endpoint(socket_->local_endpoint());
+
+ doRecvPacket();
+
+#ifdef LINUX
+ send_timer_.expires_from_now(std::chrono::microseconds(50));
+ send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this,
+ std::placeholders::_1));
+#endif
+ }
+}
+
+void UdpTunnelConnector::send(Packet &packet) {
+ strand_->post([this, pkt{packet.shared_from_this()}]() {
+ bool write_in_progress = !output_buffer_.empty();
+ output_buffer_.push_back(std::move(pkt));
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ if (!write_in_progress) {
+ doSendPacket();
+ }
+ } else {
+ data_available_ = true;
+ }
+ });
+}
+
+void UdpTunnelConnector::send(const uint8_t *packet, std::size_t len) {}
+
+void UdpTunnelConnector::close() {
+ TRANSPORT_LOGD("UDPTunnelConnector::close");
+ state_ = State::CLOSED;
+ bool is_socket_owned = socket_.use_count() == 1;
+ if (is_socket_owned) {
+ io_service_.dispatch([this]() {
+ this->socket_->close();
+ // on_close_callback_(shared_from_this());
+ });
+ }
+}
+
+void UdpTunnelConnector::doSendPacket() {
+#ifdef LINUX
+ send_timer_.expires_from_now(std::chrono::microseconds(50));
+ send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this,
+ std::placeholders::_1));
+#else
+ auto packet = output_buffer_.front().get();
+ auto array = std::vector<asio::const_buffer>();
+
+ const ::utils::MemBuf *current = packet;
+ do {
+ array.push_back(asio::const_buffer(current->data(), current->length()));
+ current = current->next();
+ } while (current != packet);
+
+ socket_->async_send_to(
+ std::move(array), remote_endpoint_send_,
+ strand_->wrap([this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ sent_callback_(this, make_error_code(forwarder_error::success));
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ sendFailed();
+ sent_callback_(this, ec);
+ }
+
+ output_buffer_.pop_front();
+ if (!output_buffer_.empty()) {
+ doSendPacket();
+ }
+ }));
+#endif
+}
+
+#ifdef LINUX
+void UdpTunnelConnector::writeHandler(std::error_code ec) {
+ if (TRANSPORT_EXPECT_FALSE(state_ != State::CONNECTED)) {
+ return;
+ }
+
+ auto len = std::min(output_buffer_.size(), std::size_t(Connector::max_burst));
+
+ if (len) {
+ int m = 0;
+ for (auto &p : output_buffer_) {
+ auto packet = p.get();
+ ::utils::MemBuf *current = packet;
+ int b = 0;
+ do {
+ // array.push_back(asio::const_buffer(current->data(),
+ // current->length()));
+ tx_iovecs_[m][b].iov_base = current->writableData();
+ tx_iovecs_[m][b].iov_len = current->length();
+ current = current->next();
+ b++;
+ } while (current != packet);
+
+ tx_msgs_[m].msg_hdr.msg_iov = tx_iovecs_[m];
+ tx_msgs_[m].msg_hdr.msg_iovlen = b;
+ tx_msgs_[m].msg_hdr.msg_name = remote_endpoint_send_.data();
+ tx_msgs_[m].msg_hdr.msg_namelen = remote_endpoint_send_.size();
+ m++;
+
+ if (--len == 0) {
+ break;
+ }
+ }
+
+ int retval = sendmmsg(socket_->native_handle(), tx_msgs_, m, MSG_DONTWAIT);
+ if (retval > 0) {
+ while (retval--) {
+ output_buffer_.pop_front();
+ }
+ } else if (retval != EWOULDBLOCK && retval != EAGAIN) {
+ TRANSPORT_LOGE("Error sending messages! %s %d\n", strerror(errno),
+ retval);
+ return;
+ }
+ }
+
+ if (!output_buffer_.empty()) {
+ send_timer_.expires_from_now(std::chrono::microseconds(50));
+ send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this,
+ std::placeholders::_1));
+ }
+}
+
+void UdpTunnelConnector::readHandler(std::error_code ec) {
+ TRANSPORT_LOGD("UdpTunnelConnector receive packet");
+
+ // TRANSPORT_LOGD("UdpTunnelConnector received packet length=%lu", length);
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ if (current_position_ == 0) {
+ for (int i = 0; i < max_burst; i++) {
+ auto read_buffer = getRawBuffer();
+ rx_iovecs_[i][0].iov_base = read_buffer.first;
+ rx_iovecs_[i][0].iov_len = read_buffer.second;
+ rx_msgs_[i].msg_hdr.msg_iov = rx_iovecs_[i];
+ rx_msgs_[i].msg_hdr.msg_iovlen = 1;
+ }
+ }
+
+ int res = recvmmsg(socket_->native_handle(), rx_msgs_ + current_position_,
+ max_burst - current_position_, MSG_DONTWAIT, nullptr);
+ if (res < 0) {
+ TRANSPORT_LOGE("Error receiving messages! %s %d\n", strerror(errno),
+ res);
+ return;
+ }
+
+ for (int i = 0; i < res; i++) {
+ auto packet = getPacketFromBuffer(
+ reinterpret_cast<uint8_t *>(
+ rx_msgs_[current_position_].msg_hdr.msg_iov[0].iov_base),
+ rx_msgs_[current_position_].msg_len);
+ receiveSuccess(*packet);
+ receive_callback_(this, *packet,
+ make_error_code(forwarder_error::success));
+ ++current_position_;
+ }
+
+ doRecvPacket();
+ } else {
+ TRANSPORT_LOGE(
+ "Error in UDP: Receiving packets from a not connected socket.");
+ }
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ TRANSPORT_LOGE("The connection has been closed by the application.");
+ return;
+ } else {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ // receive_callback_(this, *read_msg_, ec);
+ TRANSPORT_LOGE("Error in UDP connector: %d %s", ec.value(),
+ ec.message().c_str());
+ } else {
+ TRANSPORT_LOGE("Error while not connector");
+ }
+ }
+}
+#endif
+
+void UdpTunnelConnector::doRecvPacket() {
+#ifdef LINUX
+ if (state_ == State::CONNECTED) {
+#if ((ASIO_VERSION / 100 % 1000) < 11)
+ socket_->async_receive(asio::null_buffers(),
+#else
+ socket_->async_wait(asio::ip::tcp::socket::wait_read,
+#endif
+ std::bind(&UdpTunnelConnector::readHandler, this,
+ std::placeholders::_1));
+ }
+#else
+ TRANSPORT_LOGD("UdpTunnelConnector receive packet");
+ read_msg_ = getRawBuffer();
+ socket_->async_receive_from(
+ asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_recv_,
+ [this](std::error_code ec, std::size_t length) {
+ TRANSPORT_LOGD("UdpTunnelConnector received packet length=%lu", length);
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ auto packet = getPacketFromBuffer(read_msg_.first, length);
+ receiveSuccess(*packet);
+ receive_callback_(this, *packet,
+ make_error_code(forwarder_error::success));
+ doRecvPacket();
+ } else {
+ TRANSPORT_LOGE(
+ "Error in UDP: Receiving packets from a not connected socket.");
+ }
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ TRANSPORT_LOGE("The connection has been closed by the application.");
+ return;
+ } else {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ TRANSPORT_LOGE("Error in UDP connector: %d %s", ec.value(),
+ ec.message().c_str());
+ } else {
+ TRANSPORT_LOGE("Error while not connector");
+ }
+ }
+ });
+#endif
+}
+
+void UdpTunnelConnector::doConnect() {
+ asio::async_connect(
+ *socket_, endpoint_iterator_,
+ [this](std::error_code ec, asio::ip::udp::resolver::iterator) {
+ if (!ec) {
+ state_ = State::CONNECTED;
+ doRecvPacket();
+
+ if (data_available_) {
+ data_available_ = false;
+ doSendPacket();
+ }
+ } else {
+ TRANSPORT_LOGE("[Hproxy] - UDP Connection failed!!!");
+ timer_.expires_from_now(std::chrono::milliseconds(500));
+ timer_.async_wait(std::bind(&UdpTunnelConnector::doConnect, this));
+ }
+ });
+}
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.h b/libtransport/src/io_modules/forwarder/udp_tunnel.h
new file mode 100644
index 000000000..df472af91
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/udp_tunnel.h
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/platform.h>
+#include <io_modules/forwarder/errors.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <iostream>
+#include <memory>
+
+namespace transport {
+namespace core {
+
+class UdpTunnelListener;
+
+class UdpTunnelConnector : public Connector {
+ friend class UdpTunnelListener;
+
+ public:
+ template <typename ReceiveCallback, typename SentCallback, typename OnClose,
+ typename OnReconnect>
+ UdpTunnelConnector(asio::io_service &io_service,
+ ReceiveCallback &&receive_callback,
+ SentCallback &&packet_sent, OnClose &&on_close_callback,
+ OnReconnect &&on_reconnect)
+ : Connector(receive_callback, packet_sent, on_close_callback,
+ on_reconnect),
+ io_service_(io_service),
+ strand_(std::make_shared<asio::io_service::strand>(io_service_)),
+ socket_(std::make_shared<asio::ip::udp::socket>(io_service_)),
+ resolver_(io_service_),
+ timer_(io_service_),
+#ifdef LINUX
+ send_timer_(io_service_),
+ tx_iovecs_{0},
+ tx_msgs_{0},
+ rx_iovecs_{0},
+ rx_msgs_{0},
+ current_position_(0),
+#else
+ read_msg_(nullptr, 0),
+#endif
+ data_available_(false) {
+ }
+
+ template <typename ReceiveCallback, typename SentCallback, typename OnClose,
+ typename OnReconnect, typename EndpointType>
+ UdpTunnelConnector(std::shared_ptr<asio::ip::udp::socket> &socket,
+ std::shared_ptr<asio::io_service::strand> &strand,
+ ReceiveCallback &&receive_callback,
+ SentCallback &&packet_sent, OnClose &&on_close_callback,
+ OnReconnect &&on_reconnect, EndpointType &&remote_endpoint)
+ : Connector(receive_callback, packet_sent, on_close_callback,
+ on_reconnect),
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ io_service_(socket->get_io_service()),
+#else
+ io_service_((asio::io_context &)(socket->get_executor().context())),
+#endif
+ strand_(strand),
+ socket_(socket),
+ resolver_(io_service_),
+ remote_endpoint_send_(std::forward<EndpointType &&>(remote_endpoint)),
+ timer_(io_service_),
+#ifdef LINUX
+ send_timer_(io_service_),
+ tx_iovecs_{0},
+ tx_msgs_{0},
+ rx_iovecs_{0},
+ rx_msgs_{0},
+ current_position_(0),
+#else
+ read_msg_(nullptr, 0),
+#endif
+ data_available_(false) {
+ if (socket_->is_open()) {
+ state_ = State::CONNECTED;
+ remote_endpoint_ = Endpoint(remote_endpoint_send_);
+ local_endpoint_ = socket_->local_endpoint();
+ }
+ }
+
+ ~UdpTunnelConnector() override;
+
+ void send(Packet &packet) override;
+
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ void close() override;
+
+ void connect(const std::string &hostname, std::uint16_t port,
+ const std::string &bind_address = "",
+ std::uint16_t bind_port = 0);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ void doConnect();
+ void doRecvPacket();
+
+ void doRecvPacket(utils::MemBuf &buffer) {
+ receive_callback_(this, buffer, make_error_code(forwarder_error::success));
+ }
+
+#ifdef LINUX
+ void readHandler(std::error_code ec);
+ void writeHandler(std::error_code ec);
+#endif
+
+ void setConnected() { state_ = State::CONNECTED; }
+
+ void doSendPacket();
+ void doClose();
+
+ private:
+ asio::io_service &io_service_;
+ std::shared_ptr<asio::io_service::strand> strand_;
+ std::shared_ptr<asio::ip::udp::socket> socket_;
+ asio::ip::udp::resolver resolver_;
+ asio::ip::udp::resolver::iterator endpoint_iterator_;
+ asio::ip::udp::endpoint remote_endpoint_send_;
+ asio::ip::udp::endpoint remote_endpoint_recv_;
+
+ asio::steady_timer timer_;
+
+#ifdef LINUX
+ asio::steady_timer send_timer_;
+ struct iovec tx_iovecs_[max_burst][8];
+ struct mmsghdr tx_msgs_[max_burst];
+ struct iovec rx_iovecs_[max_burst][8];
+ struct mmsghdr rx_msgs_[max_burst];
+ std::uint8_t current_position_;
+#else
+ std::pair<uint8_t *, std::size_t> read_msg_;
+#endif
+
+ bool data_available_;
+};
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc
new file mode 100644
index 000000000..12246c3cf
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ */
+
+#include <hicn/transport/utils/hash.h>
+#include <hicn/transport/utils/log.h>
+#include <io_modules/forwarder/udp_tunnel.h>
+#include <io_modules/forwarder/udp_tunnel_listener.h>
+
+#ifndef LINUX
+namespace std {
+size_t hash<asio::ip::udp::endpoint>::operator()(
+ const asio::ip::udp::endpoint &endpoint) const {
+ auto hash_ip = endpoint.address().is_v4()
+ ? endpoint.address().to_v4().to_ulong()
+ : utils::hash::fnv32_buf(
+ endpoint.address().to_v6().to_bytes().data(), 16);
+ uint16_t port = endpoint.port();
+ return utils::hash::fnv32_buf(&port, 2, hash_ip);
+}
+} // namespace std
+#endif
+
+namespace transport {
+namespace core {
+
+UdpTunnelListener::~UdpTunnelListener() {}
+
+void UdpTunnelListener::close() {
+ strand_->post([this]() {
+ if (socket_->is_open()) {
+ socket_->close();
+ }
+ });
+}
+
+#ifdef LINUX
+void UdpTunnelListener::readHandler(std::error_code ec) {
+ TRANSPORT_LOGD("UdpTunnelConnector receive packet");
+
+ // TRANSPORT_LOGD("UdpTunnelConnector received packet length=%lu", length);
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (current_position_ == 0) {
+ for (int i = 0; i < Connector::max_burst; i++) {
+ auto read_buffer = Connector::getRawBuffer();
+ iovecs_[i][0].iov_base = read_buffer.first;
+ iovecs_[i][0].iov_len = read_buffer.second;
+ msgs_[i].msg_hdr.msg_iov = iovecs_[i];
+ msgs_[i].msg_hdr.msg_iovlen = 1;
+ msgs_[i].msg_hdr.msg_name = &remote_endpoints_[i];
+ msgs_[i].msg_hdr.msg_namelen = sizeof(remote_endpoints_[i]);
+ }
+ }
+
+ int res = recvmmsg(socket_->native_handle(), msgs_ + current_position_,
+ Connector::max_burst - current_position_, MSG_DONTWAIT,
+ nullptr);
+ if (res < 0) {
+ TRANSPORT_LOGE("Error in recvmmsg.");
+ }
+
+ for (int i = 0; i < res; i++) {
+ auto packet = Connector::getPacketFromBuffer(
+ reinterpret_cast<uint8_t *>(
+ msgs_[current_position_].msg_hdr.msg_iov[0].iov_base),
+ msgs_[current_position_].msg_len);
+ auto connector_id =
+ utils::hash::fnv64_buf(msgs_[current_position_].msg_hdr.msg_name,
+ msgs_[current_position_].msg_hdr.msg_namelen);
+
+ auto connector = connectors_.find(connector_id);
+ if (connector == connectors_.end()) {
+ // Create new connector corresponding to new client
+
+ /*
+ * Get the remote endpoint for this particular message
+ */
+ using namespace asio::ip;
+ if (local_endpoint_.address().is_v4()) {
+ auto addr = reinterpret_cast<struct sockaddr_in *>(
+ &remote_endpoints_[current_position_]);
+ address_v4::bytes_type address_bytes;
+ std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin_addr),
+ address_bytes.size(), address_bytes.begin());
+ address_v4 address(address_bytes);
+ remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin_port));
+ } else {
+ auto addr = reinterpret_cast<struct sockaddr_in6 *>(
+ &remote_endpoints_[current_position_]);
+ address_v6::bytes_type address_bytes;
+ std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin6_addr),
+ address_bytes.size(), address_bytes.begin());
+ address_v6 address(address_bytes);
+ remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin6_port));
+ }
+
+ /**
+ * Create new connector sharing the same socket of this listener.
+ */
+ auto ret = connectors_.emplace(
+ connector_id,
+ std::make_shared<UdpTunnelConnector>(
+ socket_, strand_, receive_callback_,
+ [](Connector *, const std::error_code &) {}, [](Connector *) {},
+ [](Connector *) {}, std::move(remote_endpoint_)));
+ connector = ret.first;
+ connector->second->setConnectorId(connector_id);
+ }
+
+ /**
+ * Use connector callback to process incoming message.
+ */
+ UdpTunnelConnector *c =
+ dynamic_cast<UdpTunnelConnector *>(connector->second.get());
+ c->doRecvPacket(*packet);
+
+ ++current_position_;
+ }
+
+ doRecvPacket();
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ TRANSPORT_LOGE("The connection has been closed by the application.");
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ }
+}
+#endif
+
+void UdpTunnelListener::doRecvPacket() {
+#ifdef LINUX
+#if ((ASIO_VERSION / 100 % 1000) < 11)
+ socket_->async_receive(
+ asio::null_buffers(),
+#else
+ socket_->async_wait(
+ asio::ip::tcp::socket::wait_read,
+#endif
+ std::bind(&UdpTunnelListener::readHandler, this, std::placeholders::_1));
+#else
+ read_msg_ = Connector::getRawBuffer();
+ socket_->async_receive_from(
+ asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_,
+ [this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ auto packet = Connector::getPacketFromBuffer(read_msg_.first, length);
+ auto connector_id =
+ std::hash<asio::ip::udp::endpoint>{}(remote_endpoint_);
+ auto connector = connectors_.find(connector_id);
+ if (connector == connectors_.end()) {
+ // Create new connector corresponding to new client
+ auto ret = connectors_.emplace(
+ connector_id, std::make_shared<UdpTunnelConnector>(
+ socket_, strand_, receive_callback_,
+ [](Connector *, const std::error_code &) {},
+ [](Connector *) {}, [](Connector *) {},
+ std::move(remote_endpoint_)));
+ connector = ret.first;
+ connector->second->setConnectorId(connector_id);
+ }
+
+ UdpTunnelConnector *c =
+ dynamic_cast<UdpTunnelConnector *>(connector->second.get());
+ c->doRecvPacket(*packet);
+ doRecvPacket();
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ TRANSPORT_LOGE("The connection has been closed by the application.");
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ }
+ });
+#endif
+}
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h
new file mode 100644
index 000000000..0ee40a400
--- /dev/null
+++ b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/platform.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <unordered_map>
+
+namespace std {
+template <>
+struct hash<asio::ip::udp::endpoint> {
+ size_t operator()(const asio::ip::udp::endpoint &endpoint) const;
+};
+} // namespace std
+
+namespace transport {
+namespace core {
+
+class UdpTunnelListener
+ : public std::enable_shared_from_this<UdpTunnelListener> {
+ using PacketReceivedCallback = Connector::PacketReceivedCallback;
+ using EndpointId = std::pair<uint32_t, uint16_t>;
+
+ static constexpr uint16_t default_port = 5004;
+
+ public:
+ using Ptr = std::shared_ptr<UdpTunnelListener>;
+
+ template <typename ReceiveCallback>
+ UdpTunnelListener(asio::io_service &io_service,
+ ReceiveCallback &&receive_callback,
+ asio::ip::udp::endpoint endpoint = asio::ip::udp::endpoint(
+ asio::ip::udp::v4(), default_port))
+ : io_service_(io_service),
+ strand_(std::make_shared<asio::io_service::strand>(io_service_)),
+ socket_(std::make_shared<asio::ip::udp::socket>(io_service_,
+ endpoint.protocol())),
+ local_endpoint_(endpoint),
+ receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)),
+#ifndef LINUX
+ read_msg_(nullptr, 0)
+#else
+ iovecs_{0},
+ msgs_{0},
+ current_position_(0)
+#endif
+ {
+ if (endpoint.protocol() == asio::ip::udp::v6()) {
+ std::error_code ec;
+ socket_->set_option(asio::ip::v6_only(false), ec);
+ // Call succeeds only on dual stack systems.
+ }
+ socket_->bind(local_endpoint_);
+ io_service_.post(std::bind(&UdpTunnelListener::doRecvPacket, this));
+ }
+
+ ~UdpTunnelListener();
+
+ void close();
+
+ int deleteConnector(Connector *connector) {
+ return connectors_.erase(connector->getConnectorId());
+ }
+
+ template <typename ReceiveCallback>
+ void setReceiveCallback(ReceiveCallback &&callback) {
+ receive_callback_ = std::forward<ReceiveCallback &&>(callback);
+ }
+
+ Connector *findConnector(Connector::Id connId) {
+ auto it = connectors_.find(connId);
+ if (it != connectors_.end()) {
+ return it->second.get();
+ }
+
+ return nullptr;
+ }
+
+ private:
+ void doRecvPacket();
+
+ void readHandler(std::error_code ec);
+
+ asio::io_service &io_service_;
+ std::shared_ptr<asio::io_service::strand> strand_;
+ std::shared_ptr<asio::ip::udp::socket> socket_;
+ asio::ip::udp::endpoint local_endpoint_;
+ asio::ip::udp::endpoint remote_endpoint_;
+ std::unordered_map<Connector::Id, std::shared_ptr<Connector>> connectors_;
+
+ PacketReceivedCallback receive_callback_;
+
+#ifdef LINUX
+ struct iovec iovecs_[Connector::max_burst][8];
+ struct mmsghdr msgs_[Connector::max_burst];
+ struct sockaddr_storage remote_endpoints_[Connector::max_burst];
+ std::uint8_t current_position_;
+#else
+ std::pair<uint8_t *, std::size_t> read_msg_;
+#endif
+};
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/loopback/CMakeLists.txt b/libtransport/src/io_modules/loopback/CMakeLists.txt
new file mode 100644
index 000000000..ac6dc8068
--- /dev/null
+++ b/libtransport/src/io_modules/loopback/CMakeLists.txt
@@ -0,0 +1,34 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+
+list(APPEND MODULE_HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/loopback_module.h
+)
+
+list(APPEND MODULE_SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/loopback_module.cc
+)
+
+build_module(loopback_module
+ SHARED
+ SOURCES ${MODULE_SOURCE_FILES}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT lib${LIBTRANSPORT}
+ INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}
+ # LIBRARY_ROOT_DIR "vpp_plugins"
+ DEFINITIONS ${COMPILER_DEFINITIONS}
+ COMPILE_OPTIONS ${COMPILE_FLAGS}
+)
diff --git a/libtransport/src/io_modules/loopback/local_face.cc b/libtransport/src/io_modules/loopback/local_face.cc
new file mode 100644
index 000000000..a59dab235
--- /dev/null
+++ b/libtransport/src/io_modules/loopback/local_face.cc
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/utils/log.h>
+#include <io_modules/loopback/local_face.h>
+
+#include <asio/io_service.hpp>
+
+namespace transport {
+namespace core {
+
+Face::Face(Connector::PacketReceivedCallback &&receive_callback,
+ asio::io_service &io_service, const std::string &app_name)
+ : receive_callback_(std::move(receive_callback)),
+ io_service_(io_service),
+ name_(app_name) {}
+
+Face::Face(const Face &other)
+ : receive_callback_(other.receive_callback_),
+ io_service_(other.io_service_),
+ name_(other.name_) {}
+
+Face::Face(Face &&other)
+ : receive_callback_(std::move(other.receive_callback_)),
+ io_service_(other.io_service_),
+ name_(std::move(other.name_)) {}
+
+Face &Face::operator=(const Face &other) {
+ receive_callback_ = other.receive_callback_;
+ io_service_ = other.io_service_;
+ name_ = other.name_;
+
+ return *this;
+}
+
+Face &Face::operator=(Face &&other) {
+ receive_callback_ = std::move(other.receive_callback_);
+ io_service_ = std::move(other.io_service_);
+ name_ = std::move(other.name_);
+
+ return *this;
+}
+
+void Face::onPacket(const Packet &packet) {
+ TRANSPORT_LOGD("Sending content to local socket.");
+
+ if (Packet::isInterest(packet.data())) {
+ rescheduleOnIoService<Interest>(packet);
+ } else {
+ rescheduleOnIoService<ContentObject>(packet);
+ }
+}
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/loopback/local_face.h b/libtransport/src/io_modules/loopback/local_face.h
new file mode 100644
index 000000000..1cbcc2c72
--- /dev/null
+++ b/libtransport/src/io_modules/loopback/local_face.h
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/utils/move_wrapper.h>
+
+#include <asio/io_service.hpp>
+
+namespace transport {
+namespace core {
+
+class Face {
+ public:
+ Face(Connector::PacketReceivedCallback &&receive_callback,
+ asio::io_service &io_service, const std::string &app_name);
+
+ Face(const Face &other);
+ Face(Face &&other);
+ void onPacket(const Packet &packet);
+ Face &operator=(Face &&other);
+ Face &operator=(const Face &other);
+
+ private:
+ template <typename T>
+ void rescheduleOnIoService(const Packet &packet) {
+ auto p = core::PacketManager<T>::getInstance().getPacket();
+ p->replace(packet.data(), packet.length());
+ io_service_.get().post([this, p]() mutable {
+ receive_callback_(nullptr, *p, make_error_code(0));
+ });
+ }
+
+ Connector::PacketReceivedCallback receive_callback_;
+ std::reference_wrapper<asio::io_service> io_service_;
+ std::string name_;
+};
+
+} // namespace core
+} // namespace transport
diff --git a/libtransport/src/io_modules/loopback/loopback_module.cc b/libtransport/src/io_modules/loopback/loopback_module.cc
new file mode 100644
index 000000000..0bdbf8c8e
--- /dev/null
+++ b/libtransport/src/io_modules/loopback/loopback_module.cc
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/errors/not_implemented_exception.h>
+#include <hicn/transport/utils/log.h>
+#include <io_modules/loopback/loopback_module.h>
+
+namespace transport {
+
+namespace core {
+
+std::vector<std::unique_ptr<LocalConnector>> LoopbackModule::local_faces_;
+std::atomic<uint32_t> LoopbackModule::global_counter_(0);
+
+LoopbackModule::LoopbackModule() : IoModule(), local_id_(~0) {}
+
+LoopbackModule::~LoopbackModule() {}
+
+void LoopbackModule::connect(bool is_consumer) {}
+
+bool LoopbackModule::isConnected() { return true; }
+
+void LoopbackModule::send(Packet &packet) {
+ IoModule::send(packet);
+
+ TRANSPORT_LOGD("LoopbackModule: sending from %u to %d", local_id_,
+ 1 - local_id_);
+
+ local_faces_.at(1 - local_id_)->send(packet);
+}
+
+void LoopbackModule::send(const uint8_t *packet, std::size_t len) {
+ // not supported
+ throw errors::NotImplementedException();
+}
+
+void LoopbackModule::registerRoute(const Prefix &prefix) {
+ // For the moment we route packets from one socket to the other.
+ // Next step will be to introduce a FIB
+ return;
+}
+
+void LoopbackModule::closeConnection() {
+ local_faces_.erase(local_faces_.begin() + local_id_);
+}
+
+void LoopbackModule::init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name) {
+ if (local_id_ == uint32_t(~0) && global_counter_ < 2) {
+ local_id_ = global_counter_++;
+ local_faces_.emplace(
+ local_faces_.begin() + local_id_,
+ new LocalConnector(io_service, std::move(receive_callback), nullptr,
+ nullptr, std::move(reconnect_callback)));
+ }
+}
+
+void LoopbackModule::processControlMessageReply(utils::MemBuf &packet_buffer) {
+ return;
+}
+
+std::uint32_t LoopbackModule::getMtu() { return interface_mtu; }
+
+bool LoopbackModule::isControlMessage(const uint8_t *message) { return false; }
+
+extern "C" IoModule *create_module(void) { return new LoopbackModule(); }
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/loopback/loopback_module.h b/libtransport/src/io_modules/loopback/loopback_module.h
new file mode 100644
index 000000000..219fa8841
--- /dev/null
+++ b/libtransport/src/io_modules/loopback/loopback_module.h
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <core/local_connector.h>
+#include <hicn/transport/core/io_module.h>
+#include <hicn/transport/core/prefix.h>
+
+#include <atomic>
+
+namespace transport {
+
+namespace core {
+
+class LoopbackModule : public IoModule {
+ static constexpr std::uint16_t interface_mtu = 1500;
+
+ public:
+ LoopbackModule();
+
+ ~LoopbackModule();
+
+ void connect(bool is_consumer) override;
+
+ void send(Packet &packet) override;
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ bool isConnected() override;
+
+ void init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name = "Libtransport") override;
+
+ void registerRoute(const Prefix &prefix) override;
+
+ std::uint32_t getMtu() override;
+
+ bool isControlMessage(const uint8_t *message) override;
+
+ void processControlMessageReply(utils::MemBuf &packet_buffer) override;
+
+ void closeConnection() override;
+
+ private:
+ static std::vector<std::unique_ptr<LocalConnector>> local_faces_;
+ static std::atomic<uint32_t> global_counter_;
+
+ private:
+ uint32_t local_id_;
+};
+
+extern "C" IoModule *create_module(void);
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/memif/CMakeLists.txt b/libtransport/src/io_modules/memif/CMakeLists.txt
new file mode 100644
index 000000000..c8a930e7b
--- /dev/null
+++ b/libtransport/src/io_modules/memif/CMakeLists.txt
@@ -0,0 +1,56 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+find_package(Vpp REQUIRED)
+find_package(Libmemif REQUIRED)
+
+if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR)
+ find_package(HicnPlugin REQUIRED)
+ find_package(SafeVapi REQUIRED)
+else()
+ list(APPEND DEPENDENCIES
+ ${SAFE_VAPI_SHARED}
+ )
+endif()
+
+list(APPEND MODULE_HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/hicn_vapi.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_vapi.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_module.h
+)
+
+list(APPEND MODULE_SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/hicn_vapi.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_vapi.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_module.cc
+)
+
+build_module(memif_module
+ SHARED
+ SOURCES ${MODULE_SOURCE_FILES}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT lib${LIBTRANSPORT}
+ LINK_LIBRARIES ${LIBMEMIF_LIBRARIES} ${SAFE_VAPI_LIBRARIES}
+ INCLUDE_DIRS
+ ${LIBTRANSPORT_INCLUDE_DIRS}
+ ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}
+ ${VPP_INCLUDE_DIRS}
+ ${LIBMEMIF_INCLUDE_DIRS}
+ ${SAFE_VAPI_INCLUDE_DIRS}
+ DEFINITIONS ${COMPILER_DEFINITIONS}
+ COMPILE_OPTIONS ${COMPILE_FLAGS}
+)
diff --git a/libtransport/src/io_modules/memif/hicn_vapi.c b/libtransport/src/io_modules/memif/hicn_vapi.c
new file mode 100644
index 000000000..b83a36b47
--- /dev/null
+++ b/libtransport/src/io_modules/memif/hicn_vapi.c
@@ -0,0 +1,229 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/utils/log.h>
+#include <io_modules/memif/hicn_vapi.h>
+
+#define HICN_VPP_PLUGIN
+#include <hicn/name.h>
+#undef HICN_VPP_PLUGIN
+
+#include <vapi/hicn.api.vapi.h>
+#include <vapi/ip.api.vapi.h>
+#include <vapi/vapi_safe.h>
+#include <vlib/vlib.h>
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+#include <vnet/ip/format.h>
+#include <vnet/ip/ip4_packet.h>
+#include <vnet/ip/ip6_packet.h>
+#include <vpp_plugins/hicn/error.h>
+#include <vppinfra/error.h>
+
+/////////////////////////////////////////////////////
+const char *HICN_ERROR_STRING[] = {
+#define _(a, b, c) c,
+ foreach_hicn_error
+#undef _
+};
+/////////////////////////////////////////////////////
+
+/*********************** Missing Symbol in vpp libraries
+ * *************************/
+u8 *format_vl_api_address_union(u8 *s, va_list *args) { return NULL; }
+
+/*********************************************************************************/
+
+DEFINE_VAPI_MSG_IDS_HICN_API_JSON
+DEFINE_VAPI_MSG_IDS_IP_API_JSON
+
+static vapi_error_e register_prod_app_cb(
+ vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+ vapi_payload_hicn_api_register_prod_app_reply *reply) {
+ hicn_producer_output_params *output_params =
+ (hicn_producer_output_params *)callback_ctx;
+
+ if (reply == NULL) return rv;
+
+ output_params->cs_reserved = reply->cs_reserved;
+ output_params->prod_addr = (ip_address_t *)malloc(sizeof(ip_address_t));
+ memset(output_params->prod_addr, 0, sizeof(ip_address_t));
+ if (reply->prod_addr.af == ADDRESS_IP6)
+ memcpy(&output_params->prod_addr->v6, reply->prod_addr.un.ip6,
+ sizeof(ip6_address_t));
+ else
+ memcpy(&output_params->prod_addr->v4, reply->prod_addr.un.ip4,
+ sizeof(ip4_address_t));
+ output_params->face_id = reply->faceid;
+
+ return reply->retval;
+}
+
+int hicn_vapi_register_prod_app(vapi_ctx_t ctx,
+ hicn_producer_input_params *input_params,
+ hicn_producer_output_params *output_params) {
+ vapi_lock();
+ vapi_msg_hicn_api_register_prod_app *msg =
+ vapi_alloc_hicn_api_register_prod_app(ctx);
+
+ if (ip46_address_is_ip4((ip46_address_t *)&input_params->prefix->address)) {
+ memcpy(&msg->payload.prefix.address.un.ip4, &input_params->prefix->address,
+ sizeof(ip4_address_t));
+ msg->payload.prefix.address.af = ADDRESS_IP4;
+ } else {
+ memcpy(&msg->payload.prefix.address.un.ip6, &input_params->prefix->address,
+ sizeof(ip6_address_t));
+ msg->payload.prefix.address.af = ADDRESS_IP6;
+ }
+ msg->payload.prefix.len = input_params->prefix->len;
+
+ msg->payload.swif = input_params->swif;
+ msg->payload.cs_reserved = input_params->cs_reserved;
+
+ int ret = vapi_hicn_api_register_prod_app(ctx, msg, register_prod_app_cb,
+ output_params);
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e face_prod_del_cb(
+ vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+ vapi_payload_hicn_api_face_prod_del_reply *reply) {
+ if (reply == NULL) return rv;
+
+ return reply->retval;
+}
+
+int hicn_vapi_face_prod_del(vapi_ctx_t ctx,
+ hicn_del_face_app_input_params *input_params) {
+ vapi_lock();
+ vapi_msg_hicn_api_face_prod_del *msg = vapi_alloc_hicn_api_face_prod_del(ctx);
+
+ msg->payload.faceid = input_params->face_id;
+
+ int ret = vapi_hicn_api_face_prod_del(ctx, msg, face_prod_del_cb, NULL);
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e register_cons_app_cb(
+ vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+ vapi_payload_hicn_api_register_cons_app_reply *reply) {
+ hicn_consumer_output_params *output_params =
+ (hicn_consumer_output_params *)callback_ctx;
+
+ if (reply == NULL) return rv;
+
+ output_params->src6 = (ip_address_t *)malloc(sizeof(ip_address_t));
+ output_params->src4 = (ip_address_t *)malloc(sizeof(ip_address_t));
+ memset(output_params->src6, 0, sizeof(ip_address_t));
+ memset(output_params->src4, 0, sizeof(ip_address_t));
+ memcpy(&output_params->src6->v6, &reply->src_addr6.un.ip6,
+ sizeof(ip6_address_t));
+ memcpy(&output_params->src4->v4, &reply->src_addr4.un.ip4,
+ sizeof(ip4_address_t));
+
+ output_params->face_id1 = reply->faceid1;
+ output_params->face_id2 = reply->faceid2;
+
+ return reply->retval;
+}
+
+int hicn_vapi_register_cons_app(vapi_ctx_t ctx,
+ hicn_consumer_input_params *input_params,
+ hicn_consumer_output_params *output_params) {
+ vapi_lock();
+ vapi_msg_hicn_api_register_cons_app *msg =
+ vapi_alloc_hicn_api_register_cons_app(ctx);
+
+ msg->payload.swif = input_params->swif;
+
+ int ret = vapi_hicn_api_register_cons_app(ctx, msg, register_cons_app_cb,
+ output_params);
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e face_cons_del_cb(
+ vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+ vapi_payload_hicn_api_face_cons_del_reply *reply) {
+ if (reply == NULL) return rv;
+
+ return reply->retval;
+}
+
+int hicn_vapi_face_cons_del(vapi_ctx_t ctx,
+ hicn_del_face_app_input_params *input_params) {
+ vapi_lock();
+ vapi_msg_hicn_api_face_cons_del *msg = vapi_alloc_hicn_api_face_cons_del(ctx);
+
+ msg->payload.faceid = input_params->face_id;
+
+ int ret = vapi_hicn_api_face_cons_del(ctx, msg, face_cons_del_cb, NULL);
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e reigster_route_cb(
+ vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+ vapi_payload_ip_route_add_del_reply *reply) {
+ if (reply == NULL) return rv;
+
+ return reply->retval;
+}
+
+int hicn_vapi_register_route(vapi_ctx_t ctx,
+ hicn_producer_set_route_params *input_params) {
+ vapi_lock();
+ vapi_msg_ip_route_add_del *msg = vapi_alloc_ip_route_add_del(ctx, 1);
+
+ msg->payload.is_add = 1;
+ if (ip46_address_is_ip4((ip46_address_t *)(input_params->prod_addr))) {
+ memcpy(&msg->payload.route.prefix.address.un.ip4,
+ &input_params->prefix->address.v4, sizeof(ip4_address_t));
+ msg->payload.route.prefix.address.af = ADDRESS_IP4;
+ msg->payload.route.prefix.len = input_params->prefix->len;
+ } else {
+ memcpy(&msg->payload.route.prefix.address.un.ip6,
+ &input_params->prefix->address.v6, sizeof(ip6_address_t));
+ msg->payload.route.prefix.address.af = ADDRESS_IP6;
+ msg->payload.route.prefix.len = input_params->prefix->len;
+ }
+
+ msg->payload.route.paths[0].sw_if_index = ~0;
+ msg->payload.route.paths[0].table_id = 0;
+ if (ip46_address_is_ip4((ip46_address_t *)(input_params->prod_addr))) {
+ memcpy(&(msg->payload.route.paths[0].nh.address.ip4),
+ input_params->prod_addr->v4.as_u8, sizeof(ip4_address_t));
+ msg->payload.route.paths[0].proto = FIB_API_PATH_NH_PROTO_IP4;
+ } else {
+ memcpy(&(msg->payload.route.paths[0].nh.address.ip6),
+ input_params->prod_addr->v6.as_u8, sizeof(ip6_address_t));
+ msg->payload.route.paths[0].proto = FIB_API_PATH_NH_PROTO_IP6;
+ }
+
+ msg->payload.route.paths[0].type = FIB_API_PATH_FLAG_NONE;
+ msg->payload.route.paths[0].flags = FIB_API_PATH_FLAG_NONE;
+
+ int ret = vapi_ip_route_add_del(ctx, msg, reigster_route_cb, NULL);
+
+ vapi_unlock();
+ return ret;
+}
+
+char *hicn_vapi_get_error_string(int ret_val) {
+ return get_error_string(ret_val);
+}
diff --git a/libtransport/src/io_modules/memif/hicn_vapi.h b/libtransport/src/io_modules/memif/hicn_vapi.h
new file mode 100644
index 000000000..e94c97749
--- /dev/null
+++ b/libtransport/src/io_modules/memif/hicn_vapi.h
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+#include <hicn/util/ip_address.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <vapi/vapi.h>
+
+#include "stdint.h"
+
+typedef struct {
+ ip_prefix_t* prefix;
+ uint32_t swif;
+ uint32_t cs_reserved;
+} hicn_producer_input_params;
+
+typedef struct {
+ uint32_t swif;
+} hicn_consumer_input_params;
+
+typedef struct {
+ uint32_t face_id;
+} hicn_del_face_app_input_params;
+
+typedef struct {
+ uint32_t cs_reserved;
+ ip_address_t* prod_addr;
+ uint32_t face_id;
+} hicn_producer_output_params;
+
+typedef struct {
+ ip_address_t* src4;
+ ip_address_t* src6;
+ uint32_t face_id1;
+ uint32_t face_id2;
+} hicn_consumer_output_params;
+
+typedef struct {
+ ip_prefix_t* prefix;
+ ip_address_t* prod_addr;
+} hicn_producer_set_route_params;
+
+int hicn_vapi_register_prod_app(vapi_ctx_t ctx,
+ hicn_producer_input_params* input_params,
+ hicn_producer_output_params* output_params);
+
+int hicn_vapi_register_cons_app(vapi_ctx_t ctx,
+ hicn_consumer_input_params* input_params,
+ hicn_consumer_output_params* output_params);
+
+int hicn_vapi_register_route(vapi_ctx_t ctx,
+ hicn_producer_set_route_params* input_params);
+
+int hicn_vapi_face_cons_del(vapi_ctx_t ctx,
+ hicn_del_face_app_input_params* input_params);
+
+int hicn_vapi_face_prod_del(vapi_ctx_t ctx,
+ hicn_del_face_app_input_params* input_params);
+
+char* hicn_vapi_get_error_string(int ret_val);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/libtransport/src/io_modules/memif/memif_connector.cc b/libtransport/src/io_modules/memif/memif_connector.cc
new file mode 100644
index 000000000..4a688d68f
--- /dev/null
+++ b/libtransport/src/io_modules/memif/memif_connector.cc
@@ -0,0 +1,493 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/errors/not_implemented_exception.h>
+#include <io_modules/memif/memif_connector.h>
+#include <sys/epoll.h>
+
+#include <cstdlib>
+
+extern "C" {
+#include <memif/libmemif.h>
+};
+
+#define CANCEL_TIMER 1
+
+namespace transport {
+
+namespace core {
+
+struct memif_connection {
+ uint16_t index;
+ /* memif conenction handle */
+ memif_conn_handle_t conn;
+ /* transmit queue id */
+ uint16_t tx_qid;
+ /* tx buffers */
+ memif_buffer_t *tx_bufs;
+ /* allocated tx buffers counter */
+ /* number of tx buffers pointing to shared memory */
+ uint16_t tx_buf_num;
+ /* rx buffers */
+ memif_buffer_t *rx_bufs;
+ /* allcoated rx buffers counter */
+ /* number of rx buffers pointing to shared memory */
+ uint16_t rx_buf_num;
+ /* interface ip address */
+ uint8_t ip_addr[4];
+};
+
+std::once_flag MemifConnector::flag_;
+utils::EpollEventReactor MemifConnector::main_event_reactor_;
+
+MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
+ PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback,
+ OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service,
+ std::string app_name)
+ : Connector(std::move(receive_callback), std::move(packet_sent),
+ std::move(close_callback), std::move(on_reconnect)),
+ memif_worker_(nullptr),
+ timer_set_(false),
+ send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
+ disconnect_timer_(
+ std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
+ io_service_(io_service),
+ memif_connection_(std::make_unique<memif_connection_t>()),
+ tx_buf_counter_(0),
+ is_reconnection_(false),
+ data_available_(false),
+ app_name_(app_name),
+ socket_filename_("") {
+ std::call_once(MemifConnector::flag_, &MemifConnector::init, this);
+}
+
+MemifConnector::~MemifConnector() { close(); }
+
+void MemifConnector::init() {
+ /* initialize memory interface */
+ int err = memif_init(controlFdUpdate, const_cast<char *>(app_name_.c_str()),
+ nullptr, nullptr, nullptr);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_init: %s", memif_strerror(err));
+ }
+}
+
+void MemifConnector::connect(uint32_t memif_id, long memif_mode) {
+ state_ = State::CONNECTING;
+
+ memif_id_ = memif_id;
+ socket_filename_ = "/run/vpp/memif.sock";
+
+ createMemif(memif_id, memif_mode, nullptr);
+
+ work_ = std::make_unique<asio::io_service::work>(io_service_);
+
+ while (state_ != State::CONNECTED) {
+ MemifConnector::main_event_reactor_.runOneEvent();
+ }
+
+ int err;
+
+ /* get interrupt queue id */
+ int fd = -1;
+ err = memif_get_queue_efd(memif_connection_->conn, 0, &fd);
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_get_queue_efd: %s", memif_strerror(err));
+ return;
+ }
+
+ // Remove fd from main epoll
+ main_event_reactor_.delFileDescriptor(fd);
+
+ // Add fd to epoll of instance
+ event_reactor_.addFileDescriptor(
+ fd, EPOLLIN, [this](const utils::Event &evt) -> int {
+ return onInterrupt(memif_connection_->conn, this, 0);
+ });
+
+ memif_worker_ = std::make_unique<std::thread>(
+ std::bind(&MemifConnector::threadMain, this));
+}
+
+int MemifConnector::createMemif(uint32_t index, uint8_t mode, char *s) {
+ memif_connection_t *c = memif_connection_.get();
+
+ /* setting memif connection arguments */
+ memif_conn_args_t args;
+ memset(&args, 0, sizeof(args));
+
+ args.is_master = mode;
+ args.log2_ring_size = MEMIF_LOG2_RING_SIZE;
+ args.buffer_size = MEMIF_BUF_SIZE;
+ args.num_s2m_rings = 1;
+ args.num_m2s_rings = 1;
+ strncpy((char *)args.interface_name, IF_NAME, strlen(IF_NAME) + 1);
+ args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP;
+
+ int err;
+
+ err = memif_create_socket(&args.socket, socket_filename_.c_str(), nullptr);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ throw errors::RuntimeException(memif_strerror(err));
+ }
+
+ args.interface_id = index;
+ /* last argument for memif_create (void * private_ctx) is used by user
+ to identify connection. this context is returned with callbacks */
+
+ /* default interrupt */
+ if (s == nullptr) {
+ err = memif_create(&c->conn, &args, onConnect, onDisconnect, onInterrupt,
+ this);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ throw errors::RuntimeException(memif_strerror(err));
+ }
+ }
+
+ c->index = (uint16_t)index;
+ c->tx_qid = 0;
+ /* alloc memif buffers */
+ c->rx_buf_num = 0;
+ c->rx_bufs = static_cast<memif_buffer_t *>(
+ malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS));
+ c->tx_buf_num = 0;
+ c->tx_bufs = static_cast<memif_buffer_t *>(
+ malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS));
+
+ // memif_set_rx_mode (c->conn, MEMIF_RX_MODE_POLLING, 0);
+
+ return 0;
+}
+
+int MemifConnector::deleteMemif() {
+ memif_connection_t *c = memif_connection_.get();
+
+ if (c->rx_bufs) {
+ free(c->rx_bufs);
+ }
+
+ c->rx_bufs = nullptr;
+ c->rx_buf_num = 0;
+
+ if (c->tx_bufs) {
+ free(c->tx_bufs);
+ }
+
+ c->tx_bufs = nullptr;
+ c->tx_buf_num = 0;
+
+ int err;
+ /* disconenct then delete memif connection */
+ err = memif_delete(&c->conn);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_delete: %s", memif_strerror(err));
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(c->conn != nullptr)) {
+ TRANSPORT_LOGE("memif delete fail");
+ }
+
+ return 0;
+}
+
+int MemifConnector::controlFdUpdate(int fd, uint8_t events, void *private_ctx) {
+ /* convert memif event definitions to epoll events */
+ if (events & MEMIF_FD_EVENT_DEL) {
+ return MemifConnector::main_event_reactor_.delFileDescriptor(fd);
+ }
+
+ uint32_t evt = 0;
+
+ if (events & MEMIF_FD_EVENT_READ) {
+ evt |= EPOLLIN;
+ }
+
+ if (events & MEMIF_FD_EVENT_WRITE) {
+ evt |= EPOLLOUT;
+ }
+
+ if (events & MEMIF_FD_EVENT_MOD) {
+ return MemifConnector::main_event_reactor_.modFileDescriptor(fd, evt);
+ }
+
+ return MemifConnector::main_event_reactor_.addFileDescriptor(
+ fd, evt, [](const utils::Event &evt) -> int {
+ uint32_t event = 0;
+ int memif_err = 0;
+
+ if (evt.events & EPOLLIN) {
+ event |= MEMIF_FD_EVENT_READ;
+ }
+
+ if (evt.events & EPOLLOUT) {
+ event |= MEMIF_FD_EVENT_WRITE;
+ }
+
+ if (evt.events & EPOLLERR) {
+ event |= MEMIF_FD_EVENT_ERROR;
+ }
+
+ memif_err = memif_control_fd_handler(evt.data.fd, event);
+
+ if (TRANSPORT_EXPECT_FALSE(memif_err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_control_fd_handler: %s",
+ memif_strerror(memif_err));
+ }
+
+ return 0;
+ });
+}
+
+int MemifConnector::bufferAlloc(long n, uint16_t qid) {
+ memif_connection_t *c = memif_connection_.get();
+ int err;
+ uint16_t r;
+ /* set data pointer to shared memory and set buffer_len to shared mmeory
+ * buffer len */
+ err = memif_buffer_alloc(c->conn, qid, c->tx_bufs, n, &r, 2000);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_buffer_alloc: %s", memif_strerror(err));
+ return -1;
+ }
+
+ c->tx_buf_num += r;
+ return r;
+}
+
+int MemifConnector::txBurst(uint16_t qid) {
+ memif_connection_t *c = memif_connection_.get();
+ int err;
+ uint16_t r;
+ /* inform peer memif interface about data in shared memory buffers */
+ /* mark memif buffers as free */
+ err = memif_tx_burst(c->conn, qid, c->tx_bufs, c->tx_buf_num, &r);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_tx_burst: %s", memif_strerror(err));
+ }
+
+ // err = memif_refill_queue(c->conn, qid, r, 0);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_tx_burst: %s", memif_strerror(err));
+ c->tx_buf_num -= r;
+ return -1;
+ }
+
+ c->tx_buf_num -= r;
+ return 0;
+}
+
+void MemifConnector::sendCallback(const std::error_code &ec) {
+ timer_set_ = false;
+
+ if (TRANSPORT_EXPECT_TRUE(!ec && state_ == State::CONNECTED)) {
+ doSend();
+ }
+}
+
+void MemifConnector::processInputBuffer(std::uint16_t total_packets) {
+ utils::MemBuf::Ptr ptr;
+
+ for (; total_packets > 0; total_packets--) {
+ if (input_buffer_.pop(ptr)) {
+ receive_callback_(this, *ptr, std::make_error_code(std::errc(0)));
+ }
+ }
+}
+
+/* informs user about connected status. private_ctx is used by user to identify
+ connection (multiple connections WIP) */
+int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) {
+ MemifConnector *connector = (MemifConnector *)private_ctx;
+ connector->state_ = State::CONNECTED;
+ memif_refill_queue(conn, 0, -1, 0);
+
+ return 0;
+}
+
+/* informs user about disconnected status. private_ctx is used by user to
+ identify connection (multiple connections WIP) */
+int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) {
+ MemifConnector *connector = (MemifConnector *)private_ctx;
+ connector->state_ = State::CLOSED;
+ return 0;
+}
+
+void MemifConnector::threadMain() { event_reactor_.runEventLoop(1000); }
+
+int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
+ uint16_t qid) {
+ MemifConnector *connector = (MemifConnector *)private_ctx;
+
+ memif_connection_t *c = connector->memif_connection_.get();
+ int err = MEMIF_ERR_SUCCESS, ret_val;
+ uint16_t total_packets = 0;
+ uint16_t rx;
+
+ do {
+ err = memif_rx_burst(conn, qid, c->rx_bufs, MAX_MEMIF_BUFS, &rx);
+ ret_val = err;
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS &&
+ err != MEMIF_ERR_NOBUF)) {
+ TRANSPORT_LOGE("memif_rx_burst: %s", memif_strerror(err));
+ goto error;
+ }
+
+ c->rx_buf_num += rx;
+
+ if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) {
+ TRANSPORT_LOGE("socket stopped: ignoring %u packets", rx);
+ goto error;
+ }
+
+ std::size_t packet_length;
+ for (int i = 0; i < rx; i++) {
+ auto buffer = connector->getRawBuffer();
+ packet_length = (c->rx_bufs + i)->len;
+ std::memcpy(buffer.first, (c->rx_bufs + i)->data, packet_length);
+ auto packet = connector->getPacketFromBuffer(buffer.first, packet_length);
+
+ if (!connector->input_buffer_.push(std::move(packet))) {
+ TRANSPORT_LOGE("Error pushing packet. Ring buffer full.");
+
+ // TODO Here we should consider the possibility to signal the congestion
+ // to the application, that would react properly (e.g. slow down
+ // message)
+ }
+ }
+
+ /* mark memif buffers and shared memory buffers as free */
+ /* free processed buffers */
+
+ err = memif_refill_queue(conn, qid, rx, 0);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_buffer_free: %s", memif_strerror(err));
+ }
+
+ c->rx_buf_num -= rx;
+ total_packets += rx;
+
+ } while (ret_val == MEMIF_ERR_NOBUF);
+
+ connector->io_service_.post(
+ std::bind(&MemifConnector::processInputBuffer, connector, total_packets));
+
+ return 0;
+
+error:
+ err = memif_refill_queue(c->conn, qid, rx, 0);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ TRANSPORT_LOGE("memif_buffer_free: %s", memif_strerror(err));
+ }
+ c->rx_buf_num -= rx;
+
+ return 0;
+}
+
+void MemifConnector::close() {
+ if (state_ != State::CLOSED) {
+ disconnect_timer_->expiresFromNow(std::chrono::microseconds(50));
+ disconnect_timer_->asyncWait([this](const std::error_code &ec) {
+ deleteMemif();
+ event_reactor_.stop();
+ work_.reset();
+ });
+
+ if (memif_worker_ && memif_worker_->joinable()) {
+ memif_worker_->join();
+ }
+ }
+}
+
+void MemifConnector::send(Packet &packet) {
+ {
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+ output_buffer_.push_back(packet.shared_from_this());
+ }
+#if CANCEL_TIMER
+ if (!timer_set_) {
+ timer_set_ = true;
+ send_timer_->expiresFromNow(std::chrono::microseconds(50));
+ send_timer_->asyncWait(
+ std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1));
+ }
+#endif
+}
+
+int MemifConnector::doSend() {
+ std::size_t max = 0;
+ int32_t n = 0;
+ std::size_t size = 0;
+
+ {
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+ size = output_buffer_.size();
+ }
+
+ do {
+ max = size < MAX_MEMIF_BUFS ? size : MAX_MEMIF_BUFS;
+ n = bufferAlloc(max, memif_connection_->tx_qid);
+
+ if (TRANSPORT_EXPECT_FALSE(n < 0)) {
+ TRANSPORT_LOGE("Error allocating buffers.");
+ return -1;
+ }
+
+ for (uint16_t i = 0; i < n; i++) {
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+
+ auto packet = output_buffer_.front().get();
+ const utils::MemBuf *current = packet;
+ std::size_t offset = 0;
+ uint8_t *shared_buffer =
+ reinterpret_cast<uint8_t *>(memif_connection_->tx_bufs[i].data);
+ do {
+ std::memcpy(shared_buffer + offset, current->data(), current->length());
+ offset += current->length();
+ current = current->next();
+ } while (current != packet);
+
+ memif_connection_->tx_bufs[i].len = uint32_t(offset);
+
+ output_buffer_.pop_front();
+ }
+
+ txBurst(memif_connection_->tx_qid);
+
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+ size = output_buffer_.size();
+ } while (size > 0);
+
+ return 0;
+}
+
+void MemifConnector::send(const uint8_t *packet, std::size_t len) {
+ throw errors::NotImplementedException();
+}
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/io_modules/memif/memif_connector.h b/libtransport/src/io_modules/memif/memif_connector.h
new file mode 100644
index 000000000..bed3516dc
--- /dev/null
+++ b/libtransport/src/io_modules/memif/memif_connector.h
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/portability.h>
+#include <hicn/transport/utils/ring_buffer.h>
+//#include <hicn/transport/core/hicn_vapi.h>
+#include <utils/epoll_event_reactor.h>
+#include <utils/fd_deadline_timer.h>
+
+#include <asio.hpp>
+#include <deque>
+#include <mutex>
+#include <thread>
+
+#define _Static_assert static_assert
+
+namespace transport {
+
+namespace core {
+
+typedef struct memif_connection memif_connection_t;
+
+#define APP_NAME "libtransport"
+#define IF_NAME "vpp_connection"
+
+#define MEMIF_BUF_SIZE 2048
+#define MEMIF_LOG2_RING_SIZE 13
+#define MAX_MEMIF_BUFS (1 << MEMIF_LOG2_RING_SIZE)
+
+class MemifConnector : public Connector {
+ using memif_conn_handle_t = void *;
+ using PacketRing = utils::CircularFifo<utils::MemBuf::Ptr, queue_size>;
+
+ public:
+ MemifConnector(PacketReceivedCallback &&receive_callback,
+ PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback,
+ OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
+
+ ~MemifConnector() override;
+
+ void send(Packet &packet) override;
+
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ void close() override;
+
+ void connect(uint32_t memif_id, long memif_mode);
+
+ TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; };
+
+ private:
+ void init();
+
+ int doSend();
+
+ int createMemif(uint32_t index, uint8_t mode, char *s);
+
+ uint32_t getMemifConfiguration();
+
+ int deleteMemif();
+
+ static int controlFdUpdate(int fd, uint8_t events, void *private_ctx);
+
+ static int onConnect(memif_conn_handle_t conn, void *private_ctx);
+
+ static int onDisconnect(memif_conn_handle_t conn, void *private_ctx);
+
+ static int onInterrupt(memif_conn_handle_t conn, void *private_ctx,
+ uint16_t qid);
+
+ void threadMain();
+
+ int txBurst(uint16_t qid);
+
+ int bufferAlloc(long n, uint16_t qid);
+
+ void sendCallback(const std::error_code &ec);
+
+ void processInputBuffer(std::uint16_t total_packets);
+
+ private:
+ static utils::EpollEventReactor main_event_reactor_;
+ static std::unique_ptr<std::thread> main_worker_;
+
+ int epfd;
+ std::unique_ptr<std::thread> memif_worker_;
+ utils::EpollEventReactor event_reactor_;
+ std::atomic_bool timer_set_;
+ std::unique_ptr<utils::FdDeadlineTimer> send_timer_;
+ std::unique_ptr<utils::FdDeadlineTimer> disconnect_timer_;
+ asio::io_service &io_service_;
+ std::unique_ptr<asio::io_service::work> work_;
+ std::unique_ptr<memif_connection_t> memif_connection_;
+ uint16_t tx_buf_counter_;
+
+ PacketRing input_buffer_;
+ bool is_reconnection_;
+ bool data_available_;
+ uint32_t memif_id_;
+ uint8_t memif_mode_;
+ std::string app_name_;
+ uint16_t transmission_index_;
+ utils::SpinLock write_msgs_lock_;
+ std::string socket_filename_;
+
+ static std::once_flag flag_;
+};
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/io_modules/memif/memif_vapi.c b/libtransport/src/io_modules/memif/memif_vapi.c
new file mode 100644
index 000000000..b3da2b012
--- /dev/null
+++ b/libtransport/src/io_modules/memif/memif_vapi.c
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <fcntl.h>
+#include <hicn/transport/config.h>
+#include <inttypes.h>
+#include <io_modules/memif/memif_vapi.h>
+#include <semaphore.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <vapi/vapi_safe.h>
+#include <vppinfra/clib.h>
+
+DEFINE_VAPI_MSG_IDS_MEMIF_API_JSON
+
+static vapi_error_e memif_details_cb(vapi_ctx_t ctx, void *callback_ctx,
+ vapi_error_e rv, bool is_last,
+ vapi_payload_memif_details *reply) {
+ uint32_t *last_memif_id = (uint32_t *)callback_ctx;
+ uint32_t current_memif_id = 0;
+ if (reply != NULL) {
+ current_memif_id = reply->id;
+ } else {
+ return rv;
+ }
+
+ if (current_memif_id >= *last_memif_id) {
+ *last_memif_id = current_memif_id + 1;
+ }
+
+ return rv;
+}
+
+int memif_vapi_get_next_memif_id(vapi_ctx_t ctx, uint32_t *memif_id) {
+ vapi_lock();
+ vapi_msg_memif_dump *msg = vapi_alloc_memif_dump(ctx);
+ int ret = vapi_memif_dump(ctx, msg, memif_details_cb, memif_id);
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e memif_create_cb(vapi_ctx_t ctx, void *callback_ctx,
+ vapi_error_e rv, bool is_last,
+ vapi_payload_memif_create_reply *reply) {
+ memif_output_params_t *output_params = (memif_output_params_t *)callback_ctx;
+
+ if (reply == NULL) return rv;
+
+ output_params->sw_if_index = reply->sw_if_index;
+
+ return rv;
+}
+
+int memif_vapi_create_memif(vapi_ctx_t ctx, memif_create_params_t *input_params,
+ memif_output_params_t *output_params) {
+ vapi_lock();
+ vapi_msg_memif_create *msg = vapi_alloc_memif_create(ctx);
+
+ int ret = 0;
+ if (input_params->socket_id == ~0) {
+ // invalid socket-id
+ ret = -1;
+ goto END;
+ }
+
+ if (!is_pow2(input_params->ring_size)) {
+ // ring size must be power of 2
+ ret = -1;
+ goto END;
+ }
+
+ if (input_params->rx_queues > 255 || input_params->rx_queues < 1) {
+ // rx queue must be between 1 - 255
+ ret = -1;
+ goto END;
+ }
+
+ if (input_params->tx_queues > 255 || input_params->tx_queues < 1) {
+ // tx queue must be between 1 - 255
+ ret = -1;
+ goto END;
+ }
+
+ msg->payload.role = input_params->role;
+ msg->payload.mode = input_params->mode;
+ msg->payload.rx_queues = input_params->rx_queues;
+ msg->payload.tx_queues = input_params->tx_queues;
+ msg->payload.id = input_params->id;
+ msg->payload.socket_id = input_params->socket_id;
+ msg->payload.ring_size = input_params->ring_size;
+ msg->payload.buffer_size = input_params->buffer_size;
+
+ ret = vapi_memif_create(ctx, msg, memif_create_cb, output_params);
+END:
+ vapi_unlock();
+ return ret;
+}
+
+static vapi_error_e memif_delete_cb(vapi_ctx_t ctx, void *callback_ctx,
+ vapi_error_e rv, bool is_last,
+ vapi_payload_memif_delete_reply *reply) {
+ if (reply == NULL) return rv;
+
+ return reply->retval;
+}
+
+int memif_vapi_delete_memif(vapi_ctx_t ctx, uint32_t sw_if_index) {
+ vapi_lock();
+ vapi_msg_memif_delete *msg = vapi_alloc_memif_delete(ctx);
+
+ msg->payload.sw_if_index = sw_if_index;
+
+ int ret = vapi_memif_delete(ctx, msg, memif_delete_cb, NULL);
+ vapi_unlock();
+ return ret;
+}
diff --git a/libtransport/src/io_modules/memif/memif_vapi.h b/libtransport/src/io_modules/memif/memif_vapi.h
new file mode 100644
index 000000000..bcf06ed43
--- /dev/null
+++ b/libtransport/src/io_modules/memif/memif_vapi.h
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <vapi/memif.api.vapi.h>
+
+#include "stdint.h"
+
+typedef struct memif_create_params_s {
+ uint8_t role;
+ uint8_t mode;
+ uint8_t rx_queues;
+ uint8_t tx_queues;
+ uint32_t id;
+ uint32_t socket_id;
+ uint8_t secret[24];
+ uint32_t ring_size;
+ uint16_t buffer_size;
+ uint8_t hw_addr[6];
+} memif_create_params_t;
+
+typedef struct memif_output_params_s {
+ uint32_t sw_if_index;
+} memif_output_params_t;
+
+int memif_vapi_get_next_memif_id(vapi_ctx_t ctx, uint32_t *memif_id);
+
+int memif_vapi_create_memif(vapi_ctx_t ctx, memif_create_params_t *input_params,
+ memif_output_params_t *output_params);
+
+int memif_vapi_delete_memif(vapi_ctx_t ctx, uint32_t sw_if_index);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/libtransport/src/io_modules/memif/vpp_forwarder_module.cc b/libtransport/src/io_modules/memif/vpp_forwarder_module.cc
new file mode 100644
index 000000000..dcbcd7ed0
--- /dev/null
+++ b/libtransport/src/io_modules/memif/vpp_forwarder_module.cc
@@ -0,0 +1,263 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/errors/not_implemented_exception.h>
+#include <io_modules/memif/hicn_vapi.h>
+#include <io_modules/memif/memif_connector.h>
+#include <io_modules/memif/memif_vapi.h>
+#include <io_modules/memif/vpp_forwarder_module.h>
+
+extern "C" {
+#include <memif/libmemif.h>
+};
+
+typedef enum { MASTER = 0, SLAVE = 1 } memif_role_t;
+
+#define MEMIF_DEFAULT_RING_SIZE 2048
+#define MEMIF_DEFAULT_RX_QUEUES 1
+#define MEMIF_DEFAULT_TX_QUEUES 1
+#define MEMIF_DEFAULT_BUFFER_SIZE 2048
+
+namespace transport {
+
+namespace core {
+
+VPPForwarderModule::VPPForwarderModule()
+ : IoModule(),
+ connector_(nullptr),
+ sw_if_index_(~0),
+ face_id1_(~0),
+ face_id2_(~0),
+ is_consumer_(false) {}
+
+VPPForwarderModule::~VPPForwarderModule() { delete connector_; }
+
+void VPPForwarderModule::init(
+ Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service, const std::string &app_name) {
+ if (!connector_) {
+ connector_ =
+ new MemifConnector(std::move(receive_callback), 0, 0,
+ std::move(reconnect_callback), io_service, app_name);
+ }
+}
+
+void VPPForwarderModule::processControlMessageReply(
+ utils::MemBuf &packet_buffer) {
+ throw errors::NotImplementedException();
+}
+
+bool VPPForwarderModule::isControlMessage(const uint8_t *message) {
+ return false;
+}
+
+bool VPPForwarderModule::isConnected() { return connector_->isConnected(); };
+
+void VPPForwarderModule::send(Packet &packet) {
+ IoModule::send(packet);
+ connector_->send(packet);
+}
+
+void VPPForwarderModule::send(const uint8_t *packet, std::size_t len) {
+ counters_.tx_packets++;
+ counters_.tx_bytes += len;
+
+ // Perfect forwarding
+ connector_->send(packet, len);
+}
+
+std::uint32_t VPPForwarderModule::getMtu() { return interface_mtu; }
+
+/**
+ * @brief Create a memif interface in the local VPP forwarder.
+ */
+uint32_t VPPForwarderModule::getMemifConfiguration() {
+ memif_create_params_t input_params = {0};
+
+ int ret = memif_vapi_get_next_memif_id(VPPForwarderModule::sock_, &memif_id_);
+
+ if (ret < 0) {
+ throw errors::RuntimeException(
+ "Error getting next memif id. Could not create memif interface.");
+ }
+
+ input_params.id = memif_id_;
+ input_params.role = memif_role_t::MASTER;
+ input_params.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP;
+ input_params.rx_queues = MEMIF_DEFAULT_RX_QUEUES;
+ input_params.tx_queues = MEMIF_DEFAULT_TX_QUEUES;
+ input_params.ring_size = MEMIF_DEFAULT_RING_SIZE;
+ input_params.buffer_size = MEMIF_DEFAULT_BUFFER_SIZE;
+
+ memif_output_params_t output_params = {0};
+
+ ret = memif_vapi_create_memif(VPPForwarderModule::sock_, &input_params,
+ &output_params);
+
+ if (ret < 0) {
+ throw errors::RuntimeException(
+ "Error creating memif interface in the local VPP forwarder.");
+ }
+
+ return output_params.sw_if_index;
+}
+
+void VPPForwarderModule::consumerConnection() {
+ hicn_consumer_input_params input = {0};
+ hicn_consumer_output_params output = {0};
+ ip_address_t ip4_address;
+ ip_address_t ip6_address;
+
+ output.src4 = &ip4_address;
+ output.src6 = &ip6_address;
+ input.swif = sw_if_index_;
+
+ int ret =
+ hicn_vapi_register_cons_app(VPPForwarderModule::sock_, &input, &output);
+
+ if (ret < 0) {
+ throw errors::RuntimeException(hicn_vapi_get_error_string(ret));
+ }
+
+ face_id1_ = output.face_id1;
+ face_id2_ = output.face_id2;
+
+ std::memcpy(inet_address_.v4.as_u8, output.src4->v4.as_u8, IPV4_ADDR_LEN);
+
+ std::memcpy(inet6_address_.v6.as_u8, output.src6->v6.as_u8, IPV6_ADDR_LEN);
+}
+
+void VPPForwarderModule::producerConnection() {
+ // Producer connection will be set when we set the first route.
+}
+
+void VPPForwarderModule::connect(bool is_consumer) {
+ int retry = 20;
+
+ TRANSPORT_LOGI("Connecting to VPP through vapi.");
+ vapi_error_e ret = vapi_connect_safe(&sock_, 0);
+
+ while (ret != VAPI_OK && retry > 0) {
+ TRANSPORT_LOGE("Error connecting to VPP through vapi. Retrying..");
+ --retry;
+ ret = vapi_connect_safe(&sock_, 0);
+ }
+
+ if (ret != VAPI_OK) {
+ throw std::runtime_error(
+ "Impossible to connect to forwarder. Is VPP running?");
+ }
+
+ TRANSPORT_LOGI("Connected to VPP through vapi.");
+
+ sw_if_index_ = getMemifConfiguration();
+
+ is_consumer_ = is_consumer;
+ if (is_consumer_) {
+ consumerConnection();
+ }
+
+ connector_->connect(memif_id_, 0);
+ connector_->setRole(is_consumer_ ? Connector::Role::CONSUMER
+ : Connector::Role::PRODUCER);
+}
+
+void VPPForwarderModule::registerRoute(const Prefix &prefix) {
+ const ip_prefix_t &addr = prefix.toIpPrefixStruct();
+
+ ip_prefix_t producer_prefix;
+ ip_address_t producer_locator;
+
+ if (face_id1_ == uint32_t(~0)) {
+ hicn_producer_input_params input;
+ std::memset(&input, 0, sizeof(input));
+
+ hicn_producer_output_params output;
+ std::memset(&output, 0, sizeof(output));
+
+ input.prefix = &producer_prefix;
+ output.prod_addr = &producer_locator;
+
+ // Here we have to ask to the actual connector what is the
+ // memif_id, since this function should be called after the
+ // memif creation.n
+ input.swif = sw_if_index_;
+ input.prefix->address = addr.address;
+ input.prefix->family = addr.family;
+ input.prefix->len = addr.len;
+ input.cs_reserved = content_store_reserved_;
+
+ int ret =
+ hicn_vapi_register_prod_app(VPPForwarderModule::sock_, &input, &output);
+
+ if (ret < 0) {
+ throw errors::RuntimeException(hicn_vapi_get_error_string(ret));
+ }
+
+ inet6_address_ = *output.prod_addr;
+
+ face_id1_ = output.face_id;
+ } else {
+ hicn_producer_set_route_params params;
+ params.prefix = &producer_prefix;
+ params.prefix->address = addr.address;
+ params.prefix->family = addr.family;
+ params.prefix->len = addr.len;
+ params.prod_addr = &producer_locator;
+
+ int ret = hicn_vapi_register_route(VPPForwarderModule::sock_, &params);
+
+ if (ret < 0) {
+ throw errors::RuntimeException(hicn_vapi_get_error_string(ret));
+ }
+ }
+}
+
+void VPPForwarderModule::closeConnection() {
+ if (VPPForwarderModule::sock_) {
+ connector_->close();
+
+ if (is_consumer_) {
+ hicn_del_face_app_input_params params;
+ params.face_id = face_id1_;
+ hicn_vapi_face_cons_del(VPPForwarderModule::sock_, &params);
+ params.face_id = face_id2_;
+ hicn_vapi_face_cons_del(VPPForwarderModule::sock_, &params);
+ } else {
+ hicn_del_face_app_input_params params;
+ params.face_id = face_id1_;
+ hicn_vapi_face_prod_del(VPPForwarderModule::sock_, &params);
+ }
+
+ if (sw_if_index_ != uint32_t(~0)) {
+ int ret =
+ memif_vapi_delete_memif(VPPForwarderModule::sock_, sw_if_index_);
+ if (ret < 0) {
+ TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_);
+ }
+ }
+
+ vapi_disconnect_safe();
+ VPPForwarderModule::sock_ = nullptr;
+ }
+}
+
+extern "C" IoModule *create_module(void) { return new VPPForwarderModule(); }
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/memif/vpp_forwarder_module.h b/libtransport/src/io_modules/memif/vpp_forwarder_module.h
new file mode 100644
index 000000000..8c4114fed
--- /dev/null
+++ b/libtransport/src/io_modules/memif/vpp_forwarder_module.h
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/io_module.h>
+#include <hicn/transport/core/prefix.h>
+
+#ifdef always_inline
+#undef always_inline
+#endif
+extern "C" {
+#include <vapi/vapi_safe.h>
+};
+
+namespace transport {
+
+namespace core {
+
+class MemifConnector;
+
+class VPPForwarderModule : public IoModule {
+ static constexpr std::uint16_t interface_mtu = 1500;
+
+ public:
+ VPPForwarderModule();
+ ~VPPForwarderModule();
+
+ void connect(bool is_consumer) override;
+
+ void send(Packet &packet) override;
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ bool isConnected() override;
+
+ void init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name = "Libtransport") override;
+
+ void registerRoute(const Prefix &prefix) override;
+
+ std::uint32_t getMtu() override;
+
+ bool isControlMessage(const uint8_t *message) override;
+
+ void processControlMessageReply(utils::MemBuf &packet_buffer) override;
+
+ void closeConnection() override;
+
+ private:
+ uint32_t getMemifConfiguration();
+ void consumerConnection();
+ void producerConnection();
+
+ private:
+ MemifConnector *connector_;
+ uint32_t memif_id_;
+ uint32_t sw_if_index_;
+ // A consumer socket in vpp has two faces (ipv4 and ipv6)
+ uint32_t face_id1_;
+ uint32_t face_id2_;
+ bool is_consumer_;
+ vapi_ctx_t sock_;
+};
+
+extern "C" IoModule *create_module(void);
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_connector.cc b/libtransport/src/io_modules/raw_socket/raw_socket_connector.cc
new file mode 100644
index 000000000..0bfcc2a58
--- /dev/null
+++ b/libtransport/src/io_modules/raw_socket/raw_socket_connector.cc
@@ -0,0 +1,201 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/raw_socket_connector.h>
+#include <hicn/transport/utils/conversions.h>
+#include <hicn/transport/utils/log.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+
+#define MY_DEST_MAC0 0x0a
+#define MY_DEST_MAC1 0x7b
+#define MY_DEST_MAC2 0x7c
+#define MY_DEST_MAC3 0x1c
+#define MY_DEST_MAC4 0x4a
+#define MY_DEST_MAC5 0x14
+
+namespace transport {
+
+namespace core {
+
+RawSocketConnector::RawSocketConnector(
+ PacketReceivedCallback &&receive_callback,
+ OnReconnect &&on_reconnect_callback, asio::io_service &io_service,
+ std::string app_name)
+ : Connector(std::move(receive_callback), std::move(on_reconnect_callback)),
+ io_service_(io_service),
+ socket_(io_service_, raw_protocol(PF_PACKET, SOCK_RAW)),
+ // resolver_(io_service_),
+ timer_(io_service_),
+ read_msg_(packet_pool_.makePtr(nullptr)),
+ data_available_(false),
+ app_name_(app_name) {
+ memset(&link_layer_address_, 0, sizeof(link_layer_address_));
+}
+
+RawSocketConnector::~RawSocketConnector() {}
+
+void RawSocketConnector::connect(const std::string &interface_name,
+ const std::string &mac_address_str) {
+ state_ = ConnectorState::CONNECTING;
+ memset(&ethernet_header_, 0, sizeof(ethernet_header_));
+ struct ifreq ifr;
+ struct ifreq if_mac;
+ uint8_t mac_address[6];
+
+ utils::convertStringToMacAddress(mac_address_str, mac_address);
+
+ // Get interface mac address
+ int fd = static_cast<int>(socket_.native_handle());
+
+ /* Get the index of the interface to send on */
+ memset(&ifr, 0, sizeof(struct ifreq));
+ strncpy(ifr.ifr_name, interface_name.c_str(), interface_name.size());
+
+ // if (ioctl(fd, SIOCGIFINDEX, &if_idx) < 0) {
+ // perror("SIOCGIFINDEX");
+ // }
+
+ /* Get the MAC address of the interface to send on */
+ memset(&if_mac, 0, sizeof(struct ifreq));
+ strncpy(if_mac.ifr_name, interface_name.c_str(), interface_name.size());
+ if (ioctl(fd, SIOCGIFHWADDR, &if_mac) < 0) {
+ perror("SIOCGIFHWADDR");
+ throw errors::RuntimeException("Interface does not exist");
+ }
+
+ /* Ethernet header */
+ for (int i = 0; i < 6; i++) {
+ ethernet_header_.ether_shost[i] =
+ ((uint8_t *)&if_mac.ifr_hwaddr.sa_data)[i];
+ ethernet_header_.ether_dhost[i] = mac_address[i];
+ }
+
+ /* Ethertype field */
+ ethernet_header_.ether_type = htons(ETH_P_IPV6);
+
+ strcpy(ifr.ifr_name, interface_name.c_str());
+
+ if (0 == ioctl(fd, SIOCGIFHWADDR, &ifr)) {
+ memcpy(link_layer_address_.sll_addr, ifr.ifr_hwaddr.sa_data, 6);
+ }
+
+ // memset(&ifr, 0, sizeof(ifr));
+ // ioctl(fd, SIOCGIFFLAGS, &ifr);
+ // ifr.ifr_flags |= IFF_PROMISC;
+ // ioctl(fd, SIOCSIFFLAGS, &ifr);
+
+ link_layer_address_.sll_family = AF_PACKET;
+ link_layer_address_.sll_protocol = htons(ETH_P_ALL);
+ link_layer_address_.sll_ifindex = if_nametoindex(interface_name.c_str());
+ link_layer_address_.sll_hatype = 1;
+ link_layer_address_.sll_halen = 6;
+
+ // startConnectionTimer();
+ doConnect();
+ doRecvPacket();
+}
+
+void RawSocketConnector::send(const uint8_t *packet, std::size_t len,
+ const PacketSentCallback &packet_sent) {
+ if (packet_sent != 0) {
+ socket_.async_send(
+ asio::buffer(packet, len),
+ [packet_sent](std::error_code ec, std::size_t /*length*/) {
+ packet_sent();
+ });
+ } else {
+ if (state_ == ConnectorState::CONNECTED) {
+ socket_.send(asio::buffer(packet, len));
+ }
+ }
+}
+
+void RawSocketConnector::send(const Packet::MemBufPtr &packet) {
+ io_service_.post([this, packet]() {
+ bool write_in_progress = !output_buffer_.empty();
+ output_buffer_.push_back(std::move(packet));
+ if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
+ if (!write_in_progress) {
+ doSendPacket();
+ } else {
+ // Tell the handle connect it has data to write
+ data_available_ = true;
+ }
+ }
+ });
+}
+
+void RawSocketConnector::close() {
+ io_service_.post([this]() { socket_.close(); });
+}
+
+void RawSocketConnector::doSendPacket() {
+ auto packet = output_buffer_.front().get();
+ auto array = std::vector<asio::const_buffer>();
+
+ const utils::MemBuf *current = packet;
+ do {
+ array.push_back(asio::const_buffer(current->data(), current->length()));
+ current = current->next();
+ } while (current != packet);
+
+ socket_.async_send(
+ std::move(array),
+ [this /*, packet*/](std::error_code ec, std::size_t bytes_transferred) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ output_buffer_.pop_front();
+ if (!output_buffer_.empty()) {
+ doSendPacket();
+ }
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ }
+ });
+}
+
+void RawSocketConnector::doRecvPacket() {
+ read_msg_ = getPacket();
+ socket_.async_receive(
+ asio::buffer(read_msg_->writableData(), packet_size),
+ [this](std::error_code ec, std::size_t bytes_transferred) mutable {
+ if (!ec) {
+ // Ignore packets that are not for us
+ uint8_t *dst_mac_address = const_cast<uint8_t *>(read_msg_->data());
+ if (!std::memcmp(dst_mac_address, ethernet_header_.ether_shost,
+ ETHER_ADDR_LEN)) {
+ read_msg_->append(bytes_transferred);
+ read_msg_->trimStart(sizeof(struct ether_header));
+ receive_callback_(std::move(read_msg_));
+ }
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ }
+ doRecvPacket();
+ });
+}
+
+void RawSocketConnector::doConnect() {
+ state_ = ConnectorState::CONNECTED;
+ socket_.bind(raw_endpoint(&link_layer_address_, sizeof(link_layer_address_)));
+}
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_connector.h b/libtransport/src/io_modules/raw_socket/raw_socket_connector.h
new file mode 100644
index 000000000..aba4b1105
--- /dev/null
+++ b/libtransport/src/io_modules/raw_socket/raw_socket_connector.h
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <core/connector.h>
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/name.h>
+#include <linux/if_packet.h>
+#include <net/ethernet.h>
+#include <sys/socket.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <deque>
+
+namespace transport {
+
+namespace core {
+
+using asio::generic::raw_protocol;
+using raw_endpoint = asio::generic::basic_endpoint<raw_protocol>;
+
+class RawSocketConnector : public Connector {
+ public:
+ RawSocketConnector(PacketReceivedCallback &&receive_callback,
+ OnReconnect &&reconnect_callback,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
+
+ ~RawSocketConnector() override;
+
+ void send(const Packet::MemBufPtr &packet) override;
+
+ void send(const uint8_t *packet, std::size_t len,
+ const PacketSentCallback &packet_sent = 0) override;
+
+ void close() override;
+
+ void connect(const std::string &interface_name,
+ const std::string &mac_address_str);
+
+ private:
+ void doConnect();
+
+ void doRecvPacket();
+
+ void doSendPacket();
+
+ private:
+ asio::io_service &io_service_;
+ raw_protocol::socket socket_;
+
+ struct ether_header ethernet_header_;
+
+ struct sockaddr_ll link_layer_address_;
+
+ asio::steady_timer timer_;
+
+ utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
+
+ bool data_available_;
+ std::string app_name_;
+};
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_interface.cc b/libtransport/src/io_modules/raw_socket/raw_socket_interface.cc
new file mode 100644
index 000000000..dcf489f59
--- /dev/null
+++ b/libtransport/src/io_modules/raw_socket/raw_socket_interface.cc
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/raw_socket_interface.h>
+#include <hicn/transport/utils/linux.h>
+
+#include <fstream>
+
+namespace transport {
+
+namespace core {
+
+static std::string config_folder_path = "/etc/transport/interface.conf.d";
+
+RawSocketInterface::RawSocketInterface(RawSocketConnector &connector)
+ : ForwarderInterface<RawSocketInterface, RawSocketConnector>(connector) {}
+
+RawSocketInterface::~RawSocketInterface() {}
+
+void RawSocketInterface::connect(bool is_consumer) {
+ std::string complete_filename =
+ config_folder_path + std::string("/") + output_interface_;
+
+ std::ifstream is(complete_filename);
+ std::string interface;
+
+ if (is) {
+ is >> remote_mac_address_;
+ }
+
+ // Get interface ip address
+ struct sockaddr_in6 address = {0};
+ utils::retrieveInterfaceAddress(output_interface_, &address);
+
+ std::memcpy(&inet6_address_.v6.as_u8, &address.sin6_addr,
+ sizeof(address.sin6_addr));
+ connector_.connect(output_interface_, remote_mac_address_);
+}
+
+void RawSocketInterface::registerRoute(Prefix &prefix) { return; }
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_interface.h b/libtransport/src/io_modules/raw_socket/raw_socket_interface.h
new file mode 100644
index 000000000..7036cac7e
--- /dev/null
+++ b/libtransport/src/io_modules/raw_socket/raw_socket_interface.h
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <core/forwarder_interface.h>
+#include <core/raw_socket_connector.h>
+#include <hicn/transport/core/prefix.h>
+
+#include <atomic>
+#include <deque>
+
+namespace transport {
+
+namespace core {
+
+class RawSocketInterface
+ : public ForwarderInterface<RawSocketInterface, RawSocketConnector> {
+ public:
+ typedef RawSocketConnector ConnectorType;
+
+ RawSocketInterface(RawSocketConnector &connector);
+
+ ~RawSocketInterface();
+
+ void connect(bool is_consumer);
+
+ void registerRoute(Prefix &prefix);
+
+ std::uint16_t getMtu() { return interface_mtu; }
+
+ TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl(
+ const uint8_t *message) {
+ return false;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl(
+ Packet::MemBufPtr &&packet_buffer) {}
+
+ TRANSPORT_ALWAYS_INLINE void closeConnection(){};
+
+ private:
+ static constexpr std::uint16_t interface_mtu = 1500;
+ std::string remote_mac_address_;
+};
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/udp/CMakeLists.txt b/libtransport/src/io_modules/udp/CMakeLists.txt
new file mode 100644
index 000000000..1a43492dc
--- /dev/null
+++ b/libtransport/src/io_modules/udp/CMakeLists.txt
@@ -0,0 +1,47 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+
+list(APPEND MODULE_HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_module.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.h
+)
+
+list(APPEND MODULE_SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_module.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.cc
+)
+
+# add_executable(hicnlight_module MACOSX_BUNDLE ${MODULE_SOURCE_FILES})
+# target_include_directories(hicnlight_module PRIVATE ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS})
+# set_target_properties(hicnlight_module PROPERTIES
+# BUNDLE True
+# MACOSX_BUNDLE_GUI_IDENTIFIER my.domain.style.identifier.hicnlight_module
+# MACOSX_BUNDLE_BUNDLE_NAME hicnlight_module
+# MACOSX_BUNDLE_BUNDLE_VERSION "0.1"
+# MACOSX_BUNDLE_SHORT_VERSION_STRING "0.1"
+# # MACOSX_BUNDLE_INFO_PLIST ${CMAKE_SOURCE_DIR}/cmake/customtemplate.plist.in
+# )
+
+build_module(hicnlight_module
+ SHARED
+ SOURCES ${MODULE_SOURCE_FILES}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT lib${LIBTRANSPORT}
+ INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}
+ # LIBRARY_ROOT_DIR "vpp_plugins"
+ DEFINITIONS ${COMPILER_DEFINITIONS}
+ COMPILE_OPTIONS ${COMPILE_FLAGS}
+)
diff --git a/libtransport/src/io_modules/udp/hicn_forwarder_module.cc b/libtransport/src/io_modules/udp/hicn_forwarder_module.cc
new file mode 100644
index 000000000..ba08dd8c0
--- /dev/null
+++ b/libtransport/src/io_modules/udp/hicn_forwarder_module.cc
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <io_modules/udp/hicn_forwarder_module.h>
+#include <io_modules/udp/udp_socket_connector.h>
+
+union AddressLight {
+ uint32_t ipv4;
+ struct in6_addr ipv6;
+};
+
+typedef struct {
+ uint8_t message_type;
+ uint8_t command_id;
+ uint16_t length;
+ uint32_t seq_num;
+} CommandHeader;
+
+typedef struct {
+ uint8_t message_type;
+ uint8_t command_id;
+ uint16_t length;
+ uint32_t seq_num;
+ char symbolic_or_connid[16];
+ union AddressLight address;
+ uint16_t cost;
+ uint8_t address_type;
+ uint8_t len;
+} RouteToSelfCommand;
+
+typedef struct {
+ uint8_t message_type;
+ uint8_t command_id;
+ uint16_t length;
+ uint32_t seq_num;
+ char symbolic_or_connid[16];
+} DeleteSelfConnectionCommand;
+
+namespace {
+static constexpr uint8_t addr_inet = 1;
+static constexpr uint8_t addr_inet6 = 2;
+static constexpr uint8_t add_route_command = 3;
+static constexpr uint8_t delete_connection_command = 5;
+static constexpr uint8_t request_light = 0xc0;
+static constexpr char identifier[] = "SELF";
+
+void fillCommandHeader(CommandHeader *header) {
+ // Allocate and fill the header
+ header->message_type = request_light;
+ header->length = 1;
+}
+
+RouteToSelfCommand createCommandRoute(std::unique_ptr<sockaddr> &&addr,
+ uint8_t prefix_length) {
+ RouteToSelfCommand command = {0};
+
+ // check and set IP address
+ if (addr->sa_family == AF_INET) {
+ command.address_type = addr_inet;
+ command.address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr;
+ } else if (addr->sa_family == AF_INET6) {
+ command.address_type = addr_inet6;
+ command.address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr;
+ }
+
+ // Fill remaining payload fields
+#ifndef _WIN32
+ strcpy(command.symbolic_or_connid, identifier);
+#else
+ strcpy_s(command.symbolic_or_connid, 16, identifier);
+#endif
+ command.cost = 1;
+ command.len = (uint8_t)prefix_length;
+
+ // Allocate and fill the header
+ command.command_id = add_route_command;
+ fillCommandHeader((CommandHeader *)&command);
+
+ return command;
+}
+
+DeleteSelfConnectionCommand createCommandDeleteConnection() {
+ DeleteSelfConnectionCommand command = {0};
+ fillCommandHeader((CommandHeader *)&command);
+ command.command_id = delete_connection_command;
+
+#ifndef _WIN32
+ strcpy(command.symbolic_or_connid, identifier);
+#else
+ strcpy_s(command.symbolic_or_connid, 16, identifier);
+#endif
+
+ return command;
+}
+
+} // namespace
+
+namespace transport {
+
+namespace core {
+
+HicnForwarderModule::HicnForwarderModule() : IoModule(), connector_(nullptr) {}
+
+HicnForwarderModule::~HicnForwarderModule() {}
+
+void HicnForwarderModule::connect(bool is_consumer) {
+ connector_->connect();
+ connector_->setRole(is_consumer ? Connector::Role::CONSUMER
+ : Connector::Role::PRODUCER);
+}
+
+bool HicnForwarderModule::isConnected() { return connector_->isConnected(); }
+
+void HicnForwarderModule::send(Packet &packet) {
+ IoModule::send(packet);
+ packet.setChecksum();
+ connector_->send(packet);
+}
+
+void HicnForwarderModule::send(const uint8_t *packet, std::size_t len) {
+ counters_.tx_packets++;
+ counters_.tx_bytes += len;
+
+ // Perfect forwarding
+ connector_->send(packet, len);
+}
+
+void HicnForwarderModule::registerRoute(const Prefix &prefix) {
+ auto command = createCommandRoute(prefix.toSockaddr(),
+ (uint8_t)prefix.getPrefixLength());
+ send((uint8_t *)&command, sizeof(RouteToSelfCommand));
+}
+
+void HicnForwarderModule::closeConnection() {
+ auto command = createCommandDeleteConnection();
+ send((uint8_t *)&command, sizeof(DeleteSelfConnectionCommand));
+ connector_->close();
+}
+
+void HicnForwarderModule::init(
+ Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service, const std::string &app_name) {
+ if (!connector_) {
+ connector_ = new UdpSocketConnector(std::move(receive_callback), nullptr,
+ nullptr, std::move(reconnect_callback),
+ io_service, app_name);
+ }
+}
+
+void HicnForwarderModule::processControlMessageReply(
+ utils::MemBuf &packet_buffer) {
+ if (packet_buffer.data()[0] == nack_code) {
+ throw errors::RuntimeException(
+ "Received Nack message from hicn light forwarder.");
+ }
+}
+
+std::uint32_t HicnForwarderModule::getMtu() { return interface_mtu; }
+
+bool HicnForwarderModule::isControlMessage(const uint8_t *message) {
+ return message[0] == ack_code || message[0] == nack_code;
+}
+
+extern "C" IoModule *create_module(void) { return new HicnForwarderModule(); }
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/udp/hicn_forwarder_module.h b/libtransport/src/io_modules/udp/hicn_forwarder_module.h
new file mode 100644
index 000000000..845db73bf
--- /dev/null
+++ b/libtransport/src/io_modules/udp/hicn_forwarder_module.h
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/io_module.h>
+#include <hicn/transport/core/prefix.h>
+
+namespace transport {
+
+namespace core {
+
+class UdpSocketConnector;
+
+class HicnForwarderModule : public IoModule {
+ static constexpr uint8_t ack_code = 0xc2;
+ static constexpr uint8_t nack_code = 0xc3;
+ static constexpr std::uint16_t interface_mtu = 1500;
+
+ public:
+ union addressLight {
+ uint32_t ipv4;
+ struct in6_addr ipv6;
+ };
+
+ struct route_to_self_command {
+ uint8_t messageType;
+ uint8_t commandID;
+ uint16_t length;
+ uint32_t seqNum;
+ char symbolicOrConnid[16];
+ union addressLight address;
+ uint16_t cost;
+ uint8_t addressType;
+ uint8_t len;
+ };
+
+ using route_to_self_command = struct route_to_self_command;
+
+ HicnForwarderModule();
+
+ ~HicnForwarderModule();
+
+ void connect(bool is_consumer) override;
+
+ void send(Packet &packet) override;
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ bool isConnected() override;
+
+ void init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::OnReconnectCallback &&reconnect_callback,
+ asio::io_service &io_service,
+ const std::string &app_name = "Libtransport") override;
+
+ void registerRoute(const Prefix &prefix) override;
+
+ std::uint32_t getMtu() override;
+
+ bool isControlMessage(const uint8_t *message) override;
+
+ void processControlMessageReply(utils::MemBuf &packet_buffer) override;
+
+ void closeConnection() override;
+
+ private:
+ UdpSocketConnector *connector_;
+};
+
+extern "C" IoModule *create_module(void);
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/udp/udp_socket_connector.cc b/libtransport/src/io_modules/udp/udp_socket_connector.cc
new file mode 100644
index 000000000..456886a54
--- /dev/null
+++ b/libtransport/src/io_modules/udp/udp_socket_connector.cc
@@ -0,0 +1,211 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef _WIN32
+#include <hicn/transport/portability/win_portability.h>
+#endif
+
+#include <hicn/transport/errors/errors.h>
+#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/object_pool.h>
+#include <io_modules/udp/udp_socket_connector.h>
+
+#include <thread>
+#include <vector>
+
+namespace transport {
+
+namespace core {
+
+UdpSocketConnector::UdpSocketConnector(
+ PacketReceivedCallback &&receive_callback, PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback, OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service, std::string app_name)
+ : Connector(std::move(receive_callback), std::move(packet_sent),
+ std::move(close_callback), std::move(on_reconnect)),
+ io_service_(io_service),
+ socket_(io_service_),
+ resolver_(io_service_),
+ connection_timer_(io_service_),
+ read_msg_(std::make_pair(nullptr, 0)),
+ is_reconnection_(false),
+ data_available_(false),
+ app_name_(app_name) {}
+
+UdpSocketConnector::~UdpSocketConnector() {}
+
+void UdpSocketConnector::connect(std::string ip_address, std::string port) {
+ endpoint_iterator_ = resolver_.resolve(
+ {ip_address, port, asio::ip::resolver_query_base::numeric_service});
+
+ state_ = Connector::State::CONNECTING;
+ doConnect();
+}
+
+void UdpSocketConnector::send(const uint8_t *packet, std::size_t len) {
+ socket_.async_send(asio::buffer(packet, len),
+ [this](std::error_code ec, std::size_t /*length*/) {
+ if (sent_callback_) {
+ sent_callback_(this, ec);
+ }
+ });
+}
+
+void UdpSocketConnector::send(Packet &packet) {
+ io_service_.post([this, _packet{packet.shared_from_this()}]() {
+ bool write_in_progress = !output_buffer_.empty();
+ output_buffer_.push_back(std::move(_packet));
+ if (TRANSPORT_EXPECT_TRUE(state_ == Connector::State::CONNECTED)) {
+ if (!write_in_progress) {
+ doWrite();
+ }
+ } else {
+ // Tell the handle connect it has data to write
+ data_available_ = true;
+ }
+ });
+}
+
+void UdpSocketConnector::close() {
+ if (io_service_.stopped()) {
+ doClose();
+ } else {
+ io_service_.dispatch(std::bind(&UdpSocketConnector::doClose, this));
+ }
+}
+
+void UdpSocketConnector::doClose() {
+ if (state_ != Connector::State::CLOSED) {
+ state_ = Connector::State::CLOSED;
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+ }
+}
+
+void UdpSocketConnector::doWrite() {
+ auto packet = output_buffer_.front().get();
+ auto array = std::vector<asio::const_buffer>();
+
+ const utils::MemBuf *current = packet;
+ do {
+ array.push_back(asio::const_buffer(current->data(), current->length()));
+ current = current->next();
+ } while (current != packet);
+
+ socket_.async_send(std::move(array), [this](std::error_code ec,
+ std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ output_buffer_.pop_front();
+ if (!output_buffer_.empty()) {
+ doWrite();
+ }
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ tryReconnect();
+ }
+ });
+}
+
+void UdpSocketConnector::doRead() {
+ read_msg_ = getRawBuffer();
+ socket_.async_receive(
+ asio::buffer(read_msg_.first, read_msg_.second),
+ [this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ auto packet = getPacketFromBuffer(read_msg_.first, length);
+ receive_callback_(this, *packet, std::make_error_code(std::errc(0)));
+ doRead();
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ tryReconnect();
+ }
+ });
+}
+
+void UdpSocketConnector::tryReconnect() {
+ if (state_ == Connector::State::CONNECTED) {
+ TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
+ state_ = Connector::State::CONNECTING;
+ is_reconnection_ = true;
+ io_service_.post([this]() {
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+
+ doConnect();
+ startConnectionTimer();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ });
+ }
+}
+
+void UdpSocketConnector::doConnect() {
+ asio::async_connect(
+ socket_, endpoint_iterator_,
+ [this](std::error_code ec, udp::resolver::iterator) {
+ if (!ec) {
+ connection_timer_.cancel();
+ state_ = Connector::State::CONNECTED;
+ doRead();
+
+ if (data_available_) {
+ data_available_ = false;
+ doWrite();
+ }
+
+ if (is_reconnection_) {
+ is_reconnection_ = false;
+ }
+
+ on_reconnect_callback_(this);
+ } else {
+ doConnect();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ });
+}
+
+bool UdpSocketConnector::checkConnected() {
+ return state_ == Connector::State::CONNECTED;
+}
+
+void UdpSocketConnector::startConnectionTimer() {
+ connection_timer_.expires_from_now(std::chrono::seconds(60));
+ connection_timer_.async_wait(std::bind(&UdpSocketConnector::handleDeadline,
+ this, std::placeholders::_1));
+}
+
+void UdpSocketConnector::handleDeadline(const std::error_code &ec) {
+ if (!ec) {
+ io_service_.post([this]() {
+ socket_.close();
+ TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n");
+ });
+ }
+}
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/io_modules/udp/udp_socket_connector.h b/libtransport/src/io_modules/udp/udp_socket_connector.h
new file mode 100644
index 000000000..8ab08e17a
--- /dev/null
+++ b/libtransport/src/io_modules/udp/udp_socket_connector.h
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/core/name.h>
+#include <hicn/transport/core/packet.h>
+#include <hicn/transport/utils/branch_prediction.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <deque>
+
+namespace transport {
+namespace core {
+
+using asio::ip::udp;
+
+class UdpSocketConnector : public Connector {
+ public:
+ UdpSocketConnector(PacketReceivedCallback &&receive_callback,
+ PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback,
+ OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
+
+ ~UdpSocketConnector() override;
+
+ void send(Packet &packet) override;
+
+ void send(const uint8_t *packet, std::size_t len) override;
+
+ void close() override;
+
+ void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
+
+ private:
+ void doConnect();
+
+ void doRead();
+
+ void doWrite();
+
+ void doClose();
+
+ bool checkConnected();
+
+ private:
+ void handleDeadline(const std::error_code &ec);
+
+ void startConnectionTimer();
+
+ void tryReconnect();
+
+ asio::io_service &io_service_;
+ asio::ip::udp::socket socket_;
+ asio::ip::udp::resolver resolver_;
+ asio::ip::udp::resolver::iterator endpoint_iterator_;
+ asio::steady_timer connection_timer_;
+
+ std::pair<uint8_t *, std::size_t> read_msg_;
+
+ bool is_reconnection_;
+ bool data_available_;
+
+ std::string app_name_;
+};
+
+} // end namespace core
+
+} // end namespace transport