aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/core/portal.h
diff options
context:
space:
mode:
authorLuca Muscariello <lumuscar@cisco.com>2022-06-09 21:34:09 +0200
committerLuca Muscariello <muscariello@ieee.org>2022-06-30 10:47:50 +0200
commit6b94663b2455e212009a544ae23bb6a8c55407f8 (patch)
tree0af780ce5eeb1009fd24b8af8af08e8368eda3bd /libtransport/src/core/portal.h
parenta1ac96f497719b897793ac14b287cb8d840651c1 (diff)
refactor(lib, hicn-light, vpp, hiperf): HICN-723
- move infra data structure into the shared lib - new packet cache using double hashing and lookup on prefix suffix - testing updates - authenticated requests using interest manifests Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Jordan Augé <jordan.auge+fdio@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Enrico Loparco <eloparco@cisco.com> Change-Id: Iaddebfe6aa5279ea8553433b0f519578f6b9ccd9 Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/core/portal.h')
-rw-r--r--libtransport/src/core/portal.h180
1 files changed, 98 insertions, 82 deletions
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h
index aae4c573e..6f3a48e83 100644
--- a/libtransport/src/core/portal.h
+++ b/libtransport/src/core/portal.h
@@ -32,6 +32,10 @@
#include <hicn/transport/utils/event_thread.h>
#include <hicn/transport/utils/fixed_block_allocator.h>
+extern "C" {
+#include <hicn/header.h>
+}
+
#include <future>
#include <memory>
#include <queue>
@@ -179,19 +183,11 @@ class Portal : public ::utils::NonCopyable,
Portal() : Portal(GlobalWorkers::getInstance().getWorker()) {}
Portal(::utils::EventThread &worker)
- : io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }),
+ : io_module_(nullptr),
worker_(worker),
app_name_("libtransport_application"),
transport_callback_(nullptr),
- is_consumer_(false) {
- /**
- * This workaroung allows to initialize memory for packet buffers *before*
- * any static variables that may be initialized in the io_modules. In this
- * way static variables in modules will be destroyed before the packet
- * memory.
- */
- PacketManager<>::getInstance();
- }
+ is_consumer_(false) {}
public:
using TransportCallback = interface::Portal::TransportCallback;
@@ -275,6 +271,7 @@ class Portal : public ::utils::NonCopyable,
ptr->transport_callback_->onError(ec);
}
},
+ [self]([[maybe_unused]] Connector *c) { /* Nothing to do here */ },
[self](Connector *c, const std::error_code &ec) {
auto ptr = self.lock();
if (ptr) {
@@ -315,76 +312,113 @@ class Portal : public ::utils::NonCopyable,
}
/**
- * Send an interest through to the local forwarder.
- *
- * @param interest - The pointer to the interest. The ownership of the
- * interest is transferred by the caller to portal.
- *
- * @param on_content_object_callback - If the caller wishes to use a
- * different callback to be called for this interest, it can set this
- * parameter. Otherwise ConsumerCallback::onContentObject will be used.
+ * @brief Add interest to PIT
*
- * @param on_interest_timeout_callback - If the caller wishes to use a
- * different callback to be called for this interest, it can set this
- * parameter. Otherwise ConsumerCallback::onTimeout will be used.
*/
- void sendInterest(
- Interest::Ptr &&interest,
+ void addInterestToPIT(
+ const Interest::Ptr &interest, uint32_t lifetime,
OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
OnInterestTimeoutCallback &&on_interest_timeout_callback =
UNSET_CALLBACK) {
- DCHECK(std::this_thread::get_id() == worker_.getThreadId());
-
- // Send it
- interest->encodeSuffixes();
- io_module_->send(*interest);
-
uint32_t initial_hash = interest->getName().getHash32(false);
auto hash = initial_hash + interest->getName().getSuffix();
uint32_t seq = interest->getName().getSuffix();
- uint32_t *suffix = interest->firstSuffix();
+ const uint32_t *suffix = interest->firstSuffix();
auto n_suffixes = interest->numberOfSuffixes();
uint32_t counter = 0;
// Set timers
do {
+ auto pend_int = pending_interest_hash_table_.try_emplace(
+ hash, worker_.getIoService(), interest);
+ PendingInterest &pending_interest = pend_int.first->second;
+ if (!pend_int.second) {
+ // element was already in map
+ pend_int.first->second.cancelTimer();
+ pending_interest.setInterest(interest);
+ }
+
+ pending_interest.setOnContentObjectCallback(
+ std::move(on_content_object_callback));
+ pending_interest.setOnTimeoutCallback(
+ std::move(on_interest_timeout_callback));
+
+ if (is_consumer_) {
+ auto self = weak_from_this();
+ pending_interest.startCountdown(
+ lifetime, portal_details::makeCustomAllocatorHandler(
+ async_callback_memory_,
+ [self, hash, seq](const std::error_code &ec) {
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ return;
+ }
+
+ if (auto ptr = self.lock()) {
+ ptr->timerHandler(hash, seq);
+ }
+ }));
+ }
+
if (suffix) {
hash = initial_hash + *suffix;
seq = *suffix;
suffix++;
}
+ } while (counter++ < n_suffixes);
+ }
- auto it = pending_interest_hash_table_.find(hash);
- PendingInterest *pending_interest = nullptr;
- if (it != pending_interest_hash_table_.end()) {
- it->second.cancelTimer();
- pending_interest = &it->second;
- pending_interest->setInterest(interest);
- } else {
- auto pend_int = pending_interest_hash_table_.try_emplace(
- hash, worker_.getIoService(), interest);
- pending_interest = &pend_int.first->second;
- }
+ void matchContentObjectInPIT(ContentObject &content_object) {
+ uint32_t hash = getHash(content_object.getName());
+ auto it = pending_interest_hash_table_.find(hash);
+ if (it != pending_interest_hash_table_.end()) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest.";
- pending_interest->setOnContentObjectCallback(
- std::move(on_content_object_callback));
- pending_interest->setOnTimeoutCallback(
- std::move(on_interest_timeout_callback));
+ PendingInterest &pend_interest = it->second;
+ pend_interest.cancelTimer();
+ auto _int = pend_interest.getInterest();
+ auto callback = pend_interest.getOnDataCallback();
+ pending_interest_hash_table_.erase(it);
- auto self = weak_from_this();
- pending_interest->startCountdown(
- portal_details::makeCustomAllocatorHandler(
- async_callback_memory_,
- [self, hash, seq](const std::error_code &ec) {
- if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
- return;
- }
+ if (is_consumer_) {
+ // Send object is for the app
+ if (callback != UNSET_CALLBACK) {
+ callback(*_int, content_object);
+ } else if (transport_callback_) {
+ transport_callback_->onContentObject(*_int, content_object);
+ }
+ } else {
+ // Send content object to the network
+ io_module_->send(content_object);
+ }
+ } else if (is_consumer_) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "No interest pending for received content object.";
+ }
+ }
- if (auto ptr = self.lock()) {
- ptr->timerHandler(hash, seq);
- }
- }));
+ /**
+ * Send an interest through to the local forwarder.
+ *
+ * @param interest - The pointer to the interest. The ownership of the
+ * interest is transferred by the caller to portal.
+ *
+ * @param on_content_object_callback - If the caller wishes to use a
+ * different callback to be called for this interest, it can set this
+ * parameter. Otherwise ConsumerCallback::onContentObject will be used.
+ *
+ * @param on_interest_timeout_callback - If the caller wishes to use a
+ * different callback to be called for this interest, it can set this
+ * parameter. Otherwise ConsumerCallback::onTimeout will be used.
+ */
+ void sendInterest(
+ Interest::Ptr &interest, uint32_t lifetime,
+ OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
+ OnInterestTimeoutCallback &&on_interest_timeout_callback =
+ UNSET_CALLBACK) {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
- } while (counter++ < n_suffixes);
+ io_module_->send(*interest);
+ addInterestToPIT(interest, lifetime, std::move(on_content_object_callback),
+ std::move(on_interest_timeout_callback));
}
/**
@@ -423,8 +457,7 @@ class Portal : public ::utils::NonCopyable,
void sendContentObject(ContentObject &content_object) {
DCHECK(io_module_);
DCHECK(std::this_thread::get_id() == worker_.getThreadId());
-
- io_module_->send(content_object);
+ matchContentObjectInPIT(content_object);
}
/**
@@ -582,6 +615,9 @@ class Portal : public ::utils::NonCopyable,
void processInterest(Interest &interest) {
// Interest for a producer
DLOG_IF(INFO, VLOG_IS_ON(3)) << "processInterest " << interest.getName();
+
+ // Save interest in PIT
+ addInterestToPIT(interest.shared_from_this(), interest.getLifetime());
if (TRANSPORT_EXPECT_TRUE(transport_callback_ != nullptr)) {
transport_callback_->onInterest(interest);
}
@@ -598,27 +634,7 @@ class Portal : public ::utils::NonCopyable,
void processContentObject(ContentObject &content_object) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "processContentObject " << content_object.getName();
- uint32_t hash = getHash(content_object.getName());
-
- auto it = pending_interest_hash_table_.find(hash);
- if (it != pending_interest_hash_table_.end()) {
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest.";
-
- PendingInterest &pend_interest = it->second;
- pend_interest.cancelTimer();
- auto _int = pend_interest.getInterest();
- auto callback = pend_interest.getOnDataCallback();
- pending_interest_hash_table_.erase(it);
-
- if (callback != UNSET_CALLBACK) {
- callback(*_int, content_object);
- } else if (transport_callback_) {
- transport_callback_->onContentObject(*_int, content_object);
- }
- } else {
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "No interest pending for received content object.";
- }
+ matchContentObjectInPIT(content_object);
}
/**
@@ -632,7 +648,7 @@ class Portal : public ::utils::NonCopyable,
private:
portal_details::HandlerMemory async_callback_memory_;
- std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_;
+ std::unique_ptr<IoModule> io_module_;
::utils::EventThread &worker_;