aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/core/portal.h
diff options
context:
space:
mode:
authorLuca Muscariello <lumuscar@cisco.com>2022-03-30 22:29:28 +0200
committerMauro Sardara <msardara@cisco.com>2022-03-31 19:51:47 +0200
commitc46e5df56b67bb8ea7a068d39324c640084ead2b (patch)
treeeddeb17785938e09bc42eec98ee09b8a28846de6 /libtransport/src/core/portal.h
parent18fa668f25d3cc5463417ce7df6637e31578e898 (diff)
feat: boostrap hicn 22.02
The current patch provides several new features, improvements, bug fixes and also complete rewrite of entire components. - lib The hicn packet parser has been improved with a new packet format fully based on UDP. The TCP header is still temporarily supported but the UDP header will replace completely the new hicn packet format. Improvements have been made to make sure every packet parsing operation is made via this library. The current new header can be used as header between the payload and the UDP header or as trailer in the UDP surplus area to be tested when UDP options will start to be used. - hicn-light The portable packet forwarder has been completely rewritten from scratch with the twofold objective to improve performance and code size but also to drop dependencies such as libparc which is now removed by the current implementation. - hicn control the control library is the agent that is used to program the packet forwarders via their binary API. This component has benefited from significant improvements in terms of interaction model which is now event driven and more robust to failures. - VPP plugin has been updated to support VPP 22.02 - transport Major improvement have been made to the RTC protocol, to the support of IO modules and to the security sub system. Signed manifests are the default data authenticity and integrity framework. Confidentiality can be enabled by sharing the encryption key to the prod/cons layer. The library has been tested with group key based applications such as broadcast/multicast and real-time on-line meetings with trusted server keys or MLS. - testing Unit testing has been introduced using GoogleTest. One third of the code base is covered by unit testing with priority on critical features. Functional testing has also been introduce using Docker, linux bridging and Robot Framework to define test with Less Code techniques to facilitate the extension of the coverage. Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Jordan Augé <jordan.auge+fdio@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Angelo Mantellini <manangel@cisco.com> Co-authored-by: Jacques Samain <jsamain@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Enrico Loparco <eloparco@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> Change-Id: I75d0ef70f86d921e3ef503c99271216ff583c215 Signed-off-by: Luca Muscariello <muscariello@ieee.org> Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/core/portal.h')
-rw-r--r--libtransport/src/core/portal.h550
1 files changed, 258 insertions, 292 deletions
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h
index f6a9ce85b..aae4c573e 100644
--- a/libtransport/src/core/portal.h
+++ b/libtransport/src/core/portal.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,6 +15,7 @@
#pragma once
+#include <core/global_workers.h>
#include <core/pending_interest.h>
#include <glog/logging.h>
#include <hicn/transport/config.h>
@@ -28,6 +29,7 @@
#include <hicn/transport/interfaces/global_conf_interface.h>
#include <hicn/transport/interfaces/portal.h>
#include <hicn/transport/portability/portability.h>
+#include <hicn/transport/utils/event_thread.h>
#include <hicn/transport/utils/fixed_block_allocator.h>
#include <future>
@@ -54,11 +56,11 @@ class HandlerMemory {
HandlerMemory(const HandlerMemory &) = delete;
HandlerMemory &operator=(const HandlerMemory &) = delete;
- TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
+ void *allocate(std::size_t size) {
return utils::FixedBlockAllocator<128, 8192>::getInstance().allocateBlock();
}
- TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
+ void deallocate(void *pointer) {
utils::FixedBlockAllocator<128, 8192>::getInstance().deallocateBlock(
pointer);
}
@@ -69,13 +71,9 @@ class HandlerMemory {
HandlerMemory(const HandlerMemory &) = delete;
HandlerMemory &operator=(const HandlerMemory &) = delete;
- TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
- return ::operator new(size);
- }
+ void *allocate(std::size_t size) { return ::operator new(size); }
- TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
- ::operator delete(pointer);
- }
+ void deallocate(void *pointer) { ::operator delete(pointer); }
#endif
};
@@ -92,21 +90,19 @@ class HandlerAllocator {
HandlerAllocator(const HandlerAllocator<U> &other) noexcept
: memory_(other.memory_) {}
- TRANSPORT_ALWAYS_INLINE bool operator==(
- const HandlerAllocator &other) const noexcept {
+ bool operator==(const HandlerAllocator &other) const noexcept {
return &memory_ == &other.memory_;
}
- TRANSPORT_ALWAYS_INLINE bool operator!=(
- const HandlerAllocator &other) const noexcept {
+ bool operator!=(const HandlerAllocator &other) const noexcept {
return &memory_ != &other.memory_;
}
- TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n) const {
+ T *allocate(std::size_t n) const {
return static_cast<T *>(memory_.allocate(sizeof(T) * n));
}
- TRANSPORT_ALWAYS_INLINE void deallocate(T *p, std::size_t /*n*/) const {
+ void deallocate(T *p, std::size_t /*n*/) const {
return memory_.deallocate(p);
}
@@ -135,7 +131,7 @@ class CustomAllocatorHandler {
}
template <typename... Args>
- void operator()(Args &&... args) {
+ void operator()(Args &&...args) {
handler_(std::forward<Args>(args)...);
}
@@ -151,49 +147,16 @@ inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler(
return CustomAllocatorHandler<Handler>(m, h);
}
-class Pool {
- public:
- Pool(asio::io_service &io_service) : io_service_(io_service) {
- increasePendingInterestPool();
- }
-
- TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() {
- // Create pool of pending interests to reuse
- for (uint32_t i = 0; i < pit_size; i++) {
- pending_interests_pool_.add(new PendingInterest(
- Interest::Ptr(nullptr),
- std::make_unique<asio::steady_timer>(io_service_)));
- }
- }
- PendingInterest::Ptr getPendingInterest() {
- auto res = pending_interests_pool_.get();
- while (TRANSPORT_EXPECT_FALSE(!res.first)) {
- increasePendingInterestPool();
- res = pending_interests_pool_.get();
- }
-
- return std::move(res.second);
- }
-
- private:
- utils::ObjectPool<PendingInterest> pending_interests_pool_;
- asio::io_service &io_service_;
-};
-
} // namespace portal_details
class PortalConfiguration;
-using PendingInterestHashTable =
- std::unordered_map<uint32_t, PendingInterest::Ptr>;
-
-using interface::BindConfig;
+using PendingInterestHashTable = std::unordered_map<uint32_t, PendingInterest>;
/**
* Portal is a opaque class which is used for sending/receiving interest/data
- * packets over multiple kind of connector. The connector itself is defined by
- * the template ForwarderInt, which is resolved at compile time. It is then not
- * possible to decide at runtime what the connector will be.
+ * packets over multiple kind of io_modules. The io_module itself is an external
+ * module loaded at runtime.
*
* The tasks performed by portal are the following:
* - Sending/Receiving Interest packets
@@ -210,22 +173,16 @@ using interface::BindConfig;
* users of this class.
*/
-class Portal {
- public:
- using ConsumerCallback = interface::Portal::ConsumerCallback;
- using ProducerCallback = interface::Portal::ProducerCallback;
-
- friend class PortalConfiguration;
-
- Portal() : Portal(internal_io_service_) {}
+class Portal : public ::utils::NonCopyable,
+ public std::enable_shared_from_this<Portal> {
+ private:
+ Portal() : Portal(GlobalWorkers::getInstance().getWorker()) {}
- Portal(asio::io_service &io_service)
+ Portal(::utils::EventThread &worker)
: io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }),
- io_service_(io_service),
- packet_pool_(io_service),
+ worker_(worker),
app_name_("libtransport_application"),
- consumer_callback_(nullptr),
- producer_callback_(nullptr),
+ transport_callback_(nullptr),
is_consumer_(false) {
/**
* This workaroung allows to initialize memory for packet buffers *before*
@@ -235,58 +192,105 @@ class Portal {
*/
PacketManager<>::getInstance();
}
+
+ public:
+ using TransportCallback = interface::Portal::TransportCallback;
+ friend class PortalConfiguration;
+
+ static std::shared_ptr<Portal> createShared() {
+ return std::shared_ptr<Portal>(new Portal());
+ }
+
+ static std::shared_ptr<Portal> createShared(::utils::EventThread &worker) {
+ return std::shared_ptr<Portal>(new Portal(worker));
+ }
+
+ bool isConnected() const { return io_module_.get() != nullptr; }
+
/**
- * Set the consumer callback.
+ * Set the transport callback. Must be called from the same worker thread.
*
- * @param consumer_callback - The pointer to the ConsumerCallback object.
+ * @param consumer_callback - The pointer to the TransportCallback object.
*/
- void setConsumerCallback(ConsumerCallback *consumer_callback) {
- consumer_callback_ = consumer_callback;
+ void registerTransportCallback(TransportCallback *transport_callback) {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+ transport_callback_ = transport_callback;
}
/**
- * Set the producer callback.
- *
- * @param producer_callback - The pointer to the ProducerCallback object.
+ * Unset the consumer callback. Must be called from the same worker thread.
*/
- void setProducerCallback(ProducerCallback *producer_callback) {
- producer_callback_ = producer_callback;
+ void unregisterTransportCallback() {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+ transport_callback_ = nullptr;
}
/**
- * Specify the output interface to use. This method will be useful in a future
- * scenario where the library will be able to forward packets without
+ * Specify the output interface to use. This method will be useful in a
+ * future scenario where the library will be able to forward packets without
* connecting to a local forwarder. Now it is not used.
*
* @param output_interface - The output interface to use for
* forwarding/receiving packets.
*/
- TRANSPORT_ALWAYS_INLINE void setOutputInterface(
- const std::string &output_interface) {
- io_module_->setOutputInterface(output_interface);
+ void setOutputInterface(const std::string &output_interface) {
+ if (io_module_) {
+ io_module_->setOutputInterface(output_interface);
+ }
}
/**
* Connect the transport to the local hicn forwarder.
*
- * @param is_consumer - Boolean specifying if the application on top of portal
- * is a consumer or a producer.
+ * @param is_consumer - Boolean specifying if the application on top of
+ * portal is a consumer or a producer.
*/
- TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) {
- if (!io_module_) {
- pending_interest_hash_table_.reserve(portal_details::pit_size);
- io_module_.reset(IoModule::load(io_module_path_.c_str()));
-
- CHECK(io_module_);
-
- io_module_->init(std::bind(&Portal::processIncomingMessages, this,
- std::placeholders::_1, std::placeholders::_2,
- std::placeholders::_3),
- std::bind(&Portal::setLocalRoutes, this), io_service_,
- app_name_);
- io_module_->connect(is_consumer);
- is_consumer_ = is_consumer;
+ void connect(bool is_consumer = true) {
+ if (isConnected()) {
+ return;
}
+
+ worker_.addAndWaitForExecution([this, is_consumer]() {
+ if (!io_module_) {
+ pending_interest_hash_table_.reserve(portal_details::pit_size);
+ io_module_.reset(IoModule::load(io_module_path_.c_str()));
+
+ CHECK(io_module_);
+
+ std::weak_ptr<Portal> self(shared_from_this());
+
+ io_module_->init(
+ [self](Connector *c, const std::vector<utils::MemBuf::Ptr> &buffers,
+ const std::error_code &ec) {
+ if (auto ptr = self.lock()) {
+ ptr->processIncomingMessages(c, buffers, ec);
+ }
+ },
+ [self](Connector *c, const std::error_code &ec) {
+ if (!ec) {
+ return;
+ }
+ auto ptr = self.lock();
+ if (ptr && ptr->transport_callback_) {
+ ptr->transport_callback_->onError(ec);
+ }
+ },
+ [self](Connector *c, const std::error_code &ec) {
+ auto ptr = self.lock();
+ if (ptr) {
+ if (ec && ptr->transport_callback_) {
+ ptr->transport_callback_->onError(ec);
+ return;
+ }
+ ptr->setLocalRoutes();
+ }
+ },
+ worker_.getIoService(), app_name_);
+
+ io_module_->connect(is_consumer);
+ is_consumer_ = is_consumer;
+ }
+ });
}
/**
@@ -295,18 +299,13 @@ class Portal {
~Portal() { killConnection(); }
/**
- * Compute name hash
- */
- TRANSPORT_ALWAYS_INLINE uint32_t getHash(const Name &name) {
- return name.getHash32(false) + name.getSuffix();
- }
-
- /**
* Check if there is already a pending interest for a given name.
*
* @param name - The interest name.
*/
- TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
+ bool interestIsPending(const Name &name) {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+
auto it = pending_interest_hash_table_.find(getHash(name));
if (it != pending_interest_hash_table_.end()) {
return true;
@@ -321,19 +320,21 @@ class Portal {
* @param interest - The pointer to the interest. The ownership of the
* interest is transferred by the caller to portal.
*
- * @param on_content_object_callback - If the caller wishes to use a different
- * callback to be called for this interest, it can set this parameter.
- * Otherwise ConsumerCallback::onContentObject will be used.
+ * @param on_content_object_callback - If the caller wishes to use a
+ * different callback to be called for this interest, it can set this
+ * parameter. Otherwise ConsumerCallback::onContentObject will be used.
*
* @param on_interest_timeout_callback - If the caller wishes to use a
* different callback to be called for this interest, it can set this
* parameter. Otherwise ConsumerCallback::onTimeout will be used.
*/
- TRANSPORT_ALWAYS_INLINE void sendInterest(
+ void sendInterest(
Interest::Ptr &&interest,
OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
OnInterestTimeoutCallback &&on_interest_timeout_callback =
UNSET_CALLBACK) {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+
// Send it
interest->encodeSuffixes();
io_module_->send(*interest);
@@ -346,35 +347,42 @@ class Portal {
uint32_t counter = 0;
// Set timers
do {
- auto pending_interest = packet_pool_.getPendingInterest();
- pending_interest->setInterest(interest);
+ if (suffix) {
+ hash = initial_hash + *suffix;
+ seq = *suffix;
+ suffix++;
+ }
+
+ auto it = pending_interest_hash_table_.find(hash);
+ PendingInterest *pending_interest = nullptr;
+ if (it != pending_interest_hash_table_.end()) {
+ it->second.cancelTimer();
+ pending_interest = &it->second;
+ pending_interest->setInterest(interest);
+ } else {
+ auto pend_int = pending_interest_hash_table_.try_emplace(
+ hash, worker_.getIoService(), interest);
+ pending_interest = &pend_int.first->second;
+ }
+
pending_interest->setOnContentObjectCallback(
std::move(on_content_object_callback));
pending_interest->setOnTimeoutCallback(
std::move(on_interest_timeout_callback));
+ auto self = weak_from_this();
pending_interest->startCountdown(
portal_details::makeCustomAllocatorHandler(
async_callback_memory_,
- std::bind(&Portal::timerHandler, this, std::placeholders::_1,
- hash, seq)));
+ [self, hash, seq](const std::error_code &ec) {
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ return;
+ }
- auto it = pending_interest_hash_table_.find(hash);
- if (it != pending_interest_hash_table_.end()) {
- it->second->cancelTimer();
-
- // Get reference to interest packet in order to have it destroyed.
- auto _int = it->second->getInterest();
- it->second = std::move(pending_interest);
- } else {
- pending_interest_hash_table_[hash] = std::move(pending_interest);
- }
-
- if (suffix) {
- hash = initial_hash + *suffix;
- seq = *suffix;
- suffix++;
- }
+ if (auto ptr = self.lock()) {
+ ptr->timerHandler(hash, seq);
+ }
+ }));
} while (counter++ < n_suffixes);
}
@@ -385,214 +393,174 @@ class Portal {
* @param ec - Error code which says whether the timer expired or has been
* canceled upon data packet reception.
*
- * @param hash - The index of the interest in the pending interest hash table.
+ * @param hash - The index of the interest in the pending interest hash
+ * table.
*/
- TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec,
- uint32_t hash, uint32_t seq) {
- bool is_stopped = io_service_.stopped();
- if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
- return;
- }
+ void timerHandler(uint32_t hash, uint32_t seq) {
+ PendingInterestHashTable::iterator it =
+ pending_interest_hash_table_.find(hash);
+ if (it != pending_interest_hash_table_.end()) {
+ PendingInterest &pend_interest = it->second;
+ auto _int = pend_interest.getInterest();
+ auto callback = pend_interest.getOnTimeoutCallback();
+ pending_interest_hash_table_.erase(it);
+ Name &name = const_cast<Name &>(_int->getName());
+ name.setSuffix(seq);
- if (TRANSPORT_EXPECT_TRUE(!ec)) {
- PendingInterestHashTable::iterator it =
- pending_interest_hash_table_.find(hash);
- if (it != pending_interest_hash_table_.end()) {
- PendingInterest::Ptr ptr = std::move(it->second);
- pending_interest_hash_table_.erase(it);
- auto _int = ptr->getInterest();
- Name &name = const_cast<Name &>(_int->getName());
- name.setSuffix(seq);
-
- if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) {
- ptr->on_interest_timeout_callback_(_int, name);
- } else if (consumer_callback_) {
- consumer_callback_->onTimeout(_int, name);
- }
+ if (callback != UNSET_CALLBACK) {
+ callback(_int, name);
+ } else if (transport_callback_) {
+ transport_callback_->onTimeout(_int, name);
}
}
}
/**
- * Register a producer name to the local forwarder and optionally set the
- * content store size in a per-face manner.
- *
- * @param config - The configuration for the local forwarder binding.
- */
- TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) {
- assert(io_module_);
- io_module_->setContentStoreSize(config.csReserved());
- served_namespaces_.push_back(config.prefix());
- setLocalRoutes();
- }
-
- /**
- * Start the event loop. This function blocks here and calls the callback set
- * by the application upon interest/data received or timeout.
- */
- TRANSPORT_ALWAYS_INLINE void runEventsLoop() {
- if (io_service_.stopped()) {
- io_service_.reset(); // ensure that run()/poll() will do some work
- }
-
- io_service_.run();
- }
-
- /**
- * Run one event and return.
- */
- TRANSPORT_ALWAYS_INLINE void runOneEvent() {
- if (io_service_.stopped()) {
- io_service_.reset(); // ensure that run()/poll() will do some work
- }
-
- io_service_.run_one();
- }
-
- /**
- * Send a data packet to the local forwarder. As opposite to sendInterest, the
- * ownership of the content object is not transferred to the portal.
+ * Send a data packet to the local forwarder.
*
* @param content_object - The data packet.
*/
- TRANSPORT_ALWAYS_INLINE void sendContentObject(
- ContentObject &content_object) {
+ void sendContentObject(ContentObject &content_object) {
+ DCHECK(io_module_);
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+
io_module_->send(content_object);
}
/**
- * Stop the event loop, canceling all the pending events in the event queue.
- *
- * Beware that stopping the event loop DOES NOT disconnect the transport from
- * the local forwarder, the connector underneath will stay connected.
+ * Disconnect the transport from the local forwarder.
*/
- TRANSPORT_ALWAYS_INLINE void stopEventsLoop() {
- if (!io_service_.stopped()) {
- io_service_.dispatch([this]() {
- clear();
- io_service_.stop();
- });
+ void killConnection() {
+ if (TRANSPORT_EXPECT_TRUE(io_module_ != nullptr)) {
+ io_module_->closeConnection();
}
}
/**
- * Disconnect the transport from the local forwarder.
+ * Clear the pending interest hash table.
*/
- TRANSPORT_ALWAYS_INLINE void killConnection() {
- io_module_->closeConnection();
+ void clear() {
+ worker_.tryRunHandlerNow([self{shared_from_this()}]() { self->doClear(); });
}
/**
- * Clear the pending interest hash table.
+ * Get a reference to the io_service object.
*/
- TRANSPORT_ALWAYS_INLINE void clear() {
- if (!io_service_.stopped()) {
- io_service_.dispatch(std::bind(&Portal::doClear, this));
- } else {
- doClear();
- }
- }
+ utils::EventThread &getThread() { return worker_; }
/**
- * Remove one pending interest.
+ * Register a route to the local forwarder.
*/
- TRANSPORT_ALWAYS_INLINE void clearOne(const Name &name) {
- if (!io_service_.stopped()) {
- io_service_.dispatch(std::bind(&Portal::doClearOne, this, name));
- } else {
- doClearOne(name);
- }
+ void registerRoute(const Prefix &prefix) {
+ std::weak_ptr<Portal> self = shared_from_this();
+ worker_.tryRunHandlerNow([self, prefix]() {
+ if (auto ptr = self.lock()) {
+ auto ret = ptr->served_namespaces_.insert(prefix);
+ if (ret.second && ptr->io_module_ && ptr->io_module_->isConnected()) {
+ ptr->io_module_->registerRoute(prefix);
+ }
+ }
+ });
}
/**
- * Get a reference to the io_service object.
+ * Send a MAP-Me update to traverse NATs.
*/
- TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() {
- return io_service_;
+ TRANSPORT_ALWAYS_INLINE void sendMapme() {
+ if (io_module_->isConnected()) {
+ io_module_->sendMapme();
+ }
}
/**
- * Register a route to the local forwarder.
+ * set forwarding strategy
*/
- TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) {
- served_namespaces_.push_back(prefix);
+ TRANSPORT_ALWAYS_INLINE void setForwardingStrategy(Prefix &prefix,
+ std::string &strategy) {
if (io_module_->isConnected()) {
- io_module_->registerRoute(prefix);
+ io_module_->setForwardingStrategy(prefix, strategy);
}
}
/**
* Check if the transport is connected to a forwarder or not
*/
- TRANSPORT_ALWAYS_INLINE bool isConnectedToFwd() {
+ bool isConnectedToFwd() {
std::string mod = io_module_path_.substr(0, io_module_path_.find("."));
if (mod == "forwarder_module") return false;
return true;
}
+ auto &getServedNamespaces() { return served_namespaces_; }
+
private:
/**
- * Clear the pending interest hash table.
+ * Compute name hash
*/
- TRANSPORT_ALWAYS_INLINE void doClear() {
- for (auto &pend_interest : pending_interest_hash_table_) {
- pend_interest.second->cancelTimer();
-
- // Get interest packet from pending interest and do nothing with it. It
- // will get destroyed as it goes out of scope.
- auto _int = pend_interest.second->getInterest();
- }
-
- pending_interest_hash_table_.clear();
+ uint32_t getHash(const Name &name) {
+ return name.getHash32(false) + name.getSuffix();
}
/**
- * Remove one pending interest.
+ * Clear the pending interest hash table.
*/
- TRANSPORT_ALWAYS_INLINE void doClearOne(const Name &name) {
- auto it = pending_interest_hash_table_.find(getHash(name));
-
- if (it != pending_interest_hash_table_.end()) {
- it->second->cancelTimer();
-
- // Get interest packet from pending interest and do nothing with it. It
- // will get destroyed as it goes out of scope.
- auto _int = it->second->getInterest();
-
- pending_interest_hash_table_.erase(it);
+ void doClear() {
+ for (auto &pend_interest : pending_interest_hash_table_) {
+ pend_interest.second.cancelTimer();
}
+
+ pending_interest_hash_table_.clear();
}
/**
- * Callback called by the underlying connector upon reception of a packet from
- * the local forwarder.
+ * Callback called by the underlying connector upon reception of a packet
+ * from the local forwarder.
*
* @param packet_buffer - The bytes of the packet.
*/
- TRANSPORT_ALWAYS_INLINE void processIncomingMessages(
- Connector *c, utils::MemBuf &buffer, const std::error_code &ec) {
- bool is_stopped = io_service_.stopped();
- if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
+ void processIncomingMessages(Connector *c,
+ const std::vector<utils::MemBuf::Ptr> &buffers,
+ const std::error_code &ec) {
+ if (!transport_callback_) {
return;
}
- if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer.data()))) {
- processControlMessage(buffer);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ // Error receiving from underlying infra.
+ if (transport_callback_) {
+ transport_callback_->onError(ec);
+ }
+
return;
}
- // The buffer is a base class for an interest or a content object
- Packet &packet_buffer = static_cast<Packet &>(buffer);
+ for (auto &buffer_ptr : buffers) {
+ auto &buffer = *buffer_ptr;
+
+ if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer))) {
+ processControlMessage(buffer);
+ return;
+ }
- auto format = packet_buffer.getFormat();
- if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) {
- if (is_consumer_) {
- processContentObject(static_cast<ContentObject &>(packet_buffer));
+ auto format = Packet::getFormatFromBuffer(buffer.data(), buffer.length());
+ if (TRANSPORT_EXPECT_TRUE(_is_cmpr(format) || _is_tcp(format))) {
+ // The buffer is a base class for an interest or a content object
+ Packet &packet_buffer = static_cast<Packet &>(buffer);
+ if (is_consumer_ && !packet_buffer.isInterest()) {
+ processContentObject(static_cast<ContentObject &>(packet_buffer));
+ } else if (!is_consumer_ && packet_buffer.isInterest()) {
+ processInterest(static_cast<Interest &>(packet_buffer));
+ } else {
+ auto packet_type =
+ packet_buffer.isInterest() ? "Interest" : "ContentObject";
+ auto socket_type = is_consumer_ ? "consumer " : "producer ";
+ LOG(ERROR) << "Received a " << packet_type << " packet with name "
+ << packet_buffer.getName() << " in a " << socket_type
+ << " transport. Ignoring it.";
+ }
} else {
- processInterest(static_cast<Interest &>(packet_buffer));
+ LOG(ERROR) << "Received not supported packet. Ignoring it.";
}
- } else {
- LOG(ERROR) << "Received not supported packet. Ignoring it.";
}
}
@@ -601,19 +569,21 @@ class Portal {
* It register the prefixes in the served_namespaces_ list to the local
* forwarder.
*/
- TRANSPORT_ALWAYS_INLINE void setLocalRoutes() {
+ void setLocalRoutes() {
+ DCHECK(io_module_);
+ DCHECK(io_module_->isConnected());
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+
for (auto &prefix : served_namespaces_) {
- if (io_module_->isConnected()) {
- io_module_->registerRoute(prefix);
- }
+ io_module_->registerRoute(prefix);
}
}
- TRANSPORT_ALWAYS_INLINE void processInterest(Interest &interest) {
+ void processInterest(Interest &interest) {
// Interest for a producer
DLOG_IF(INFO, VLOG_IS_ON(3)) << "processInterest " << interest.getName();
- if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) {
- producer_callback_->onInterest(interest);
+ if (TRANSPORT_EXPECT_TRUE(transport_callback_ != nullptr)) {
+ transport_callback_->onInterest(interest);
}
}
@@ -625,8 +595,7 @@ class Portal {
*
* @param content_object - The data packet
*/
- TRANSPORT_ALWAYS_INLINE void processContentObject(
- ContentObject &content_object) {
+ void processContentObject(ContentObject &content_object) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "processContentObject " << content_object.getName();
uint32_t hash = getHash(content_object.getName());
@@ -635,15 +604,16 @@ class Portal {
if (it != pending_interest_hash_table_.end()) {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest.";
- PendingInterest::Ptr interest_ptr = std::move(it->second);
+ PendingInterest &pend_interest = it->second;
+ pend_interest.cancelTimer();
+ auto _int = pend_interest.getInterest();
+ auto callback = pend_interest.getOnDataCallback();
pending_interest_hash_table_.erase(it);
- interest_ptr->cancelTimer();
- auto _int = interest_ptr->getInterest();
- if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) {
- interest_ptr->on_content_object_callback_(*_int, content_object);
- } else if (consumer_callback_) {
- consumer_callback_->onContentObject(*_int, content_object);
+ if (callback != UNSET_CALLBACK) {
+ callback(*_int, content_object);
+ } else if (transport_callback_) {
+ transport_callback_->onContentObject(*_int, content_object);
}
} else {
DLOG_IF(INFO, VLOG_IS_ON(3))
@@ -652,12 +622,11 @@ class Portal {
}
/**
- * Process a control message. Control messages are different depending on the
- * connector, then the forwarder_interface will do the job of understanding
- * them.
+ * Process a control message. Control messages are different depending on
+ * the connector, then the forwarder_interface will do the job of
+ * understanding them.
*/
- TRANSPORT_ALWAYS_INLINE void processControlMessage(
- utils::MemBuf &packet_buffer) {
+ void processControlMessage(utils::MemBuf &packet_buffer) {
io_module_->processControlMessageReply(packet_buffer);
}
@@ -665,17 +634,14 @@ class Portal {
portal_details::HandlerMemory async_callback_memory_;
std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_;
- asio::io_service &io_service_;
- asio::io_service internal_io_service_;
- portal_details::Pool packet_pool_;
+ ::utils::EventThread &worker_;
std::string app_name_;
PendingInterestHashTable pending_interest_hash_table_;
- std::list<Prefix> served_namespaces_;
+ std::set<Prefix> served_namespaces_;
- ConsumerCallback *consumer_callback_;
- ProducerCallback *producer_callback_;
+ TransportCallback *transport_callback_;
bool is_consumer_;