diff options
Diffstat (limited to 'libtransport/src/io_modules/forwarder')
-rw-r--r-- | libtransport/src/io_modules/forwarder/CMakeLists.txt | 35 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/configuration.h | 89 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/errors.cc | 52 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/errors.h | 91 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder.cc | 293 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder.h | 108 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder_module.cc | 89 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder_module.h | 76 |
8 files changed, 833 insertions, 0 deletions
diff --git a/libtransport/src/io_modules/forwarder/CMakeLists.txt b/libtransport/src/io_modules/forwarder/CMakeLists.txt new file mode 100644 index 000000000..2235d842e --- /dev/null +++ b/libtransport/src/io_modules/forwarder/CMakeLists.txt @@ -0,0 +1,35 @@ +# Copyright (c) 2021-2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +list(APPEND MODULE_HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/endpoint.h + ${CMAKE_CURRENT_SOURCE_DIR}/errors.h + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.h + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.h +) + +list(APPEND MODULE_SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.cc + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.cc +) + +build_module(forwarder_module + SOURCES ${MODULE_SOURCE_FILES} + DEPENDS ${DEPENDENCIES} + COMPONENT ${LIBTRANSPORT_COMPONENT}-io-modules + INCLUDE_DIRS ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} ${Libhicn_INCLUDE_DIRS} + DEFINITIONS ${COMPILER_DEFINITIONS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} +) diff --git a/libtransport/src/io_modules/forwarder/configuration.h b/libtransport/src/io_modules/forwarder/configuration.h new file mode 100644 index 000000000..fcaa5530d --- /dev/null +++ b/libtransport/src/io_modules/forwarder/configuration.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace transport { +namespace core { + +struct ListenerConfig { + std::string address; + std::uint16_t port; + std::string name; +}; + +struct ConnectorConfig { + std::string local_address; + std::uint16_t local_port; + std::string remote_address; + std::uint16_t remote_port; + std::string name; +}; + +struct RouteConfig { + std::string prefix; + uint16_t weight; + std::string connector; + std::string name; +}; + +class Configuration { + public: + Configuration() : n_threads_(1) {} + + bool empty() { + return listeners_.empty() && connectors_.empty() && routes_.empty(); + } + + Configuration& setThreadNumber(std::size_t threads) { + n_threads_ = threads; + return *this; + } + + std::size_t getThreadNumber() { return n_threads_; } + + template <typename... Args> + Configuration& addListener(Args&&... args) { + listeners_.emplace_back(std::forward<Args>(args)...); + return *this; + } + + template <typename... Args> + Configuration& addConnector(Args&&... args) { + connectors_.emplace_back(std::forward<Args>(args)...); + return *this; + } + + template <typename... Args> + Configuration& addRoute(Args&&... args) { + routes_.emplace_back(std::forward<Args>(args)...); + return *this; + } + + std::vector<ListenerConfig>& getListeners() { return listeners_; } + + std::vector<ConnectorConfig>& getConnectors() { return connectors_; } + + std::vector<RouteConfig>& getRoutes() { return routes_; } + + private: + std::vector<ListenerConfig> listeners_; + std::vector<ConnectorConfig> connectors_; + std::vector<RouteConfig> routes_; + std::size_t n_threads_; +}; + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/io_modules/forwarder/errors.cc b/libtransport/src/io_modules/forwarder/errors.cc new file mode 100644 index 000000000..6e93d0453 --- /dev/null +++ b/libtransport/src/io_modules/forwarder/errors.cc @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + */ + +#include <io_modules/forwarder/errors.h> + +namespace transport { +namespace core { + +const std::error_category& forwarder_category() { + static forwarder_category_impl instance; + + return instance; +} + +const char* forwarder_category_impl::name() const throw() { + return "proxy::connector::error"; +} + +std::string forwarder_category_impl::message(int ev) const { + switch (static_cast<forwarder_error>(ev)) { + case forwarder_error::success: { + return "Success"; + } + case forwarder_error::disconnected: { + return "Connector is disconnected"; + } + case forwarder_error::receive_failed: { + return "Packet reception failed"; + } + case forwarder_error::send_failed: { + return "Packet send failed"; + } + case forwarder_error::memory_allocation_error: { + return "Impossible to allocate memory for packet pool"; + } + case forwarder_error::invalid_connector_type: { + return "Invalid type specified for connector."; + } + case forwarder_error::invalid_connector: { + return "Created connector was invalid."; + } + case forwarder_error::interest_cache_miss: { + return "interest cache miss."; + } + default: { + return "Unknown connector error"; + } + } +} +} // namespace core +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/errors.h b/libtransport/src/io_modules/forwarder/errors.h new file mode 100644 index 000000000..dd5cc8fe7 --- /dev/null +++ b/libtransport/src/io_modules/forwarder/errors.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <system_error> + +namespace transport { +namespace core { +/** + * @brief Get the default server error category. + * @return The default server error category instance. + * + * @warning The first call to this function is thread-safe only starting with + * C++11. + */ +const std::error_category& forwarder_category(); + +/** + * The list of errors. + */ +enum class forwarder_error { + success = 0, + send_failed, + receive_failed, + disconnected, + memory_allocation_error, + invalid_connector_type, + invalid_connector, + interest_cache_miss +}; + +/** + * @brief Create an error_code instance for the given error. + * @param error The error. + * @return The error_code instance. + */ +inline std::error_code make_error_code(forwarder_error error) { + return std::error_code(static_cast<int>(error), forwarder_category()); +} + +/** + * @brief Create an error_condition instance for the given error. + * @param error The error. + * @return The error_condition instance. + */ +inline std::error_condition make_error_condition(forwarder_error error) { + return std::error_condition(static_cast<int>(error), forwarder_category()); +} + +/** + * @brief A server error category. + */ +class forwarder_category_impl : public std::error_category { + public: + /** + * @brief Get the name of the category. + * @return The name of the category. + */ + virtual const char* name() const throw(); + + /** + * @brief Get the error message for a given error. + * @param ev The error numeric value. + * @return The message associated to the error. + */ + virtual std::string message(int ev) const; +}; +} // namespace core +} // namespace transport + +namespace std { +// namespace system { +template <> +struct is_error_code_enum<::transport::core::forwarder_error> + : public std::true_type {}; +// } // namespace system +} // namespace std diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc new file mode 100644 index 000000000..d5f0b589e --- /dev/null +++ b/libtransport/src/io_modules/forwarder/forwarder.cc @@ -0,0 +1,293 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/global_configuration.h> +#include <core/global_id_counter.h> +#include <core/local_connector.h> +#include <core/udp_connector.h> +#include <core/udp_listener.h> +#include <glog/logging.h> +#include <io_modules/forwarder/forwarder.h> + +namespace transport { + +namespace core { + +constexpr char Forwarder::forwarder_config_section[]; + +Forwarder::Forwarder() : config_() { + using namespace std::placeholders; + GlobalConfiguration::getInstance().registerConfigurationParser( + forwarder_config_section, + std::bind(&Forwarder::parseForwarderConfiguration, this, _1, _2)); + + if (!config_.empty()) { + initThreads(); + initListeners(); + initConnectors(); + } +} + +Forwarder::~Forwarder() { + for (auto &l : listeners_) { + l->close(); + } + + for (auto &c : remote_connectors_) { + c.second->close(); + } + + GlobalConfiguration::getInstance().unregisterConfigurationParser( + forwarder_config_section); +} + +void Forwarder::initThreads() { + for (unsigned i = 0; i < config_.getThreadNumber(); i++) { + thread_pool_.emplace_back(io_service_, /* detached */ false); + } +} + +void Forwarder::initListeners() { + using namespace std::placeholders; + for (auto &l : config_.getListeners()) { + listeners_.emplace_back(std::make_shared<UdpTunnelListener>( + io_service_, + std::bind(&Forwarder::onPacketFromListener, this, _1, _2, _3), + asio::ip::udp::endpoint(asio::ip::address::from_string(l.address), + l.port))); + } +} + +void Forwarder::initConnectors() { + using namespace std::placeholders; + for (auto &c : config_.getConnectors()) { + auto id = GlobalCounter<Connector::Id>::getInstance().getNext(); + auto conn = new UdpTunnelConnector( + io_service_, std::bind(&Forwarder::onPacketReceived, this, _1, _2, _3), + std::bind(&Forwarder::onPacketSent, this, _1, _2), + std::bind(&Forwarder::onConnectorClosed, this, _1), + std::bind(&Forwarder::onConnectorReconnected, this, _1)); + conn->setConnectorId(id); + remote_connectors_.emplace(id, conn); + conn->connect(c.remote_address, c.remote_port, c.local_address, + c.local_port); + } +} + +Connector::Id Forwarder::registerLocalConnector( + asio::io_service &io_service, + Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, + Connector::OnCloseCallback &&close_callback, + Connector::OnReconnectCallback &&reconnect_callback) { + utils::SpinLock::Acquire locked(connector_lock_); + auto id = GlobalCounter<Connector::Id>::getInstance().getNext(); + auto connector = std::make_shared<LocalConnector>( + io_service, std::move(receive_callback), std::move(sent_callback), + std::move(close_callback), std::move(reconnect_callback)); + connector->setConnectorId(id); + local_connectors_.emplace(id, std::move(connector)); + return id; +} + +Forwarder &Forwarder::deleteConnector(Connector::Id id) { + utils::SpinLock::Acquire locked(connector_lock_); + auto it = local_connectors_.find(id); + if (it != local_connectors_.end()) { + it->second->close(); + local_connectors_.erase(it); + } else { + } + + return *this; +} + +Connector::Ptr Forwarder::getConnector(Connector::Id id) { + utils::SpinLock::Acquire locked(connector_lock_); + auto it = local_connectors_.find(id); + if (it != local_connectors_.end()) { + return it->second; + } + + return nullptr; +} + +void Forwarder::onPacketFromListener( + Connector *connector, const std::vector<utils::MemBuf::Ptr> &packets, + const std::error_code &ec) { + // Create connector + connector->setReceiveCallback( + std::bind(&Forwarder::onPacketReceived, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Packet received from listener."; + + { + utils::SpinLock::Acquire locked(connector_lock_); + remote_connectors_.emplace(connector->getConnectorId(), + connector->shared_from_this()); + } + + // TODO Check if control packet or not. For the moment it is not. + onPacketReceived(connector, packets, ec); +} + +void Forwarder::onPacketReceived(Connector *connector, + const std::vector<utils::MemBuf::Ptr> &packets, + const std::error_code &ec) { + if (ec) { + LOG(ERROR) << "Error receiving packet: " << ec.message(); + return; + } + + for (auto &c : local_connectors_) { + c.second->receive(packets); + } + + // PCS Lookup + FIB lookup. Skip for now + + // Forward packet to local connectors +} + +void Forwarder::send(Packet &packet, Connector::Id connector_id) { + // TODo Here a nice PIT/CS / FIB would be required:) + // For now let's just forward the packet on the remote connector we get + for (auto &c : remote_connectors_) { + auto remote_endpoint = c.second->getRemoteEndpoint(); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Sending packet to: " << remote_endpoint.getAddress() << ":" + << remote_endpoint.getPort(); + c.second->send(packet); + } + + for (auto &c : local_connectors_) { + if (c.first != connector_id) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Sending packet to local connector " << c.first << std::endl; + c.second->receive({packet.shared_from_this()}); + } + } +} + +void Forwarder::onPacketSent(Connector *connector, const std::error_code &ec) {} + +void Forwarder::onConnectorClosed(Connector *connector) {} + +void Forwarder::onConnectorReconnected(Connector *connector) {} + +void Forwarder::parseForwarderConfiguration( + const libconfig::Setting &forwarder_config, std::error_code &ec) { + using namespace libconfig; + + // n_thread + if (forwarder_config.exists("n_threads")) { + // Get number of threads + int n_threads = 1; + forwarder_config.lookupValue("n_threads", n_threads); + VLOG(1) << "Forwarder threads from config file: " << n_threads; + config_.setThreadNumber(n_threads); + } + + // listeners + if (forwarder_config.exists("listeners")) { + // get path where looking for modules + const Setting &listeners = forwarder_config.lookup("listeners"); + auto count = listeners.getLength(); + + for (int i = 0; i < count; i++) { + const Setting &listener = listeners[i]; + ListenerConfig list; + unsigned port; + + list.name = listener.getName(); + listener.lookupValue("local_address", list.address); + listener.lookupValue("local_port", port); + list.port = (uint16_t)(port); + + VLOG(1) << "Adding listener " << list.name << ", ( " << list.address + << ":" << list.port << ")"; + config_.addListener(std::move(list)); + } + } + + // connectors + if (forwarder_config.exists("connectors")) { + // get path where looking for modules + const Setting &connectors = forwarder_config.lookup("connectors"); + auto count = connectors.getLength(); + + for (int i = 0; i < count; i++) { + const Setting &connector = connectors[i]; + ConnectorConfig conn; + + conn.name = connector.getName(); + unsigned port = 0; + + if (!connector.lookupValue("local_address", conn.local_address)) { + conn.local_address = ""; + } + + if (!connector.lookupValue("local_port", port)) { + port = 0; + } + + conn.local_port = (uint16_t)(port); + + if (!connector.lookupValue("remote_address", conn.remote_address)) { + throw errors::RuntimeException( + "Error in configuration file: remote_address is a mandatory field " + "of Connectors."); + } + + if (!connector.lookupValue("remote_port", port)) { + throw errors::RuntimeException( + "Error in configuration file: remote_port is a mandatory field " + "of Connectors."); + } + + conn.remote_port = (uint16_t)(port); + + VLOG(1) << "Adding connector " << conn.name << ", (" << conn.local_address + << ":" << conn.local_port << " " << conn.remote_address << ":" + << conn.remote_port << ")"; + config_.addConnector(std::move(conn)); + } + } + + // Routes + if (forwarder_config.exists("routes")) { + const Setting &routes = forwarder_config.lookup("routes"); + auto count = routes.getLength(); + + for (int i = 0; i < count; i++) { + const Setting &route = routes[i]; + RouteConfig r; + unsigned weight; + + r.name = route.getName(); + route.lookupValue("prefix", r.prefix); + route.lookupValue("weight", weight); + route.lookupValue("connector", r.connector); + r.weight = (uint16_t)(weight); + + VLOG(1) << "Adding route " << r.name << " " << r.prefix << " (" + << r.connector << " " << r.weight << ")"; + config_.addRoute(std::move(r)); + } + } +} + +} // namespace core +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/forwarder.h b/libtransport/src/io_modules/forwarder/forwarder.h new file mode 100644 index 000000000..1022bf81b --- /dev/null +++ b/libtransport/src/io_modules/forwarder/forwarder.h @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <core/udp_listener.h> +#include <hicn/transport/core/io_module.h> +#include <hicn/transport/core/prefix.h> +#include <hicn/transport/utils/event_thread.h> +#include <hicn/transport/utils/singleton.h> +#include <hicn/transport/utils/spinlock.h> +#include <io_modules/forwarder/configuration.h> + +#include <atomic> +#include <libconfig.h++> +#include <unordered_map> + +namespace transport { + +namespace core { + +class Forwarder { + static constexpr char forwarder_config_section[] = "forwarder"; + + public: + Forwarder(); + + ~Forwarder(); + + void initThreads(); + void initListeners(); + void initConnectors(); + + Connector::Id registerLocalConnector( + asio::io_service &io_service, + Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, + Connector::OnCloseCallback &&close_callback, + Connector::OnReconnectCallback &&reconnect_callback); + + Forwarder &deleteConnector(Connector::Id id); + + Connector::Ptr getConnector(Connector::Id id); + + void send(Packet &packet, Connector::Id id); + + void stop(); + + private: + void onPacketFromListener(Connector *connector, + const std::vector<utils::MemBuf::Ptr> &packets, + const std::error_code &ec); + void onPacketReceived(Connector *connector, + const std::vector<utils::MemBuf::Ptr> &packets, + const std::error_code &ec); + void onPacketSent(Connector *connector, const std::error_code &ec); + void onConnectorClosed(Connector *connector); + void onConnectorReconnected(Connector *connector); + + void parseForwarderConfiguration(const libconfig::Setting &io_config, + std::error_code &ec); + + asio::io_service io_service_; + utils::SpinLock connector_lock_; + + /** + * Connectors and listeners must be declares *before* thread_pool_, so that + * threads destructors will wait for them to gracefully close before being + * destroyed. + */ + std::unordered_map<Connector::Id, Connector::Ptr> remote_connectors_; + std::unordered_map<Connector::Id, Connector::Ptr> local_connectors_; + std::vector<UdpTunnelListener::Ptr> listeners_; + + std::vector<utils::EventThread> thread_pool_; + + Configuration config_; +}; + +class ForwarderGlobal : public ::utils::Singleton<ForwarderGlobal> { + friend class utils::Singleton<ForwarderGlobal>; + + public: + ~ForwarderGlobal() {} + std::shared_ptr<Forwarder> &getReference() { return forwarder_; } + + private: + ForwarderGlobal() : forwarder_(std::make_shared<Forwarder>()) {} + + private: + std::shared_ptr<Forwarder> forwarder_; +}; + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.cc b/libtransport/src/io_modules/forwarder/forwarder_module.cc new file mode 100644 index 000000000..ca9466f01 --- /dev/null +++ b/libtransport/src/io_modules/forwarder/forwarder_module.cc @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <hicn/transport/errors/not_implemented_exception.h> +#include <io_modules/forwarder/forwarder_module.h> + +namespace transport { + +namespace core { + +ForwarderModule::ForwarderModule() + : IoModule(), + name_(""), + connector_id_(Connector::invalid_connector), + forwarder_ptr_(ForwarderGlobal::getInstance().getReference()), + forwarder_(*forwarder_ptr_) {} + +ForwarderModule::~ForwarderModule() {} + +bool ForwarderModule::isConnected() { return true; } + +void ForwarderModule::send(Packet &packet) { + IoModule::send(packet); + forwarder_.send(packet, connector_id_); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Sending from " << connector_id_ << " to " << 1 - connector_id_; +} + +void ForwarderModule::send(const utils::MemBuf::Ptr &buffer) { + // not supported + throw errors::NotImplementedException(); +} + +void ForwarderModule::registerRoute(const Prefix &prefix) { + // For the moment we route packets from one socket to the other. + // Next step will be to introduce a FIB + return; +} + +void ForwarderModule::closeConnection() { + forwarder_.deleteConnector(connector_id_); +} + +void ForwarderModule::init(Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, + Connector::OnCloseCallback &&close_callback, + Connector::OnReconnectCallback &&reconnect_callback, + asio::io_service &io_service, + const std::string &app_name) { + connector_id_ = forwarder_.registerLocalConnector( + io_service, std::move(receive_callback), std::move(sent_callback), + std::move(close_callback), std::move(reconnect_callback)); + name_ = app_name; +} + +void ForwarderModule::processControlMessageReply(utils::MemBuf &packet_buffer) { + return; +} + +void ForwarderModule::connect(bool is_consumer) { + forwarder_.getConnector(connector_id_) + ->setRole(is_consumer ? Connector::Role::CONSUMER + : Connector::Role::PRODUCER); +} + +std::uint32_t ForwarderModule::getMtu() { return interface_mtu; } + +bool ForwarderModule::isControlMessage(utils::MemBuf &packet_buffer) { + return false; +} + +extern "C" IoModule *create_module(void) { return new ForwarderModule(); } + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.h b/libtransport/src/io_modules/forwarder/forwarder_module.h new file mode 100644 index 000000000..a48701161 --- /dev/null +++ b/libtransport/src/io_modules/forwarder/forwarder_module.h @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/io_module.h> +#include <hicn/transport/core/prefix.h> +#include <io_modules/forwarder/forwarder.h> + +#include <atomic> + +namespace transport { + +namespace core { + +class Forwarder; + +class ForwarderModule : public IoModule { + static constexpr std::uint16_t interface_mtu = 1500; + + public: + ForwarderModule(); + + ~ForwarderModule(); + + void connect(bool is_consumer) override; + + void send(Packet &packet) override; + void send(const utils::MemBuf::Ptr &buffer) override; + + bool isConnected() override; + + void init(Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, + Connector::OnCloseCallback &&close_callback, + Connector::OnReconnectCallback &&reconnect_callback, + asio::io_service &io_service, + const std::string &app_name = "Libtransport") override; + + void registerRoute(const Prefix &prefix) override; + + std::uint32_t getMtu() override; + + bool isControlMessage(utils::MemBuf &packet_buffer) override; + + void processControlMessageReply(utils::MemBuf &packet_buffer) override; + + void closeConnection() override; + + private: + static void initForwarder(); + + private: + std::string name_; + Connector::Id connector_id_; + std::shared_ptr<Forwarder> forwarder_ptr_; + Forwarder &forwarder_; +}; + +extern "C" IoModule *create_module(void); + +} // namespace core + +} // namespace transport |