diff options
author | Mauro Sardara <msardara@cisco.com> | 2019-02-20 14:32:42 +0100 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2019-03-05 11:42:39 +0100 |
commit | 9d0002e5cb97d939f2f74ab1e635b616d634e7db (patch) | |
tree | 730e4240a637264f859b3b7efeeb8fb83d476f3d /libtransport/src/hicn/transport/core/portal.h | |
parent | 6d7704c1b497341fd6dd3c27e3f64d0db062ccc2 (diff) |
[HICN-73] Performance improvements of interest/data transmission and reception at low level in the stack (portal.h)
Change-Id: I1525726f52040f1609e284bb9b995ea8794c5d5e
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/core/portal.h')
-rw-r--r-- | libtransport/src/hicn/transport/core/portal.h | 284 |
1 files changed, 220 insertions, 64 deletions
diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index 7efbc2009..4abcbe2cd 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -35,16 +35,182 @@ #include <asio/steady_timer.hpp> #include <future> #include <memory> +#include <queue> #include <unordered_map> #define UNSET_CALLBACK 0 namespace transport { - namespace core { -typedef std::unordered_map<Name, std::unique_ptr<PendingInterest>> - PendingInterestHashTable; +namespace portal_details { + +static constexpr uint32_t pool_size = 2048; + +class HandlerMemory { + static constexpr std::size_t memory_size = 1024 * 1024; + public: + HandlerMemory() : index_(0) { } + + HandlerMemory(const HandlerMemory&) = delete; + HandlerMemory& operator=(const HandlerMemory&) = delete; + + TRANSPORT_ALWAYS_INLINE void* allocate(std::size_t size) { + return &storage_[index_++ % memory_size]; + } + + TRANSPORT_ALWAYS_INLINE void deallocate(void* pointer) { } + + private: + // Storage space used for handler-based custom memory allocation. + typename std::aligned_storage<128>::type storage_[memory_size]; + uint32_t index_; +}; + +// The allocator to be associated with the handler objects. This allocator only +// needs to satisfy the C++11 minimal allocator requirements. +template <typename T> +class HandlerAllocator { + public: + using value_type = T; + + explicit HandlerAllocator(HandlerMemory& mem) + : memory_(mem) {} + + template <typename U> + HandlerAllocator(const HandlerAllocator<U>& other) noexcept + : memory_(other.memory_) { } + + TRANSPORT_ALWAYS_INLINE bool operator==(const HandlerAllocator& other) const noexcept { + return &memory_ == &other.memory_; + } + + TRANSPORT_ALWAYS_INLINE bool operator!=(const HandlerAllocator& other) const noexcept { + return &memory_ != &other.memory_; + } + + TRANSPORT_ALWAYS_INLINE T* allocate(std::size_t n) const { + return static_cast<T*>(memory_.allocate(sizeof(T) * n)); + } + + TRANSPORT_ALWAYS_INLINE void deallocate(T* p, std::size_t /*n*/) const { + return memory_.deallocate(p); + } + + private: + template <typename> friend class HandlerAllocator; + + // The underlying memory. + HandlerMemory& memory_; +}; + +// Wrapper class template for handler objects to allow handler memory +// allocation to be customised. The allocator_type type and get_allocator() +// member function are used by the asynchronous operations to obtain the +// allocator. Calls to operator() are forwarded to the encapsulated handler. +template <typename Handler> +class CustomAllocatorHandler { + public: + using allocator_type = HandlerAllocator<Handler>; + + CustomAllocatorHandler(HandlerMemory& m, Handler h) + : memory_(m), + handler_(h) { } + + allocator_type get_allocator() const noexcept { + return allocator_type(memory_); + } + + template <typename ...Args> + void operator()(Args&&... args) { + handler_(std::forward<Args>(args)...); + } + + private: + HandlerMemory& memory_; + Handler handler_; +}; + +// Helper function to wrap a handler object to add custom allocation. +template <typename Handler> +inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler( + HandlerMemory& m, Handler h) { + return CustomAllocatorHandler<Handler>(m, h); +} + +class Pool { + public: + Pool(asio::io_service &io_service) + : io_service_(io_service) { + increasePendingInterestPool(); + increaseInterestPool(); + increaseContentObjectPool(); + } + + TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() { + // Create pool of pending interests to reuse + for (uint32_t i = 0; i < pool_size; i++) { + pending_interests_pool_.add(new PendingInterest( + Interest::Ptr(nullptr), + std::make_unique<asio::steady_timer>(io_service_))); + } + } + + TRANSPORT_ALWAYS_INLINE void increaseInterestPool() { + // Create pool of interests to reuse + for (uint32_t i = 0; i < pool_size; i++) { + interest_pool_.add(new Interest()); + } + } + + TRANSPORT_ALWAYS_INLINE void increaseContentObjectPool() { + // Create pool of content object to reuse + for (uint32_t i = 0; i < pool_size; i++) { + content_object_pool_.add(new ContentObject()); + } + } + + PendingInterest::Ptr getPendingInterest() { + auto res = pending_interests_pool_.get(); + while (TRANSPORT_EXPECT_FALSE(!res.first)) { + increasePendingInterestPool(); + res = pending_interests_pool_.get(); + } + + return std::move(res.second); + } + + TRANSPORT_ALWAYS_INLINE ContentObject::Ptr getContentObject() { + auto res = content_object_pool_.get(); + while (TRANSPORT_EXPECT_FALSE(!res.first)) { + increaseContentObjectPool(); + res = content_object_pool_.get(); + } + + return std::move(res.second); + } + + TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() { + auto res = interest_pool_.get(); + while (TRANSPORT_EXPECT_FALSE(!res.first)) { + increaseInterestPool(); + res = interest_pool_.get(); + } + + return std::move(res.second); + } + + private: + utils::ObjectPool<PendingInterest> pending_interests_pool_; + utils::ObjectPool<ContentObject> content_object_pool_; + utils::ObjectPool<Interest> interest_pool_; + asio::io_service &io_service_; +}; + +} + +using PendingInterestHashTable = + std::unordered_map<uint32_t, PendingInterest::Ptr>; template <typename PrefixType> class BasicBindConfig { @@ -84,7 +250,6 @@ class Portal { typename ForwarderInt::ConnectorType>, ForwarderInt>::value, "ForwarderInt must inherit from ForwarderInterface!"); - public: class ConsumerCallback { public: @@ -108,7 +273,8 @@ class Portal { std::placeholders::_1), std::bind(&Portal::setLocalRoutes, this), io_service_, app_name_), - forwarder_interface_(connector_) {} + forwarder_interface_(connector_), + packet_pool_(io_service) { } void setConsumerCallback(ConsumerCallback *consumer_callback) { consumer_callback_ = consumer_callback; @@ -124,64 +290,59 @@ class Portal { } TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { + pending_interest_hash_table_.reserve(portal_details::pool_size); forwarder_interface_.connect(is_consumer); } ~Portal() { stopEventsLoop(true); } TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { - auto it = pending_interest_hash_table_.find(name); - if (it != pending_interest_hash_table_.end()) - if (!it->second->isReceived()) return true; + auto it = + pending_interest_hash_table_.find(name.getHash32() + name.getSuffix()); + if (it != pending_interest_hash_table_.end()) { + return true; + } return false; } - TRANSPORT_ALWAYS_INLINE void sendInterest(Interest::Ptr &&interest) { - const Name name(interest->getName(), true); - - // Send it - forwarder_interface_.send(*interest); - - pending_interest_hash_table_[name] = std::make_unique<PendingInterest>( - std::move(interest), std::make_unique<asio::steady_timer>(io_service_)); - - pending_interest_hash_table_[name]->startCountdown( - std::bind(&Portal<ForwarderInt>::timerHandler, this, - std::placeholders::_1, name)); - } - TRANSPORT_ALWAYS_INLINE void sendInterest( Interest::Ptr &&interest, - const OnContentObjectCallback &&on_content_object_callback, - const OnInterestTimeoutCallback &&on_interest_timeout_callback) { - const Name name(interest->getName(), true); - + OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, + OnInterestTimeoutCallback &&on_interest_timeout_callback = + UNSET_CALLBACK) { + uint32_t hash = + interest->getName().getHash32() + interest->getName().getSuffix(); // Send it forwarder_interface_.send(*interest); - pending_interest_hash_table_[name] = std::make_unique<PendingInterest>( - std::move(interest), std::move(on_content_object_callback), - std::move(on_interest_timeout_callback), - std::make_unique<asio::steady_timer>(io_service_)); - - pending_interest_hash_table_[name]->startCountdown( + auto pending_interest = packet_pool_.getPendingInterest(); + pending_interest->setInterest(std::move(interest)); + pending_interest->setOnContentObjectCallback( + std::move(on_content_object_callback)); + pending_interest->setOnTimeoutCallback( + std::move(on_interest_timeout_callback)); + pending_interest->startCountdown( + portal_details::makeCustomAllocatorHandler( + async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler, this, - std::placeholders::_1, name)); + std::placeholders::_1, hash))); + pending_interest_hash_table_.emplace( + std::make_pair(hash, std::move(pending_interest))); } TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, - const Name &name) { + uint32_t hash) { bool is_stopped = io_service_.stopped(); if (TRANSPORT_EXPECT_FALSE(is_stopped)) { return; } if (TRANSPORT_EXPECT_TRUE(!ec)) { - std::unordered_map<Name, std::unique_ptr<PendingInterest>>::iterator it = - pending_interest_hash_table_.find(name); + PendingInterestHashTable::iterator it = + pending_interest_hash_table_.find(hash); if (it != pending_interest_hash_table_.end()) { - std::unique_ptr<PendingInterest> ptr = std::move(it->second); + PendingInterest::Ptr ptr = std::move(it->second); pending_interest_hash_table_.erase(it); if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) { @@ -273,10 +434,13 @@ class Portal { if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { if (!Packet::isInterest(packet_buffer->data())) { - processContentObject( - ContentObject::Ptr(new ContentObject(std::move(packet_buffer)))); + auto content_object = packet_pool_.getContentObject(); + content_object->replace(std::move(packet_buffer)); + processContentObject(std::move(content_object)); } else { - processInterest(Interest::Ptr(new Interest(std::move(packet_buffer)))); + auto interest = packet_pool_.getInterest(); + interest->replace(std::move(packet_buffer)); + processInterest(std::move(interest)); } } else { TRANSPORT_LOGE("Received not supported packet. Ignoring it."); @@ -298,30 +462,23 @@ class Portal { TRANSPORT_ALWAYS_INLINE void processContentObject( ContentObject::Ptr &&content_object) { - PendingInterestHashTable::iterator it = - pending_interest_hash_table_.find(content_object->getName()); + uint32_t hash = content_object->getName().getHash32() + + content_object->getName().getSuffix(); - if (TRANSPORT_EXPECT_TRUE(it != pending_interest_hash_table_.end())) { - std::unique_ptr<PendingInterest> interest_ptr = std::move(it->second); + auto it = pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + PendingInterest::Ptr interest_ptr = std::move(it->second); + pending_interest_hash_table_.erase(it); interest_ptr->cancelTimer(); - if (TRANSPORT_EXPECT_TRUE(!interest_ptr->isReceived())) { - interest_ptr->setReceived(); - pending_interest_hash_table_.erase(content_object->getName()); - - if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { - interest_ptr->on_content_object_callback_( - std::move(interest_ptr->getInterest()), - std::move(content_object)); - } else if (consumer_callback_) { - consumer_callback_->onContentObject( - std::move(interest_ptr->getInterest()), - std::move(content_object)); - } - - } else { - TRANSPORT_LOGW( - "Content already received (interest already satisfied)."); + if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { + interest_ptr->on_content_object_callback_( + std::move(interest_ptr->getInterest()), + std::move(content_object)); + } else if (consumer_callback_) { + consumer_callback_->onContentObject( + std::move(interest_ptr->getInterest()), + std::move(content_object)); } } else { TRANSPORT_LOGW("No pending interests for current content (%s)", @@ -349,9 +506,8 @@ class Portal { ForwarderInt forwarder_interface_; std::list<Prefix> served_namespaces_; - - ip_address_t locator4_; - ip_address_t locator6_; + portal_details::HandlerMemory async_callback_memory_; + portal_details::Pool packet_pool_; }; } // end namespace core |