diff options
author | Luca Muscariello <lumuscar@cisco.com> | 2022-06-09 21:34:09 +0200 |
---|---|---|
committer | Luca Muscariello <muscariello@ieee.org> | 2022-06-30 10:47:50 +0200 |
commit | 6b94663b2455e212009a544ae23bb6a8c55407f8 (patch) | |
tree | 0af780ce5eeb1009fd24b8af8af08e8368eda3bd /libtransport/src/core/portal.h | |
parent | a1ac96f497719b897793ac14b287cb8d840651c1 (diff) |
refactor(lib, hicn-light, vpp, hiperf): HICN-723
- move infra data structure into the shared lib
- new packet cache using double hashing and lookup on prefix suffix
- testing updates
- authenticated requests using interest manifests
Co-authored-by: Mauro Sardara <msardara@cisco.com>
Co-authored-by: Jordan Augé <jordan.auge+fdio@cisco.com>
Co-authored-by: Michele Papalini <micpapal@cisco.com>
Co-authored-by: Olivier Roques <oroques+fdio@cisco.com>
Co-authored-by: Enrico Loparco <eloparco@cisco.com>
Change-Id: Iaddebfe6aa5279ea8553433b0f519578f6b9ccd9
Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/core/portal.h')
-rw-r--r-- | libtransport/src/core/portal.h | 180 |
1 files changed, 98 insertions, 82 deletions
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h index aae4c573e..6f3a48e83 100644 --- a/libtransport/src/core/portal.h +++ b/libtransport/src/core/portal.h @@ -32,6 +32,10 @@ #include <hicn/transport/utils/event_thread.h> #include <hicn/transport/utils/fixed_block_allocator.h> +extern "C" { +#include <hicn/header.h> +} + #include <future> #include <memory> #include <queue> @@ -179,19 +183,11 @@ class Portal : public ::utils::NonCopyable, Portal() : Portal(GlobalWorkers::getInstance().getWorker()) {} Portal(::utils::EventThread &worker) - : io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }), + : io_module_(nullptr), worker_(worker), app_name_("libtransport_application"), transport_callback_(nullptr), - is_consumer_(false) { - /** - * This workaroung allows to initialize memory for packet buffers *before* - * any static variables that may be initialized in the io_modules. In this - * way static variables in modules will be destroyed before the packet - * memory. - */ - PacketManager<>::getInstance(); - } + is_consumer_(false) {} public: using TransportCallback = interface::Portal::TransportCallback; @@ -275,6 +271,7 @@ class Portal : public ::utils::NonCopyable, ptr->transport_callback_->onError(ec); } }, + [self]([[maybe_unused]] Connector *c) { /* Nothing to do here */ }, [self](Connector *c, const std::error_code &ec) { auto ptr = self.lock(); if (ptr) { @@ -315,76 +312,113 @@ class Portal : public ::utils::NonCopyable, } /** - * Send an interest through to the local forwarder. - * - * @param interest - The pointer to the interest. The ownership of the - * interest is transferred by the caller to portal. - * - * @param on_content_object_callback - If the caller wishes to use a - * different callback to be called for this interest, it can set this - * parameter. Otherwise ConsumerCallback::onContentObject will be used. + * @brief Add interest to PIT * - * @param on_interest_timeout_callback - If the caller wishes to use a - * different callback to be called for this interest, it can set this - * parameter. Otherwise ConsumerCallback::onTimeout will be used. */ - void sendInterest( - Interest::Ptr &&interest, + void addInterestToPIT( + const Interest::Ptr &interest, uint32_t lifetime, OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, OnInterestTimeoutCallback &&on_interest_timeout_callback = UNSET_CALLBACK) { - DCHECK(std::this_thread::get_id() == worker_.getThreadId()); - - // Send it - interest->encodeSuffixes(); - io_module_->send(*interest); - uint32_t initial_hash = interest->getName().getHash32(false); auto hash = initial_hash + interest->getName().getSuffix(); uint32_t seq = interest->getName().getSuffix(); - uint32_t *suffix = interest->firstSuffix(); + const uint32_t *suffix = interest->firstSuffix(); auto n_suffixes = interest->numberOfSuffixes(); uint32_t counter = 0; // Set timers do { + auto pend_int = pending_interest_hash_table_.try_emplace( + hash, worker_.getIoService(), interest); + PendingInterest &pending_interest = pend_int.first->second; + if (!pend_int.second) { + // element was already in map + pend_int.first->second.cancelTimer(); + pending_interest.setInterest(interest); + } + + pending_interest.setOnContentObjectCallback( + std::move(on_content_object_callback)); + pending_interest.setOnTimeoutCallback( + std::move(on_interest_timeout_callback)); + + if (is_consumer_) { + auto self = weak_from_this(); + pending_interest.startCountdown( + lifetime, portal_details::makeCustomAllocatorHandler( + async_callback_memory_, + [self, hash, seq](const std::error_code &ec) { + if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { + return; + } + + if (auto ptr = self.lock()) { + ptr->timerHandler(hash, seq); + } + })); + } + if (suffix) { hash = initial_hash + *suffix; seq = *suffix; suffix++; } + } while (counter++ < n_suffixes); + } - auto it = pending_interest_hash_table_.find(hash); - PendingInterest *pending_interest = nullptr; - if (it != pending_interest_hash_table_.end()) { - it->second.cancelTimer(); - pending_interest = &it->second; - pending_interest->setInterest(interest); - } else { - auto pend_int = pending_interest_hash_table_.try_emplace( - hash, worker_.getIoService(), interest); - pending_interest = &pend_int.first->second; - } + void matchContentObjectInPIT(ContentObject &content_object) { + uint32_t hash = getHash(content_object.getName()); + auto it = pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest."; - pending_interest->setOnContentObjectCallback( - std::move(on_content_object_callback)); - pending_interest->setOnTimeoutCallback( - std::move(on_interest_timeout_callback)); + PendingInterest &pend_interest = it->second; + pend_interest.cancelTimer(); + auto _int = pend_interest.getInterest(); + auto callback = pend_interest.getOnDataCallback(); + pending_interest_hash_table_.erase(it); - auto self = weak_from_this(); - pending_interest->startCountdown( - portal_details::makeCustomAllocatorHandler( - async_callback_memory_, - [self, hash, seq](const std::error_code &ec) { - if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { - return; - } + if (is_consumer_) { + // Send object is for the app + if (callback != UNSET_CALLBACK) { + callback(*_int, content_object); + } else if (transport_callback_) { + transport_callback_->onContentObject(*_int, content_object); + } + } else { + // Send content object to the network + io_module_->send(content_object); + } + } else if (is_consumer_) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "No interest pending for received content object."; + } + } - if (auto ptr = self.lock()) { - ptr->timerHandler(hash, seq); - } - })); + /** + * Send an interest through to the local forwarder. + * + * @param interest - The pointer to the interest. The ownership of the + * interest is transferred by the caller to portal. + * + * @param on_content_object_callback - If the caller wishes to use a + * different callback to be called for this interest, it can set this + * parameter. Otherwise ConsumerCallback::onContentObject will be used. + * + * @param on_interest_timeout_callback - If the caller wishes to use a + * different callback to be called for this interest, it can set this + * parameter. Otherwise ConsumerCallback::onTimeout will be used. + */ + void sendInterest( + Interest::Ptr &interest, uint32_t lifetime, + OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, + OnInterestTimeoutCallback &&on_interest_timeout_callback = + UNSET_CALLBACK) { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); - } while (counter++ < n_suffixes); + io_module_->send(*interest); + addInterestToPIT(interest, lifetime, std::move(on_content_object_callback), + std::move(on_interest_timeout_callback)); } /** @@ -423,8 +457,7 @@ class Portal : public ::utils::NonCopyable, void sendContentObject(ContentObject &content_object) { DCHECK(io_module_); DCHECK(std::this_thread::get_id() == worker_.getThreadId()); - - io_module_->send(content_object); + matchContentObjectInPIT(content_object); } /** @@ -582,6 +615,9 @@ class Portal : public ::utils::NonCopyable, void processInterest(Interest &interest) { // Interest for a producer DLOG_IF(INFO, VLOG_IS_ON(3)) << "processInterest " << interest.getName(); + + // Save interest in PIT + addInterestToPIT(interest.shared_from_this(), interest.getLifetime()); if (TRANSPORT_EXPECT_TRUE(transport_callback_ != nullptr)) { transport_callback_->onInterest(interest); } @@ -598,27 +634,7 @@ class Portal : public ::utils::NonCopyable, void processContentObject(ContentObject &content_object) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "processContentObject " << content_object.getName(); - uint32_t hash = getHash(content_object.getName()); - - auto it = pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest."; - - PendingInterest &pend_interest = it->second; - pend_interest.cancelTimer(); - auto _int = pend_interest.getInterest(); - auto callback = pend_interest.getOnDataCallback(); - pending_interest_hash_table_.erase(it); - - if (callback != UNSET_CALLBACK) { - callback(*_int, content_object); - } else if (transport_callback_) { - transport_callback_->onContentObject(*_int, content_object); - } - } else { - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "No interest pending for received content object."; - } + matchContentObjectInPIT(content_object); } /** @@ -632,7 +648,7 @@ class Portal : public ::utils::NonCopyable, private: portal_details::HandlerMemory async_callback_memory_; - std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_; + std::unique_ptr<IoModule> io_module_; ::utils::EventThread &worker_; |