diff options
Diffstat (limited to 'libtransport/src/core/portal.h')
-rw-r--r-- | libtransport/src/core/portal.h | 285 |
1 files changed, 158 insertions, 127 deletions
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h index b63eab3af..59254cf7b 100644 --- a/libtransport/src/core/portal.h +++ b/libtransport/src/core/portal.h @@ -15,24 +15,20 @@ #pragma once -#include <core/forwarder_interface.h> #include <core/pending_interest.h> -#include <core/udp_socket_connector.h> #include <hicn/transport/config.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/interest.h> +#include <hicn/transport/core/io_module.h> #include <hicn/transport/core/name.h> #include <hicn/transport/core/prefix.h> #include <hicn/transport/errors/errors.h> +#include <hicn/transport/interfaces/global_conf_interface.h> #include <hicn/transport/interfaces/portal.h> #include <hicn/transport/portability/portability.h> #include <hicn/transport/utils/fixed_block_allocator.h> #include <hicn/transport/utils/log.h> -#ifdef __vpp__ -#include <core/memif_connector.h> -#endif - #include <asio.hpp> #include <asio/steady_timer.hpp> #include <future> @@ -40,17 +36,19 @@ #include <queue> #include <unordered_map> +namespace libconfig { +class Setting; +} + namespace transport { namespace core { namespace portal_details { -static constexpr uint32_t pool_size = 2048; +static constexpr uint32_t pit_size = 1024; class HandlerMemory { #ifdef __vpp__ - static constexpr std::size_t memory_size = 1024 * 1024; - public: HandlerMemory() {} @@ -58,12 +56,11 @@ class HandlerMemory { HandlerMemory &operator=(const HandlerMemory &) = delete; TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { - return utils::FixedBlockAllocator<128, 4096>::getInstance() - ->allocateBlock(); + return utils::FixedBlockAllocator<128, 8192>::getInstance().allocateBlock(); } TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { - utils::FixedBlockAllocator<128, 4096>::getInstance()->deallocateBlock( + utils::FixedBlockAllocator<128, 8192>::getInstance().deallocateBlock( pointer); } #else @@ -159,33 +156,16 @@ class Pool { public: Pool(asio::io_service &io_service) : io_service_(io_service) { increasePendingInterestPool(); - increaseInterestPool(); - increaseContentObjectPool(); } TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() { // Create pool of pending interests to reuse - for (uint32_t i = 0; i < pool_size; i++) { + for (uint32_t i = 0; i < pit_size; i++) { pending_interests_pool_.add(new PendingInterest( Interest::Ptr(nullptr), std::make_unique<asio::steady_timer>(io_service_))); } } - - TRANSPORT_ALWAYS_INLINE void increaseInterestPool() { - // Create pool of interests to reuse - for (uint32_t i = 0; i < pool_size; i++) { - interest_pool_.add(new Interest()); - } - } - - TRANSPORT_ALWAYS_INLINE void increaseContentObjectPool() { - // Create pool of content object to reuse - for (uint32_t i = 0; i < pool_size; i++) { - content_object_pool_.add(new ContentObject()); - } - } - PendingInterest::Ptr getPendingInterest() { auto res = pending_interests_pool_.get(); while (TRANSPORT_EXPECT_FALSE(!res.first)) { @@ -196,35 +176,15 @@ class Pool { return std::move(res.second); } - TRANSPORT_ALWAYS_INLINE ContentObject::Ptr getContentObject() { - auto res = content_object_pool_.get(); - while (TRANSPORT_EXPECT_FALSE(!res.first)) { - increaseContentObjectPool(); - res = content_object_pool_.get(); - } - - return std::move(res.second); - } - - TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() { - auto res = interest_pool_.get(); - while (TRANSPORT_EXPECT_FALSE(!res.first)) { - increaseInterestPool(); - res = interest_pool_.get(); - } - - return std::move(res.second); - } - private: utils::ObjectPool<PendingInterest> pending_interests_pool_; - utils::ObjectPool<ContentObject> content_object_pool_; - utils::ObjectPool<Interest> interest_pool_; asio::io_service &io_service_; }; } // namespace portal_details +class PortalConfiguration; + using PendingInterestHashTable = std::unordered_map<uint32_t, PendingInterest::Ptr>; @@ -250,32 +210,32 @@ using interface::BindConfig; * The portal class is not thread safe, appropriate locking is required by the * users of this class. */ -template <typename ForwarderInt> -class Portal { - static_assert( - std::is_base_of<ForwarderInterface<ForwarderInt, - typename ForwarderInt::ConnectorType>, - ForwarderInt>::value, - "ForwarderInt must inherit from ForwarderInterface!"); +class Portal { public: using ConsumerCallback = interface::Portal::ConsumerCallback; using ProducerCallback = interface::Portal::ProducerCallback; + friend class PortalConfiguration; + Portal() : Portal(internal_io_service_) {} Portal(asio::io_service &io_service) - : io_service_(io_service), + : io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }), + io_service_(io_service), packet_pool_(io_service), app_name_("libtransport_application"), consumer_callback_(nullptr), producer_callback_(nullptr), - connector_(std::bind(&Portal::processIncomingMessages, this, - std::placeholders::_1), - std::bind(&Portal::setLocalRoutes, this), io_service_, - app_name_), - forwarder_interface_(connector_) {} - + is_consumer_(false) { + /** + * This workaroung allows to initialize memory for packet buffers *before* + * any static variables that may be initialized in the io_modules. In this + * way static variables in modules will be destroyed before the packet + * memory. + */ + PacketManager<>::getInstance(); + } /** * Set the consumer callback. * @@ -304,7 +264,7 @@ class Portal { */ TRANSPORT_ALWAYS_INLINE void setOutputInterface( const std::string &output_interface) { - forwarder_interface_.setOutputInterface(output_interface); + io_module_->setOutputInterface(output_interface); } /** @@ -314,8 +274,19 @@ class Portal { * is a consumer or a producer. */ TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { - pending_interest_hash_table_.reserve(portal_details::pool_size); - forwarder_interface_.connect(is_consumer); + if (!io_module_) { + pending_interest_hash_table_.reserve(portal_details::pit_size); + io_module_.reset(IoModule::load(io_module_path_.c_str())); + assert(io_module_); + + io_module_->init(std::bind(&Portal::processIncomingMessages, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3), + std::bind(&Portal::setLocalRoutes, this), io_service_, + app_name_); + io_module_->connect(is_consumer); + is_consumer_ = is_consumer; + } } /** @@ -324,13 +295,19 @@ class Portal { ~Portal() { killConnection(); } /** + * Compute name hash + */ + TRANSPORT_ALWAYS_INLINE uint32_t getHash(const Name &name) { + return name.getHash32() + name.getSuffix(); + } + + /** * Check if there is already a pending interest for a given name. * * @param name - The interest name. */ TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { - auto it = - pending_interest_hash_table_.find(name.getHash32() + name.getSuffix()); + auto it = pending_interest_hash_table_.find(getHash(name)); if (it != pending_interest_hash_table_.end()) { return true; } @@ -357,31 +334,46 @@ class Portal { OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, OnInterestTimeoutCallback &&on_interest_timeout_callback = UNSET_CALLBACK) { - uint32_t hash = - interest->getName().getHash32() + interest->getName().getSuffix(); // Send it - forwarder_interface_.send(*interest); - - auto pending_interest = packet_pool_.getPendingInterest(); - pending_interest->setInterest(std::move(interest)); - pending_interest->setOnContentObjectCallback( - std::move(on_content_object_callback)); - pending_interest->setOnTimeoutCallback( - std::move(on_interest_timeout_callback)); - pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler( - async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler, - this, std::placeholders::_1, hash))); + interest->encodeSuffixes(); + io_module_->send(*interest); + + uint32_t initial_hash = interest->getName().getHash32(); + auto hash = initial_hash + interest->getName().getSuffix(); + uint32_t *suffix = interest->firstSuffix(); + auto n_suffixes = interest->numberOfSuffixes(); + uint32_t counter = 0; + // Set timers + do { + auto pending_interest = packet_pool_.getPendingInterest(); + pending_interest->setInterest(std::move(interest)); + pending_interest->setOnContentObjectCallback( + std::move(on_content_object_callback)); + pending_interest->setOnTimeoutCallback( + std::move(on_interest_timeout_callback)); + + pending_interest->startCountdown( + portal_details::makeCustomAllocatorHandler( + async_callback_memory_, std::bind(&Portal::timerHandler, this, + std::placeholders::_1, hash))); + + auto it = pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + it->second->cancelTimer(); - auto it = pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - it->second->cancelTimer(); + // Get reference to interest packet in order to have it destroyed. + auto _int = it->second->getInterest(); + it->second = std::move(pending_interest); + } else { + pending_interest_hash_table_[hash] = std::move(pending_interest); + } - // Get reference to interest packet in order to have it destroyed. - auto _int = it->second->getInterest(); - it->second = std::move(pending_interest); - } else { - pending_interest_hash_table_[hash] = std::move(pending_interest); - } + if (suffix) { + hash = initial_hash + *suffix; + suffix++; + } + + } while (counter++ < n_suffixes); } /** @@ -423,9 +415,9 @@ class Portal { * @param config - The configuration for the local forwarder binding. */ TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) { - forwarder_interface_.setContentStoreSize(config.csReserved()); + assert(io_module_); + io_module_->setContentStoreSize(config.csReserved()); served_namespaces_.push_back(config.prefix()); - setLocalRoutes(); } @@ -460,7 +452,7 @@ class Portal { */ TRANSPORT_ALWAYS_INLINE void sendContentObject( ContentObject &content_object) { - forwarder_interface_.send(content_object); + io_module_->send(content_object); } /** @@ -482,7 +474,7 @@ class Portal { * Disconnect the transport from the local forwarder. */ TRANSPORT_ALWAYS_INLINE void killConnection() { - forwarder_interface_.closeConnection(); + io_module_->closeConnection(); } /** @@ -497,6 +489,17 @@ class Portal { } /** + * Remove one pending interest. + */ + TRANSPORT_ALWAYS_INLINE void clearOne(const Name &name) { + if (!io_service_.stopped()) { + io_service_.dispatch(std::bind(&Portal::doClearOne, this, name)); + } else { + doClearOne(name); + } + } + + /** * Get a reference to the io_service object. */ TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { @@ -508,8 +511,8 @@ class Portal { */ TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) { served_namespaces_.push_back(prefix); - if (connector_.isConnected()) { - forwarder_interface_.registerRoute(prefix); + if (io_module_->isConnected()) { + io_module_->registerRoute(prefix); } } @@ -530,36 +533,49 @@ class Portal { } /** + * Remove one pending interest. + */ + TRANSPORT_ALWAYS_INLINE void doClearOne(const Name &name) { + auto it = pending_interest_hash_table_.find(getHash(name)); + + if (it != pending_interest_hash_table_.end()) { + it->second->cancelTimer(); + + // Get interest packet from pending interest and do nothing with it. It + // will get destroyed as it goes out of scope. + auto _int = it->second->getInterest(); + + pending_interest_hash_table_.erase(it); + } + } + + /** * Callback called by the underlying connector upon reception of a packet from * the local forwarder. * * @param packet_buffer - The bytes of the packet. */ TRANSPORT_ALWAYS_INLINE void processIncomingMessages( - Packet::MemBufPtr &&packet_buffer) { + Connector *c, utils::MemBuf &buffer, const std::error_code &ec) { bool is_stopped = io_service_.stopped(); if (TRANSPORT_EXPECT_FALSE(is_stopped)) { return; } - if (TRANSPORT_EXPECT_FALSE( - ForwarderInt::isControlMessage(packet_buffer->data()))) { - processControlMessage(std::move(packet_buffer)); + if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer.data()))) { + processControlMessage(buffer); return; } - Packet::Format format = Packet::getFormatFromBuffer( - packet_buffer->data(), packet_buffer->length()); + // The buffer is a base class for an interest or a content object + Packet &packet_buffer = static_cast<Packet &>(buffer); + auto format = packet_buffer.getFormat(); if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { - if (!Packet::isInterest(packet_buffer->data())) { - auto content_object = packet_pool_.getContentObject(); - content_object->replace(std::move(packet_buffer)); - processContentObject(std::move(content_object)); + if (is_consumer_) { + processContentObject(static_cast<ContentObject &>(packet_buffer)); } else { - auto interest = packet_pool_.getInterest(); - interest->replace(std::move(packet_buffer)); - processInterest(std::move(interest)); + processInterest(static_cast<Interest &>(packet_buffer)); } } else { TRANSPORT_LOGE("Received not supported packet. Ignoring it."); @@ -573,16 +589,16 @@ class Portal { */ TRANSPORT_ALWAYS_INLINE void setLocalRoutes() { for (auto &prefix : served_namespaces_) { - if (connector_.isConnected()) { - forwarder_interface_.registerRoute(prefix); + if (io_module_->isConnected()) { + io_module_->registerRoute(prefix); } } } - TRANSPORT_ALWAYS_INLINE void processInterest(Interest::Ptr &&interest) { + TRANSPORT_ALWAYS_INLINE void processInterest(Interest &interest) { // Interest for a producer if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) { - producer_callback_->onInterest(std::move(interest)); + producer_callback_->onInterest(interest); } } @@ -595,24 +611,27 @@ class Portal { * @param content_object - The data packet */ TRANSPORT_ALWAYS_INLINE void processContentObject( - ContentObject::Ptr &&content_object) { - uint32_t hash = content_object->getName().getHash32() + - content_object->getName().getSuffix(); + ContentObject &content_object) { + TRANSPORT_LOGD("processContentObject %s", + content_object.getName().toString().c_str()); + uint32_t hash = getHash(content_object.getName()); auto it = pending_interest_hash_table_.find(hash); if (it != pending_interest_hash_table_.end()) { + TRANSPORT_LOGD("Found pending interest."); + PendingInterest::Ptr interest_ptr = std::move(it->second); pending_interest_hash_table_.erase(it); interest_ptr->cancelTimer(); auto _int = interest_ptr->getInterest(); if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { - interest_ptr->on_content_object_callback_(std::move(_int), - std::move(content_object)); + interest_ptr->on_content_object_callback_(*_int, content_object); } else if (consumer_callback_) { - consumer_callback_->onContentObject(std::move(_int), - std::move(content_object)); + consumer_callback_->onContentObject(*_int, content_object); } + } else { + TRANSPORT_LOGD("No interest pending for received content object."); } } @@ -622,12 +641,13 @@ class Portal { * them. */ TRANSPORT_ALWAYS_INLINE void processControlMessage( - Packet::MemBufPtr &&packet_buffer) { - forwarder_interface_.processControlMessageReply(std::move(packet_buffer)); + utils::MemBuf &packet_buffer) { + io_module_->processControlMessageReply(packet_buffer); } private: portal_details::HandlerMemory async_callback_memory_; + std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_; asio::io_service &io_service_; asio::io_service internal_io_service_; @@ -641,8 +661,19 @@ class Portal { ConsumerCallback *consumer_callback_; ProducerCallback *producer_callback_; - typename ForwarderInt::ConnectorType connector_; - ForwarderInt forwarder_interface_; + bool is_consumer_; + + private: + static std::string defaultIoModule(); + static void parseIoModuleConfiguration(const libconfig::Setting &io_config, + std::error_code &ec); + static void getModuleConfiguration( + interface::global_config::ConfigurationObject &conf, std::error_code &ec); + static void setModuleConfiguration( + const interface::global_config::ConfigurationObject &conf, + std::error_code &ec); + static interface::global_config::IoModuleConfiguration conf_; + static std::string io_module_path_; }; } // namespace core |