diff options
Diffstat (limited to 'libtransport/src/io_modules/forwarder/forwarder.cc')
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder.cc | 64 |
1 files changed, 38 insertions, 26 deletions
diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc index 0546cb8b3..3ae5bf397 100644 --- a/libtransport/src/io_modules/forwarder/forwarder.cc +++ b/libtransport/src/io_modules/forwarder/forwarder.cc @@ -15,11 +15,11 @@ #include <core/global_configuration.h> #include <core/local_connector.h> +#include <core/udp_connector.h> +#include <core/udp_listener.h> #include <glog/logging.h> #include <io_modules/forwarder/forwarder.h> #include <io_modules/forwarder/global_id_counter.h> -#include <io_modules/forwarder/udp_tunnel.h> -#include <io_modules/forwarder/udp_tunnel_listener.h> namespace transport { @@ -89,11 +89,12 @@ void Forwarder::initConnectors() { Connector::Id Forwarder::registerLocalConnector( asio::io_service &io_service, Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, Connector::OnReconnectCallback &&reconnect_callback) { utils::SpinLock::Acquire locked(connector_lock_); auto id = GlobalCounter<Connector::Id>::getInstance().getNext(); auto connector = std::make_shared<LocalConnector>( - io_service, receive_callback, nullptr, nullptr, reconnect_callback); + io_service, receive_callback, sent_callback, nullptr, reconnect_callback); connector->setConnectorId(id); local_connectors_.emplace(id, std::move(connector)); return id; @@ -105,6 +106,7 @@ Forwarder &Forwarder::deleteConnector(Connector::Id id) { if (it != local_connectors_.end()) { it->second->close(); local_connectors_.erase(it); + } else { } return *this; @@ -120,9 +122,9 @@ Connector::Ptr Forwarder::getConnector(Connector::Id id) { return nullptr; } -void Forwarder::onPacketFromListener(Connector *connector, - utils::MemBuf &packet_buffer, - const std::error_code &ec) { +void Forwarder::onPacketFromListener( + Connector *connector, const std::vector<utils::MemBuf::Ptr> &packets, + const std::error_code &ec) { // Create connector connector->setReceiveCallback( std::bind(&Forwarder::onPacketReceived, this, std::placeholders::_1, @@ -135,37 +137,47 @@ void Forwarder::onPacketFromListener(Connector *connector, remote_connectors_.emplace(connector->getConnectorId(), connector->shared_from_this()); } + // TODO Check if control packet or not. For the moment it is not. - onPacketReceived(connector, packet_buffer, ec); + onPacketReceived(connector, packets, ec); } void Forwarder::onPacketReceived(Connector *connector, - utils::MemBuf &packet_buffer, + const std::vector<utils::MemBuf::Ptr> &packets, const std::error_code &ec) { - // Figure out the type of packet we received - bool is_interest = Packet::isInterest(packet_buffer.data()); - - Packet *packet = nullptr; - if (is_interest) { - packet = static_cast<Interest *>(&packet_buffer); - } else { - packet = static_cast<ContentObject *>(&packet_buffer); + if (ec) { + LOG(ERROR) << "Error receiving packet: " << ec.message(); + return; } - for (auto &c : local_connectors_) { - auto role = c.second->getRole(); - auto is_producer = role == Connector::Role::PRODUCER; - if ((is_producer && is_interest) || (!is_producer && !is_interest)) { - c.second->send(*packet); + for (auto &packet_buffer_ptr : packets) { + auto &packet_buffer = *packet_buffer_ptr; + + // Figure out the type of packet we received + bool is_interest = Packet::isInterest(packet_buffer.data()); + + Packet *packet = nullptr; + if (is_interest) { + packet = static_cast<Interest *>(&packet_buffer); } else { - LOG(ERROR) << "Error sending packet to local connector. is_interest = " - << is_interest << " - is_producer = " << is_producer; + packet = static_cast<ContentObject *>(&packet_buffer); + } + + for (auto &c : local_connectors_) { + auto role = c.second->getRole(); + auto is_producer = role == Connector::Role::PRODUCER; + if ((is_producer && is_interest) || (!is_producer && !is_interest)) { + c.second->send(*packet); + } else { + LOG(ERROR) << "Error sending packet to local connector. is_interest = " + << is_interest << " - is_producer = " << is_producer; + } } - } - // PCS Lookup + FIB lookup. Skip for now + // PCS Lookup + FIB lookup. Skip for now - // Forward packet to local connectors + // Forward packet to local connectors + } } void Forwarder::send(Packet &packet) { |