aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/io_modules/forwarder/forwarder.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/io_modules/forwarder/forwarder.cc')
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder.cc68
1 files changed, 33 insertions, 35 deletions
diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc
index 0546cb8b3..d5f0b589e 100644
--- a/libtransport/src/io_modules/forwarder/forwarder.cc
+++ b/libtransport/src/io_modules/forwarder/forwarder.cc
@@ -14,12 +14,12 @@
*/
#include <core/global_configuration.h>
+#include <core/global_id_counter.h>
#include <core/local_connector.h>
+#include <core/udp_connector.h>
+#include <core/udp_listener.h>
#include <glog/logging.h>
#include <io_modules/forwarder/forwarder.h>
-#include <io_modules/forwarder/global_id_counter.h>
-#include <io_modules/forwarder/udp_tunnel.h>
-#include <io_modules/forwarder/udp_tunnel_listener.h>
namespace transport {
@@ -89,11 +89,14 @@ void Forwarder::initConnectors() {
Connector::Id Forwarder::registerLocalConnector(
asio::io_service &io_service,
Connector::PacketReceivedCallback &&receive_callback,
+ Connector::PacketSentCallback &&sent_callback,
+ Connector::OnCloseCallback &&close_callback,
Connector::OnReconnectCallback &&reconnect_callback) {
utils::SpinLock::Acquire locked(connector_lock_);
auto id = GlobalCounter<Connector::Id>::getInstance().getNext();
auto connector = std::make_shared<LocalConnector>(
- io_service, receive_callback, nullptr, nullptr, reconnect_callback);
+ io_service, std::move(receive_callback), std::move(sent_callback),
+ std::move(close_callback), std::move(reconnect_callback));
connector->setConnectorId(id);
local_connectors_.emplace(id, std::move(connector));
return id;
@@ -105,6 +108,7 @@ Forwarder &Forwarder::deleteConnector(Connector::Id id) {
if (it != local_connectors_.end()) {
it->second->close();
local_connectors_.erase(it);
+ } else {
}
return *this;
@@ -120,9 +124,9 @@ Connector::Ptr Forwarder::getConnector(Connector::Id id) {
return nullptr;
}
-void Forwarder::onPacketFromListener(Connector *connector,
- utils::MemBuf &packet_buffer,
- const std::error_code &ec) {
+void Forwarder::onPacketFromListener(
+ Connector *connector, const std::vector<utils::MemBuf::Ptr> &packets,
+ const std::error_code &ec) {
// Create connector
connector->setReceiveCallback(
std::bind(&Forwarder::onPacketReceived, this, std::placeholders::_1,
@@ -135,32 +139,21 @@ void Forwarder::onPacketFromListener(Connector *connector,
remote_connectors_.emplace(connector->getConnectorId(),
connector->shared_from_this());
}
+
// TODO Check if control packet or not. For the moment it is not.
- onPacketReceived(connector, packet_buffer, ec);
+ onPacketReceived(connector, packets, ec);
}
void Forwarder::onPacketReceived(Connector *connector,
- utils::MemBuf &packet_buffer,
+ const std::vector<utils::MemBuf::Ptr> &packets,
const std::error_code &ec) {
- // Figure out the type of packet we received
- bool is_interest = Packet::isInterest(packet_buffer.data());
-
- Packet *packet = nullptr;
- if (is_interest) {
- packet = static_cast<Interest *>(&packet_buffer);
- } else {
- packet = static_cast<ContentObject *>(&packet_buffer);
+ if (ec) {
+ LOG(ERROR) << "Error receiving packet: " << ec.message();
+ return;
}
for (auto &c : local_connectors_) {
- auto role = c.second->getRole();
- auto is_producer = role == Connector::Role::PRODUCER;
- if ((is_producer && is_interest) || (!is_producer && !is_interest)) {
- c.second->send(*packet);
- } else {
- LOG(ERROR) << "Error sending packet to local connector. is_interest = "
- << is_interest << " - is_producer = " << is_producer;
- }
+ c.second->receive(packets);
}
// PCS Lookup + FIB lookup. Skip for now
@@ -168,19 +161,24 @@ void Forwarder::onPacketReceived(Connector *connector,
// Forward packet to local connectors
}
-void Forwarder::send(Packet &packet) {
+void Forwarder::send(Packet &packet, Connector::Id connector_id) {
// TODo Here a nice PIT/CS / FIB would be required:)
// For now let's just forward the packet on the remote connector we get
- if (remote_connectors_.begin() == remote_connectors_.end()) {
- return;
+ for (auto &c : remote_connectors_) {
+ auto remote_endpoint = c.second->getRemoteEndpoint();
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Sending packet to: " << remote_endpoint.getAddress() << ":"
+ << remote_endpoint.getPort();
+ c.second->send(packet);
}
- auto remote_endpoint =
- remote_connectors_.begin()->second->getRemoteEndpoint();
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Sending packet to: " << remote_endpoint.getAddress() << ":"
- << remote_endpoint.getPort();
- remote_connectors_.begin()->second->send(packet);
+ for (auto &c : local_connectors_) {
+ if (c.first != connector_id) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Sending packet to local connector " << c.first << std::endl;
+ c.second->receive({packet.shared_from_this()});
+ }
+ }
}
void Forwarder::onPacketSent(Connector *connector, const std::error_code &ec) {}
@@ -292,4 +290,4 @@ void Forwarder::parseForwarderConfiguration(
}
} // namespace core
-} // namespace transport \ No newline at end of file
+} // namespace transport