diff options
Diffstat (limited to 'libtransport/src/core/portal.h')
-rw-r--r-- | libtransport/src/core/portal.h | 704 |
1 files changed, 379 insertions, 325 deletions
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h index 364a36577..c4ba096b9 100644 --- a/libtransport/src/core/portal.h +++ b/libtransport/src/core/portal.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,55 +15,53 @@ #pragma once -#include <core/forwarder_interface.h> +#include <core/global_workers.h> #include <core/pending_interest.h> -#include <core/udp_socket_connector.h> +#include <glog/logging.h> #include <hicn/transport/config.h> +#include <hicn/transport/core/asio_wrapper.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/interest.h> +#include <hicn/transport/core/io_module.h> #include <hicn/transport/core/name.h> #include <hicn/transport/core/prefix.h> #include <hicn/transport/errors/errors.h> +#include <hicn/transport/interfaces/global_conf_interface.h> #include <hicn/transport/interfaces/portal.h> #include <hicn/transport/portability/portability.h> +#include <hicn/transport/utils/event_thread.h> #include <hicn/transport/utils/fixed_block_allocator.h> -#include <hicn/transport/utils/log.h> -#ifdef __vpp__ -#include <core/memif_connector.h> -#endif - -#include <asio.hpp> -#include <asio/steady_timer.hpp> #include <future> #include <memory> #include <queue> #include <unordered_map> +namespace libconfig { +class Setting; +} + namespace transport { namespace core { namespace portal_details { -static constexpr uint32_t pool_size = 2048; +static constexpr uint32_t pit_size = 1024; class HandlerMemory { #ifdef __vpp__ - static constexpr std::size_t memory_size = 1024 * 1024; - public: HandlerMemory() {} HandlerMemory(const HandlerMemory &) = delete; HandlerMemory &operator=(const HandlerMemory &) = delete; - TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { - return utils::FixedBlockAllocator<128, 4096>::getInstance() - ->allocateBlock(); + void *allocate(std::size_t /* size */) { + return utils::FixedBlockAllocator<128, 8192>::getInstance().allocateBlock(); } - TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { - utils::FixedBlockAllocator<128, 4096>::getInstance()->deallocateBlock( + void deallocate(void *pointer) { + utils::FixedBlockAllocator<128, 8192>::getInstance().deallocateBlock( pointer); } #else @@ -73,13 +71,9 @@ class HandlerMemory { HandlerMemory(const HandlerMemory &) = delete; HandlerMemory &operator=(const HandlerMemory &) = delete; - TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { - return ::operator new(size); - } + void *allocate(std::size_t size) { return ::operator new(size); } - TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { - ::operator delete(pointer); - } + void deallocate(void *pointer) { ::operator delete(pointer); } #endif }; @@ -96,21 +90,19 @@ class HandlerAllocator { HandlerAllocator(const HandlerAllocator<U> &other) noexcept : memory_(other.memory_) {} - TRANSPORT_ALWAYS_INLINE bool operator==(const HandlerAllocator &other) const - noexcept { + bool operator==(const HandlerAllocator &other) const noexcept { return &memory_ == &other.memory_; } - TRANSPORT_ALWAYS_INLINE bool operator!=(const HandlerAllocator &other) const - noexcept { + bool operator!=(const HandlerAllocator &other) const noexcept { return &memory_ != &other.memory_; } - TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n) const { + T *allocate(std::size_t n) const { return static_cast<T *>(memory_.allocate(sizeof(T) * n)); } - TRANSPORT_ALWAYS_INLINE void deallocate(T *p, std::size_t /*n*/) const { + void deallocate(T *p, std::size_t /*n*/) const { return memory_.deallocate(p); } @@ -139,7 +131,7 @@ class CustomAllocatorHandler { } template <typename... Args> - void operator()(Args &&... args) { + void operator()(Args &&...args) { handler_(std::forward<Args>(args)...); } @@ -155,86 +147,16 @@ inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler( return CustomAllocatorHandler<Handler>(m, h); } -class Pool { - public: - Pool(asio::io_service &io_service) : io_service_(io_service) { - increasePendingInterestPool(); - increaseInterestPool(); - increaseContentObjectPool(); - } - - TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() { - // Create pool of pending interests to reuse - for (uint32_t i = 0; i < pool_size; i++) { - pending_interests_pool_.add(new PendingInterest( - Interest::Ptr(nullptr), - std::make_unique<asio::steady_timer>(io_service_))); - } - } - - TRANSPORT_ALWAYS_INLINE void increaseInterestPool() { - // Create pool of interests to reuse - for (uint32_t i = 0; i < pool_size; i++) { - interest_pool_.add(new Interest()); - } - } - - TRANSPORT_ALWAYS_INLINE void increaseContentObjectPool() { - // Create pool of content object to reuse - for (uint32_t i = 0; i < pool_size; i++) { - content_object_pool_.add(new ContentObject()); - } - } - - PendingInterest::Ptr getPendingInterest() { - auto res = pending_interests_pool_.get(); - while (TRANSPORT_EXPECT_FALSE(!res.first)) { - increasePendingInterestPool(); - res = pending_interests_pool_.get(); - } - - return std::move(res.second); - } - - TRANSPORT_ALWAYS_INLINE ContentObject::Ptr getContentObject() { - auto res = content_object_pool_.get(); - while (TRANSPORT_EXPECT_FALSE(!res.first)) { - increaseContentObjectPool(); - res = content_object_pool_.get(); - } - - return std::move(res.second); - } - - TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() { - auto res = interest_pool_.get(); - while (TRANSPORT_EXPECT_FALSE(!res.first)) { - increaseInterestPool(); - res = interest_pool_.get(); - } - - return std::move(res.second); - } - - private: - utils::ObjectPool<PendingInterest> pending_interests_pool_; - utils::ObjectPool<ContentObject> content_object_pool_; - utils::ObjectPool<Interest> interest_pool_; - asio::io_service &io_service_; -}; - } // namespace portal_details -using PendingInterestHashTable = - std::unordered_map<uint32_t, PendingInterest::Ptr>; +class PortalConfiguration; -using interface::BindConfig; +using PendingInterestHashTable = std::unordered_map<uint32_t, PendingInterest>; /** * Portal is a opaque class which is used for sending/receiving interest/data - * packets over multiple kind of connector. The connector itself is defined by - * the template ForwarderInt, which is resolved at compile time. It is then not - * possible to decide at runtime what the connector will be. + * packets over multiple kind of io_modules. The io_module itself is an external + * module loaded at runtime. * * The tasks performed by portal are the following: * - Sending/Receiving Interest packets @@ -250,72 +172,118 @@ using interface::BindConfig; * The portal class is not thread safe, appropriate locking is required by the * users of this class. */ -template <typename ForwarderInt> -class Portal { - static_assert( - std::is_base_of<ForwarderInterface<ForwarderInt, - typename ForwarderInt::ConnectorType>, - ForwarderInt>::value, - "ForwarderInt must inherit from ForwarderInterface!"); + +class Portal : public ::utils::NonCopyable, + public std::enable_shared_from_this<Portal> { + private: + Portal() : Portal(GlobalWorkers::getInstance().getWorker()) {} + + Portal(::utils::EventThread &worker) + : io_module_(nullptr), + worker_(worker), + app_name_("libtransport_application"), + transport_callback_(nullptr), + is_consumer_(false) {} public: - using ConsumerCallback = interface::Portal::ConsumerCallback; - using ProducerCallback = interface::Portal::ProducerCallback; + using TransportCallback = interface::Portal::TransportCallback; + friend class PortalConfiguration; - Portal() : Portal(internal_io_service_) {} + static std::shared_ptr<Portal> createShared() { + return std::shared_ptr<Portal>(new Portal()); + } - Portal(asio::io_service &io_service) - : io_service_(io_service), - packet_pool_(io_service), - app_name_("libtransport_application"), - consumer_callback_(nullptr), - producer_callback_(nullptr), - connector_(std::bind(&Portal::processIncomingMessages, this, - std::placeholders::_1), - std::bind(&Portal::setLocalRoutes, this), io_service_, - app_name_), - forwarder_interface_(connector_) {} + static std::shared_ptr<Portal> createShared(::utils::EventThread &worker) { + return std::shared_ptr<Portal>(new Portal(worker)); + } + + bool isConnected() const { return io_module_.get() != nullptr; } /** - * Set the consumer callback. + * Set the transport callback. Must be called from the same worker thread. * - * @param consumer_callback - The pointer to the ConsumerCallback object. + * @param consumer_callback - The pointer to the TransportCallback object. */ - void setConsumerCallback(ConsumerCallback *consumer_callback) { - consumer_callback_ = consumer_callback; + void registerTransportCallback(TransportCallback *transport_callback) { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + transport_callback_ = transport_callback; } /** - * Set the producer callback. - * - * @param producer_callback - The pointer to the ProducerCallback object. + * Unset the consumer callback. Must be called from the same worker thread. */ - void setProducerCallback(ProducerCallback *producer_callback) { - producer_callback_ = producer_callback; + void unregisterTransportCallback() { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + transport_callback_ = nullptr; } /** - * Specify the output interface to use. This method will be useful in a future - * scenario where the library will be able to forward packets without + * Specify the output interface to use. This method will be useful in a + * future scenario where the library will be able to forward packets without * connecting to a local forwarder. Now it is not used. * * @param output_interface - The output interface to use for * forwarding/receiving packets. */ - TRANSPORT_ALWAYS_INLINE void setOutputInterface( - const std::string &output_interface) { - forwarder_interface_.setOutputInterface(output_interface); + void setOutputInterface(const std::string &output_interface) { + if (io_module_) { + io_module_->setOutputInterface(output_interface); + } } /** * Connect the transport to the local hicn forwarder. * - * @param is_consumer - Boolean specifying if the application on top of portal - * is a consumer or a producer. + * @param is_consumer - Boolean specifying if the application on top of + * portal is a consumer or a producer. */ - TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { - pending_interest_hash_table_.reserve(portal_details::pool_size); - forwarder_interface_.connect(is_consumer); + void connect(bool is_consumer = true) { + if (isConnected()) { + return; + } + + worker_.addAndWaitForExecution([this, is_consumer]() { + if (!io_module_) { + pending_interest_hash_table_.reserve(portal_details::pit_size); + io_module_.reset(IoModule::load(io_module_path_.c_str())); + + CHECK(io_module_); + + std::weak_ptr<Portal> self(shared_from_this()); + + io_module_->init( + [self](Connector *c, const std::vector<utils::MemBuf::Ptr> &buffers, + const std::error_code &ec) { + if (auto ptr = self.lock()) { + ptr->processIncomingMessages(c, buffers, ec); + } + }, + [self](Connector *c, const std::error_code &ec) { + if (!ec) { + return; + } + auto ptr = self.lock(); + if (ptr && ptr->transport_callback_) { + ptr->transport_callback_->onError(ec); + } + }, + [self]([[maybe_unused]] Connector *c) { /* Nothing to do here */ }, + [self](Connector *c, const std::error_code &ec) { + auto ptr = self.lock(); + if (ptr) { + if (ec && ptr->transport_callback_) { + ptr->transport_callback_->onError(ec); + return; + } + ptr->setLocalRoutes(); + } + }, + worker_.getIoService(), app_name_); + + io_module_->connect(is_consumer); + is_consumer_ = is_consumer; + } + }); } /** @@ -328,9 +296,10 @@ class Portal { * * @param name - The interest name. */ - TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { - auto it = - pending_interest_hash_table_.find(name.getHash32() + name.getSuffix()); + bool interestIsPending(const Name &name) { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + + auto it = pending_interest_hash_table_.find(getHash(name)); if (it != pending_interest_hash_table_.end()) { return true; } @@ -339,49 +308,117 @@ class Portal { } /** + * @brief Add interest to PIT + * + */ + void addInterestToPIT( + const Interest::Ptr &interest, uint32_t lifetime, + OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, + OnInterestTimeoutCallback &&on_interest_timeout_callback = + UNSET_CALLBACK) { + uint32_t initial_hash = interest->getName().getHash32(false); + auto hash = initial_hash + interest->getName().getSuffix(); + uint32_t seq = interest->getName().getSuffix(); + const uint32_t *suffix = interest->firstSuffix() != nullptr + ? interest->firstSuffix() + 1 + : nullptr; + auto n_suffixes = + interest->numberOfSuffixes() > 0 ? interest->numberOfSuffixes() - 1 : 0; + uint32_t counter = 0; + // Set timers + do { + auto pend_int = pending_interest_hash_table_.try_emplace( + hash, worker_.getIoService(), interest); + PendingInterest &pending_interest = pend_int.first->second; + if (!pend_int.second) { + // element was already in map + pend_int.first->second.cancelTimer(); + pending_interest.setInterest(interest); + } + + pending_interest.setOnContentObjectCallback( + std::move(on_content_object_callback)); + pending_interest.setOnTimeoutCallback( + std::move(on_interest_timeout_callback)); + + if (is_consumer_) { + auto self = weak_from_this(); + pending_interest.startCountdown( + lifetime, portal_details::makeCustomAllocatorHandler( + async_callback_memory_, + [self, hash, seq](const std::error_code &ec) { + if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { + return; + } + + if (auto ptr = self.lock()) { + ptr->timerHandler(hash, seq); + } + })); + } + + if (suffix) { + hash = initial_hash + *suffix; + seq = *suffix; + suffix++; + } + } while (counter++ < n_suffixes); + } + + void matchContentObjectInPIT(ContentObject &content_object) { + uint32_t hash = getHash(content_object.getName()); + auto it = pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest."; + + PendingInterest &pend_interest = it->second; + pend_interest.cancelTimer(); + auto _int = pend_interest.getInterest(); + auto callback = pend_interest.getOnDataCallback(); + pending_interest_hash_table_.erase(it); + + if (is_consumer_) { + // Send object is for the app + if (callback != UNSET_CALLBACK) { + callback(*_int, content_object); + } else if (transport_callback_) { + transport_callback_->onContentObject(*_int, content_object); + } + } else { + // Send content object to the network + io_module_->send(content_object); + } + } else if (is_consumer_) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "No interest pending for received content object."; + } + } + + /** * Send an interest through to the local forwarder. * * @param interest - The pointer to the interest. The ownership of the * interest is transferred by the caller to portal. * - * @param on_content_object_callback - If the caller wishes to use a different - * callback to be called for this interest, it can set this parameter. - * Otherwise ConsumerCallback::onContentObject will be used. + * @param on_content_object_callback - If the caller wishes to use a + * different callback to be called for this interest, it can set this + * parameter. Otherwise ConsumerCallback::onContentObject will be used. * * @param on_interest_timeout_callback - If the caller wishes to use a * different callback to be called for this interest, it can set this * parameter. Otherwise ConsumerCallback::onTimeout will be used. */ - TRANSPORT_ALWAYS_INLINE void sendInterest( - Interest::Ptr &&interest, + void sendInterest( + Interest::Ptr &interest, uint32_t lifetime, OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, OnInterestTimeoutCallback &&on_interest_timeout_callback = UNSET_CALLBACK) { - uint32_t hash = - interest->getName().getHash32() + interest->getName().getSuffix(); - // Send it - forwarder_interface_.send(*interest); - - auto pending_interest = packet_pool_.getPendingInterest(); - pending_interest->setInterest(std::move(interest)); - pending_interest->setOnContentObjectCallback( - std::move(on_content_object_callback)); - pending_interest->setOnTimeoutCallback( - std::move(on_interest_timeout_callback)); - pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler( - async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler, - this, std::placeholders::_1, hash))); + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); - auto it = pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - it->second->cancelTimer(); - - // Get reference to interest packet in order to have it destroyed. - auto _int = it->second->getInterest(); - it->second = std::move(pending_interest); - } else { - pending_interest_hash_table_[hash] = std::move(pending_interest); - } + addInterestToPIT(interest, lifetime, std::move(on_content_object_callback), + std::move(on_interest_timeout_callback)); + interest->serializeSuffixes(); + io_module_->send(*interest); } /** @@ -390,178 +427,196 @@ class Portal { * @param ec - Error code which says whether the timer expired or has been * canceled upon data packet reception. * - * @param hash - The index of the interest in the pending interest hash table. + * @param hash - The index of the interest in the pending interest hash + * table. */ - TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, - uint32_t hash) { - bool is_stopped = io_service_.stopped(); - if (TRANSPORT_EXPECT_FALSE(is_stopped)) { - return; - } + void timerHandler(uint32_t hash, uint32_t seq) { + PendingInterestHashTable::iterator it = + pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + PendingInterest &pend_interest = it->second; + auto _int = pend_interest.getInterest(); + auto callback = pend_interest.getOnTimeoutCallback(); + pending_interest_hash_table_.erase(it); + Name &name = const_cast<Name &>(_int->getName()); + name.setSuffix(seq); - if (TRANSPORT_EXPECT_TRUE(!ec)) { - PendingInterestHashTable::iterator it = - pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - PendingInterest::Ptr ptr = std::move(it->second); - pending_interest_hash_table_.erase(it); - auto _int = ptr->getInterest(); - - if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) { - ptr->on_interest_timeout_callback_(std::move(_int)); - } else if (consumer_callback_) { - consumer_callback_->onTimeout(std::move(_int)); - } + if (callback != UNSET_CALLBACK) { + callback(_int, name); + } else if (transport_callback_) { + transport_callback_->onTimeout(_int, name); } } } /** - * Register a producer name to the local forwarder and optionally set the - * content store size in a per-face manner. + * Send a data packet to the local forwarder. * - * @param config - The configuration for the local forwarder binding. + * @param content_object - The data packet. */ - TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) { - forwarder_interface_.setContentStoreSize(config.csReserved()); - served_namespaces_.push_back(config.prefix()); - - setLocalRoutes(); + void sendContentObject(ContentObject &content_object) { + DCHECK(io_module_); + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + matchContentObjectInPIT(content_object); } /** - * Start the event loop. This function blocks here and calls the callback set - * by the application upon interest/data received or timeout. + * Disconnect the transport from the local forwarder. */ - TRANSPORT_ALWAYS_INLINE void runEventsLoop() { - if (io_service_.stopped()) { - io_service_.reset(); // ensure that run()/poll() will do some work + void killConnection() { + if (TRANSPORT_EXPECT_TRUE(io_module_ != nullptr)) { + io_module_->closeConnection(); } - - io_service_.run(); } /** - * Run one event and return. + * Clear the pending interest hash table. */ - TRANSPORT_ALWAYS_INLINE void runOneEvent() { - if (io_service_.stopped()) { - io_service_.reset(); // ensure that run()/poll() will do some work - } - - io_service_.run_one(); + void clear() { + worker_.tryRunHandlerNow([self{shared_from_this()}]() { self->doClear(); }); } /** - * Send a data packet to the local forwarder. As opposite to sendInterest, the - * ownership of the content object is not transferred to the portal. - * - * @param content_object - The data packet. + * Get a reference to the io_service object. */ - TRANSPORT_ALWAYS_INLINE void sendContentObject( - ContentObject &content_object) { - forwarder_interface_.send(content_object); - } + utils::EventThread &getThread() { return worker_; } /** - * Stop the event loop, canceling all the pending events in the event queue. - * - * Beware that stopping the event loop DOES NOT disconnect the transport from - * the local forwarder, the connector underneath will stay connected. + * Register a route to the local forwarder. */ - TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { - if (!io_service_.stopped()) { - io_service_.dispatch([this]() { - clear(); - io_service_.stop(); - }); - } + void registerRoute(const Prefix &prefix) { + std::weak_ptr<Portal> self = shared_from_this(); + worker_.tryRunHandlerNow([self, prefix]() { + if (auto ptr = self.lock()) { + auto ret = ptr->served_namespaces_.insert(prefix); + if (ret.second && ptr->io_module_ && ptr->io_module_->isConnected()) { + ptr->io_module_->registerRoute(prefix); + } + } + }); } /** - * Disconnect the transport from the local forwarder. + * Send a MAP-Me update to traverse NATs. */ - TRANSPORT_ALWAYS_INLINE void killConnection() { - forwarder_interface_.closeConnection(); + TRANSPORT_ALWAYS_INLINE void sendMapme() { + if (io_module_->isConnected()) { + io_module_->sendMapme(); + } } /** - * Clear the pending interest hash table. + * set forwarding strategy */ - TRANSPORT_ALWAYS_INLINE void clear() { - if (!io_service_.stopped()) { - io_service_.dispatch(std::bind(&Portal::doClear, this)); - } else { - doClear(); + TRANSPORT_ALWAYS_INLINE void setForwardingStrategy(Prefix &prefix, + std::string &strategy) { + if (io_module_->isConnected()) { + io_module_->setForwardingStrategy(prefix, strategy); } } /** - * Get a reference to the io_service object. + * Check if the transport is connected to a forwarder or not */ - TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { - return io_service_; + bool isConnectedToFwd() { + std::string mod = io_module_path_.substr(0, io_module_path_.find(".")); + if (mod == "forwarder_module") return false; + return true; } + auto &getServedNamespaces() { return served_namespaces_; } + + private: /** - * Register a route to the local forwarder. + * Compute name hash */ - TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) { - served_namespaces_.push_back(prefix); - if (connector_.isConnected()) { - forwarder_interface_.registerRoute(prefix); - } + uint32_t getHash(const Name &name) { + return name.getHash32(false) + name.getSuffix(); } - private: /** * Clear the pending interest hash table. */ - TRANSPORT_ALWAYS_INLINE void doClear() { + void doClear() { for (auto &pend_interest : pending_interest_hash_table_) { - pend_interest.second->cancelTimer(); - - // Get interest packet from pending interest and do nothing with it. It - // will get destroyed as it goes out of scope. - auto _int = pend_interest.second->getInterest(); + pend_interest.second.cancelTimer(); } pending_interest_hash_table_.clear(); } + void dumpPIT() { + std::vector<Name> sorted_elements; + for (const auto &[key, value] : pending_interest_hash_table_) { + sorted_elements.push_back(value.getInterestReference()->getName()); + } + + std::sort(sorted_elements.begin(), sorted_elements.end(), + [](const Name &a, const Name &b) { + return a.getSuffix() < b.getSuffix(); + }); + + for (auto &elt : sorted_elements) { + LOG(INFO) << elt; + } + } + /** - * Callback called by the underlying connector upon reception of a packet from - * the local forwarder. + * Callback called by the underlying connector upon reception of a packet + * from the local forwarder. * * @param packet_buffer - The bytes of the packet. */ - TRANSPORT_ALWAYS_INLINE void processIncomingMessages( - Packet::MemBufPtr &&packet_buffer) { - bool is_stopped = io_service_.stopped(); - if (TRANSPORT_EXPECT_FALSE(is_stopped)) { + void processIncomingMessages(Connector *c, + const std::vector<utils::MemBuf::Ptr> &buffers, + const std::error_code &ec) { + if (!transport_callback_) { return; } - if (TRANSPORT_EXPECT_FALSE( - ForwarderInt::isControlMessage(packet_buffer->data()))) { - processControlMessage(std::move(packet_buffer)); + if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { + // Error receiving from underlying infra. + if (transport_callback_) { + transport_callback_->onError(ec); + } + return; } - Packet::Format format = Packet::getFormatFromBuffer(packet_buffer->data()); + for (auto &buffer_ptr : buffers) { + auto &buffer = *buffer_ptr; - if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { - if (!Packet::isInterest(packet_buffer->data())) { - auto content_object = packet_pool_.getContentObject(); - content_object->replace(std::move(packet_buffer)); - processContentObject(std::move(content_object)); - } else { - auto interest = packet_pool_.getInterest(); - interest->replace(std::move(packet_buffer)); - processInterest(std::move(interest)); + if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer))) { + processControlMessage(buffer); + return; + } + + // The buffer is a base class for an interest or a content object + Packet &packet_buffer = static_cast<Packet &>(buffer); + + switch (packet_buffer.getType()) { + case HICN_PACKET_TYPE_INTEREST: + if (!is_consumer_) { + processInterest(static_cast<Interest &>(packet_buffer)); + } else { + LOG(ERROR) << "Received an Interest packet with name " + << packet_buffer.getName() + << " in a consumer transport. Ignoring it."; + } + break; + case HICN_PACKET_TYPE_DATA: + if (is_consumer_) { + processContentObject(static_cast<ContentObject &>(packet_buffer)); + } else { + LOG(ERROR) << "Received a Data packet with name " + << packet_buffer.getName() + << " in a producer transport. Ignoring it."; + } + break; + default: + LOG(ERROR) << "Received not supported packet. Ignoring it."; + break; } - } else { - TRANSPORT_LOGE("Received not supported packet. Ignoring it."); } } @@ -570,18 +625,25 @@ class Portal { * It register the prefixes in the served_namespaces_ list to the local * forwarder. */ - TRANSPORT_ALWAYS_INLINE void setLocalRoutes() { + void setLocalRoutes() { + DCHECK(io_module_); + DCHECK(io_module_->isConnected()); + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + for (auto &prefix : served_namespaces_) { - if (connector_.isConnected()) { - forwarder_interface_.registerRoute(prefix); - } + io_module_->registerRoute(prefix); } } - TRANSPORT_ALWAYS_INLINE void processInterest(Interest::Ptr &&interest) { + void processInterest(Interest &interest) { // Interest for a producer - if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) { - producer_callback_->onInterest(std::move(interest)); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "processInterest " << interest.getName(); + + // Save interest in PIT + interest.deserializeSuffixes(); + addInterestToPIT(interest.shared_from_this(), interest.getLifetime()); + if (TRANSPORT_EXPECT_TRUE(transport_callback_ != nullptr)) { + transport_callback_->onInterest(interest); } } @@ -593,55 +655,47 @@ class Portal { * * @param content_object - The data packet */ - TRANSPORT_ALWAYS_INLINE void processContentObject( - ContentObject::Ptr &&content_object) { - uint32_t hash = content_object->getName().getHash32() + - content_object->getName().getSuffix(); - - auto it = pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - PendingInterest::Ptr interest_ptr = std::move(it->second); - pending_interest_hash_table_.erase(it); - interest_ptr->cancelTimer(); - auto _int = interest_ptr->getInterest(); - - if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { - interest_ptr->on_content_object_callback_(std::move(_int), - std::move(content_object)); - } else if (consumer_callback_) { - consumer_callback_->onContentObject(std::move(_int), - std::move(content_object)); - } - } + void processContentObject(ContentObject &content_object) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "processContentObject " << content_object.getName(); + matchContentObjectInPIT(content_object); } /** - * Process a control message. Control messages are different depending on the - * connector, then the forwarder_interface will do the job of understanding - * them. + * Process a control message. Control messages are different depending on + * the connector, then the forwarder_interface will do the job of + * understanding them. */ - TRANSPORT_ALWAYS_INLINE void processControlMessage( - Packet::MemBufPtr &&packet_buffer) { - forwarder_interface_.processControlMessageReply(std::move(packet_buffer)); + void processControlMessage(utils::MemBuf &packet_buffer) { + io_module_->processControlMessageReply(packet_buffer); } private: portal_details::HandlerMemory async_callback_memory_; + std::unique_ptr<IoModule> io_module_; - asio::io_service &io_service_; - asio::io_service internal_io_service_; - portal_details::Pool packet_pool_; + ::utils::EventThread &worker_; std::string app_name_; PendingInterestHashTable pending_interest_hash_table_; - std::list<Prefix> served_namespaces_; + std::set<Prefix> served_namespaces_; - ConsumerCallback *consumer_callback_; - ProducerCallback *producer_callback_; + TransportCallback *transport_callback_; - typename ForwarderInt::ConnectorType connector_; - ForwarderInt forwarder_interface_; + bool is_consumer_; + + private: + static std::string defaultIoModule(); + static void parseIoModuleConfiguration(const libconfig::Setting &io_config, + std::error_code &ec); + static void getModuleConfiguration( + interface::global_config::ConfigurationObject &conf, std::error_code &ec); + static void setModuleConfiguration( + const interface::global_config::ConfigurationObject &conf, + std::error_code &ec); + static interface::global_config::IoModuleConfiguration conf_; + static std::string io_module_path_; }; } // namespace core |