diff options
Diffstat (limited to 'libtransport/src/io_modules/forwarder')
-rw-r--r-- | libtransport/src/io_modules/forwarder/CMakeLists.txt | 44 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/configuration.h | 89 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/errors.cc | 52 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/errors.h | 91 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder.cc | 296 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder.h | 90 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder_module.cc | 87 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder_module.h | 70 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/global_id_counter.h | 54 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/udp_tunnel.cc | 288 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/udp_tunnel.h | 147 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc | 177 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/udp_tunnel_listener.h | 110 |
13 files changed, 1595 insertions, 0 deletions
diff --git a/libtransport/src/io_modules/forwarder/CMakeLists.txt b/libtransport/src/io_modules/forwarder/CMakeLists.txt new file mode 100644 index 000000000..92662bc4c --- /dev/null +++ b/libtransport/src/io_modules/forwarder/CMakeLists.txt @@ -0,0 +1,44 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + + +list(APPEND MODULE_HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/endpoint.h + ${CMAKE_CURRENT_SOURCE_DIR}/errors.h + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.h + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.h + ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.h + ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.h + ${CMAKE_CURRENT_SOURCE_DIR}/global_counter.h +) + +list(APPEND MODULE_SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.cc + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.cc + ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.cc + ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.cc +) + +build_module(forwarder_module + SHARED + SOURCES ${MODULE_SOURCE_FILES} + DEPENDS ${DEPENDENCIES} + COMPONENT lib${LIBTRANSPORT} + INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} + DEFINITIONS ${COMPILER_DEFINITIONS} + COMPILE_OPTIONS ${COMPILE_FLAGS} +) diff --git a/libtransport/src/io_modules/forwarder/configuration.h b/libtransport/src/io_modules/forwarder/configuration.h new file mode 100644 index 000000000..fcaa5530d --- /dev/null +++ b/libtransport/src/io_modules/forwarder/configuration.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace transport { +namespace core { + +struct ListenerConfig { + std::string address; + std::uint16_t port; + std::string name; +}; + +struct ConnectorConfig { + std::string local_address; + std::uint16_t local_port; + std::string remote_address; + std::uint16_t remote_port; + std::string name; +}; + +struct RouteConfig { + std::string prefix; + uint16_t weight; + std::string connector; + std::string name; +}; + +class Configuration { + public: + Configuration() : n_threads_(1) {} + + bool empty() { + return listeners_.empty() && connectors_.empty() && routes_.empty(); + } + + Configuration& setThreadNumber(std::size_t threads) { + n_threads_ = threads; + return *this; + } + + std::size_t getThreadNumber() { return n_threads_; } + + template <typename... Args> + Configuration& addListener(Args&&... args) { + listeners_.emplace_back(std::forward<Args>(args)...); + return *this; + } + + template <typename... Args> + Configuration& addConnector(Args&&... args) { + connectors_.emplace_back(std::forward<Args>(args)...); + return *this; + } + + template <typename... Args> + Configuration& addRoute(Args&&... args) { + routes_.emplace_back(std::forward<Args>(args)...); + return *this; + } + + std::vector<ListenerConfig>& getListeners() { return listeners_; } + + std::vector<ConnectorConfig>& getConnectors() { return connectors_; } + + std::vector<RouteConfig>& getRoutes() { return routes_; } + + private: + std::vector<ListenerConfig> listeners_; + std::vector<ConnectorConfig> connectors_; + std::vector<RouteConfig> routes_; + std::size_t n_threads_; +}; + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/io_modules/forwarder/errors.cc b/libtransport/src/io_modules/forwarder/errors.cc new file mode 100644 index 000000000..b5f131499 --- /dev/null +++ b/libtransport/src/io_modules/forwarder/errors.cc @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + */ + +#include <io_modules/forwarder/errors.h> + +namespace transport { +namespace core { + +const std::error_category& forwarder_category() { + static forwarder_category_impl instance; + + return instance; +} + +const char* forwarder_category_impl::name() const throw() { + return "proxy::connector::error"; +} + +std::string forwarder_category_impl::message(int ev) const { + switch (static_cast<forwarder_error>(ev)) { + case forwarder_error::success: { + return "Success"; + } + case forwarder_error::disconnected: { + return "Connector is disconnected"; + } + case forwarder_error::receive_failed: { + return "Packet reception failed"; + } + case forwarder_error::send_failed: { + return "Packet send failed"; + } + case forwarder_error::memory_allocation_error: { + return "Impossible to allocate memory for packet pool"; + } + case forwarder_error::invalid_connector_type: { + return "Invalid type specified for connector."; + } + case forwarder_error::invalid_connector: { + return "Created connector was invalid."; + } + case forwarder_error::interest_cache_miss: { + return "interest cache miss."; + } + default: { + return "Unknown connector error"; + } + } +} +} // namespace core +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/errors.h b/libtransport/src/io_modules/forwarder/errors.h new file mode 100644 index 000000000..dd5cc8fe7 --- /dev/null +++ b/libtransport/src/io_modules/forwarder/errors.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <system_error> + +namespace transport { +namespace core { +/** + * @brief Get the default server error category. + * @return The default server error category instance. + * + * @warning The first call to this function is thread-safe only starting with + * C++11. + */ +const std::error_category& forwarder_category(); + +/** + * The list of errors. + */ +enum class forwarder_error { + success = 0, + send_failed, + receive_failed, + disconnected, + memory_allocation_error, + invalid_connector_type, + invalid_connector, + interest_cache_miss +}; + +/** + * @brief Create an error_code instance for the given error. + * @param error The error. + * @return The error_code instance. + */ +inline std::error_code make_error_code(forwarder_error error) { + return std::error_code(static_cast<int>(error), forwarder_category()); +} + +/** + * @brief Create an error_condition instance for the given error. + * @param error The error. + * @return The error_condition instance. + */ +inline std::error_condition make_error_condition(forwarder_error error) { + return std::error_condition(static_cast<int>(error), forwarder_category()); +} + +/** + * @brief A server error category. + */ +class forwarder_category_impl : public std::error_category { + public: + /** + * @brief Get the name of the category. + * @return The name of the category. + */ + virtual const char* name() const throw(); + + /** + * @brief Get the error message for a given error. + * @param ev The error numeric value. + * @return The message associated to the error. + */ + virtual std::string message(int ev) const; +}; +} // namespace core +} // namespace transport + +namespace std { +// namespace system { +template <> +struct is_error_code_enum<::transport::core::forwarder_error> + : public std::true_type {}; +// } // namespace system +} // namespace std diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc new file mode 100644 index 000000000..7e89e2f9f --- /dev/null +++ b/libtransport/src/io_modules/forwarder/forwarder.cc @@ -0,0 +1,296 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/global_configuration.h> +#include <core/local_connector.h> +#include <io_modules/forwarder/forwarder.h> +#include <io_modules/forwarder/global_id_counter.h> +#include <io_modules/forwarder/udp_tunnel.h> +#include <io_modules/forwarder/udp_tunnel_listener.h> + +namespace transport { + +namespace core { + +constexpr char Forwarder::forwarder_config_section[]; + +Forwarder::Forwarder() : config_() { + using namespace std::placeholders; + GlobalConfiguration::getInstance().registerConfigurationParser( + forwarder_config_section, + std::bind(&Forwarder::parseForwarderConfiguration, this, _1, _2)); + + if (!config_.empty()) { + initThreads(); + initListeners(); + initConnectors(); + } +} + +Forwarder::~Forwarder() { + for (auto &l : listeners_) { + l->close(); + } + + for (auto &c : remote_connectors_) { + c.second->close(); + } + + GlobalConfiguration::getInstance().unregisterConfigurationParser( + forwarder_config_section); +} + +void Forwarder::initThreads() { + for (unsigned i = 0; i < config_.getThreadNumber(); i++) { + thread_pool_.emplace_back(io_service_, /* detached */ false); + } +} + +void Forwarder::initListeners() { + using namespace std::placeholders; + for (auto &l : config_.getListeners()) { + listeners_.emplace_back(std::make_shared<UdpTunnelListener>( + io_service_, + std::bind(&Forwarder::onPacketFromListener, this, _1, _2, _3), + asio::ip::udp::endpoint(asio::ip::address::from_string(l.address), + l.port))); + } +} + +void Forwarder::initConnectors() { + using namespace std::placeholders; + for (auto &c : config_.getConnectors()) { + auto id = GlobalCounter<Connector::Id>::getInstance().getNext(); + auto conn = new UdpTunnelConnector( + io_service_, std::bind(&Forwarder::onPacketReceived, this, _1, _2, _3), + std::bind(&Forwarder::onPacketSent, this, _1, _2), + std::bind(&Forwarder::onConnectorClosed, this, _1), + std::bind(&Forwarder::onConnectorReconnected, this, _1)); + conn->setConnectorId(id); + remote_connectors_.emplace(id, conn); + conn->connect(c.remote_address, c.remote_port, c.local_address, + c.local_port); + } +} + +Connector::Id Forwarder::registerLocalConnector( + asio::io_service &io_service, + Connector::PacketReceivedCallback &&receive_callback, + Connector::OnReconnectCallback &&reconnect_callback) { + utils::SpinLock::Acquire locked(connector_lock_); + auto id = GlobalCounter<Connector::Id>::getInstance().getNext(); + auto connector = std::make_shared<LocalConnector>( + io_service, receive_callback, nullptr, nullptr, reconnect_callback); + connector->setConnectorId(id); + local_connectors_.emplace(id, std::move(connector)); + return id; +} + +Forwarder &Forwarder::deleteConnector(Connector::Id id) { + utils::SpinLock::Acquire locked(connector_lock_); + auto it = local_connectors_.find(id); + if (it != local_connectors_.end()) { + it->second->close(); + local_connectors_.erase(it); + } + + return *this; +} + +Connector::Ptr Forwarder::getConnector(Connector::Id id) { + utils::SpinLock::Acquire locked(connector_lock_); + auto it = local_connectors_.find(id); + if (it != local_connectors_.end()) { + return it->second; + } + + return nullptr; +} + +void Forwarder::onPacketFromListener(Connector *connector, + utils::MemBuf &packet_buffer, + const std::error_code &ec) { + // Create connector + connector->setReceiveCallback( + std::bind(&Forwarder::onPacketReceived, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); + + TRANSPORT_LOGD("Packet received from listener."); + + { + utils::SpinLock::Acquire locked(connector_lock_); + remote_connectors_.emplace(connector->getConnectorId(), + connector->shared_from_this()); + } + // TODO Check if control packet or not. For the moment it is not. + onPacketReceived(connector, packet_buffer, ec); +} + +void Forwarder::onPacketReceived(Connector *connector, + utils::MemBuf &packet_buffer, + const std::error_code &ec) { + // Figure out the type of packet we received + bool is_interest = Packet::isInterest(packet_buffer.data()); + + Packet *packet = nullptr; + if (is_interest) { + packet = static_cast<Interest *>(&packet_buffer); + } else { + packet = static_cast<ContentObject *>(&packet_buffer); + } + + for (auto &c : local_connectors_) { + auto role = c.second->getRole(); + auto is_producer = role == Connector::Role::PRODUCER; + if ((is_producer && is_interest) || (!is_producer && !is_interest)) { + c.second->send(*packet); + } else { + TRANSPORT_LOGD( + "Error sending packet to local connector. is_interest = %d - " + "is_producer = %d", + (int)is_interest, (int)is_producer); + } + } + + // PCS Lookup + FIB lookup. Skip for now + + // Forward packet to local connectors +} + +void Forwarder::send(Packet &packet) { + // TODo Here a nice PIT/CS / FIB would be required:) + // For now let's just forward the packet on the remote connector we get + if (remote_connectors_.begin() == remote_connectors_.end()) { + return; + } + + auto remote_endpoint = + remote_connectors_.begin()->second->getRemoteEndpoint(); + TRANSPORT_LOGD("Sending packet to: %s:%u", + remote_endpoint.getAddress().to_string().c_str(), + remote_endpoint.getPort()); + remote_connectors_.begin()->second->send(packet); +} + +void Forwarder::onPacketSent(Connector *connector, const std::error_code &ec) {} + +void Forwarder::onConnectorClosed(Connector *connector) {} + +void Forwarder::onConnectorReconnected(Connector *connector) {} + +void Forwarder::parseForwarderConfiguration( + const libconfig::Setting &forwarder_config, std::error_code &ec) { + using namespace libconfig; + + // n_thread + if (forwarder_config.exists("n_threads")) { + // Get number of threads + int n_threads = 1; + forwarder_config.lookupValue("n_threads", n_threads); + TRANSPORT_LOGD("Forwarder threads from config file: %u", n_threads); + config_.setThreadNumber(n_threads); + } + + // listeners + if (forwarder_config.exists("listeners")) { + // get path where looking for modules + const Setting &listeners = forwarder_config.lookup("listeners"); + auto count = listeners.getLength(); + + for (int i = 0; i < count; i++) { + const Setting &listener = listeners[i]; + ListenerConfig list; + unsigned port; + + list.name = listener.getName(); + listener.lookupValue("local_address", list.address); + listener.lookupValue("local_port", port); + list.port = (uint16_t)(port); + + TRANSPORT_LOGD("Adding listener %s, (%s:%u)", list.name.c_str(), + list.address.c_str(), list.port); + config_.addListener(std::move(list)); + } + } + + // connectors + if (forwarder_config.exists("connectors")) { + // get path where looking for modules + const Setting &connectors = forwarder_config.lookup("connectors"); + auto count = connectors.getLength(); + + for (int i = 0; i < count; i++) { + const Setting &connector = connectors[i]; + ConnectorConfig conn; + + conn.name = connector.getName(); + unsigned port = 0; + + if (!connector.lookupValue("local_address", conn.local_address)) { + conn.local_address = ""; + } + + if (!connector.lookupValue("local_port", port)) { + port = 0; + } + + conn.local_port = (uint16_t)(port); + + if (!connector.lookupValue("remote_address", conn.remote_address)) { + throw errors::RuntimeException( + "Error in configuration file: remote_address is a mandatory field " + "of Connectors."); + } + + if (!connector.lookupValue("remote_port", port)) { + throw errors::RuntimeException( + "Error in configuration file: remote_port is a mandatory field " + "of Connectors."); + } + + conn.remote_port = (uint16_t)(port); + + TRANSPORT_LOGD("Adding connector %s, (%s:%u %s:%u)", conn.name.c_str(), + conn.local_address.c_str(), conn.local_port, + conn.remote_address.c_str(), conn.remote_port); + config_.addConnector(std::move(conn)); + } + } + + // Routes + if (forwarder_config.exists("routes")) { + const Setting &routes = forwarder_config.lookup("routes"); + auto count = routes.getLength(); + + for (int i = 0; i < count; i++) { + const Setting &route = routes[i]; + RouteConfig r; + unsigned weight; + + r.name = route.getName(); + route.lookupValue("prefix", r.prefix); + route.lookupValue("weight", weight); + route.lookupValue("connector", r.connector); + r.weight = (uint16_t)(weight); + + TRANSPORT_LOGD("Adding route %s %s (%s %u)", r.name.c_str(), + r.prefix.c_str(), r.connector.c_str(), r.weight); + config_.addRoute(std::move(r)); + } + } +} + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/io_modules/forwarder/forwarder.h b/libtransport/src/io_modules/forwarder/forwarder.h new file mode 100644 index 000000000..5b564bb5e --- /dev/null +++ b/libtransport/src/io_modules/forwarder/forwarder.h @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/io_module.h> +#include <hicn/transport/core/prefix.h> +#include <hicn/transport/utils/event_thread.h> +#include <hicn/transport/utils/singleton.h> +#include <hicn/transport/utils/spinlock.h> +#include <io_modules/forwarder/configuration.h> +#include <io_modules/forwarder/udp_tunnel_listener.h> + +#include <atomic> +#include <libconfig.h++> +#include <unordered_map> + +namespace transport { + +namespace core { + +class Forwarder : public utils::Singleton<Forwarder> { + static constexpr char forwarder_config_section[] = "forwarder"; + friend class utils::Singleton<Forwarder>; + + public: + Forwarder(); + + ~Forwarder(); + + void initThreads(); + void initListeners(); + void initConnectors(); + + Connector::Id registerLocalConnector( + asio::io_service &io_service, + Connector::PacketReceivedCallback &&receive_callback, + Connector::OnReconnectCallback &&reconnect_callback); + + Forwarder &deleteConnector(Connector::Id id); + + Connector::Ptr getConnector(Connector::Id id); + + void send(Packet &packet); + + void stop(); + + private: + void onPacketFromListener(Connector *connector, utils::MemBuf &packet_buffer, + const std::error_code &ec); + void onPacketReceived(Connector *connector, utils::MemBuf &packet_buffer, + const std::error_code &ec); + void onPacketSent(Connector *connector, const std::error_code &ec); + void onConnectorClosed(Connector *connector); + void onConnectorReconnected(Connector *connector); + + void parseForwarderConfiguration(const libconfig::Setting &io_config, + std::error_code &ec); + + asio::io_service io_service_; + utils::SpinLock connector_lock_; + + /** + * Connectors and listeners must be declares *before* thread_pool_, so that + * threads destructors will wait for them to gracefully close before being + * destroyed. + */ + std::unordered_map<Connector::Id, Connector::Ptr> remote_connectors_; + std::unordered_map<Connector::Id, Connector::Ptr> local_connectors_; + std::vector<UdpTunnelListener::Ptr> listeners_; + + std::vector<utils::EventThread> thread_pool_; + + Configuration config_; +}; + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.cc b/libtransport/src/io_modules/forwarder/forwarder_module.cc new file mode 100644 index 000000000..356b42d3b --- /dev/null +++ b/libtransport/src/io_modules/forwarder/forwarder_module.cc @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/errors/not_implemented_exception.h> +#include <hicn/transport/utils/log.h> +#include <io_modules/forwarder/forwarder_module.h> + +namespace transport { + +namespace core { + +ForwarderModule::ForwarderModule() + : IoModule(), + name_(""), + connector_id_(Connector::invalid_connector), + forwarder_(Forwarder::getInstance()) {} + +ForwarderModule::~ForwarderModule() { + forwarder_.deleteConnector(connector_id_); +} + +bool ForwarderModule::isConnected() { return true; } + +void ForwarderModule::send(Packet &packet) { + IoModule::send(packet); + forwarder_.send(packet); + // TRANSPORT_LOGD("ForwarderModule: sending from %u to %d", local_id_, + // 1 - local_id_); + + // local_faces_.at(1 - local_id_).onPacket(packet); +} + +void ForwarderModule::send(const uint8_t *packet, std::size_t len) { + // not supported + throw errors::NotImplementedException(); +} + +void ForwarderModule::registerRoute(const Prefix &prefix) { + // For the moment we route packets from one socket to the other. + // Next step will be to introduce a FIB + return; +} + +void ForwarderModule::closeConnection() { + forwarder_.deleteConnector(connector_id_); +} + +void ForwarderModule::init(Connector::PacketReceivedCallback &&receive_callback, + Connector::OnReconnectCallback &&reconnect_callback, + asio::io_service &io_service, + const std::string &app_name) { + connector_id_ = forwarder_.registerLocalConnector( + io_service, std::move(receive_callback), std::move(reconnect_callback)); + name_ = app_name; +} + +void ForwarderModule::processControlMessageReply(utils::MemBuf &packet_buffer) { + return; +} + +void ForwarderModule::connect(bool is_consumer) { + forwarder_.getConnector(connector_id_) + ->setRole(is_consumer ? Connector::Role::CONSUMER + : Connector::Role::PRODUCER); +} + +std::uint32_t ForwarderModule::getMtu() { return interface_mtu; } + +bool ForwarderModule::isControlMessage(const uint8_t *message) { return false; } + +extern "C" IoModule *create_module(void) { return new ForwarderModule(); } + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.h b/libtransport/src/io_modules/forwarder/forwarder_module.h new file mode 100644 index 000000000..58bfb7996 --- /dev/null +++ b/libtransport/src/io_modules/forwarder/forwarder_module.h @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/io_module.h> +#include <hicn/transport/core/prefix.h> +#include <io_modules/forwarder/forwarder.h> + +#include <atomic> + +namespace transport { + +namespace core { + +class Forwarder; + +class ForwarderModule : public IoModule { + static constexpr std::uint16_t interface_mtu = 1500; + + public: + ForwarderModule(); + + ~ForwarderModule(); + + void connect(bool is_consumer) override; + + void send(Packet &packet) override; + void send(const uint8_t *packet, std::size_t len) override; + + bool isConnected() override; + + void init(Connector::PacketReceivedCallback &&receive_callback, + Connector::OnReconnectCallback &&reconnect_callback, + asio::io_service &io_service, + const std::string &app_name = "Libtransport") override; + + void registerRoute(const Prefix &prefix) override; + + std::uint32_t getMtu() override; + + bool isControlMessage(const uint8_t *message) override; + + void processControlMessageReply(utils::MemBuf &packet_buffer) override; + + void closeConnection() override; + + private: + std::string name_; + Connector::Id connector_id_; + Forwarder &forwarder_; +}; + +extern "C" IoModule *create_module(void); + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/global_id_counter.h b/libtransport/src/io_modules/forwarder/global_id_counter.h new file mode 100644 index 000000000..fe8d76730 --- /dev/null +++ b/libtransport/src/io_modules/forwarder/global_id_counter.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <atomic> +#include <mutex> + +namespace transport { + +namespace core { + +template <typename T = uint64_t> +class GlobalCounter { + public: + static GlobalCounter& getInstance() { + std::lock_guard<std::mutex> lock(global_mutex_); + + if (!instance_) { + instance_.reset(new GlobalCounter()); + } + + return *instance_; + } + + T getNext() { return counter_++; } + + private: + GlobalCounter() : counter_(0) {} + static std::unique_ptr<GlobalCounter<T>> instance_; + static std::mutex global_mutex_; + std::atomic<T> counter_; +}; + +template <typename T> +std::unique_ptr<GlobalCounter<T>> GlobalCounter<T>::instance_ = nullptr; + +template <typename T> +std::mutex GlobalCounter<T>::global_mutex_; + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.cc b/libtransport/src/io_modules/forwarder/udp_tunnel.cc new file mode 100644 index 000000000..dc725fc4e --- /dev/null +++ b/libtransport/src/io_modules/forwarder/udp_tunnel.cc @@ -0,0 +1,288 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + */ + +#include <hicn/transport/utils/branch_prediction.h> +#include <io_modules/forwarder/errors.h> +#include <io_modules/forwarder/udp_tunnel.h> + +#include <iostream> +#include <thread> +#include <vector> + +namespace transport { +namespace core { + +UdpTunnelConnector::~UdpTunnelConnector() {} + +void UdpTunnelConnector::connect(const std::string &hostname, uint16_t port, + const std::string &bind_address, + uint16_t bind_port) { + if (state_ == State::CLOSED) { + state_ = State::CONNECTING; + endpoint_iterator_ = resolver_.resolve({hostname, std::to_string(port)}); + remote_endpoint_send_ = *endpoint_iterator_; + socket_->open(remote_endpoint_send_.protocol()); + + if (!bind_address.empty() && bind_port != 0) { + using namespace asio::ip; + socket_->bind( + udp::endpoint(address::from_string(bind_address), bind_port)); + } + + state_ = State::CONNECTED; + + remote_endpoint_ = Endpoint(remote_endpoint_send_); + local_endpoint_ = Endpoint(socket_->local_endpoint()); + + doRecvPacket(); + +#ifdef LINUX + send_timer_.expires_from_now(std::chrono::microseconds(50)); + send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this, + std::placeholders::_1)); +#endif + } +} + +void UdpTunnelConnector::send(Packet &packet) { + strand_->post([this, pkt{packet.shared_from_this()}]() { + bool write_in_progress = !output_buffer_.empty(); + output_buffer_.push_back(std::move(pkt)); + if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { + if (!write_in_progress) { + doSendPacket(); + } + } else { + data_available_ = true; + } + }); +} + +void UdpTunnelConnector::send(const uint8_t *packet, std::size_t len) {} + +void UdpTunnelConnector::close() { + TRANSPORT_LOGD("UDPTunnelConnector::close"); + state_ = State::CLOSED; + bool is_socket_owned = socket_.use_count() == 1; + if (is_socket_owned) { + io_service_.dispatch([this]() { + this->socket_->close(); + // on_close_callback_(shared_from_this()); + }); + } +} + +void UdpTunnelConnector::doSendPacket() { +#ifdef LINUX + send_timer_.expires_from_now(std::chrono::microseconds(50)); + send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this, + std::placeholders::_1)); +#else + auto packet = output_buffer_.front().get(); + auto array = std::vector<asio::const_buffer>(); + + const ::utils::MemBuf *current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); + + socket_->async_send_to( + std::move(array), remote_endpoint_send_, + strand_->wrap([this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + sent_callback_(this, make_error_code(forwarder_error::success)); + } else if (ec.value() == + static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + sendFailed(); + sent_callback_(this, ec); + } + + output_buffer_.pop_front(); + if (!output_buffer_.empty()) { + doSendPacket(); + } + })); +#endif +} + +#ifdef LINUX +void UdpTunnelConnector::writeHandler(std::error_code ec) { + if (TRANSPORT_EXPECT_FALSE(state_ != State::CONNECTED)) { + return; + } + + auto len = std::min(output_buffer_.size(), std::size_t(Connector::max_burst)); + + if (len) { + int m = 0; + for (auto &p : output_buffer_) { + auto packet = p.get(); + ::utils::MemBuf *current = packet; + int b = 0; + do { + // array.push_back(asio::const_buffer(current->data(), + // current->length())); + tx_iovecs_[m][b].iov_base = current->writableData(); + tx_iovecs_[m][b].iov_len = current->length(); + current = current->next(); + b++; + } while (current != packet); + + tx_msgs_[m].msg_hdr.msg_iov = tx_iovecs_[m]; + tx_msgs_[m].msg_hdr.msg_iovlen = b; + tx_msgs_[m].msg_hdr.msg_name = remote_endpoint_send_.data(); + tx_msgs_[m].msg_hdr.msg_namelen = remote_endpoint_send_.size(); + m++; + + if (--len == 0) { + break; + } + } + + int retval = sendmmsg(socket_->native_handle(), tx_msgs_, m, MSG_DONTWAIT); + if (retval > 0) { + while (retval--) { + output_buffer_.pop_front(); + } + } else if (retval != EWOULDBLOCK && retval != EAGAIN) { + TRANSPORT_LOGE("Error sending messages! %s %d\n", strerror(errno), + retval); + return; + } + } + + if (!output_buffer_.empty()) { + send_timer_.expires_from_now(std::chrono::microseconds(50)); + send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this, + std::placeholders::_1)); + } +} + +void UdpTunnelConnector::readHandler(std::error_code ec) { + TRANSPORT_LOGD("UdpTunnelConnector receive packet"); + + // TRANSPORT_LOGD("UdpTunnelConnector received packet length=%lu", length); + if (TRANSPORT_EXPECT_TRUE(!ec)) { + if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { + if (current_position_ == 0) { + for (int i = 0; i < max_burst; i++) { + auto read_buffer = getRawBuffer(); + rx_iovecs_[i][0].iov_base = read_buffer.first; + rx_iovecs_[i][0].iov_len = read_buffer.second; + rx_msgs_[i].msg_hdr.msg_iov = rx_iovecs_[i]; + rx_msgs_[i].msg_hdr.msg_iovlen = 1; + } + } + + int res = recvmmsg(socket_->native_handle(), rx_msgs_ + current_position_, + max_burst - current_position_, MSG_DONTWAIT, nullptr); + if (res < 0) { + TRANSPORT_LOGE("Error receiving messages! %s %d\n", strerror(errno), + res); + return; + } + + for (int i = 0; i < res; i++) { + auto packet = getPacketFromBuffer( + reinterpret_cast<uint8_t *>( + rx_msgs_[current_position_].msg_hdr.msg_iov[0].iov_base), + rx_msgs_[current_position_].msg_len); + receiveSuccess(*packet); + receive_callback_(this, *packet, + make_error_code(forwarder_error::success)); + ++current_position_; + } + + doRecvPacket(); + } else { + TRANSPORT_LOGE( + "Error in UDP: Receiving packets from a not connected socket."); + } + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + TRANSPORT_LOGE("The connection has been closed by the application."); + return; + } else { + if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { + // receive_callback_(this, *read_msg_, ec); + TRANSPORT_LOGE("Error in UDP connector: %d %s", ec.value(), + ec.message().c_str()); + } else { + TRANSPORT_LOGE("Error while not connector"); + } + } +} +#endif + +void UdpTunnelConnector::doRecvPacket() { +#ifdef LINUX + if (state_ == State::CONNECTED) { +#if ((ASIO_VERSION / 100 % 1000) < 11) + socket_->async_receive(asio::null_buffers(), +#else + socket_->async_wait(asio::ip::tcp::socket::wait_read, +#endif + std::bind(&UdpTunnelConnector::readHandler, this, + std::placeholders::_1)); + } +#else + TRANSPORT_LOGD("UdpTunnelConnector receive packet"); + read_msg_ = getRawBuffer(); + socket_->async_receive_from( + asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_recv_, + [this](std::error_code ec, std::size_t length) { + TRANSPORT_LOGD("UdpTunnelConnector received packet length=%lu", length); + if (TRANSPORT_EXPECT_TRUE(!ec)) { + if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { + auto packet = getPacketFromBuffer(read_msg_.first, length); + receiveSuccess(*packet); + receive_callback_(this, *packet, + make_error_code(forwarder_error::success)); + doRecvPacket(); + } else { + TRANSPORT_LOGE( + "Error in UDP: Receiving packets from a not connected socket."); + } + } else if (ec.value() == + static_cast<int>(std::errc::operation_canceled)) { + TRANSPORT_LOGE("The connection has been closed by the application."); + return; + } else { + if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { + TRANSPORT_LOGE("Error in UDP connector: %d %s", ec.value(), + ec.message().c_str()); + } else { + TRANSPORT_LOGE("Error while not connector"); + } + } + }); +#endif +} + +void UdpTunnelConnector::doConnect() { + asio::async_connect( + *socket_, endpoint_iterator_, + [this](std::error_code ec, asio::ip::udp::resolver::iterator) { + if (!ec) { + state_ = State::CONNECTED; + doRecvPacket(); + + if (data_available_) { + data_available_ = false; + doSendPacket(); + } + } else { + TRANSPORT_LOGE("[Hproxy] - UDP Connection failed!!!"); + timer_.expires_from_now(std::chrono::milliseconds(500)); + timer_.async_wait(std::bind(&UdpTunnelConnector::doConnect, this)); + } + }); +} + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.h b/libtransport/src/io_modules/forwarder/udp_tunnel.h new file mode 100644 index 000000000..df472af91 --- /dev/null +++ b/libtransport/src/io_modules/forwarder/udp_tunnel.h @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + */ + +#pragma once + +#include <hicn/transport/core/connector.h> +#include <hicn/transport/portability/platform.h> +#include <io_modules/forwarder/errors.h> + +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <iostream> +#include <memory> + +namespace transport { +namespace core { + +class UdpTunnelListener; + +class UdpTunnelConnector : public Connector { + friend class UdpTunnelListener; + + public: + template <typename ReceiveCallback, typename SentCallback, typename OnClose, + typename OnReconnect> + UdpTunnelConnector(asio::io_service &io_service, + ReceiveCallback &&receive_callback, + SentCallback &&packet_sent, OnClose &&on_close_callback, + OnReconnect &&on_reconnect) + : Connector(receive_callback, packet_sent, on_close_callback, + on_reconnect), + io_service_(io_service), + strand_(std::make_shared<asio::io_service::strand>(io_service_)), + socket_(std::make_shared<asio::ip::udp::socket>(io_service_)), + resolver_(io_service_), + timer_(io_service_), +#ifdef LINUX + send_timer_(io_service_), + tx_iovecs_{0}, + tx_msgs_{0}, + rx_iovecs_{0}, + rx_msgs_{0}, + current_position_(0), +#else + read_msg_(nullptr, 0), +#endif + data_available_(false) { + } + + template <typename ReceiveCallback, typename SentCallback, typename OnClose, + typename OnReconnect, typename EndpointType> + UdpTunnelConnector(std::shared_ptr<asio::ip::udp::socket> &socket, + std::shared_ptr<asio::io_service::strand> &strand, + ReceiveCallback &&receive_callback, + SentCallback &&packet_sent, OnClose &&on_close_callback, + OnReconnect &&on_reconnect, EndpointType &&remote_endpoint) + : Connector(receive_callback, packet_sent, on_close_callback, + on_reconnect), +#if ((ASIO_VERSION / 100 % 1000) < 12) + io_service_(socket->get_io_service()), +#else + io_service_((asio::io_context &)(socket->get_executor().context())), +#endif + strand_(strand), + socket_(socket), + resolver_(io_service_), + remote_endpoint_send_(std::forward<EndpointType &&>(remote_endpoint)), + timer_(io_service_), +#ifdef LINUX + send_timer_(io_service_), + tx_iovecs_{0}, + tx_msgs_{0}, + rx_iovecs_{0}, + rx_msgs_{0}, + current_position_(0), +#else + read_msg_(nullptr, 0), +#endif + data_available_(false) { + if (socket_->is_open()) { + state_ = State::CONNECTED; + remote_endpoint_ = Endpoint(remote_endpoint_send_); + local_endpoint_ = socket_->local_endpoint(); + } + } + + ~UdpTunnelConnector() override; + + void send(Packet &packet) override; + + void send(const uint8_t *packet, std::size_t len) override; + + void close() override; + + void connect(const std::string &hostname, std::uint16_t port, + const std::string &bind_address = "", + std::uint16_t bind_port = 0); + + auto shared_from_this() { return utils::shared_from(this); } + + private: + void doConnect(); + void doRecvPacket(); + + void doRecvPacket(utils::MemBuf &buffer) { + receive_callback_(this, buffer, make_error_code(forwarder_error::success)); + } + +#ifdef LINUX + void readHandler(std::error_code ec); + void writeHandler(std::error_code ec); +#endif + + void setConnected() { state_ = State::CONNECTED; } + + void doSendPacket(); + void doClose(); + + private: + asio::io_service &io_service_; + std::shared_ptr<asio::io_service::strand> strand_; + std::shared_ptr<asio::ip::udp::socket> socket_; + asio::ip::udp::resolver resolver_; + asio::ip::udp::resolver::iterator endpoint_iterator_; + asio::ip::udp::endpoint remote_endpoint_send_; + asio::ip::udp::endpoint remote_endpoint_recv_; + + asio::steady_timer timer_; + +#ifdef LINUX + asio::steady_timer send_timer_; + struct iovec tx_iovecs_[max_burst][8]; + struct mmsghdr tx_msgs_[max_burst]; + struct iovec rx_iovecs_[max_burst][8]; + struct mmsghdr rx_msgs_[max_burst]; + std::uint8_t current_position_; +#else + std::pair<uint8_t *, std::size_t> read_msg_; +#endif + + bool data_available_; +}; + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc new file mode 100644 index 000000000..12246c3cf --- /dev/null +++ b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + */ + +#include <hicn/transport/utils/hash.h> +#include <hicn/transport/utils/log.h> +#include <io_modules/forwarder/udp_tunnel.h> +#include <io_modules/forwarder/udp_tunnel_listener.h> + +#ifndef LINUX +namespace std { +size_t hash<asio::ip::udp::endpoint>::operator()( + const asio::ip::udp::endpoint &endpoint) const { + auto hash_ip = endpoint.address().is_v4() + ? endpoint.address().to_v4().to_ulong() + : utils::hash::fnv32_buf( + endpoint.address().to_v6().to_bytes().data(), 16); + uint16_t port = endpoint.port(); + return utils::hash::fnv32_buf(&port, 2, hash_ip); +} +} // namespace std +#endif + +namespace transport { +namespace core { + +UdpTunnelListener::~UdpTunnelListener() {} + +void UdpTunnelListener::close() { + strand_->post([this]() { + if (socket_->is_open()) { + socket_->close(); + } + }); +} + +#ifdef LINUX +void UdpTunnelListener::readHandler(std::error_code ec) { + TRANSPORT_LOGD("UdpTunnelConnector receive packet"); + + // TRANSPORT_LOGD("UdpTunnelConnector received packet length=%lu", length); + if (TRANSPORT_EXPECT_TRUE(!ec)) { + if (current_position_ == 0) { + for (int i = 0; i < Connector::max_burst; i++) { + auto read_buffer = Connector::getRawBuffer(); + iovecs_[i][0].iov_base = read_buffer.first; + iovecs_[i][0].iov_len = read_buffer.second; + msgs_[i].msg_hdr.msg_iov = iovecs_[i]; + msgs_[i].msg_hdr.msg_iovlen = 1; + msgs_[i].msg_hdr.msg_name = &remote_endpoints_[i]; + msgs_[i].msg_hdr.msg_namelen = sizeof(remote_endpoints_[i]); + } + } + + int res = recvmmsg(socket_->native_handle(), msgs_ + current_position_, + Connector::max_burst - current_position_, MSG_DONTWAIT, + nullptr); + if (res < 0) { + TRANSPORT_LOGE("Error in recvmmsg."); + } + + for (int i = 0; i < res; i++) { + auto packet = Connector::getPacketFromBuffer( + reinterpret_cast<uint8_t *>( + msgs_[current_position_].msg_hdr.msg_iov[0].iov_base), + msgs_[current_position_].msg_len); + auto connector_id = + utils::hash::fnv64_buf(msgs_[current_position_].msg_hdr.msg_name, + msgs_[current_position_].msg_hdr.msg_namelen); + + auto connector = connectors_.find(connector_id); + if (connector == connectors_.end()) { + // Create new connector corresponding to new client + + /* + * Get the remote endpoint for this particular message + */ + using namespace asio::ip; + if (local_endpoint_.address().is_v4()) { + auto addr = reinterpret_cast<struct sockaddr_in *>( + &remote_endpoints_[current_position_]); + address_v4::bytes_type address_bytes; + std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin_addr), + address_bytes.size(), address_bytes.begin()); + address_v4 address(address_bytes); + remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin_port)); + } else { + auto addr = reinterpret_cast<struct sockaddr_in6 *>( + &remote_endpoints_[current_position_]); + address_v6::bytes_type address_bytes; + std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin6_addr), + address_bytes.size(), address_bytes.begin()); + address_v6 address(address_bytes); + remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin6_port)); + } + + /** + * Create new connector sharing the same socket of this listener. + */ + auto ret = connectors_.emplace( + connector_id, + std::make_shared<UdpTunnelConnector>( + socket_, strand_, receive_callback_, + [](Connector *, const std::error_code &) {}, [](Connector *) {}, + [](Connector *) {}, std::move(remote_endpoint_))); + connector = ret.first; + connector->second->setConnectorId(connector_id); + } + + /** + * Use connector callback to process incoming message. + */ + UdpTunnelConnector *c = + dynamic_cast<UdpTunnelConnector *>(connector->second.get()); + c->doRecvPacket(*packet); + + ++current_position_; + } + + doRecvPacket(); + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + TRANSPORT_LOGE("The connection has been closed by the application."); + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + } +} +#endif + +void UdpTunnelListener::doRecvPacket() { +#ifdef LINUX +#if ((ASIO_VERSION / 100 % 1000) < 11) + socket_->async_receive( + asio::null_buffers(), +#else + socket_->async_wait( + asio::ip::tcp::socket::wait_read, +#endif + std::bind(&UdpTunnelListener::readHandler, this, std::placeholders::_1)); +#else + read_msg_ = Connector::getRawBuffer(); + socket_->async_receive_from( + asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_, + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + auto packet = Connector::getPacketFromBuffer(read_msg_.first, length); + auto connector_id = + std::hash<asio::ip::udp::endpoint>{}(remote_endpoint_); + auto connector = connectors_.find(connector_id); + if (connector == connectors_.end()) { + // Create new connector corresponding to new client + auto ret = connectors_.emplace( + connector_id, std::make_shared<UdpTunnelConnector>( + socket_, strand_, receive_callback_, + [](Connector *, const std::error_code &) {}, + [](Connector *) {}, [](Connector *) {}, + std::move(remote_endpoint_))); + connector = ret.first; + connector->second->setConnectorId(connector_id); + } + + UdpTunnelConnector *c = + dynamic_cast<UdpTunnelConnector *>(connector->second.get()); + c->doRecvPacket(*packet); + doRecvPacket(); + } else if (ec.value() == + static_cast<int>(std::errc::operation_canceled)) { + TRANSPORT_LOGE("The connection has been closed by the application."); + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + } + }); +#endif +} +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h new file mode 100644 index 000000000..0ee40a400 --- /dev/null +++ b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + */ + +#pragma once + +#include <hicn/transport/core/connector.h> +#include <hicn/transport/portability/platform.h> + +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <unordered_map> + +namespace std { +template <> +struct hash<asio::ip::udp::endpoint> { + size_t operator()(const asio::ip::udp::endpoint &endpoint) const; +}; +} // namespace std + +namespace transport { +namespace core { + +class UdpTunnelListener + : public std::enable_shared_from_this<UdpTunnelListener> { + using PacketReceivedCallback = Connector::PacketReceivedCallback; + using EndpointId = std::pair<uint32_t, uint16_t>; + + static constexpr uint16_t default_port = 5004; + + public: + using Ptr = std::shared_ptr<UdpTunnelListener>; + + template <typename ReceiveCallback> + UdpTunnelListener(asio::io_service &io_service, + ReceiveCallback &&receive_callback, + asio::ip::udp::endpoint endpoint = asio::ip::udp::endpoint( + asio::ip::udp::v4(), default_port)) + : io_service_(io_service), + strand_(std::make_shared<asio::io_service::strand>(io_service_)), + socket_(std::make_shared<asio::ip::udp::socket>(io_service_, + endpoint.protocol())), + local_endpoint_(endpoint), + receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)), +#ifndef LINUX + read_msg_(nullptr, 0) +#else + iovecs_{0}, + msgs_{0}, + current_position_(0) +#endif + { + if (endpoint.protocol() == asio::ip::udp::v6()) { + std::error_code ec; + socket_->set_option(asio::ip::v6_only(false), ec); + // Call succeeds only on dual stack systems. + } + socket_->bind(local_endpoint_); + io_service_.post(std::bind(&UdpTunnelListener::doRecvPacket, this)); + } + + ~UdpTunnelListener(); + + void close(); + + int deleteConnector(Connector *connector) { + return connectors_.erase(connector->getConnectorId()); + } + + template <typename ReceiveCallback> + void setReceiveCallback(ReceiveCallback &&callback) { + receive_callback_ = std::forward<ReceiveCallback &&>(callback); + } + + Connector *findConnector(Connector::Id connId) { + auto it = connectors_.find(connId); + if (it != connectors_.end()) { + return it->second.get(); + } + + return nullptr; + } + + private: + void doRecvPacket(); + + void readHandler(std::error_code ec); + + asio::io_service &io_service_; + std::shared_ptr<asio::io_service::strand> strand_; + std::shared_ptr<asio::ip::udp::socket> socket_; + asio::ip::udp::endpoint local_endpoint_; + asio::ip::udp::endpoint remote_endpoint_; + std::unordered_map<Connector::Id, std::shared_ptr<Connector>> connectors_; + + PacketReceivedCallback receive_callback_; + +#ifdef LINUX + struct iovec iovecs_[Connector::max_burst][8]; + struct mmsghdr msgs_[Connector::max_burst]; + struct sockaddr_storage remote_endpoints_[Connector::max_burst]; + std::uint8_t current_position_; +#else + std::pair<uint8_t *, std::size_t> read_msg_; +#endif +}; + +} // namespace core + +} // namespace transport |