diff options
Diffstat (limited to 'libtransport/src/io_modules/forwarder')
11 files changed, 83 insertions, 826 deletions
diff --git a/libtransport/src/io_modules/forwarder/CMakeLists.txt b/libtransport/src/io_modules/forwarder/CMakeLists.txt index a1d0c5db5..2235d842e 100644 --- a/libtransport/src/io_modules/forwarder/CMakeLists.txt +++ b/libtransport/src/io_modules/forwarder/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2021 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -17,25 +17,19 @@ list(APPEND MODULE_HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/errors.h ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.h ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.h - ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.h - ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.h - ${CMAKE_CURRENT_SOURCE_DIR}/global_counter.h ) list(APPEND MODULE_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.cc ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.cc - ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.cc - ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.cc ) build_module(forwarder_module - SHARED SOURCES ${MODULE_SOURCE_FILES} DEPENDS ${DEPENDENCIES} COMPONENT ${LIBTRANSPORT_COMPONENT}-io-modules - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} + INCLUDE_DIRS ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} ${Libhicn_INCLUDE_DIRS} DEFINITIONS ${COMPILER_DEFINITIONS} - COMPILE_OPTIONS ${COMPILE_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} ) diff --git a/libtransport/src/io_modules/forwarder/errors.cc b/libtransport/src/io_modules/forwarder/errors.cc index b5f131499..6e93d0453 100644 --- a/libtransport/src/io_modules/forwarder/errors.cc +++ b/libtransport/src/io_modules/forwarder/errors.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. */ #include <io_modules/forwarder/errors.h> diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc index 0546cb8b3..d5f0b589e 100644 --- a/libtransport/src/io_modules/forwarder/forwarder.cc +++ b/libtransport/src/io_modules/forwarder/forwarder.cc @@ -14,12 +14,12 @@ */ #include <core/global_configuration.h> +#include <core/global_id_counter.h> #include <core/local_connector.h> +#include <core/udp_connector.h> +#include <core/udp_listener.h> #include <glog/logging.h> #include <io_modules/forwarder/forwarder.h> -#include <io_modules/forwarder/global_id_counter.h> -#include <io_modules/forwarder/udp_tunnel.h> -#include <io_modules/forwarder/udp_tunnel_listener.h> namespace transport { @@ -89,11 +89,14 @@ void Forwarder::initConnectors() { Connector::Id Forwarder::registerLocalConnector( asio::io_service &io_service, Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, + Connector::OnCloseCallback &&close_callback, Connector::OnReconnectCallback &&reconnect_callback) { utils::SpinLock::Acquire locked(connector_lock_); auto id = GlobalCounter<Connector::Id>::getInstance().getNext(); auto connector = std::make_shared<LocalConnector>( - io_service, receive_callback, nullptr, nullptr, reconnect_callback); + io_service, std::move(receive_callback), std::move(sent_callback), + std::move(close_callback), std::move(reconnect_callback)); connector->setConnectorId(id); local_connectors_.emplace(id, std::move(connector)); return id; @@ -105,6 +108,7 @@ Forwarder &Forwarder::deleteConnector(Connector::Id id) { if (it != local_connectors_.end()) { it->second->close(); local_connectors_.erase(it); + } else { } return *this; @@ -120,9 +124,9 @@ Connector::Ptr Forwarder::getConnector(Connector::Id id) { return nullptr; } -void Forwarder::onPacketFromListener(Connector *connector, - utils::MemBuf &packet_buffer, - const std::error_code &ec) { +void Forwarder::onPacketFromListener( + Connector *connector, const std::vector<utils::MemBuf::Ptr> &packets, + const std::error_code &ec) { // Create connector connector->setReceiveCallback( std::bind(&Forwarder::onPacketReceived, this, std::placeholders::_1, @@ -135,32 +139,21 @@ void Forwarder::onPacketFromListener(Connector *connector, remote_connectors_.emplace(connector->getConnectorId(), connector->shared_from_this()); } + // TODO Check if control packet or not. For the moment it is not. - onPacketReceived(connector, packet_buffer, ec); + onPacketReceived(connector, packets, ec); } void Forwarder::onPacketReceived(Connector *connector, - utils::MemBuf &packet_buffer, + const std::vector<utils::MemBuf::Ptr> &packets, const std::error_code &ec) { - // Figure out the type of packet we received - bool is_interest = Packet::isInterest(packet_buffer.data()); - - Packet *packet = nullptr; - if (is_interest) { - packet = static_cast<Interest *>(&packet_buffer); - } else { - packet = static_cast<ContentObject *>(&packet_buffer); + if (ec) { + LOG(ERROR) << "Error receiving packet: " << ec.message(); + return; } for (auto &c : local_connectors_) { - auto role = c.second->getRole(); - auto is_producer = role == Connector::Role::PRODUCER; - if ((is_producer && is_interest) || (!is_producer && !is_interest)) { - c.second->send(*packet); - } else { - LOG(ERROR) << "Error sending packet to local connector. is_interest = " - << is_interest << " - is_producer = " << is_producer; - } + c.second->receive(packets); } // PCS Lookup + FIB lookup. Skip for now @@ -168,19 +161,24 @@ void Forwarder::onPacketReceived(Connector *connector, // Forward packet to local connectors } -void Forwarder::send(Packet &packet) { +void Forwarder::send(Packet &packet, Connector::Id connector_id) { // TODo Here a nice PIT/CS / FIB would be required:) // For now let's just forward the packet on the remote connector we get - if (remote_connectors_.begin() == remote_connectors_.end()) { - return; + for (auto &c : remote_connectors_) { + auto remote_endpoint = c.second->getRemoteEndpoint(); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Sending packet to: " << remote_endpoint.getAddress() << ":" + << remote_endpoint.getPort(); + c.second->send(packet); } - auto remote_endpoint = - remote_connectors_.begin()->second->getRemoteEndpoint(); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Sending packet to: " << remote_endpoint.getAddress() << ":" - << remote_endpoint.getPort(); - remote_connectors_.begin()->second->send(packet); + for (auto &c : local_connectors_) { + if (c.first != connector_id) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Sending packet to local connector " << c.first << std::endl; + c.second->receive({packet.shared_from_this()}); + } + } } void Forwarder::onPacketSent(Connector *connector, const std::error_code &ec) {} @@ -292,4 +290,4 @@ void Forwarder::parseForwarderConfiguration( } } // namespace core -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/forwarder.h b/libtransport/src/io_modules/forwarder/forwarder.h index 5b564bb5e..1022bf81b 100644 --- a/libtransport/src/io_modules/forwarder/forwarder.h +++ b/libtransport/src/io_modules/forwarder/forwarder.h @@ -15,13 +15,13 @@ #pragma once +#include <core/udp_listener.h> #include <hicn/transport/core/io_module.h> #include <hicn/transport/core/prefix.h> #include <hicn/transport/utils/event_thread.h> #include <hicn/transport/utils/singleton.h> #include <hicn/transport/utils/spinlock.h> #include <io_modules/forwarder/configuration.h> -#include <io_modules/forwarder/udp_tunnel_listener.h> #include <atomic> #include <libconfig.h++> @@ -31,9 +31,8 @@ namespace transport { namespace core { -class Forwarder : public utils::Singleton<Forwarder> { +class Forwarder { static constexpr char forwarder_config_section[] = "forwarder"; - friend class utils::Singleton<Forwarder>; public: Forwarder(); @@ -47,20 +46,24 @@ class Forwarder : public utils::Singleton<Forwarder> { Connector::Id registerLocalConnector( asio::io_service &io_service, Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, + Connector::OnCloseCallback &&close_callback, Connector::OnReconnectCallback &&reconnect_callback); Forwarder &deleteConnector(Connector::Id id); Connector::Ptr getConnector(Connector::Id id); - void send(Packet &packet); + void send(Packet &packet, Connector::Id id); void stop(); private: - void onPacketFromListener(Connector *connector, utils::MemBuf &packet_buffer, + void onPacketFromListener(Connector *connector, + const std::vector<utils::MemBuf::Ptr> &packets, const std::error_code &ec); - void onPacketReceived(Connector *connector, utils::MemBuf &packet_buffer, + void onPacketReceived(Connector *connector, + const std::vector<utils::MemBuf::Ptr> &packets, const std::error_code &ec); void onPacketSent(Connector *connector, const std::error_code &ec); void onConnectorClosed(Connector *connector); @@ -86,5 +89,20 @@ class Forwarder : public utils::Singleton<Forwarder> { Configuration config_; }; +class ForwarderGlobal : public ::utils::Singleton<ForwarderGlobal> { + friend class utils::Singleton<ForwarderGlobal>; + + public: + ~ForwarderGlobal() {} + std::shared_ptr<Forwarder> &getReference() { return forwarder_; } + + private: + ForwarderGlobal() : forwarder_(std::make_shared<Forwarder>()) {} + + private: + std::shared_ptr<Forwarder> forwarder_; +}; + } // namespace core -} // namespace transport
\ No newline at end of file + +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.cc b/libtransport/src/io_modules/forwarder/forwarder_module.cc index 4f95b9ca0..ca9466f01 100644 --- a/libtransport/src/io_modules/forwarder/forwarder_module.cc +++ b/libtransport/src/io_modules/forwarder/forwarder_module.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -25,24 +25,21 @@ ForwarderModule::ForwarderModule() : IoModule(), name_(""), connector_id_(Connector::invalid_connector), - forwarder_(Forwarder::getInstance()) {} + forwarder_ptr_(ForwarderGlobal::getInstance().getReference()), + forwarder_(*forwarder_ptr_) {} -ForwarderModule::~ForwarderModule() { - forwarder_.deleteConnector(connector_id_); -} +ForwarderModule::~ForwarderModule() {} bool ForwarderModule::isConnected() { return true; } void ForwarderModule::send(Packet &packet) { IoModule::send(packet); - forwarder_.send(packet); + forwarder_.send(packet, connector_id_); DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending from " << connector_id_ << " to " << 1 - connector_id_; - - // local_faces_.at(1 - local_id_).onPacket(packet); } -void ForwarderModule::send(const uint8_t *packet, std::size_t len) { +void ForwarderModule::send(const utils::MemBuf::Ptr &buffer) { // not supported throw errors::NotImplementedException(); } @@ -58,11 +55,14 @@ void ForwarderModule::closeConnection() { } void ForwarderModule::init(Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, + Connector::OnCloseCallback &&close_callback, Connector::OnReconnectCallback &&reconnect_callback, asio::io_service &io_service, const std::string &app_name) { connector_id_ = forwarder_.registerLocalConnector( - io_service, std::move(receive_callback), std::move(reconnect_callback)); + io_service, std::move(receive_callback), std::move(sent_callback), + std::move(close_callback), std::move(reconnect_callback)); name_ = app_name; } @@ -78,7 +78,9 @@ void ForwarderModule::connect(bool is_consumer) { std::uint32_t ForwarderModule::getMtu() { return interface_mtu; } -bool ForwarderModule::isControlMessage(const uint8_t *message) { return false; } +bool ForwarderModule::isControlMessage(utils::MemBuf &packet_buffer) { + return false; +} extern "C" IoModule *create_module(void) { return new ForwarderModule(); } diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.h b/libtransport/src/io_modules/forwarder/forwarder_module.h index 58bfb7996..a48701161 100644 --- a/libtransport/src/io_modules/forwarder/forwarder_module.h +++ b/libtransport/src/io_modules/forwarder/forwarder_module.h @@ -38,11 +38,13 @@ class ForwarderModule : public IoModule { void connect(bool is_consumer) override; void send(Packet &packet) override; - void send(const uint8_t *packet, std::size_t len) override; + void send(const utils::MemBuf::Ptr &buffer) override; bool isConnected() override; void init(Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, + Connector::OnCloseCallback &&close_callback, Connector::OnReconnectCallback &&reconnect_callback, asio::io_service &io_service, const std::string &app_name = "Libtransport") override; @@ -51,15 +53,19 @@ class ForwarderModule : public IoModule { std::uint32_t getMtu() override; - bool isControlMessage(const uint8_t *message) override; + bool isControlMessage(utils::MemBuf &packet_buffer) override; void processControlMessageReply(utils::MemBuf &packet_buffer) override; void closeConnection() override; private: + static void initForwarder(); + + private: std::string name_; Connector::Id connector_id_; + std::shared_ptr<Forwarder> forwarder_ptr_; Forwarder &forwarder_; }; diff --git a/libtransport/src/io_modules/forwarder/global_id_counter.h b/libtransport/src/io_modules/forwarder/global_id_counter.h deleted file mode 100644 index 0a67b76d5..000000000 --- a/libtransport/src/io_modules/forwarder/global_id_counter.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2021 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/utils/singleton.h> - -#include <atomic> -#include <mutex> - -namespace transport { - -namespace core { - -template <typename T = uint64_t> -class GlobalCounter : public utils::Singleton<GlobalCounter<T>> { - public: - friend class utils::Singleton<GlobalCounter>; - T getNext() { return counter_++; } - - private: - GlobalCounter() : counter_(0) {} - std::atomic<T> counter_; -}; - -} // namespace core -} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.cc b/libtransport/src/io_modules/forwarder/udp_tunnel.cc deleted file mode 100644 index bf6a69b92..000000000 --- a/libtransport/src/io_modules/forwarder/udp_tunnel.cc +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - */ - -#include <glog/logging.h> -#include <hicn/transport/utils/branch_prediction.h> -#include <io_modules/forwarder/errors.h> -#include <io_modules/forwarder/udp_tunnel.h> - -#include <iostream> -#include <thread> -#include <vector> - -namespace transport { -namespace core { - -UdpTunnelConnector::~UdpTunnelConnector() {} - -void UdpTunnelConnector::connect(const std::string &hostname, uint16_t port, - const std::string &bind_address, - uint16_t bind_port) { - if (state_ == State::CLOSED) { - state_ = State::CONNECTING; - endpoint_iterator_ = resolver_.resolve({hostname, std::to_string(port)}); - remote_endpoint_send_ = *endpoint_iterator_; - socket_->open(remote_endpoint_send_.protocol()); - - if (!bind_address.empty() && bind_port != 0) { - using namespace asio::ip; - socket_->bind( - udp::endpoint(address::from_string(bind_address), bind_port)); - } - - state_ = State::CONNECTED; - - remote_endpoint_ = Endpoint(remote_endpoint_send_); - local_endpoint_ = Endpoint(socket_->local_endpoint()); - - doRecvPacket(); - -#ifdef LINUX - send_timer_.expires_from_now(std::chrono::microseconds(50)); - send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this, - std::placeholders::_1)); -#endif - } -} - -void UdpTunnelConnector::send(Packet &packet) { - strand_->post([this, pkt{packet.shared_from_this()}]() { - bool write_in_progress = !output_buffer_.empty(); - output_buffer_.push_back(std::move(pkt)); - if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { - if (!write_in_progress) { - doSendPacket(); - } - } else { - data_available_ = true; - } - }); -} - -void UdpTunnelConnector::send(const uint8_t *packet, std::size_t len) {} - -void UdpTunnelConnector::close() { - DLOG_IF(INFO, VLOG_IS_ON(2)) << "UDPTunnelConnector::close"; - state_ = State::CLOSED; - bool is_socket_owned = socket_.use_count() == 1; - if (is_socket_owned) { - io_service_.dispatch([this]() { - this->socket_->close(); - // on_close_callback_(shared_from_this()); - }); - } -} - -void UdpTunnelConnector::doSendPacket() { -#ifdef LINUX - send_timer_.expires_from_now(std::chrono::microseconds(50)); - send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this, - std::placeholders::_1)); -#else - auto packet = output_buffer_.front().get(); - auto array = std::vector<asio::const_buffer>(); - - const ::utils::MemBuf *current = packet; - do { - array.push_back(asio::const_buffer(current->data(), current->length())); - current = current->next(); - } while (current != packet); - - socket_->async_send_to( - std::move(array), remote_endpoint_send_, - strand_->wrap([this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - sent_callback_(this, make_error_code(forwarder_error::success)); - } else if (ec.value() == - static_cast<int>(std::errc::operation_canceled)) { - // The connection has been closed by the application. - return; - } else { - sendFailed(); - sent_callback_(this, ec); - } - - output_buffer_.pop_front(); - if (!output_buffer_.empty()) { - doSendPacket(); - } - })); -#endif -} - -#ifdef LINUX -void UdpTunnelConnector::writeHandler(std::error_code ec) { - if (TRANSPORT_EXPECT_FALSE(state_ != State::CONNECTED)) { - return; - } - - auto len = std::min(output_buffer_.size(), std::size_t(Connector::max_burst)); - - if (len) { - int m = 0; - for (auto &p : output_buffer_) { - auto packet = p.get(); - ::utils::MemBuf *current = packet; - int b = 0; - do { - // array.push_back(asio::const_buffer(current->data(), - // current->length())); - tx_iovecs_[m][b].iov_base = current->writableData(); - tx_iovecs_[m][b].iov_len = current->length(); - current = current->next(); - b++; - } while (current != packet); - - tx_msgs_[m].msg_hdr.msg_iov = tx_iovecs_[m]; - tx_msgs_[m].msg_hdr.msg_iovlen = b; - tx_msgs_[m].msg_hdr.msg_name = remote_endpoint_send_.data(); - tx_msgs_[m].msg_hdr.msg_namelen = remote_endpoint_send_.size(); - m++; - - if (--len == 0) { - break; - } - } - - int retval = sendmmsg(socket_->native_handle(), tx_msgs_, m, MSG_DONTWAIT); - if (retval > 0) { - while (retval--) { - output_buffer_.pop_front(); - } - } else if (retval != EWOULDBLOCK && retval != EAGAIN) { - LOG(ERROR) << "Error sending messages! " << strerror(errno) - << " << retval"; - return; - } - } - - if (!output_buffer_.empty()) { - send_timer_.expires_from_now(std::chrono::microseconds(50)); - send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this, - std::placeholders::_1)); - } -} - -void UdpTunnelConnector::readHandler(std::error_code ec) { - DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet"; - - if (TRANSPORT_EXPECT_TRUE(!ec)) { - if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { - if (current_position_ == 0) { - for (int i = 0; i < max_burst; i++) { - auto read_buffer = getRawBuffer(); - rx_iovecs_[i][0].iov_base = read_buffer.first; - rx_iovecs_[i][0].iov_len = read_buffer.second; - rx_msgs_[i].msg_hdr.msg_iov = rx_iovecs_[i]; - rx_msgs_[i].msg_hdr.msg_iovlen = 1; - } - } - - int res = recvmmsg(socket_->native_handle(), rx_msgs_ + current_position_, - max_burst - current_position_, MSG_DONTWAIT, nullptr); - if (res < 0) { - LOG(ERROR) << "Error receiving messages! " << strerror(errno) << " " - << res; - return; - } - - for (int i = 0; i < res; i++) { - auto packet = getPacketFromBuffer( - reinterpret_cast<uint8_t *>( - rx_msgs_[current_position_].msg_hdr.msg_iov[0].iov_base), - rx_msgs_[current_position_].msg_len); - receiveSuccess(*packet); - receive_callback_(this, *packet, - make_error_code(forwarder_error::success)); - ++current_position_; - } - - doRecvPacket(); - } else { - LOG(ERROR) - << "Error in UDP: Receiving packets from a not connected socket."; - } - } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { - LOG(ERROR) << "The connection has been closed by the application."; - return; - } else { - if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { - // receive_callback_(this, *read_msg_, ec); - LOG(ERROR) << "Error in UDP connector: " << ec.value() << " " - << ec.message(); - } else { - LOG(ERROR) << "Error in connector while not connected. " << ec.value() - << " " << ec.message(); - } - } -} -#endif - -void UdpTunnelConnector::doRecvPacket() { -#ifdef LINUX - if (state_ == State::CONNECTED) { -#if ((ASIO_VERSION / 100 % 1000) < 11) - socket_->async_receive(asio::null_buffers(), -#else - socket_->async_wait(asio::ip::tcp::socket::wait_read, -#endif - std::bind(&UdpTunnelConnector::readHandler, this, - std::placeholders::_1)); - } -#else - DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet"; - read_msg_ = getRawBuffer(); - socket_->async_receive_from( - asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_recv_, - [this](std::error_code ec, std::size_t length) { - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "UdpTunnelConnector received packet length=" << length; - if (TRANSPORT_EXPECT_TRUE(!ec)) { - if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { - auto packet = getPacketFromBuffer(read_msg_.first, length); - receiveSuccess(*packet); - receive_callback_(this, *packet, - make_error_code(forwarder_error::success)); - doRecvPacket(); - } else { - LOG(ERROR) << "Error in UDP: Receiving packets from a not " - "connected socket."; - } - } else if (ec.value() == - static_cast<int>(std::errc::operation_canceled)) { - LOG(ERROR) << "The connection has been closed by the application."; - return; - } else { - if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { - LOG(ERROR) << "Error in UDP connector: " << ec.value() - << ec.message(); - } else { - LOG(ERROR) << "Error while not connected"; - } - } - }); -#endif -} - -void UdpTunnelConnector::doConnect() { - asio::async_connect( - *socket_, endpoint_iterator_, - [this](std::error_code ec, asio::ip::udp::resolver::iterator) { - if (!ec) { - state_ = State::CONNECTED; - doRecvPacket(); - - if (data_available_) { - data_available_ = false; - doSendPacket(); - } - } else { - LOG(ERROR) << "UDP Connection failed!!!"; - timer_.expires_from_now(std::chrono::milliseconds(500)); - timer_.async_wait(std::bind(&UdpTunnelConnector::doConnect, this)); - } - }); -} - -} // namespace core - -} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.h b/libtransport/src/io_modules/forwarder/udp_tunnel.h deleted file mode 100644 index 4f044f93f..000000000 --- a/libtransport/src/io_modules/forwarder/udp_tunnel.h +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - */ - -#pragma once - -#include <hicn/transport/core/asio_wrapper.h> -#include <hicn/transport/core/connector.h> -#include <hicn/transport/portability/platform.h> -#include <io_modules/forwarder/errors.h> - -#include <iostream> -#include <memory> - -namespace transport { -namespace core { - -class UdpTunnelListener; - -class UdpTunnelConnector : public Connector { - friend class UdpTunnelListener; - - public: - template <typename ReceiveCallback, typename SentCallback, typename OnClose, - typename OnReconnect> - UdpTunnelConnector(asio::io_service &io_service, - ReceiveCallback &&receive_callback, - SentCallback &&packet_sent, OnClose &&on_close_callback, - OnReconnect &&on_reconnect) - : Connector(receive_callback, packet_sent, on_close_callback, - on_reconnect), - io_service_(io_service), - strand_(std::make_shared<asio::io_service::strand>(io_service_)), - socket_(std::make_shared<asio::ip::udp::socket>(io_service_)), - resolver_(io_service_), - timer_(io_service_), -#ifdef LINUX - send_timer_(io_service_), - tx_iovecs_{0}, - tx_msgs_{0}, - rx_iovecs_{0}, - rx_msgs_{0}, - current_position_(0), -#else - read_msg_(nullptr, 0), -#endif - data_available_(false) { - } - - template <typename ReceiveCallback, typename SentCallback, typename OnClose, - typename OnReconnect, typename EndpointType> - UdpTunnelConnector(std::shared_ptr<asio::ip::udp::socket> &socket, - std::shared_ptr<asio::io_service::strand> &strand, - ReceiveCallback &&receive_callback, - SentCallback &&packet_sent, OnClose &&on_close_callback, - OnReconnect &&on_reconnect, EndpointType &&remote_endpoint) - : Connector(receive_callback, packet_sent, on_close_callback, - on_reconnect), -#if ((ASIO_VERSION / 100 % 1000) < 12) - io_service_(socket->get_io_service()), -#else - io_service_((asio::io_context &)(socket->get_executor().context())), -#endif - strand_(strand), - socket_(socket), - resolver_(io_service_), - remote_endpoint_send_(std::forward<EndpointType &&>(remote_endpoint)), - timer_(io_service_), -#ifdef LINUX - send_timer_(io_service_), - tx_iovecs_{0}, - tx_msgs_{0}, - rx_iovecs_{0}, - rx_msgs_{0}, - current_position_(0), -#else - read_msg_(nullptr, 0), -#endif - data_available_(false) { - if (socket_->is_open()) { - state_ = State::CONNECTED; - remote_endpoint_ = Endpoint(remote_endpoint_send_); - local_endpoint_ = socket_->local_endpoint(); - } - } - - ~UdpTunnelConnector() override; - - void send(Packet &packet) override; - - void send(const uint8_t *packet, std::size_t len) override; - - void close() override; - - void connect(const std::string &hostname, std::uint16_t port, - const std::string &bind_address = "", - std::uint16_t bind_port = 0); - - auto shared_from_this() { return utils::shared_from(this); } - - private: - void doConnect(); - void doRecvPacket(); - - void doRecvPacket(utils::MemBuf &buffer) { - receive_callback_(this, buffer, make_error_code(forwarder_error::success)); - } - -#ifdef LINUX - void readHandler(std::error_code ec); - void writeHandler(std::error_code ec); -#endif - - void setConnected() { state_ = State::CONNECTED; } - - void doSendPacket(); - void doClose(); - - private: - asio::io_service &io_service_; - std::shared_ptr<asio::io_service::strand> strand_; - std::shared_ptr<asio::ip::udp::socket> socket_; - asio::ip::udp::resolver resolver_; - asio::ip::udp::resolver::iterator endpoint_iterator_; - asio::ip::udp::endpoint remote_endpoint_send_; - asio::ip::udp::endpoint remote_endpoint_recv_; - - asio::steady_timer timer_; - -#ifdef LINUX - asio::steady_timer send_timer_; - struct iovec tx_iovecs_[max_burst][8]; - struct mmsghdr tx_msgs_[max_burst]; - struct iovec rx_iovecs_[max_burst][8]; - struct mmsghdr rx_msgs_[max_burst]; - std::uint8_t current_position_; -#else - std::pair<uint8_t *, std::size_t> read_msg_; -#endif - - bool data_available_; -}; - -} // namespace core - -} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc deleted file mode 100644 index d047cc568..000000000 --- a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - */ - -#include <glog/logging.h> -#include <hicn/transport/utils/hash.h> -#include <io_modules/forwarder/udp_tunnel.h> -#include <io_modules/forwarder/udp_tunnel_listener.h> - -#ifndef LINUX -namespace std { -size_t hash<asio::ip::udp::endpoint>::operator()( - const asio::ip::udp::endpoint &endpoint) const { - auto hash_ip = endpoint.address().is_v4() - ? endpoint.address().to_v4().to_ulong() - : utils::hash::fnv32_buf( - endpoint.address().to_v6().to_bytes().data(), 16); - uint16_t port = endpoint.port(); - return utils::hash::fnv32_buf(&port, 2, hash_ip); -} -} // namespace std -#endif - -namespace transport { -namespace core { - -UdpTunnelListener::~UdpTunnelListener() {} - -void UdpTunnelListener::close() { - strand_->post([this]() { - if (socket_->is_open()) { - socket_->close(); - } - }); -} - -#ifdef LINUX -void UdpTunnelListener::readHandler(std::error_code ec) { - DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet"; - - if (TRANSPORT_EXPECT_TRUE(!ec)) { - if (current_position_ == 0) { - for (int i = 0; i < Connector::max_burst; i++) { - auto read_buffer = Connector::getRawBuffer(); - iovecs_[i][0].iov_base = read_buffer.first; - iovecs_[i][0].iov_len = read_buffer.second; - msgs_[i].msg_hdr.msg_iov = iovecs_[i]; - msgs_[i].msg_hdr.msg_iovlen = 1; - msgs_[i].msg_hdr.msg_name = &remote_endpoints_[i]; - msgs_[i].msg_hdr.msg_namelen = sizeof(remote_endpoints_[i]); - } - } - - int res = recvmmsg(socket_->native_handle(), msgs_ + current_position_, - Connector::max_burst - current_position_, MSG_DONTWAIT, - nullptr); - if (res < 0) { - LOG(ERROR) << "Error in recvmmsg."; - return; - } - - for (int i = 0; i < res; i++) { - auto packet = Connector::getPacketFromBuffer( - reinterpret_cast<uint8_t *>( - msgs_[current_position_].msg_hdr.msg_iov[0].iov_base), - msgs_[current_position_].msg_len); - auto connector_id = - utils::hash::fnv64_buf(msgs_[current_position_].msg_hdr.msg_name, - msgs_[current_position_].msg_hdr.msg_namelen); - - auto connector = connectors_.find(connector_id); - if (connector == connectors_.end()) { - // Create new connector corresponding to new client - - /* - * Get the remote endpoint for this particular message - */ - using namespace asio::ip; - if (local_endpoint_.address().is_v4()) { - auto addr = reinterpret_cast<struct sockaddr_in *>( - &remote_endpoints_[current_position_]); - address_v4::bytes_type address_bytes; - std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin_addr), - address_bytes.size(), address_bytes.begin()); - address_v4 address(address_bytes); - remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin_port)); - } else { - auto addr = reinterpret_cast<struct sockaddr_in6 *>( - &remote_endpoints_[current_position_]); - address_v6::bytes_type address_bytes; - std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin6_addr), - address_bytes.size(), address_bytes.begin()); - address_v6 address(address_bytes); - remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin6_port)); - } - - /** - * Create new connector sharing the same socket of this listener. - */ - auto ret = connectors_.emplace( - connector_id, - std::make_shared<UdpTunnelConnector>( - socket_, strand_, receive_callback_, - [](Connector *, const std::error_code &) {}, [](Connector *) {}, - [](Connector *) {}, std::move(remote_endpoint_))); - connector = ret.first; - connector->second->setConnectorId(connector_id); - } - - /** - * Use connector callback to process incoming message. - */ - UdpTunnelConnector *c = - dynamic_cast<UdpTunnelConnector *>(connector->second.get()); - c->doRecvPacket(*packet); - - ++current_position_; - } - - doRecvPacket(); - } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { - LOG(ERROR) << "The connection has been closed by the application."; - return; - } else { - LOG(ERROR) << ec.value() << " " << ec.message(); - } -} -#endif - -void UdpTunnelListener::doRecvPacket() { -#ifdef LINUX -#if ((ASIO_VERSION / 100 % 1000) < 11) - socket_->async_receive( - asio::null_buffers(), -#else - socket_->async_wait( - asio::ip::tcp::socket::wait_read, -#endif - std::bind(&UdpTunnelListener::readHandler, this, std::placeholders::_1)); -#else - read_msg_ = Connector::getRawBuffer(); - socket_->async_receive_from( - asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_, - [this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - auto packet = Connector::getPacketFromBuffer(read_msg_.first, length); - auto connector_id = - std::hash<asio::ip::udp::endpoint>{}(remote_endpoint_); - auto connector = connectors_.find(connector_id); - if (connector == connectors_.end()) { - // Create new connector corresponding to new client - auto ret = connectors_.emplace( - connector_id, std::make_shared<UdpTunnelConnector>( - socket_, strand_, receive_callback_, - [](Connector *, const std::error_code &) {}, - [](Connector *) {}, [](Connector *) {}, - std::move(remote_endpoint_))); - connector = ret.first; - connector->second->setConnectorId(connector_id); - } - - UdpTunnelConnector *c = - dynamic_cast<UdpTunnelConnector *>(connector->second.get()); - c->doRecvPacket(*packet); - doRecvPacket(); - } else if (ec.value() == - static_cast<int>(std::errc::operation_canceled)) { - LOG(ERROR) << "The connection has been closed by the application."; - return; - } else { - LOG(ERROR) << ec.value() << " " << ec.message(); - } - }); -#endif -} -} // namespace core -} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h deleted file mode 100644 index 5d197dcb0..000000000 --- a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - */ - -#pragma once - -#include <hicn/transport/core/asio_wrapper.h> -#include <hicn/transport/core/connector.h> -#include <hicn/transport/portability/platform.h> - -#include <unordered_map> - -namespace std { -template <> -struct hash<asio::ip::udp::endpoint> { - size_t operator()(const asio::ip::udp::endpoint &endpoint) const; -}; -} // namespace std - -namespace transport { -namespace core { - -class UdpTunnelListener - : public std::enable_shared_from_this<UdpTunnelListener> { - using PacketReceivedCallback = Connector::PacketReceivedCallback; - using EndpointId = std::pair<uint32_t, uint16_t>; - - static constexpr uint16_t default_port = 5004; - - public: - using Ptr = std::shared_ptr<UdpTunnelListener>; - - template <typename ReceiveCallback> - UdpTunnelListener(asio::io_service &io_service, - ReceiveCallback &&receive_callback, - asio::ip::udp::endpoint endpoint = asio::ip::udp::endpoint( - asio::ip::udp::v4(), default_port)) - : io_service_(io_service), - strand_(std::make_shared<asio::io_service::strand>(io_service_)), - socket_(std::make_shared<asio::ip::udp::socket>(io_service_, - endpoint.protocol())), - local_endpoint_(endpoint), - receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)), -#ifndef LINUX - read_msg_(nullptr, 0) -#else - iovecs_{0}, - msgs_{0}, - current_position_(0) -#endif - { - if (endpoint.protocol() == asio::ip::udp::v6()) { - std::error_code ec; - socket_->set_option(asio::ip::v6_only(false), ec); - // Call succeeds only on dual stack systems. - } - socket_->bind(local_endpoint_); - io_service_.post(std::bind(&UdpTunnelListener::doRecvPacket, this)); - } - - ~UdpTunnelListener(); - - void close(); - - int deleteConnector(Connector *connector) { - return connectors_.erase(connector->getConnectorId()); - } - - template <typename ReceiveCallback> - void setReceiveCallback(ReceiveCallback &&callback) { - receive_callback_ = std::forward<ReceiveCallback &&>(callback); - } - - Connector *findConnector(Connector::Id connId) { - auto it = connectors_.find(connId); - if (it != connectors_.end()) { - return it->second.get(); - } - - return nullptr; - } - - private: - void doRecvPacket(); - - void readHandler(std::error_code ec); - - asio::io_service &io_service_; - std::shared_ptr<asio::io_service::strand> strand_; - std::shared_ptr<asio::ip::udp::socket> socket_; - asio::ip::udp::endpoint local_endpoint_; - asio::ip::udp::endpoint remote_endpoint_; - std::unordered_map<Connector::Id, std::shared_ptr<Connector>> connectors_; - - PacketReceivedCallback receive_callback_; - -#ifdef LINUX - struct iovec iovecs_[Connector::max_burst][8]; - struct mmsghdr msgs_[Connector::max_burst]; - struct sockaddr_storage remote_endpoints_[Connector::max_burst]; - std::uint8_t current_position_; -#else - std::pair<uint8_t *, std::size_t> read_msg_; -#endif -}; - -} // namespace core - -} // namespace transport |