aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/io_modules/forwarder
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/io_modules/forwarder')
-rw-r--r--libtransport/src/io_modules/forwarder/CMakeLists.txt12
-rw-r--r--libtransport/src/io_modules/forwarder/errors.cc2
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder.cc68
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder.h32
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder_module.cc24
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder_module.h10
-rw-r--r--libtransport/src/io_modules/forwarder/global_id_counter.h39
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel.cc290
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel.h146
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc177
-rw-r--r--libtransport/src/io_modules/forwarder/udp_tunnel_listener.h109
11 files changed, 83 insertions, 826 deletions
diff --git a/libtransport/src/io_modules/forwarder/CMakeLists.txt b/libtransport/src/io_modules/forwarder/CMakeLists.txt
index a1d0c5db5..2235d842e 100644
--- a/libtransport/src/io_modules/forwarder/CMakeLists.txt
+++ b/libtransport/src/io_modules/forwarder/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2021 Cisco and/or its affiliates.
+# Copyright (c) 2021-2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -17,25 +17,19 @@ list(APPEND MODULE_HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/errors.h
${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.h
${CMAKE_CURRENT_SOURCE_DIR}/forwarder.h
- ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.h
- ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.h
- ${CMAKE_CURRENT_SOURCE_DIR}/global_counter.h
)
list(APPEND MODULE_SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/errors.cc
${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.cc
${CMAKE_CURRENT_SOURCE_DIR}/forwarder.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.cc
)
build_module(forwarder_module
- SHARED
SOURCES ${MODULE_SOURCE_FILES}
DEPENDS ${DEPENDENCIES}
COMPONENT ${LIBTRANSPORT_COMPONENT}-io-modules
- INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}
+ INCLUDE_DIRS ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} ${Libhicn_INCLUDE_DIRS}
DEFINITIONS ${COMPILER_DEFINITIONS}
- COMPILE_OPTIONS ${COMPILE_FLAGS}
+ COMPILE_OPTIONS ${COMPILER_OPTIONS}
)
diff --git a/libtransport/src/io_modules/forwarder/errors.cc b/libtransport/src/io_modules/forwarder/errors.cc
index b5f131499..6e93d0453 100644
--- a/libtransport/src/io_modules/forwarder/errors.cc
+++ b/libtransport/src/io_modules/forwarder/errors.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
*/
#include <io_modules/forwarder/errors.h>
diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc
index 0546cb8b3..d5f0b589e 100644
--- a/libtransport/src/io_modules/forwarder/forwarder.cc
+++ b/libtransport/src/io_modules/forwarder/forwarder.cc
@@ -14,12 +14,12 @@
*/
#include <core/global_configuration.h>
+#include <core/global_id_counter.h>
#include <core/local_connector.h>
+#include <core/udp_connector.h>
+#include <core/udp_listener.h>
#include <glog/logging.h>
#include <io_modules/forwarder/forwarder.h>
-#include <io_modules/forwarder/global_id_counter.h>
-#include <io_modules/forwarder/udp_tunnel.h>
-#include <io_modules/forwarder/udp_tunnel_listener.h>
namespace transport {
@@ -89,11 +89,14 @@ void Forwarder::initConnectors() {
Connector::Id Forwarder::registerLocalConnector(
asio::io_service &io_service,
Connector::PacketReceivedCallback &&receive_callback,
+ Connector::PacketSentCallback &&sent_callback,
+ Connector::OnCloseCallback &&close_callback,
Connector::OnReconnectCallback &&reconnect_callback) {
utils::SpinLock::Acquire locked(connector_lock_);
auto id = GlobalCounter<Connector::Id>::getInstance().getNext();
auto connector = std::make_shared<LocalConnector>(
- io_service, receive_callback, nullptr, nullptr, reconnect_callback);
+ io_service, std::move(receive_callback), std::move(sent_callback),
+ std::move(close_callback), std::move(reconnect_callback));
connector->setConnectorId(id);
local_connectors_.emplace(id, std::move(connector));
return id;
@@ -105,6 +108,7 @@ Forwarder &Forwarder::deleteConnector(Connector::Id id) {
if (it != local_connectors_.end()) {
it->second->close();
local_connectors_.erase(it);
+ } else {
}
return *this;
@@ -120,9 +124,9 @@ Connector::Ptr Forwarder::getConnector(Connector::Id id) {
return nullptr;
}
-void Forwarder::onPacketFromListener(Connector *connector,
- utils::MemBuf &packet_buffer,
- const std::error_code &ec) {
+void Forwarder::onPacketFromListener(
+ Connector *connector, const std::vector<utils::MemBuf::Ptr> &packets,
+ const std::error_code &ec) {
// Create connector
connector->setReceiveCallback(
std::bind(&Forwarder::onPacketReceived, this, std::placeholders::_1,
@@ -135,32 +139,21 @@ void Forwarder::onPacketFromListener(Connector *connector,
remote_connectors_.emplace(connector->getConnectorId(),
connector->shared_from_this());
}
+
// TODO Check if control packet or not. For the moment it is not.
- onPacketReceived(connector, packet_buffer, ec);
+ onPacketReceived(connector, packets, ec);
}
void Forwarder::onPacketReceived(Connector *connector,
- utils::MemBuf &packet_buffer,
+ const std::vector<utils::MemBuf::Ptr> &packets,
const std::error_code &ec) {
- // Figure out the type of packet we received
- bool is_interest = Packet::isInterest(packet_buffer.data());
-
- Packet *packet = nullptr;
- if (is_interest) {
- packet = static_cast<Interest *>(&packet_buffer);
- } else {
- packet = static_cast<ContentObject *>(&packet_buffer);
+ if (ec) {
+ LOG(ERROR) << "Error receiving packet: " << ec.message();
+ return;
}
for (auto &c : local_connectors_) {
- auto role = c.second->getRole();
- auto is_producer = role == Connector::Role::PRODUCER;
- if ((is_producer && is_interest) || (!is_producer && !is_interest)) {
- c.second->send(*packet);
- } else {
- LOG(ERROR) << "Error sending packet to local connector. is_interest = "
- << is_interest << " - is_producer = " << is_producer;
- }
+ c.second->receive(packets);
}
// PCS Lookup + FIB lookup. Skip for now
@@ -168,19 +161,24 @@ void Forwarder::onPacketReceived(Connector *connector,
// Forward packet to local connectors
}
-void Forwarder::send(Packet &packet) {
+void Forwarder::send(Packet &packet, Connector::Id connector_id) {
// TODo Here a nice PIT/CS / FIB would be required:)
// For now let's just forward the packet on the remote connector we get
- if (remote_connectors_.begin() == remote_connectors_.end()) {
- return;
+ for (auto &c : remote_connectors_) {
+ auto remote_endpoint = c.second->getRemoteEndpoint();
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Sending packet to: " << remote_endpoint.getAddress() << ":"
+ << remote_endpoint.getPort();
+ c.second->send(packet);
}
- auto remote_endpoint =
- remote_connectors_.begin()->second->getRemoteEndpoint();
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Sending packet to: " << remote_endpoint.getAddress() << ":"
- << remote_endpoint.getPort();
- remote_connectors_.begin()->second->send(packet);
+ for (auto &c : local_connectors_) {
+ if (c.first != connector_id) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Sending packet to local connector " << c.first << std::endl;
+ c.second->receive({packet.shared_from_this()});
+ }
+ }
}
void Forwarder::onPacketSent(Connector *connector, const std::error_code &ec) {}
@@ -292,4 +290,4 @@ void Forwarder::parseForwarderConfiguration(
}
} // namespace core
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/forwarder.h b/libtransport/src/io_modules/forwarder/forwarder.h
index 5b564bb5e..1022bf81b 100644
--- a/libtransport/src/io_modules/forwarder/forwarder.h
+++ b/libtransport/src/io_modules/forwarder/forwarder.h
@@ -15,13 +15,13 @@
#pragma once
+#include <core/udp_listener.h>
#include <hicn/transport/core/io_module.h>
#include <hicn/transport/core/prefix.h>
#include <hicn/transport/utils/event_thread.h>
#include <hicn/transport/utils/singleton.h>
#include <hicn/transport/utils/spinlock.h>
#include <io_modules/forwarder/configuration.h>
-#include <io_modules/forwarder/udp_tunnel_listener.h>
#include <atomic>
#include <libconfig.h++>
@@ -31,9 +31,8 @@ namespace transport {
namespace core {
-class Forwarder : public utils::Singleton<Forwarder> {
+class Forwarder {
static constexpr char forwarder_config_section[] = "forwarder";
- friend class utils::Singleton<Forwarder>;
public:
Forwarder();
@@ -47,20 +46,24 @@ class Forwarder : public utils::Singleton<Forwarder> {
Connector::Id registerLocalConnector(
asio::io_service &io_service,
Connector::PacketReceivedCallback &&receive_callback,
+ Connector::PacketSentCallback &&sent_callback,
+ Connector::OnCloseCallback &&close_callback,
Connector::OnReconnectCallback &&reconnect_callback);
Forwarder &deleteConnector(Connector::Id id);
Connector::Ptr getConnector(Connector::Id id);
- void send(Packet &packet);
+ void send(Packet &packet, Connector::Id id);
void stop();
private:
- void onPacketFromListener(Connector *connector, utils::MemBuf &packet_buffer,
+ void onPacketFromListener(Connector *connector,
+ const std::vector<utils::MemBuf::Ptr> &packets,
const std::error_code &ec);
- void onPacketReceived(Connector *connector, utils::MemBuf &packet_buffer,
+ void onPacketReceived(Connector *connector,
+ const std::vector<utils::MemBuf::Ptr> &packets,
const std::error_code &ec);
void onPacketSent(Connector *connector, const std::error_code &ec);
void onConnectorClosed(Connector *connector);
@@ -86,5 +89,20 @@ class Forwarder : public utils::Singleton<Forwarder> {
Configuration config_;
};
+class ForwarderGlobal : public ::utils::Singleton<ForwarderGlobal> {
+ friend class utils::Singleton<ForwarderGlobal>;
+
+ public:
+ ~ForwarderGlobal() {}
+ std::shared_ptr<Forwarder> &getReference() { return forwarder_; }
+
+ private:
+ ForwarderGlobal() : forwarder_(std::make_shared<Forwarder>()) {}
+
+ private:
+ std::shared_ptr<Forwarder> forwarder_;
+};
+
} // namespace core
-} // namespace transport \ No newline at end of file
+
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.cc b/libtransport/src/io_modules/forwarder/forwarder_module.cc
index 4f95b9ca0..ca9466f01 100644
--- a/libtransport/src/io_modules/forwarder/forwarder_module.cc
+++ b/libtransport/src/io_modules/forwarder/forwarder_module.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -25,24 +25,21 @@ ForwarderModule::ForwarderModule()
: IoModule(),
name_(""),
connector_id_(Connector::invalid_connector),
- forwarder_(Forwarder::getInstance()) {}
+ forwarder_ptr_(ForwarderGlobal::getInstance().getReference()),
+ forwarder_(*forwarder_ptr_) {}
-ForwarderModule::~ForwarderModule() {
- forwarder_.deleteConnector(connector_id_);
-}
+ForwarderModule::~ForwarderModule() {}
bool ForwarderModule::isConnected() { return true; }
void ForwarderModule::send(Packet &packet) {
IoModule::send(packet);
- forwarder_.send(packet);
+ forwarder_.send(packet, connector_id_);
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Sending from " << connector_id_ << " to " << 1 - connector_id_;
-
- // local_faces_.at(1 - local_id_).onPacket(packet);
}
-void ForwarderModule::send(const uint8_t *packet, std::size_t len) {
+void ForwarderModule::send(const utils::MemBuf::Ptr &buffer) {
// not supported
throw errors::NotImplementedException();
}
@@ -58,11 +55,14 @@ void ForwarderModule::closeConnection() {
}
void ForwarderModule::init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::PacketSentCallback &&sent_callback,
+ Connector::OnCloseCallback &&close_callback,
Connector::OnReconnectCallback &&reconnect_callback,
asio::io_service &io_service,
const std::string &app_name) {
connector_id_ = forwarder_.registerLocalConnector(
- io_service, std::move(receive_callback), std::move(reconnect_callback));
+ io_service, std::move(receive_callback), std::move(sent_callback),
+ std::move(close_callback), std::move(reconnect_callback));
name_ = app_name;
}
@@ -78,7 +78,9 @@ void ForwarderModule::connect(bool is_consumer) {
std::uint32_t ForwarderModule::getMtu() { return interface_mtu; }
-bool ForwarderModule::isControlMessage(const uint8_t *message) { return false; }
+bool ForwarderModule::isControlMessage(utils::MemBuf &packet_buffer) {
+ return false;
+}
extern "C" IoModule *create_module(void) { return new ForwarderModule(); }
diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.h b/libtransport/src/io_modules/forwarder/forwarder_module.h
index 58bfb7996..a48701161 100644
--- a/libtransport/src/io_modules/forwarder/forwarder_module.h
+++ b/libtransport/src/io_modules/forwarder/forwarder_module.h
@@ -38,11 +38,13 @@ class ForwarderModule : public IoModule {
void connect(bool is_consumer) override;
void send(Packet &packet) override;
- void send(const uint8_t *packet, std::size_t len) override;
+ void send(const utils::MemBuf::Ptr &buffer) override;
bool isConnected() override;
void init(Connector::PacketReceivedCallback &&receive_callback,
+ Connector::PacketSentCallback &&sent_callback,
+ Connector::OnCloseCallback &&close_callback,
Connector::OnReconnectCallback &&reconnect_callback,
asio::io_service &io_service,
const std::string &app_name = "Libtransport") override;
@@ -51,15 +53,19 @@ class ForwarderModule : public IoModule {
std::uint32_t getMtu() override;
- bool isControlMessage(const uint8_t *message) override;
+ bool isControlMessage(utils::MemBuf &packet_buffer) override;
void processControlMessageReply(utils::MemBuf &packet_buffer) override;
void closeConnection() override;
private:
+ static void initForwarder();
+
+ private:
std::string name_;
Connector::Id connector_id_;
+ std::shared_ptr<Forwarder> forwarder_ptr_;
Forwarder &forwarder_;
};
diff --git a/libtransport/src/io_modules/forwarder/global_id_counter.h b/libtransport/src/io_modules/forwarder/global_id_counter.h
deleted file mode 100644
index 0a67b76d5..000000000
--- a/libtransport/src/io_modules/forwarder/global_id_counter.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/utils/singleton.h>
-
-#include <atomic>
-#include <mutex>
-
-namespace transport {
-
-namespace core {
-
-template <typename T = uint64_t>
-class GlobalCounter : public utils::Singleton<GlobalCounter<T>> {
- public:
- friend class utils::Singleton<GlobalCounter>;
- T getNext() { return counter_++; }
-
- private:
- GlobalCounter() : counter_(0) {}
- std::atomic<T> counter_;
-};
-
-} // namespace core
-} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.cc b/libtransport/src/io_modules/forwarder/udp_tunnel.cc
deleted file mode 100644
index bf6a69b92..000000000
--- a/libtransport/src/io_modules/forwarder/udp_tunnel.cc
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- */
-
-#include <glog/logging.h>
-#include <hicn/transport/utils/branch_prediction.h>
-#include <io_modules/forwarder/errors.h>
-#include <io_modules/forwarder/udp_tunnel.h>
-
-#include <iostream>
-#include <thread>
-#include <vector>
-
-namespace transport {
-namespace core {
-
-UdpTunnelConnector::~UdpTunnelConnector() {}
-
-void UdpTunnelConnector::connect(const std::string &hostname, uint16_t port,
- const std::string &bind_address,
- uint16_t bind_port) {
- if (state_ == State::CLOSED) {
- state_ = State::CONNECTING;
- endpoint_iterator_ = resolver_.resolve({hostname, std::to_string(port)});
- remote_endpoint_send_ = *endpoint_iterator_;
- socket_->open(remote_endpoint_send_.protocol());
-
- if (!bind_address.empty() && bind_port != 0) {
- using namespace asio::ip;
- socket_->bind(
- udp::endpoint(address::from_string(bind_address), bind_port));
- }
-
- state_ = State::CONNECTED;
-
- remote_endpoint_ = Endpoint(remote_endpoint_send_);
- local_endpoint_ = Endpoint(socket_->local_endpoint());
-
- doRecvPacket();
-
-#ifdef LINUX
- send_timer_.expires_from_now(std::chrono::microseconds(50));
- send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this,
- std::placeholders::_1));
-#endif
- }
-}
-
-void UdpTunnelConnector::send(Packet &packet) {
- strand_->post([this, pkt{packet.shared_from_this()}]() {
- bool write_in_progress = !output_buffer_.empty();
- output_buffer_.push_back(std::move(pkt));
- if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
- if (!write_in_progress) {
- doSendPacket();
- }
- } else {
- data_available_ = true;
- }
- });
-}
-
-void UdpTunnelConnector::send(const uint8_t *packet, std::size_t len) {}
-
-void UdpTunnelConnector::close() {
- DLOG_IF(INFO, VLOG_IS_ON(2)) << "UDPTunnelConnector::close";
- state_ = State::CLOSED;
- bool is_socket_owned = socket_.use_count() == 1;
- if (is_socket_owned) {
- io_service_.dispatch([this]() {
- this->socket_->close();
- // on_close_callback_(shared_from_this());
- });
- }
-}
-
-void UdpTunnelConnector::doSendPacket() {
-#ifdef LINUX
- send_timer_.expires_from_now(std::chrono::microseconds(50));
- send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this,
- std::placeholders::_1));
-#else
- auto packet = output_buffer_.front().get();
- auto array = std::vector<asio::const_buffer>();
-
- const ::utils::MemBuf *current = packet;
- do {
- array.push_back(asio::const_buffer(current->data(), current->length()));
- current = current->next();
- } while (current != packet);
-
- socket_->async_send_to(
- std::move(array), remote_endpoint_send_,
- strand_->wrap([this](std::error_code ec, std::size_t length) {
- if (TRANSPORT_EXPECT_TRUE(!ec)) {
- sent_callback_(this, make_error_code(forwarder_error::success));
- } else if (ec.value() ==
- static_cast<int>(std::errc::operation_canceled)) {
- // The connection has been closed by the application.
- return;
- } else {
- sendFailed();
- sent_callback_(this, ec);
- }
-
- output_buffer_.pop_front();
- if (!output_buffer_.empty()) {
- doSendPacket();
- }
- }));
-#endif
-}
-
-#ifdef LINUX
-void UdpTunnelConnector::writeHandler(std::error_code ec) {
- if (TRANSPORT_EXPECT_FALSE(state_ != State::CONNECTED)) {
- return;
- }
-
- auto len = std::min(output_buffer_.size(), std::size_t(Connector::max_burst));
-
- if (len) {
- int m = 0;
- for (auto &p : output_buffer_) {
- auto packet = p.get();
- ::utils::MemBuf *current = packet;
- int b = 0;
- do {
- // array.push_back(asio::const_buffer(current->data(),
- // current->length()));
- tx_iovecs_[m][b].iov_base = current->writableData();
- tx_iovecs_[m][b].iov_len = current->length();
- current = current->next();
- b++;
- } while (current != packet);
-
- tx_msgs_[m].msg_hdr.msg_iov = tx_iovecs_[m];
- tx_msgs_[m].msg_hdr.msg_iovlen = b;
- tx_msgs_[m].msg_hdr.msg_name = remote_endpoint_send_.data();
- tx_msgs_[m].msg_hdr.msg_namelen = remote_endpoint_send_.size();
- m++;
-
- if (--len == 0) {
- break;
- }
- }
-
- int retval = sendmmsg(socket_->native_handle(), tx_msgs_, m, MSG_DONTWAIT);
- if (retval > 0) {
- while (retval--) {
- output_buffer_.pop_front();
- }
- } else if (retval != EWOULDBLOCK && retval != EAGAIN) {
- LOG(ERROR) << "Error sending messages! " << strerror(errno)
- << " << retval";
- return;
- }
- }
-
- if (!output_buffer_.empty()) {
- send_timer_.expires_from_now(std::chrono::microseconds(50));
- send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this,
- std::placeholders::_1));
- }
-}
-
-void UdpTunnelConnector::readHandler(std::error_code ec) {
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet";
-
- if (TRANSPORT_EXPECT_TRUE(!ec)) {
- if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
- if (current_position_ == 0) {
- for (int i = 0; i < max_burst; i++) {
- auto read_buffer = getRawBuffer();
- rx_iovecs_[i][0].iov_base = read_buffer.first;
- rx_iovecs_[i][0].iov_len = read_buffer.second;
- rx_msgs_[i].msg_hdr.msg_iov = rx_iovecs_[i];
- rx_msgs_[i].msg_hdr.msg_iovlen = 1;
- }
- }
-
- int res = recvmmsg(socket_->native_handle(), rx_msgs_ + current_position_,
- max_burst - current_position_, MSG_DONTWAIT, nullptr);
- if (res < 0) {
- LOG(ERROR) << "Error receiving messages! " << strerror(errno) << " "
- << res;
- return;
- }
-
- for (int i = 0; i < res; i++) {
- auto packet = getPacketFromBuffer(
- reinterpret_cast<uint8_t *>(
- rx_msgs_[current_position_].msg_hdr.msg_iov[0].iov_base),
- rx_msgs_[current_position_].msg_len);
- receiveSuccess(*packet);
- receive_callback_(this, *packet,
- make_error_code(forwarder_error::success));
- ++current_position_;
- }
-
- doRecvPacket();
- } else {
- LOG(ERROR)
- << "Error in UDP: Receiving packets from a not connected socket.";
- }
- } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
- LOG(ERROR) << "The connection has been closed by the application.";
- return;
- } else {
- if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
- // receive_callback_(this, *read_msg_, ec);
- LOG(ERROR) << "Error in UDP connector: " << ec.value() << " "
- << ec.message();
- } else {
- LOG(ERROR) << "Error in connector while not connected. " << ec.value()
- << " " << ec.message();
- }
- }
-}
-#endif
-
-void UdpTunnelConnector::doRecvPacket() {
-#ifdef LINUX
- if (state_ == State::CONNECTED) {
-#if ((ASIO_VERSION / 100 % 1000) < 11)
- socket_->async_receive(asio::null_buffers(),
-#else
- socket_->async_wait(asio::ip::tcp::socket::wait_read,
-#endif
- std::bind(&UdpTunnelConnector::readHandler, this,
- std::placeholders::_1));
- }
-#else
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet";
- read_msg_ = getRawBuffer();
- socket_->async_receive_from(
- asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_recv_,
- [this](std::error_code ec, std::size_t length) {
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "UdpTunnelConnector received packet length=" << length;
- if (TRANSPORT_EXPECT_TRUE(!ec)) {
- if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
- auto packet = getPacketFromBuffer(read_msg_.first, length);
- receiveSuccess(*packet);
- receive_callback_(this, *packet,
- make_error_code(forwarder_error::success));
- doRecvPacket();
- } else {
- LOG(ERROR) << "Error in UDP: Receiving packets from a not "
- "connected socket.";
- }
- } else if (ec.value() ==
- static_cast<int>(std::errc::operation_canceled)) {
- LOG(ERROR) << "The connection has been closed by the application.";
- return;
- } else {
- if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
- LOG(ERROR) << "Error in UDP connector: " << ec.value()
- << ec.message();
- } else {
- LOG(ERROR) << "Error while not connected";
- }
- }
- });
-#endif
-}
-
-void UdpTunnelConnector::doConnect() {
- asio::async_connect(
- *socket_, endpoint_iterator_,
- [this](std::error_code ec, asio::ip::udp::resolver::iterator) {
- if (!ec) {
- state_ = State::CONNECTED;
- doRecvPacket();
-
- if (data_available_) {
- data_available_ = false;
- doSendPacket();
- }
- } else {
- LOG(ERROR) << "UDP Connection failed!!!";
- timer_.expires_from_now(std::chrono::milliseconds(500));
- timer_.async_wait(std::bind(&UdpTunnelConnector::doConnect, this));
- }
- });
-}
-
-} // namespace core
-
-} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.h b/libtransport/src/io_modules/forwarder/udp_tunnel.h
deleted file mode 100644
index 4f044f93f..000000000
--- a/libtransport/src/io_modules/forwarder/udp_tunnel.h
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- */
-
-#pragma once
-
-#include <hicn/transport/core/asio_wrapper.h>
-#include <hicn/transport/core/connector.h>
-#include <hicn/transport/portability/platform.h>
-#include <io_modules/forwarder/errors.h>
-
-#include <iostream>
-#include <memory>
-
-namespace transport {
-namespace core {
-
-class UdpTunnelListener;
-
-class UdpTunnelConnector : public Connector {
- friend class UdpTunnelListener;
-
- public:
- template <typename ReceiveCallback, typename SentCallback, typename OnClose,
- typename OnReconnect>
- UdpTunnelConnector(asio::io_service &io_service,
- ReceiveCallback &&receive_callback,
- SentCallback &&packet_sent, OnClose &&on_close_callback,
- OnReconnect &&on_reconnect)
- : Connector(receive_callback, packet_sent, on_close_callback,
- on_reconnect),
- io_service_(io_service),
- strand_(std::make_shared<asio::io_service::strand>(io_service_)),
- socket_(std::make_shared<asio::ip::udp::socket>(io_service_)),
- resolver_(io_service_),
- timer_(io_service_),
-#ifdef LINUX
- send_timer_(io_service_),
- tx_iovecs_{0},
- tx_msgs_{0},
- rx_iovecs_{0},
- rx_msgs_{0},
- current_position_(0),
-#else
- read_msg_(nullptr, 0),
-#endif
- data_available_(false) {
- }
-
- template <typename ReceiveCallback, typename SentCallback, typename OnClose,
- typename OnReconnect, typename EndpointType>
- UdpTunnelConnector(std::shared_ptr<asio::ip::udp::socket> &socket,
- std::shared_ptr<asio::io_service::strand> &strand,
- ReceiveCallback &&receive_callback,
- SentCallback &&packet_sent, OnClose &&on_close_callback,
- OnReconnect &&on_reconnect, EndpointType &&remote_endpoint)
- : Connector(receive_callback, packet_sent, on_close_callback,
- on_reconnect),
-#if ((ASIO_VERSION / 100 % 1000) < 12)
- io_service_(socket->get_io_service()),
-#else
- io_service_((asio::io_context &)(socket->get_executor().context())),
-#endif
- strand_(strand),
- socket_(socket),
- resolver_(io_service_),
- remote_endpoint_send_(std::forward<EndpointType &&>(remote_endpoint)),
- timer_(io_service_),
-#ifdef LINUX
- send_timer_(io_service_),
- tx_iovecs_{0},
- tx_msgs_{0},
- rx_iovecs_{0},
- rx_msgs_{0},
- current_position_(0),
-#else
- read_msg_(nullptr, 0),
-#endif
- data_available_(false) {
- if (socket_->is_open()) {
- state_ = State::CONNECTED;
- remote_endpoint_ = Endpoint(remote_endpoint_send_);
- local_endpoint_ = socket_->local_endpoint();
- }
- }
-
- ~UdpTunnelConnector() override;
-
- void send(Packet &packet) override;
-
- void send(const uint8_t *packet, std::size_t len) override;
-
- void close() override;
-
- void connect(const std::string &hostname, std::uint16_t port,
- const std::string &bind_address = "",
- std::uint16_t bind_port = 0);
-
- auto shared_from_this() { return utils::shared_from(this); }
-
- private:
- void doConnect();
- void doRecvPacket();
-
- void doRecvPacket(utils::MemBuf &buffer) {
- receive_callback_(this, buffer, make_error_code(forwarder_error::success));
- }
-
-#ifdef LINUX
- void readHandler(std::error_code ec);
- void writeHandler(std::error_code ec);
-#endif
-
- void setConnected() { state_ = State::CONNECTED; }
-
- void doSendPacket();
- void doClose();
-
- private:
- asio::io_service &io_service_;
- std::shared_ptr<asio::io_service::strand> strand_;
- std::shared_ptr<asio::ip::udp::socket> socket_;
- asio::ip::udp::resolver resolver_;
- asio::ip::udp::resolver::iterator endpoint_iterator_;
- asio::ip::udp::endpoint remote_endpoint_send_;
- asio::ip::udp::endpoint remote_endpoint_recv_;
-
- asio::steady_timer timer_;
-
-#ifdef LINUX
- asio::steady_timer send_timer_;
- struct iovec tx_iovecs_[max_burst][8];
- struct mmsghdr tx_msgs_[max_burst];
- struct iovec rx_iovecs_[max_burst][8];
- struct mmsghdr rx_msgs_[max_burst];
- std::uint8_t current_position_;
-#else
- std::pair<uint8_t *, std::size_t> read_msg_;
-#endif
-
- bool data_available_;
-};
-
-} // namespace core
-
-} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc
deleted file mode 100644
index d047cc568..000000000
--- a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- */
-
-#include <glog/logging.h>
-#include <hicn/transport/utils/hash.h>
-#include <io_modules/forwarder/udp_tunnel.h>
-#include <io_modules/forwarder/udp_tunnel_listener.h>
-
-#ifndef LINUX
-namespace std {
-size_t hash<asio::ip::udp::endpoint>::operator()(
- const asio::ip::udp::endpoint &endpoint) const {
- auto hash_ip = endpoint.address().is_v4()
- ? endpoint.address().to_v4().to_ulong()
- : utils::hash::fnv32_buf(
- endpoint.address().to_v6().to_bytes().data(), 16);
- uint16_t port = endpoint.port();
- return utils::hash::fnv32_buf(&port, 2, hash_ip);
-}
-} // namespace std
-#endif
-
-namespace transport {
-namespace core {
-
-UdpTunnelListener::~UdpTunnelListener() {}
-
-void UdpTunnelListener::close() {
- strand_->post([this]() {
- if (socket_->is_open()) {
- socket_->close();
- }
- });
-}
-
-#ifdef LINUX
-void UdpTunnelListener::readHandler(std::error_code ec) {
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet";
-
- if (TRANSPORT_EXPECT_TRUE(!ec)) {
- if (current_position_ == 0) {
- for (int i = 0; i < Connector::max_burst; i++) {
- auto read_buffer = Connector::getRawBuffer();
- iovecs_[i][0].iov_base = read_buffer.first;
- iovecs_[i][0].iov_len = read_buffer.second;
- msgs_[i].msg_hdr.msg_iov = iovecs_[i];
- msgs_[i].msg_hdr.msg_iovlen = 1;
- msgs_[i].msg_hdr.msg_name = &remote_endpoints_[i];
- msgs_[i].msg_hdr.msg_namelen = sizeof(remote_endpoints_[i]);
- }
- }
-
- int res = recvmmsg(socket_->native_handle(), msgs_ + current_position_,
- Connector::max_burst - current_position_, MSG_DONTWAIT,
- nullptr);
- if (res < 0) {
- LOG(ERROR) << "Error in recvmmsg.";
- return;
- }
-
- for (int i = 0; i < res; i++) {
- auto packet = Connector::getPacketFromBuffer(
- reinterpret_cast<uint8_t *>(
- msgs_[current_position_].msg_hdr.msg_iov[0].iov_base),
- msgs_[current_position_].msg_len);
- auto connector_id =
- utils::hash::fnv64_buf(msgs_[current_position_].msg_hdr.msg_name,
- msgs_[current_position_].msg_hdr.msg_namelen);
-
- auto connector = connectors_.find(connector_id);
- if (connector == connectors_.end()) {
- // Create new connector corresponding to new client
-
- /*
- * Get the remote endpoint for this particular message
- */
- using namespace asio::ip;
- if (local_endpoint_.address().is_v4()) {
- auto addr = reinterpret_cast<struct sockaddr_in *>(
- &remote_endpoints_[current_position_]);
- address_v4::bytes_type address_bytes;
- std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin_addr),
- address_bytes.size(), address_bytes.begin());
- address_v4 address(address_bytes);
- remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin_port));
- } else {
- auto addr = reinterpret_cast<struct sockaddr_in6 *>(
- &remote_endpoints_[current_position_]);
- address_v6::bytes_type address_bytes;
- std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin6_addr),
- address_bytes.size(), address_bytes.begin());
- address_v6 address(address_bytes);
- remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin6_port));
- }
-
- /**
- * Create new connector sharing the same socket of this listener.
- */
- auto ret = connectors_.emplace(
- connector_id,
- std::make_shared<UdpTunnelConnector>(
- socket_, strand_, receive_callback_,
- [](Connector *, const std::error_code &) {}, [](Connector *) {},
- [](Connector *) {}, std::move(remote_endpoint_)));
- connector = ret.first;
- connector->second->setConnectorId(connector_id);
- }
-
- /**
- * Use connector callback to process incoming message.
- */
- UdpTunnelConnector *c =
- dynamic_cast<UdpTunnelConnector *>(connector->second.get());
- c->doRecvPacket(*packet);
-
- ++current_position_;
- }
-
- doRecvPacket();
- } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
- LOG(ERROR) << "The connection has been closed by the application.";
- return;
- } else {
- LOG(ERROR) << ec.value() << " " << ec.message();
- }
-}
-#endif
-
-void UdpTunnelListener::doRecvPacket() {
-#ifdef LINUX
-#if ((ASIO_VERSION / 100 % 1000) < 11)
- socket_->async_receive(
- asio::null_buffers(),
-#else
- socket_->async_wait(
- asio::ip::tcp::socket::wait_read,
-#endif
- std::bind(&UdpTunnelListener::readHandler, this, std::placeholders::_1));
-#else
- read_msg_ = Connector::getRawBuffer();
- socket_->async_receive_from(
- asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_,
- [this](std::error_code ec, std::size_t length) {
- if (TRANSPORT_EXPECT_TRUE(!ec)) {
- auto packet = Connector::getPacketFromBuffer(read_msg_.first, length);
- auto connector_id =
- std::hash<asio::ip::udp::endpoint>{}(remote_endpoint_);
- auto connector = connectors_.find(connector_id);
- if (connector == connectors_.end()) {
- // Create new connector corresponding to new client
- auto ret = connectors_.emplace(
- connector_id, std::make_shared<UdpTunnelConnector>(
- socket_, strand_, receive_callback_,
- [](Connector *, const std::error_code &) {},
- [](Connector *) {}, [](Connector *) {},
- std::move(remote_endpoint_)));
- connector = ret.first;
- connector->second->setConnectorId(connector_id);
- }
-
- UdpTunnelConnector *c =
- dynamic_cast<UdpTunnelConnector *>(connector->second.get());
- c->doRecvPacket(*packet);
- doRecvPacket();
- } else if (ec.value() ==
- static_cast<int>(std::errc::operation_canceled)) {
- LOG(ERROR) << "The connection has been closed by the application.";
- return;
- } else {
- LOG(ERROR) << ec.value() << " " << ec.message();
- }
- });
-#endif
-}
-} // namespace core
-} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h
deleted file mode 100644
index 5d197dcb0..000000000
--- a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- */
-
-#pragma once
-
-#include <hicn/transport/core/asio_wrapper.h>
-#include <hicn/transport/core/connector.h>
-#include <hicn/transport/portability/platform.h>
-
-#include <unordered_map>
-
-namespace std {
-template <>
-struct hash<asio::ip::udp::endpoint> {
- size_t operator()(const asio::ip::udp::endpoint &endpoint) const;
-};
-} // namespace std
-
-namespace transport {
-namespace core {
-
-class UdpTunnelListener
- : public std::enable_shared_from_this<UdpTunnelListener> {
- using PacketReceivedCallback = Connector::PacketReceivedCallback;
- using EndpointId = std::pair<uint32_t, uint16_t>;
-
- static constexpr uint16_t default_port = 5004;
-
- public:
- using Ptr = std::shared_ptr<UdpTunnelListener>;
-
- template <typename ReceiveCallback>
- UdpTunnelListener(asio::io_service &io_service,
- ReceiveCallback &&receive_callback,
- asio::ip::udp::endpoint endpoint = asio::ip::udp::endpoint(
- asio::ip::udp::v4(), default_port))
- : io_service_(io_service),
- strand_(std::make_shared<asio::io_service::strand>(io_service_)),
- socket_(std::make_shared<asio::ip::udp::socket>(io_service_,
- endpoint.protocol())),
- local_endpoint_(endpoint),
- receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)),
-#ifndef LINUX
- read_msg_(nullptr, 0)
-#else
- iovecs_{0},
- msgs_{0},
- current_position_(0)
-#endif
- {
- if (endpoint.protocol() == asio::ip::udp::v6()) {
- std::error_code ec;
- socket_->set_option(asio::ip::v6_only(false), ec);
- // Call succeeds only on dual stack systems.
- }
- socket_->bind(local_endpoint_);
- io_service_.post(std::bind(&UdpTunnelListener::doRecvPacket, this));
- }
-
- ~UdpTunnelListener();
-
- void close();
-
- int deleteConnector(Connector *connector) {
- return connectors_.erase(connector->getConnectorId());
- }
-
- template <typename ReceiveCallback>
- void setReceiveCallback(ReceiveCallback &&callback) {
- receive_callback_ = std::forward<ReceiveCallback &&>(callback);
- }
-
- Connector *findConnector(Connector::Id connId) {
- auto it = connectors_.find(connId);
- if (it != connectors_.end()) {
- return it->second.get();
- }
-
- return nullptr;
- }
-
- private:
- void doRecvPacket();
-
- void readHandler(std::error_code ec);
-
- asio::io_service &io_service_;
- std::shared_ptr<asio::io_service::strand> strand_;
- std::shared_ptr<asio::ip::udp::socket> socket_;
- asio::ip::udp::endpoint local_endpoint_;
- asio::ip::udp::endpoint remote_endpoint_;
- std::unordered_map<Connector::Id, std::shared_ptr<Connector>> connectors_;
-
- PacketReceivedCallback receive_callback_;
-
-#ifdef LINUX
- struct iovec iovecs_[Connector::max_burst][8];
- struct mmsghdr msgs_[Connector::max_burst];
- struct sockaddr_storage remote_endpoints_[Connector::max_burst];
- std::uint8_t current_position_;
-#else
- std::pair<uint8_t *, std::size_t> read_msg_;
-#endif
-};
-
-} // namespace core
-
-} // namespace transport