summaryrefslogtreecommitdiffstats
path: root/libtransport/src/io_modules/forwarder
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/io_modules/forwarder')
-rw-r--r--libtransport/src/io_modules/forwarder/CMakeLists.txt1
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder.cc39
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder.h1
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder_module.cc5
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder_module.h1
-rw-r--r--libtransport/src/io_modules/forwarder/global_id_counter.h39
6 files changed, 14 insertions, 72 deletions
diff --git a/libtransport/src/io_modules/forwarder/CMakeLists.txt b/libtransport/src/io_modules/forwarder/CMakeLists.txt
index 3922316d3..2235d842e 100644
--- a/libtransport/src/io_modules/forwarder/CMakeLists.txt
+++ b/libtransport/src/io_modules/forwarder/CMakeLists.txt
@@ -17,7 +17,6 @@ list(APPEND MODULE_HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/errors.h
${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.h
${CMAKE_CURRENT_SOURCE_DIR}/forwarder.h
- ${CMAKE_CURRENT_SOURCE_DIR}/global_counter.h
)
list(APPEND MODULE_SOURCE_FILES
diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc
index 3ae5bf397..bfe4dd5de 100644
--- a/libtransport/src/io_modules/forwarder/forwarder.cc
+++ b/libtransport/src/io_modules/forwarder/forwarder.cc
@@ -14,12 +14,12 @@
*/
#include <core/global_configuration.h>
+#include <core/global_id_counter.h>
#include <core/local_connector.h>
#include <core/udp_connector.h>
#include <core/udp_listener.h>
#include <glog/logging.h>
#include <io_modules/forwarder/forwarder.h>
-#include <io_modules/forwarder/global_id_counter.h>
namespace transport {
@@ -90,11 +90,13 @@ Connector::Id Forwarder::registerLocalConnector(
asio::io_service &io_service,
Connector::PacketReceivedCallback &&receive_callback,
Connector::PacketSentCallback &&sent_callback,
+ Connector::OnCloseCallback &&close_callback,
Connector::OnReconnectCallback &&reconnect_callback) {
utils::SpinLock::Acquire locked(connector_lock_);
auto id = GlobalCounter<Connector::Id>::getInstance().getNext();
auto connector = std::make_shared<LocalConnector>(
- io_service, receive_callback, sent_callback, nullptr, reconnect_callback);
+ io_service, std::move(receive_callback), std::move(sent_callback),
+ std::move(close_callback), std::move(reconnect_callback));
connector->setConnectorId(id);
local_connectors_.emplace(id, std::move(connector));
return id;
@@ -150,34 +152,13 @@ void Forwarder::onPacketReceived(Connector *connector,
return;
}
- for (auto &packet_buffer_ptr : packets) {
- auto &packet_buffer = *packet_buffer_ptr;
-
- // Figure out the type of packet we received
- bool is_interest = Packet::isInterest(packet_buffer.data());
-
- Packet *packet = nullptr;
- if (is_interest) {
- packet = static_cast<Interest *>(&packet_buffer);
- } else {
- packet = static_cast<ContentObject *>(&packet_buffer);
- }
-
- for (auto &c : local_connectors_) {
- auto role = c.second->getRole();
- auto is_producer = role == Connector::Role::PRODUCER;
- if ((is_producer && is_interest) || (!is_producer && !is_interest)) {
- c.second->send(*packet);
- } else {
- LOG(ERROR) << "Error sending packet to local connector. is_interest = "
- << is_interest << " - is_producer = " << is_producer;
- }
- }
+ for (auto &c : local_connectors_) {
+ c.second->receive(packets);
+ }
- // PCS Lookup + FIB lookup. Skip for now
+ // PCS Lookup + FIB lookup. Skip for now
- // Forward packet to local connectors
- }
+ // Forward packet to local connectors
}
void Forwarder::send(Packet &packet) {
@@ -304,4 +285,4 @@ void Forwarder::parseForwarderConfiguration(
}
} // namespace core
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/io_modules/forwarder/forwarder.h b/libtransport/src/io_modules/forwarder/forwarder.h
index 38b4260b3..9ad989fcd 100644
--- a/libtransport/src/io_modules/forwarder/forwarder.h
+++ b/libtransport/src/io_modules/forwarder/forwarder.h
@@ -47,6 +47,7 @@ class Forwarder {
asio::io_service &io_service,
Connector::PacketReceivedCallback &&receive_callback,
Connector::PacketSentCallback &&sent_callback,
+ Connector::OnCloseCallback &&close_callback,
Connector::OnReconnectCallback &&reconnect_callback);
Forwarder &deleteConnector(Connector::Id id);
diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.cc b/libtransport/src/io_modules/forwarder/forwarder_module.cc
index 0ced84ab4..77d2b5e6a 100644
--- a/libtransport/src/io_modules/forwarder/forwarder_module.cc
+++ b/libtransport/src/io_modules/forwarder/forwarder_module.cc
@@ -37,8 +37,6 @@ void ForwarderModule::send(Packet &packet) {
forwarder_.send(packet);
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Sending from " << connector_id_ << " to " << 1 - connector_id_;
-
- // local_faces_.at(1 - local_id_).onPacket(packet);
}
void ForwarderModule::send(const utils::MemBuf::Ptr &buffer) {
@@ -58,12 +56,13 @@ void ForwarderModule::closeConnection() {
void ForwarderModule::init(Connector::PacketReceivedCallback &&receive_callback,
Connector::PacketSentCallback &&sent_callback,
+ Connector::OnCloseCallback &&close_callback,
Connector::OnReconnectCallback &&reconnect_callback,
asio::io_service &io_service,
const std::string &app_name) {
connector_id_ = forwarder_.registerLocalConnector(
io_service, std::move(receive_callback), std::move(sent_callback),
- std::move(reconnect_callback));
+ std::move(close_callback), std::move(reconnect_callback));
name_ = app_name;
}
diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.h b/libtransport/src/io_modules/forwarder/forwarder_module.h
index 52a12b67e..a48701161 100644
--- a/libtransport/src/io_modules/forwarder/forwarder_module.h
+++ b/libtransport/src/io_modules/forwarder/forwarder_module.h
@@ -44,6 +44,7 @@ class ForwarderModule : public IoModule {
void init(Connector::PacketReceivedCallback &&receive_callback,
Connector::PacketSentCallback &&sent_callback,
+ Connector::OnCloseCallback &&close_callback,
Connector::OnReconnectCallback &&reconnect_callback,
asio::io_service &io_service,
const std::string &app_name = "Libtransport") override;
diff --git a/libtransport/src/io_modules/forwarder/global_id_counter.h b/libtransport/src/io_modules/forwarder/global_id_counter.h
deleted file mode 100644
index 0a67b76d5..000000000
--- a/libtransport/src/io_modules/forwarder/global_id_counter.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/utils/singleton.h>
-
-#include <atomic>
-#include <mutex>
-
-namespace transport {
-
-namespace core {
-
-template <typename T = uint64_t>
-class GlobalCounter : public utils::Singleton<GlobalCounter<T>> {
- public:
- friend class utils::Singleton<GlobalCounter>;
- T getNext() { return counter_++; }
-
- private:
- GlobalCounter() : counter_(0) {}
- std::atomic<T> counter_;
-};
-
-} // namespace core
-} // namespace transport \ No newline at end of file