diff options
Diffstat (limited to 'libtransport/src/core/portal.h')
-rw-r--r-- | libtransport/src/core/portal.h | 646 |
1 files changed, 326 insertions, 320 deletions
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h index f6a9ce85b..c4ba096b9 100644 --- a/libtransport/src/core/portal.h +++ b/libtransport/src/core/portal.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,6 +15,7 @@ #pragma once +#include <core/global_workers.h> #include <core/pending_interest.h> #include <glog/logging.h> #include <hicn/transport/config.h> @@ -28,6 +29,7 @@ #include <hicn/transport/interfaces/global_conf_interface.h> #include <hicn/transport/interfaces/portal.h> #include <hicn/transport/portability/portability.h> +#include <hicn/transport/utils/event_thread.h> #include <hicn/transport/utils/fixed_block_allocator.h> #include <future> @@ -54,11 +56,11 @@ class HandlerMemory { HandlerMemory(const HandlerMemory &) = delete; HandlerMemory &operator=(const HandlerMemory &) = delete; - TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { + void *allocate(std::size_t /* size */) { return utils::FixedBlockAllocator<128, 8192>::getInstance().allocateBlock(); } - TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { + void deallocate(void *pointer) { utils::FixedBlockAllocator<128, 8192>::getInstance().deallocateBlock( pointer); } @@ -69,13 +71,9 @@ class HandlerMemory { HandlerMemory(const HandlerMemory &) = delete; HandlerMemory &operator=(const HandlerMemory &) = delete; - TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { - return ::operator new(size); - } + void *allocate(std::size_t size) { return ::operator new(size); } - TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { - ::operator delete(pointer); - } + void deallocate(void *pointer) { ::operator delete(pointer); } #endif }; @@ -92,21 +90,19 @@ class HandlerAllocator { HandlerAllocator(const HandlerAllocator<U> &other) noexcept : memory_(other.memory_) {} - TRANSPORT_ALWAYS_INLINE bool operator==( - const HandlerAllocator &other) const noexcept { + bool operator==(const HandlerAllocator &other) const noexcept { return &memory_ == &other.memory_; } - TRANSPORT_ALWAYS_INLINE bool operator!=( - const HandlerAllocator &other) const noexcept { + bool operator!=(const HandlerAllocator &other) const noexcept { return &memory_ != &other.memory_; } - TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n) const { + T *allocate(std::size_t n) const { return static_cast<T *>(memory_.allocate(sizeof(T) * n)); } - TRANSPORT_ALWAYS_INLINE void deallocate(T *p, std::size_t /*n*/) const { + void deallocate(T *p, std::size_t /*n*/) const { return memory_.deallocate(p); } @@ -135,7 +131,7 @@ class CustomAllocatorHandler { } template <typename... Args> - void operator()(Args &&... args) { + void operator()(Args &&...args) { handler_(std::forward<Args>(args)...); } @@ -151,49 +147,16 @@ inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler( return CustomAllocatorHandler<Handler>(m, h); } -class Pool { - public: - Pool(asio::io_service &io_service) : io_service_(io_service) { - increasePendingInterestPool(); - } - - TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() { - // Create pool of pending interests to reuse - for (uint32_t i = 0; i < pit_size; i++) { - pending_interests_pool_.add(new PendingInterest( - Interest::Ptr(nullptr), - std::make_unique<asio::steady_timer>(io_service_))); - } - } - PendingInterest::Ptr getPendingInterest() { - auto res = pending_interests_pool_.get(); - while (TRANSPORT_EXPECT_FALSE(!res.first)) { - increasePendingInterestPool(); - res = pending_interests_pool_.get(); - } - - return std::move(res.second); - } - - private: - utils::ObjectPool<PendingInterest> pending_interests_pool_; - asio::io_service &io_service_; -}; - } // namespace portal_details class PortalConfiguration; -using PendingInterestHashTable = - std::unordered_map<uint32_t, PendingInterest::Ptr>; - -using interface::BindConfig; +using PendingInterestHashTable = std::unordered_map<uint32_t, PendingInterest>; /** * Portal is a opaque class which is used for sending/receiving interest/data - * packets over multiple kind of connector. The connector itself is defined by - * the template ForwarderInt, which is resolved at compile time. It is then not - * possible to decide at runtime what the connector will be. + * packets over multiple kind of io_modules. The io_module itself is an external + * module loaded at runtime. * * The tasks performed by portal are the following: * - Sending/Receiving Interest packets @@ -210,83 +173,117 @@ using interface::BindConfig; * users of this class. */ -class Portal { - public: - using ConsumerCallback = interface::Portal::ConsumerCallback; - using ProducerCallback = interface::Portal::ProducerCallback; +class Portal : public ::utils::NonCopyable, + public std::enable_shared_from_this<Portal> { + private: + Portal() : Portal(GlobalWorkers::getInstance().getWorker()) {} + Portal(::utils::EventThread &worker) + : io_module_(nullptr), + worker_(worker), + app_name_("libtransport_application"), + transport_callback_(nullptr), + is_consumer_(false) {} + + public: + using TransportCallback = interface::Portal::TransportCallback; friend class PortalConfiguration; - Portal() : Portal(internal_io_service_) {} + static std::shared_ptr<Portal> createShared() { + return std::shared_ptr<Portal>(new Portal()); + } - Portal(asio::io_service &io_service) - : io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }), - io_service_(io_service), - packet_pool_(io_service), - app_name_("libtransport_application"), - consumer_callback_(nullptr), - producer_callback_(nullptr), - is_consumer_(false) { - /** - * This workaroung allows to initialize memory for packet buffers *before* - * any static variables that may be initialized in the io_modules. In this - * way static variables in modules will be destroyed before the packet - * memory. - */ - PacketManager<>::getInstance(); + static std::shared_ptr<Portal> createShared(::utils::EventThread &worker) { + return std::shared_ptr<Portal>(new Portal(worker)); } + + bool isConnected() const { return io_module_.get() != nullptr; } + /** - * Set the consumer callback. + * Set the transport callback. Must be called from the same worker thread. * - * @param consumer_callback - The pointer to the ConsumerCallback object. + * @param consumer_callback - The pointer to the TransportCallback object. */ - void setConsumerCallback(ConsumerCallback *consumer_callback) { - consumer_callback_ = consumer_callback; + void registerTransportCallback(TransportCallback *transport_callback) { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + transport_callback_ = transport_callback; } /** - * Set the producer callback. - * - * @param producer_callback - The pointer to the ProducerCallback object. + * Unset the consumer callback. Must be called from the same worker thread. */ - void setProducerCallback(ProducerCallback *producer_callback) { - producer_callback_ = producer_callback; + void unregisterTransportCallback() { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + transport_callback_ = nullptr; } /** - * Specify the output interface to use. This method will be useful in a future - * scenario where the library will be able to forward packets without + * Specify the output interface to use. This method will be useful in a + * future scenario where the library will be able to forward packets without * connecting to a local forwarder. Now it is not used. * * @param output_interface - The output interface to use for * forwarding/receiving packets. */ - TRANSPORT_ALWAYS_INLINE void setOutputInterface( - const std::string &output_interface) { - io_module_->setOutputInterface(output_interface); + void setOutputInterface(const std::string &output_interface) { + if (io_module_) { + io_module_->setOutputInterface(output_interface); + } } /** * Connect the transport to the local hicn forwarder. * - * @param is_consumer - Boolean specifying if the application on top of portal - * is a consumer or a producer. + * @param is_consumer - Boolean specifying if the application on top of + * portal is a consumer or a producer. */ - TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { - if (!io_module_) { - pending_interest_hash_table_.reserve(portal_details::pit_size); - io_module_.reset(IoModule::load(io_module_path_.c_str())); - - CHECK(io_module_); - - io_module_->init(std::bind(&Portal::processIncomingMessages, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3), - std::bind(&Portal::setLocalRoutes, this), io_service_, - app_name_); - io_module_->connect(is_consumer); - is_consumer_ = is_consumer; + void connect(bool is_consumer = true) { + if (isConnected()) { + return; } + + worker_.addAndWaitForExecution([this, is_consumer]() { + if (!io_module_) { + pending_interest_hash_table_.reserve(portal_details::pit_size); + io_module_.reset(IoModule::load(io_module_path_.c_str())); + + CHECK(io_module_); + + std::weak_ptr<Portal> self(shared_from_this()); + + io_module_->init( + [self](Connector *c, const std::vector<utils::MemBuf::Ptr> &buffers, + const std::error_code &ec) { + if (auto ptr = self.lock()) { + ptr->processIncomingMessages(c, buffers, ec); + } + }, + [self](Connector *c, const std::error_code &ec) { + if (!ec) { + return; + } + auto ptr = self.lock(); + if (ptr && ptr->transport_callback_) { + ptr->transport_callback_->onError(ec); + } + }, + [self]([[maybe_unused]] Connector *c) { /* Nothing to do here */ }, + [self](Connector *c, const std::error_code &ec) { + auto ptr = self.lock(); + if (ptr) { + if (ec && ptr->transport_callback_) { + ptr->transport_callback_->onError(ec); + return; + } + ptr->setLocalRoutes(); + } + }, + worker_.getIoService(), app_name_); + + io_module_->connect(is_consumer); + is_consumer_ = is_consumer; + } + }); } /** @@ -295,18 +292,13 @@ class Portal { ~Portal() { killConnection(); } /** - * Compute name hash - */ - TRANSPORT_ALWAYS_INLINE uint32_t getHash(const Name &name) { - return name.getHash32(false) + name.getSuffix(); - } - - /** * Check if there is already a pending interest for a given name. * * @param name - The interest name. */ - TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { + bool interestIsPending(const Name &name) { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + auto it = pending_interest_hash_table_.find(getHash(name)); if (it != pending_interest_hash_table_.end()) { return true; @@ -316,58 +308,53 @@ class Portal { } /** - * Send an interest through to the local forwarder. + * @brief Add interest to PIT * - * @param interest - The pointer to the interest. The ownership of the - * interest is transferred by the caller to portal. - * - * @param on_content_object_callback - If the caller wishes to use a different - * callback to be called for this interest, it can set this parameter. - * Otherwise ConsumerCallback::onContentObject will be used. - * - * @param on_interest_timeout_callback - If the caller wishes to use a - * different callback to be called for this interest, it can set this - * parameter. Otherwise ConsumerCallback::onTimeout will be used. */ - TRANSPORT_ALWAYS_INLINE void sendInterest( - Interest::Ptr &&interest, + void addInterestToPIT( + const Interest::Ptr &interest, uint32_t lifetime, OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, OnInterestTimeoutCallback &&on_interest_timeout_callback = UNSET_CALLBACK) { - // Send it - interest->encodeSuffixes(); - io_module_->send(*interest); - uint32_t initial_hash = interest->getName().getHash32(false); auto hash = initial_hash + interest->getName().getSuffix(); uint32_t seq = interest->getName().getSuffix(); - uint32_t *suffix = interest->firstSuffix(); - auto n_suffixes = interest->numberOfSuffixes(); + const uint32_t *suffix = interest->firstSuffix() != nullptr + ? interest->firstSuffix() + 1 + : nullptr; + auto n_suffixes = + interest->numberOfSuffixes() > 0 ? interest->numberOfSuffixes() - 1 : 0; uint32_t counter = 0; // Set timers do { - auto pending_interest = packet_pool_.getPendingInterest(); - pending_interest->setInterest(interest); - pending_interest->setOnContentObjectCallback( + auto pend_int = pending_interest_hash_table_.try_emplace( + hash, worker_.getIoService(), interest); + PendingInterest &pending_interest = pend_int.first->second; + if (!pend_int.second) { + // element was already in map + pend_int.first->second.cancelTimer(); + pending_interest.setInterest(interest); + } + + pending_interest.setOnContentObjectCallback( std::move(on_content_object_callback)); - pending_interest->setOnTimeoutCallback( + pending_interest.setOnTimeoutCallback( std::move(on_interest_timeout_callback)); - pending_interest->startCountdown( - portal_details::makeCustomAllocatorHandler( - async_callback_memory_, - std::bind(&Portal::timerHandler, this, std::placeholders::_1, - hash, seq))); - - auto it = pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - it->second->cancelTimer(); - - // Get reference to interest packet in order to have it destroyed. - auto _int = it->second->getInterest(); - it->second = std::move(pending_interest); - } else { - pending_interest_hash_table_[hash] = std::move(pending_interest); + if (is_consumer_) { + auto self = weak_from_this(); + pending_interest.startCountdown( + lifetime, portal_details::makeCustomAllocatorHandler( + async_callback_memory_, + [self, hash, seq](const std::error_code &ec) { + if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { + return; + } + + if (auto ptr = self.lock()) { + ptr->timerHandler(hash, seq); + } + })); } if (suffix) { @@ -375,224 +362,261 @@ class Portal { seq = *suffix; suffix++; } - } while (counter++ < n_suffixes); } - /** - * Handler fot the timer set when the interest is sent. - * - * @param ec - Error code which says whether the timer expired or has been - * canceled upon data packet reception. - * - * @param hash - The index of the interest in the pending interest hash table. - */ - TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, - uint32_t hash, uint32_t seq) { - bool is_stopped = io_service_.stopped(); - if (TRANSPORT_EXPECT_FALSE(is_stopped)) { - return; - } + void matchContentObjectInPIT(ContentObject &content_object) { + uint32_t hash = getHash(content_object.getName()); + auto it = pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest."; + + PendingInterest &pend_interest = it->second; + pend_interest.cancelTimer(); + auto _int = pend_interest.getInterest(); + auto callback = pend_interest.getOnDataCallback(); + pending_interest_hash_table_.erase(it); - if (TRANSPORT_EXPECT_TRUE(!ec)) { - PendingInterestHashTable::iterator it = - pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - PendingInterest::Ptr ptr = std::move(it->second); - pending_interest_hash_table_.erase(it); - auto _int = ptr->getInterest(); - Name &name = const_cast<Name &>(_int->getName()); - name.setSuffix(seq); - - if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) { - ptr->on_interest_timeout_callback_(_int, name); - } else if (consumer_callback_) { - consumer_callback_->onTimeout(_int, name); + if (is_consumer_) { + // Send object is for the app + if (callback != UNSET_CALLBACK) { + callback(*_int, content_object); + } else if (transport_callback_) { + transport_callback_->onContentObject(*_int, content_object); } + } else { + // Send content object to the network + io_module_->send(content_object); } + } else if (is_consumer_) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "No interest pending for received content object."; } } /** - * Register a producer name to the local forwarder and optionally set the - * content store size in a per-face manner. + * Send an interest through to the local forwarder. * - * @param config - The configuration for the local forwarder binding. - */ - TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) { - assert(io_module_); - io_module_->setContentStoreSize(config.csReserved()); - served_namespaces_.push_back(config.prefix()); - setLocalRoutes(); - } - - /** - * Start the event loop. This function blocks here and calls the callback set - * by the application upon interest/data received or timeout. + * @param interest - The pointer to the interest. The ownership of the + * interest is transferred by the caller to portal. + * + * @param on_content_object_callback - If the caller wishes to use a + * different callback to be called for this interest, it can set this + * parameter. Otherwise ConsumerCallback::onContentObject will be used. + * + * @param on_interest_timeout_callback - If the caller wishes to use a + * different callback to be called for this interest, it can set this + * parameter. Otherwise ConsumerCallback::onTimeout will be used. */ - TRANSPORT_ALWAYS_INLINE void runEventsLoop() { - if (io_service_.stopped()) { - io_service_.reset(); // ensure that run()/poll() will do some work - } + void sendInterest( + Interest::Ptr &interest, uint32_t lifetime, + OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, + OnInterestTimeoutCallback &&on_interest_timeout_callback = + UNSET_CALLBACK) { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); - io_service_.run(); + addInterestToPIT(interest, lifetime, std::move(on_content_object_callback), + std::move(on_interest_timeout_callback)); + interest->serializeSuffixes(); + io_module_->send(*interest); } /** - * Run one event and return. + * Handler fot the timer set when the interest is sent. + * + * @param ec - Error code which says whether the timer expired or has been + * canceled upon data packet reception. + * + * @param hash - The index of the interest in the pending interest hash + * table. */ - TRANSPORT_ALWAYS_INLINE void runOneEvent() { - if (io_service_.stopped()) { - io_service_.reset(); // ensure that run()/poll() will do some work - } + void timerHandler(uint32_t hash, uint32_t seq) { + PendingInterestHashTable::iterator it = + pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + PendingInterest &pend_interest = it->second; + auto _int = pend_interest.getInterest(); + auto callback = pend_interest.getOnTimeoutCallback(); + pending_interest_hash_table_.erase(it); + Name &name = const_cast<Name &>(_int->getName()); + name.setSuffix(seq); - io_service_.run_one(); + if (callback != UNSET_CALLBACK) { + callback(_int, name); + } else if (transport_callback_) { + transport_callback_->onTimeout(_int, name); + } + } } /** - * Send a data packet to the local forwarder. As opposite to sendInterest, the - * ownership of the content object is not transferred to the portal. + * Send a data packet to the local forwarder. * * @param content_object - The data packet. */ - TRANSPORT_ALWAYS_INLINE void sendContentObject( - ContentObject &content_object) { - io_module_->send(content_object); + void sendContentObject(ContentObject &content_object) { + DCHECK(io_module_); + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + matchContentObjectInPIT(content_object); } /** - * Stop the event loop, canceling all the pending events in the event queue. - * - * Beware that stopping the event loop DOES NOT disconnect the transport from - * the local forwarder, the connector underneath will stay connected. + * Disconnect the transport from the local forwarder. */ - TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { - if (!io_service_.stopped()) { - io_service_.dispatch([this]() { - clear(); - io_service_.stop(); - }); + void killConnection() { + if (TRANSPORT_EXPECT_TRUE(io_module_ != nullptr)) { + io_module_->closeConnection(); } } /** - * Disconnect the transport from the local forwarder. + * Clear the pending interest hash table. */ - TRANSPORT_ALWAYS_INLINE void killConnection() { - io_module_->closeConnection(); + void clear() { + worker_.tryRunHandlerNow([self{shared_from_this()}]() { self->doClear(); }); } /** - * Clear the pending interest hash table. + * Get a reference to the io_service object. */ - TRANSPORT_ALWAYS_INLINE void clear() { - if (!io_service_.stopped()) { - io_service_.dispatch(std::bind(&Portal::doClear, this)); - } else { - doClear(); - } - } + utils::EventThread &getThread() { return worker_; } /** - * Remove one pending interest. + * Register a route to the local forwarder. */ - TRANSPORT_ALWAYS_INLINE void clearOne(const Name &name) { - if (!io_service_.stopped()) { - io_service_.dispatch(std::bind(&Portal::doClearOne, this, name)); - } else { - doClearOne(name); - } + void registerRoute(const Prefix &prefix) { + std::weak_ptr<Portal> self = shared_from_this(); + worker_.tryRunHandlerNow([self, prefix]() { + if (auto ptr = self.lock()) { + auto ret = ptr->served_namespaces_.insert(prefix); + if (ret.second && ptr->io_module_ && ptr->io_module_->isConnected()) { + ptr->io_module_->registerRoute(prefix); + } + } + }); } /** - * Get a reference to the io_service object. + * Send a MAP-Me update to traverse NATs. */ - TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { - return io_service_; + TRANSPORT_ALWAYS_INLINE void sendMapme() { + if (io_module_->isConnected()) { + io_module_->sendMapme(); + } } /** - * Register a route to the local forwarder. + * set forwarding strategy */ - TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) { - served_namespaces_.push_back(prefix); + TRANSPORT_ALWAYS_INLINE void setForwardingStrategy(Prefix &prefix, + std::string &strategy) { if (io_module_->isConnected()) { - io_module_->registerRoute(prefix); + io_module_->setForwardingStrategy(prefix, strategy); } } /** * Check if the transport is connected to a forwarder or not */ - TRANSPORT_ALWAYS_INLINE bool isConnectedToFwd() { + bool isConnectedToFwd() { std::string mod = io_module_path_.substr(0, io_module_path_.find(".")); if (mod == "forwarder_module") return false; return true; } + auto &getServedNamespaces() { return served_namespaces_; } + private: /** + * Compute name hash + */ + uint32_t getHash(const Name &name) { + return name.getHash32(false) + name.getSuffix(); + } + + /** * Clear the pending interest hash table. */ - TRANSPORT_ALWAYS_INLINE void doClear() { + void doClear() { for (auto &pend_interest : pending_interest_hash_table_) { - pend_interest.second->cancelTimer(); - - // Get interest packet from pending interest and do nothing with it. It - // will get destroyed as it goes out of scope. - auto _int = pend_interest.second->getInterest(); + pend_interest.second.cancelTimer(); } pending_interest_hash_table_.clear(); } - /** - * Remove one pending interest. - */ - TRANSPORT_ALWAYS_INLINE void doClearOne(const Name &name) { - auto it = pending_interest_hash_table_.find(getHash(name)); - - if (it != pending_interest_hash_table_.end()) { - it->second->cancelTimer(); + void dumpPIT() { + std::vector<Name> sorted_elements; + for (const auto &[key, value] : pending_interest_hash_table_) { + sorted_elements.push_back(value.getInterestReference()->getName()); + } - // Get interest packet from pending interest and do nothing with it. It - // will get destroyed as it goes out of scope. - auto _int = it->second->getInterest(); + std::sort(sorted_elements.begin(), sorted_elements.end(), + [](const Name &a, const Name &b) { + return a.getSuffix() < b.getSuffix(); + }); - pending_interest_hash_table_.erase(it); + for (auto &elt : sorted_elements) { + LOG(INFO) << elt; } } /** - * Callback called by the underlying connector upon reception of a packet from - * the local forwarder. + * Callback called by the underlying connector upon reception of a packet + * from the local forwarder. * * @param packet_buffer - The bytes of the packet. */ - TRANSPORT_ALWAYS_INLINE void processIncomingMessages( - Connector *c, utils::MemBuf &buffer, const std::error_code &ec) { - bool is_stopped = io_service_.stopped(); - if (TRANSPORT_EXPECT_FALSE(is_stopped)) { + void processIncomingMessages(Connector *c, + const std::vector<utils::MemBuf::Ptr> &buffers, + const std::error_code &ec) { + if (!transport_callback_) { return; } - if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer.data()))) { - processControlMessage(buffer); + if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { + // Error receiving from underlying infra. + if (transport_callback_) { + transport_callback_->onError(ec); + } + return; } - // The buffer is a base class for an interest or a content object - Packet &packet_buffer = static_cast<Packet &>(buffer); + for (auto &buffer_ptr : buffers) { + auto &buffer = *buffer_ptr; - auto format = packet_buffer.getFormat(); - if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { - if (is_consumer_) { - processContentObject(static_cast<ContentObject &>(packet_buffer)); - } else { - processInterest(static_cast<Interest &>(packet_buffer)); + if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer))) { + processControlMessage(buffer); + return; + } + + // The buffer is a base class for an interest or a content object + Packet &packet_buffer = static_cast<Packet &>(buffer); + + switch (packet_buffer.getType()) { + case HICN_PACKET_TYPE_INTEREST: + if (!is_consumer_) { + processInterest(static_cast<Interest &>(packet_buffer)); + } else { + LOG(ERROR) << "Received an Interest packet with name " + << packet_buffer.getName() + << " in a consumer transport. Ignoring it."; + } + break; + case HICN_PACKET_TYPE_DATA: + if (is_consumer_) { + processContentObject(static_cast<ContentObject &>(packet_buffer)); + } else { + LOG(ERROR) << "Received a Data packet with name " + << packet_buffer.getName() + << " in a producer transport. Ignoring it."; + } + break; + default: + LOG(ERROR) << "Received not supported packet. Ignoring it."; + break; } - } else { - LOG(ERROR) << "Received not supported packet. Ignoring it."; } } @@ -601,19 +625,25 @@ class Portal { * It register the prefixes in the served_namespaces_ list to the local * forwarder. */ - TRANSPORT_ALWAYS_INLINE void setLocalRoutes() { + void setLocalRoutes() { + DCHECK(io_module_); + DCHECK(io_module_->isConnected()); + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + for (auto &prefix : served_namespaces_) { - if (io_module_->isConnected()) { - io_module_->registerRoute(prefix); - } + io_module_->registerRoute(prefix); } } - TRANSPORT_ALWAYS_INLINE void processInterest(Interest &interest) { + void processInterest(Interest &interest) { // Interest for a producer DLOG_IF(INFO, VLOG_IS_ON(3)) << "processInterest " << interest.getName(); - if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) { - producer_callback_->onInterest(interest); + + // Save interest in PIT + interest.deserializeSuffixes(); + addInterestToPIT(interest.shared_from_this(), interest.getLifetime()); + if (TRANSPORT_EXPECT_TRUE(transport_callback_ != nullptr)) { + transport_callback_->onInterest(interest); } } @@ -625,57 +655,33 @@ class Portal { * * @param content_object - The data packet */ - TRANSPORT_ALWAYS_INLINE void processContentObject( - ContentObject &content_object) { + void processContentObject(ContentObject &content_object) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "processContentObject " << content_object.getName(); - uint32_t hash = getHash(content_object.getName()); - - auto it = pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest."; - - PendingInterest::Ptr interest_ptr = std::move(it->second); - pending_interest_hash_table_.erase(it); - interest_ptr->cancelTimer(); - auto _int = interest_ptr->getInterest(); - - if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { - interest_ptr->on_content_object_callback_(*_int, content_object); - } else if (consumer_callback_) { - consumer_callback_->onContentObject(*_int, content_object); - } - } else { - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "No interest pending for received content object."; - } + matchContentObjectInPIT(content_object); } /** - * Process a control message. Control messages are different depending on the - * connector, then the forwarder_interface will do the job of understanding - * them. + * Process a control message. Control messages are different depending on + * the connector, then the forwarder_interface will do the job of + * understanding them. */ - TRANSPORT_ALWAYS_INLINE void processControlMessage( - utils::MemBuf &packet_buffer) { + void processControlMessage(utils::MemBuf &packet_buffer) { io_module_->processControlMessageReply(packet_buffer); } private: portal_details::HandlerMemory async_callback_memory_; - std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_; + std::unique_ptr<IoModule> io_module_; - asio::io_service &io_service_; - asio::io_service internal_io_service_; - portal_details::Pool packet_pool_; + ::utils::EventThread &worker_; std::string app_name_; PendingInterestHashTable pending_interest_hash_table_; - std::list<Prefix> served_namespaces_; + std::set<Prefix> served_namespaces_; - ConsumerCallback *consumer_callback_; - ProducerCallback *producer_callback_; + TransportCallback *transport_callback_; bool is_consumer_; |