diff options
author | Mauro Sardara <msardara@cisco.com> | 2019-02-20 14:32:42 +0100 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2019-03-05 11:42:39 +0100 |
commit | 9d0002e5cb97d939f2f74ab1e635b616d634e7db (patch) | |
tree | 730e4240a637264f859b3b7efeeb8fb83d476f3d | |
parent | 6d7704c1b497341fd6dd3c27e3f64d0db062ccc2 (diff) |
[HICN-73] Performance improvements of interest/data transmission and reception at low level in the stack (portal.h)
Change-Id: I1525726f52040f1609e284bb9b995ea8794c5d5e
Signed-off-by: Mauro Sardara <msardara@cisco.com>
14 files changed, 335 insertions, 145 deletions
diff --git a/libtransport/src/hicn/transport/core/content_object.cc b/libtransport/src/hicn/transport/core/content_object.cc index c00535512..df16923e9 100644 --- a/libtransport/src/hicn/transport/core/content_object.cc +++ b/libtransport/src/hicn/transport/core/content_object.cc @@ -81,6 +81,15 @@ ContentObject::ContentObject(ContentObject &&other) : Packet(std::move(other)) { ContentObject::~ContentObject() {} +void ContentObject::replace(MemBufPtr &&buffer) { + Packet::replace(std::move(buffer)); + + if (hicn_data_get_name(format_, (hicn_header_t *)packet_start_, + name_.getStructReference()) < 0) { + throw errors::RuntimeException("Error getting name from content object."); + } +} + const Name &ContentObject::getName() const { if (!name_) { if (hicn_data_get_name(format_, (hicn_header_t *)packet_start_, @@ -94,7 +103,7 @@ const Name &ContentObject::getName() const { Name &ContentObject::getWritableName() { return const_cast<Name &>(getName()); } -ContentObject &ContentObject::setName(const Name &name) { +void ContentObject::setName(const Name &name) { if (hicn_data_set_name(format_, (hicn_header_t *)packet_start_, name.getStructReference()) < 0) { throw errors::RuntimeException("Error setting content object name."); @@ -104,8 +113,6 @@ ContentObject &ContentObject::setName(const Name &name) { name_.getStructReference()) < 0) { throw errors::MalformedPacketException(); } - - return *this; } void ContentObject::setName(Name &&name) { diff --git a/libtransport/src/hicn/transport/core/content_object.h b/libtransport/src/hicn/transport/core/content_object.h index 8770e8cb4..fd531e8bc 100644 --- a/libtransport/src/hicn/transport/core/content_object.h +++ b/libtransport/src/hicn/transport/core/content_object.h @@ -46,13 +46,15 @@ class ContentObject : public Packet { ~ContentObject() override; + void replace(MemBufPtr &&buffer) override; + const Name &getName() const override; Name &getWritableName() override; - ContentObject &setName(const Name &name); + void setName(const Name &name) override; - void setName(Name &&name); + void setName(Name &&name) override; uint32_t getPathLabel() const; diff --git a/libtransport/src/hicn/transport/core/forwarder_interface.h b/libtransport/src/hicn/transport/core/forwarder_interface.h index b3403cf5a..d470b6276 100644 --- a/libtransport/src/hicn/transport/core/forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/forwarder_interface.h @@ -99,9 +99,6 @@ class ForwarderInterface { packet.setLocator(inet6_address_); } - // TRANSPORT_LOGI("Sending packet %s at %lu", - // packet.getName().toString().c_str(), - // utils::SteadyClock::now().time_since_epoch().count()); packet.setChecksum(); connector_.send(packet.data()); } diff --git a/libtransport/src/hicn/transport/core/interest.cc b/libtransport/src/hicn/transport/core/interest.cc index 49d452d19..85f24bb25 100644 --- a/libtransport/src/hicn/transport/core/interest.cc +++ b/libtransport/src/hicn/transport/core/interest.cc @@ -68,6 +68,15 @@ Interest::Interest(Interest &&other_interest) Interest::~Interest() {} +void Interest::replace(MemBufPtr &&buffer) { + Packet::replace(std::move(buffer)); + + if (hicn_interest_get_name(format_, (hicn_header_t *)packet_start_, + name_.getStructReference()) < 0) { + throw errors::MalformedPacketException(); + } +} + const Name &Interest::getName() const { if (!name_) { if (hicn_interest_get_name(format_, (hicn_header_t *)packet_start_, @@ -81,7 +90,7 @@ const Name &Interest::getName() const { Name &Interest::getWritableName() { return const_cast<Name &>(getName()); } -Interest &Interest::setName(const Name &name) { +void Interest::setName(const Name &name) { if (hicn_interest_set_name(format_, (hicn_header_t *)packet_start_, name.getStructReference()) < 0) { throw errors::RuntimeException("Error setting interest name."); @@ -91,11 +100,9 @@ Interest &Interest::setName(const Name &name) { name_.getStructReference()) < 0) { throw errors::MalformedPacketException(); } - - return *this; } -Interest &Interest::setName(Name &&name) { +void Interest::setName(Name &&name) { if (hicn_interest_set_name(format_, (hicn_header_t *)packet_start_, name.getStructReference()) < 0) { throw errors::RuntimeException("Error setting interest name."); @@ -105,8 +112,6 @@ Interest &Interest::setName(Name &&name) { name_.getStructReference()) < 0) { throw errors::MalformedPacketException(); } - - return *this; } void Interest::setLocator(const ip_address_t &ip_address) { diff --git a/libtransport/src/hicn/transport/core/interest.h b/libtransport/src/hicn/transport/core/interest.h index 95b45ac9a..74979d8d0 100644 --- a/libtransport/src/hicn/transport/core/interest.h +++ b/libtransport/src/hicn/transport/core/interest.h @@ -45,13 +45,15 @@ class Interest ~Interest() override; + void replace(MemBufPtr &&buffer) override; + const Name &getName() const override; Name &getWritableName() override; - Interest &setName(const Name &name); + void setName(const Name &name) override; - Interest &setName(Name &&name); + void setName(Name &&name) override; void setLocator(const ip_address_t &ip_address) override; diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc index a4756d136..863e1aa20 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.cc +++ b/libtransport/src/hicn/transport/core/memif_connector.cc @@ -377,9 +377,6 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, } } - connector->io_service_.post( - std::bind(&MemifConnector::processInputBuffer, connector)); - /* mark memif buffers and shared memory buffers as free */ /* free processed buffers */ @@ -399,6 +396,9 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, // } } while (ret_val == MEMIF_ERR_NOBUF); + connector->io_service_.post( + std::bind(&MemifConnector::processInputBuffer, connector)); + return 0; error: diff --git a/libtransport/src/hicn/transport/core/memif_connector.h b/libtransport/src/hicn/transport/core/memif_connector.h index 06a8fd73e..609571389 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.h +++ b/libtransport/src/hicn/transport/core/memif_connector.h @@ -41,9 +41,9 @@ typedef struct memif_connection memif_connection_t; #define APP_NAME "libtransport" #define IF_NAME "vpp_connection" -#define MAX_MEMIF_BUFS 1024 #define MEMIF_BUF_SIZE 2048 #define MEMIF_LOG2_RING_SIZE 11 +#define MAX_MEMIF_BUFS (1 << MEMIF_LOG2_RING_SIZE) class MemifConnector : public Connector { typedef void *memif_conn_handle_t; diff --git a/libtransport/src/hicn/transport/core/packet.cc b/libtransport/src/hicn/transport/core/packet.cc index ec99bb3f7..d925375ce 100644 --- a/libtransport/src/hicn/transport/core/packet.cc +++ b/libtransport/src/hicn/transport/core/packet.cc @@ -50,30 +50,7 @@ Packet::Packet(MemBufPtr &&buffer) packet_start_(packet_->writableData()), header_head_(packet_.get()), payload_head_(nullptr), - format_(getFormatFromBuffer(packet_start_)) { - int signature_size = 0; - - if (_is_ah(format_)) { - signature_size = (uint32_t)getSignatureSize(); - } - - auto header_size = getHeaderSizeFromFormat(format_, signature_size); - auto payload_length = packet_->length() - header_size; - if (!payload_length) { - return; - } - - packet_->trimEnd(packet_->length()); - - if (payload_length) { - auto payload = packet_->cloneOne(); - payload_head_ = payload.get(); - payload_head_->advance(header_size); - payload_head_->append(payload_length); - packet_->prependChain(std::move(payload)); - packet_->append(header_size); - } -} + format_(getFormatFromBuffer(packet_start_)) {} Packet::Packet(const uint8_t *buffer, std::size_t size) : Packet(MemBufPtr(utils::MemBuf::copyBuffer(buffer, size).release())) {} @@ -145,6 +122,14 @@ std::size_t Packet::getPayloadSizeFromBuffer(Format format, return payload_length; } +void Packet::replace(MemBufPtr &&buffer) { + packet_ = std::move(buffer); + packet_start_ = packet_->writableData(); + header_head_ = packet_.get(); + payload_head_ = nullptr; + format_ = getFormatFromBuffer(packet_start_); +} + std::size_t Packet::payloadSize() const { return getPayloadSizeFromBuffer(format_, packet_start_); } @@ -173,6 +158,8 @@ uint32_t Packet::getLifetime() const { } Packet &Packet::appendPayload(std::unique_ptr<utils::MemBuf> &&payload) { + separateHeaderPayload(); + if (!payload_head_) { payload_head_ = payload.get(); } @@ -187,6 +174,8 @@ Packet &Packet::appendPayload(const uint8_t *buffer, std::size_t length) { } Packet &Packet::appendHeader(std::unique_ptr<utils::MemBuf> &&header) { + separateHeaderPayload(); + if (!payload_head_) { header_head_->prependChain(std::move(header)); } else { @@ -202,6 +191,8 @@ Packet &Packet::appendHeader(const uint8_t *buffer, std::size_t length) { } utils::Array<uint8_t> Packet::getPayload() const { + const_cast<Packet *>(this)->separateHeaderPayload(); + if (TRANSPORT_EXPECT_FALSE(payload_head_ == nullptr)) { return utils::Array<uint8_t>(); } @@ -263,6 +254,8 @@ Packet::Format Packet::getFormat() const { const std::shared_ptr<utils::MemBuf> Packet::data() { return packet_; } void Packet::dump() const { + const_cast<Packet *>(this)->separateHeaderPayload(); + std::cout << "HEADER -- Length: " << headerSize() << std::endl; hicn_packet_dump((uint8_t *)header_head_->data(), headerSize()); @@ -604,6 +597,35 @@ uint8_t Packet::getTTL() const { return hops; } +void Packet::separateHeaderPayload() { + if (payload_head_) { + return; + } + + int signature_size = 0; + + if (_is_ah(format_)) { + signature_size = (uint32_t)getSignatureSize(); + } + + auto header_size = getHeaderSizeFromFormat(format_, signature_size); + auto payload_length = packet_->length() - header_size; + if (!payload_length) { + return; + } + + packet_->trimEnd(packet_->length()); + + if (payload_length) { + auto payload = packet_->cloneOne(); + payload_head_ = payload.get(); + payload_head_->advance(header_size); + payload_head_->append(payload_length); + packet_->prependChain(std::move(payload)); + packet_->append(header_size); + } +} + } // end namespace core } // end namespace transport diff --git a/libtransport/src/hicn/transport/core/packet.h b/libtransport/src/hicn/transport/core/packet.h index add830c1e..78dbeae07 100644 --- a/libtransport/src/hicn/transport/core/packet.h +++ b/libtransport/src/hicn/transport/core/packet.h @@ -93,6 +93,8 @@ class Packet : public std::enable_shared_from_this<Packet> { static Format getFormatFromBuffer(const uint8_t *buffer); + virtual void replace(MemBufPtr &&buffer); + std::size_t payloadSize() const; std::size_t headerSize() const; @@ -101,12 +103,16 @@ class Packet : public std::enable_shared_from_this<Packet> { const uint8_t *start() const; - virtual void setLifetime(uint32_t lifetime); - virtual const Name &getName() const = 0; virtual Name &getWritableName() = 0; + virtual void setName(const Name &name) = 0; + + virtual void setName(Name &&name) = 0; + + virtual void setLifetime(uint32_t lifetime); + virtual uint32_t getLifetime() const; Packet &appendPayload(const uint8_t *buffer, std::size_t length); @@ -179,6 +185,7 @@ class Packet : public std::enable_shared_from_this<Packet> { void setSignatureSize(std::size_t size_bytes); std::size_t getSignatureSize() const; uint8_t *getSignature() const; + void separateHeaderPayload(); protected: Name name_; diff --git a/libtransport/src/hicn/transport/core/pending_interest.cc b/libtransport/src/hicn/transport/core/pending_interest.cc index 73bc41e87..dbbd2c83e 100644 --- a/libtransport/src/hicn/transport/core/pending_interest.cc +++ b/libtransport/src/hicn/transport/core/pending_interest.cc @@ -23,36 +23,31 @@ PendingInterest::PendingInterest() : interest_(nullptr, nullptr), timer_(), on_content_object_callback_(), - on_interest_timeout_callback_(), - received_(false) {} + on_interest_timeout_callback_() {} PendingInterest::PendingInterest(Interest::Ptr &&interest, std::unique_ptr<asio::steady_timer> &&timer) : interest_(std::move(interest)), timer_(std::move(timer)), on_content_object_callback_(), - on_interest_timeout_callback_(), - received_(false) {} + on_interest_timeout_callback_() {} PendingInterest::PendingInterest( - Interest::Ptr &&interest, const OnContentObjectCallback &&on_content_object, - const OnInterestTimeoutCallback &&on_interest_timeout, + Interest::Ptr &&interest, OnContentObjectCallback &&on_content_object, + OnInterestTimeoutCallback &&on_interest_timeout, std::unique_ptr<asio::steady_timer> &&timer) : interest_(std::move(interest)), timer_(std::move(timer)), on_content_object_callback_(std::move(on_content_object)), - on_interest_timeout_callback_(std::move(on_interest_timeout)), - received_(false) {} + on_interest_timeout_callback_(std::move(on_interest_timeout)) {} -PendingInterest::~PendingInterest() { - // timer_.reset(); -} +PendingInterest::~PendingInterest() {} void PendingInterest::cancelTimer() { timer_->cancel(); } -void PendingInterest::setReceived() { received_ = true; } - -bool PendingInterest::isReceived() const { return received_; } +void PendingInterest::setInterest(Interest::Ptr &&interest) { + interest_ = std::move(interest); +} Interest::Ptr &&PendingInterest::getInterest() { return std::move(interest_); } @@ -60,8 +55,8 @@ const OnContentObjectCallback &PendingInterest::getOnDataCallback() const { return on_content_object_callback_; } -void PendingInterest::setOnDataCallback( - const OnContentObjectCallback &on_content_object) { +void PendingInterest::setOnContentObjectCallback( + OnContentObjectCallback &&on_content_object) { PendingInterest::on_content_object_callback_ = on_content_object; } @@ -70,7 +65,7 @@ const OnInterestTimeoutCallback &PendingInterest::getOnTimeoutCallback() const { } void PendingInterest::setOnTimeoutCallback( - const OnInterestTimeoutCallback &on_interest_timeout) { + OnInterestTimeoutCallback &&on_interest_timeout) { PendingInterest::on_interest_timeout_callback_ = on_interest_timeout; } diff --git a/libtransport/src/hicn/transport/core/pending_interest.h b/libtransport/src/hicn/transport/core/pending_interest.h index 3b2442d76..c481cc200 100644 --- a/libtransport/src/hicn/transport/core/pending_interest.h +++ b/libtransport/src/hicn/transport/core/pending_interest.h @@ -46,14 +46,15 @@ class PendingInterest { friend class Portal<RawSocketInterface>; public: + using Ptr = utils::ObjectPool<PendingInterest>::Ptr; PendingInterest(); PendingInterest(Interest::Ptr &&interest, std::unique_ptr<asio::steady_timer> &&timer); PendingInterest(Interest::Ptr &&interest, - const OnContentObjectCallback &&on_content_object, - const OnInterestTimeoutCallback &&on_interest_timeout, + OnContentObjectCallback &&on_content_object, + OnInterestTimeoutCallback &&on_interest_timeout, std::unique_ptr<asio::steady_timer> &&timer); ~PendingInterest(); @@ -62,32 +63,28 @@ class PendingInterest { TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) { timer_->expires_from_now( std::chrono::milliseconds(interest_->getLifetime())); - timer_->async_wait(cb); + timer_->async_wait(std::forward<Handler &&>(cb)); } void cancelTimer(); - void setReceived(); - - bool isReceived() const; - Interest::Ptr &&getInterest(); + void setInterest(Interest::Ptr &&interest); + const OnContentObjectCallback &getOnDataCallback() const; - void setOnDataCallback(const OnContentObjectCallback &on_content_object); + void setOnContentObjectCallback(OnContentObjectCallback &&on_content_object); const OnInterestTimeoutCallback &getOnTimeoutCallback() const; - void setOnTimeoutCallback( - const OnInterestTimeoutCallback &on_interest_timeout); + void setOnTimeoutCallback(OnInterestTimeoutCallback &&on_interest_timeout); private: Interest::Ptr interest_; std::unique_ptr<asio::steady_timer> timer_; OnContentObjectCallback on_content_object_callback_; OnInterestTimeoutCallback on_interest_timeout_callback_; - bool received_; }; } // end namespace core diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index 7efbc2009..4abcbe2cd 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -35,16 +35,182 @@ #include <asio/steady_timer.hpp> #include <future> #include <memory> +#include <queue> #include <unordered_map> #define UNSET_CALLBACK 0 namespace transport { - namespace core { -typedef std::unordered_map<Name, std::unique_ptr<PendingInterest>> - PendingInterestHashTable; +namespace portal_details { + +static constexpr uint32_t pool_size = 2048; + +class HandlerMemory { + static constexpr std::size_t memory_size = 1024 * 1024; + public: + HandlerMemory() : index_(0) { } + + HandlerMemory(const HandlerMemory&) = delete; + HandlerMemory& operator=(const HandlerMemory&) = delete; + + TRANSPORT_ALWAYS_INLINE void* allocate(std::size_t size) { + return &storage_[index_++ % memory_size]; + } + + TRANSPORT_ALWAYS_INLINE void deallocate(void* pointer) { } + + private: + // Storage space used for handler-based custom memory allocation. + typename std::aligned_storage<128>::type storage_[memory_size]; + uint32_t index_; +}; + +// The allocator to be associated with the handler objects. This allocator only +// needs to satisfy the C++11 minimal allocator requirements. +template <typename T> +class HandlerAllocator { + public: + using value_type = T; + + explicit HandlerAllocator(HandlerMemory& mem) + : memory_(mem) {} + + template <typename U> + HandlerAllocator(const HandlerAllocator<U>& other) noexcept + : memory_(other.memory_) { } + + TRANSPORT_ALWAYS_INLINE bool operator==(const HandlerAllocator& other) const noexcept { + return &memory_ == &other.memory_; + } + + TRANSPORT_ALWAYS_INLINE bool operator!=(const HandlerAllocator& other) const noexcept { + return &memory_ != &other.memory_; + } + + TRANSPORT_ALWAYS_INLINE T* allocate(std::size_t n) const { + return static_cast<T*>(memory_.allocate(sizeof(T) * n)); + } + + TRANSPORT_ALWAYS_INLINE void deallocate(T* p, std::size_t /*n*/) const { + return memory_.deallocate(p); + } + + private: + template <typename> friend class HandlerAllocator; + + // The underlying memory. + HandlerMemory& memory_; +}; + +// Wrapper class template for handler objects to allow handler memory +// allocation to be customised. The allocator_type type and get_allocator() +// member function are used by the asynchronous operations to obtain the +// allocator. Calls to operator() are forwarded to the encapsulated handler. +template <typename Handler> +class CustomAllocatorHandler { + public: + using allocator_type = HandlerAllocator<Handler>; + + CustomAllocatorHandler(HandlerMemory& m, Handler h) + : memory_(m), + handler_(h) { } + + allocator_type get_allocator() const noexcept { + return allocator_type(memory_); + } + + template <typename ...Args> + void operator()(Args&&... args) { + handler_(std::forward<Args>(args)...); + } + + private: + HandlerMemory& memory_; + Handler handler_; +}; + +// Helper function to wrap a handler object to add custom allocation. +template <typename Handler> +inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler( + HandlerMemory& m, Handler h) { + return CustomAllocatorHandler<Handler>(m, h); +} + +class Pool { + public: + Pool(asio::io_service &io_service) + : io_service_(io_service) { + increasePendingInterestPool(); + increaseInterestPool(); + increaseContentObjectPool(); + } + + TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() { + // Create pool of pending interests to reuse + for (uint32_t i = 0; i < pool_size; i++) { + pending_interests_pool_.add(new PendingInterest( + Interest::Ptr(nullptr), + std::make_unique<asio::steady_timer>(io_service_))); + } + } + + TRANSPORT_ALWAYS_INLINE void increaseInterestPool() { + // Create pool of interests to reuse + for (uint32_t i = 0; i < pool_size; i++) { + interest_pool_.add(new Interest()); + } + } + + TRANSPORT_ALWAYS_INLINE void increaseContentObjectPool() { + // Create pool of content object to reuse + for (uint32_t i = 0; i < pool_size; i++) { + content_object_pool_.add(new ContentObject()); + } + } + + PendingInterest::Ptr getPendingInterest() { + auto res = pending_interests_pool_.get(); + while (TRANSPORT_EXPECT_FALSE(!res.first)) { + increasePendingInterestPool(); + res = pending_interests_pool_.get(); + } + + return std::move(res.second); + } + + TRANSPORT_ALWAYS_INLINE ContentObject::Ptr getContentObject() { + auto res = content_object_pool_.get(); + while (TRANSPORT_EXPECT_FALSE(!res.first)) { + increaseContentObjectPool(); + res = content_object_pool_.get(); + } + + return std::move(res.second); + } + + TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() { + auto res = interest_pool_.get(); + while (TRANSPORT_EXPECT_FALSE(!res.first)) { + increaseInterestPool(); + res = interest_pool_.get(); + } + + return std::move(res.second); + } + + private: + utils::ObjectPool<PendingInterest> pending_interests_pool_; + utils::ObjectPool<ContentObject> content_object_pool_; + utils::ObjectPool<Interest> interest_pool_; + asio::io_service &io_service_; +}; + +} + +using PendingInterestHashTable = + std::unordered_map<uint32_t, PendingInterest::Ptr>; template <typename PrefixType> class BasicBindConfig { @@ -84,7 +250,6 @@ class Portal { typename ForwarderInt::ConnectorType>, ForwarderInt>::value, "ForwarderInt must inherit from ForwarderInterface!"); - public: class ConsumerCallback { public: @@ -108,7 +273,8 @@ class Portal { std::placeholders::_1), std::bind(&Portal::setLocalRoutes, this), io_service_, app_name_), - forwarder_interface_(connector_) {} + forwarder_interface_(connector_), + packet_pool_(io_service) { } void setConsumerCallback(ConsumerCallback *consumer_callback) { consumer_callback_ = consumer_callback; @@ -124,64 +290,59 @@ class Portal { } TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { + pending_interest_hash_table_.reserve(portal_details::pool_size); forwarder_interface_.connect(is_consumer); } ~Portal() { stopEventsLoop(true); } TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { - auto it = pending_interest_hash_table_.find(name); - if (it != pending_interest_hash_table_.end()) - if (!it->second->isReceived()) return true; + auto it = + pending_interest_hash_table_.find(name.getHash32() + name.getSuffix()); + if (it != pending_interest_hash_table_.end()) { + return true; + } return false; } - TRANSPORT_ALWAYS_INLINE void sendInterest(Interest::Ptr &&interest) { - const Name name(interest->getName(), true); - - // Send it - forwarder_interface_.send(*interest); - - pending_interest_hash_table_[name] = std::make_unique<PendingInterest>( - std::move(interest), std::make_unique<asio::steady_timer>(io_service_)); - - pending_interest_hash_table_[name]->startCountdown( - std::bind(&Portal<ForwarderInt>::timerHandler, this, - std::placeholders::_1, name)); - } - TRANSPORT_ALWAYS_INLINE void sendInterest( Interest::Ptr &&interest, - const OnContentObjectCallback &&on_content_object_callback, - const OnInterestTimeoutCallback &&on_interest_timeout_callback) { - const Name name(interest->getName(), true); - + OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, + OnInterestTimeoutCallback &&on_interest_timeout_callback = + UNSET_CALLBACK) { + uint32_t hash = + interest->getName().getHash32() + interest->getName().getSuffix(); // Send it forwarder_interface_.send(*interest); - pending_interest_hash_table_[name] = std::make_unique<PendingInterest>( - std::move(interest), std::move(on_content_object_callback), - std::move(on_interest_timeout_callback), - std::make_unique<asio::steady_timer>(io_service_)); - - pending_interest_hash_table_[name]->startCountdown( + auto pending_interest = packet_pool_.getPendingInterest(); + pending_interest->setInterest(std::move(interest)); + pending_interest->setOnContentObjectCallback( + std::move(on_content_object_callback)); + pending_interest->setOnTimeoutCallback( + std::move(on_interest_timeout_callback)); + pending_interest->startCountdown( + portal_details::makeCustomAllocatorHandler( + async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler, this, - std::placeholders::_1, name)); + std::placeholders::_1, hash))); + pending_interest_hash_table_.emplace( + std::make_pair(hash, std::move(pending_interest))); } TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, - const Name &name) { + uint32_t hash) { bool is_stopped = io_service_.stopped(); if (TRANSPORT_EXPECT_FALSE(is_stopped)) { return; } if (TRANSPORT_EXPECT_TRUE(!ec)) { - std::unordered_map<Name, std::unique_ptr<PendingInterest>>::iterator it = - pending_interest_hash_table_.find(name); + PendingInterestHashTable::iterator it = + pending_interest_hash_table_.find(hash); if (it != pending_interest_hash_table_.end()) { - std::unique_ptr<PendingInterest> ptr = std::move(it->second); + PendingInterest::Ptr ptr = std::move(it->second); pending_interest_hash_table_.erase(it); if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) { @@ -273,10 +434,13 @@ class Portal { if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { if (!Packet::isInterest(packet_buffer->data())) { - processContentObject( - ContentObject::Ptr(new ContentObject(std::move(packet_buffer)))); + auto content_object = packet_pool_.getContentObject(); + content_object->replace(std::move(packet_buffer)); + processContentObject(std::move(content_object)); } else { - processInterest(Interest::Ptr(new Interest(std::move(packet_buffer)))); + auto interest = packet_pool_.getInterest(); + interest->replace(std::move(packet_buffer)); + processInterest(std::move(interest)); } } else { TRANSPORT_LOGE("Received not supported packet. Ignoring it."); @@ -298,30 +462,23 @@ class Portal { TRANSPORT_ALWAYS_INLINE void processContentObject( ContentObject::Ptr &&content_object) { - PendingInterestHashTable::iterator it = - pending_interest_hash_table_.find(content_object->getName()); + uint32_t hash = content_object->getName().getHash32() + + content_object->getName().getSuffix(); - if (TRANSPORT_EXPECT_TRUE(it != pending_interest_hash_table_.end())) { - std::unique_ptr<PendingInterest> interest_ptr = std::move(it->second); + auto it = pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + PendingInterest::Ptr interest_ptr = std::move(it->second); + pending_interest_hash_table_.erase(it); interest_ptr->cancelTimer(); - if (TRANSPORT_EXPECT_TRUE(!interest_ptr->isReceived())) { - interest_ptr->setReceived(); - pending_interest_hash_table_.erase(content_object->getName()); - - if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { - interest_ptr->on_content_object_callback_( - std::move(interest_ptr->getInterest()), - std::move(content_object)); - } else if (consumer_callback_) { - consumer_callback_->onContentObject( - std::move(interest_ptr->getInterest()), - std::move(content_object)); - } - - } else { - TRANSPORT_LOGW( - "Content already received (interest already satisfied)."); + if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { + interest_ptr->on_content_object_callback_( + std::move(interest_ptr->getInterest()), + std::move(content_object)); + } else if (consumer_callback_) { + consumer_callback_->onContentObject( + std::move(interest_ptr->getInterest()), + std::move(content_object)); } } else { TRANSPORT_LOGW("No pending interests for current content (%s)", @@ -349,9 +506,8 @@ class Portal { ForwarderInt forwarder_interface_; std::list<Prefix> served_namespaces_; - - ip_address_t locator4_; - ip_address_t locator6_; + portal_details::HandlerMemory async_callback_memory_; + portal_details::Pool packet_pool_; }; } // end namespace core diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h index 142228aeb..046fea892 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h @@ -39,8 +39,6 @@ static constexpr uint32_t log_2_default_buffer_size = 12; static constexpr uint32_t signature_size = 260; // bytes static constexpr uint32_t key_locator_size = 60; // bytes static constexpr uint32_t limit_guard = 80; // bytes -static constexpr uint32_t min_window_size = 1; // Interests -static constexpr uint32_t max_window_size = 256; // Interests static constexpr uint32_t digest_size = 34; // bytes static constexpr uint32_t max_out_of_order_segments = 3; // content object @@ -60,9 +58,11 @@ static constexpr double alpha = 0.8; static constexpr uint32_t rate_choice = 0; // maximum allowed values -const uint32_t transport_protocol_min_retransmissions = 0; -const uint32_t transport_protocol_max_retransmissions = 128; -const uint32_t max_content_object_size = 8096; +static constexpr uint32_t transport_protocol_min_retransmissions = 0; +static constexpr uint32_t transport_protocol_max_retransmissions = 128; +static constexpr uint32_t max_content_object_size = 8096; +static constexpr uint32_t min_window_size = 1; // Interests +static constexpr uint32_t max_window_size = 256 * 2; // Interests } // namespace default_values diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index fbd9d1e26..1b6fc67af 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -307,7 +307,7 @@ class HIperfClient { io_service_.stop(); }); - t_download_ = std::chrono::steady_clock::now(); + t_download_ = t_stats_ = std::chrono::steady_clock::now(); consumer_socket_->asyncConsume(configuration_.name, configuration_.receive_buffer); io_service_.run(); @@ -336,7 +336,7 @@ class HIperfServer { content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), content_objects_index_(0), mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1) { - std::string buffer(1200, 'X'); + std::string buffer(1440, 'X'); std::cout << "Producing contents under name " << conf.name.getName() << std::endl; |