aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/core/portal.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/core/portal.h')
-rw-r--r--libtransport/src/hicn/transport/core/portal.h284
1 files changed, 220 insertions, 64 deletions
diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h
index 7efbc2009..4abcbe2cd 100644
--- a/libtransport/src/hicn/transport/core/portal.h
+++ b/libtransport/src/hicn/transport/core/portal.h
@@ -35,16 +35,182 @@
#include <asio/steady_timer.hpp>
#include <future>
#include <memory>
+#include <queue>
#include <unordered_map>
#define UNSET_CALLBACK 0
namespace transport {
-
namespace core {
-typedef std::unordered_map<Name, std::unique_ptr<PendingInterest>>
- PendingInterestHashTable;
+namespace portal_details {
+
+static constexpr uint32_t pool_size = 2048;
+
+class HandlerMemory {
+ static constexpr std::size_t memory_size = 1024 * 1024;
+ public:
+ HandlerMemory() : index_(0) { }
+
+ HandlerMemory(const HandlerMemory&) = delete;
+ HandlerMemory& operator=(const HandlerMemory&) = delete;
+
+ TRANSPORT_ALWAYS_INLINE void* allocate(std::size_t size) {
+ return &storage_[index_++ % memory_size];
+ }
+
+ TRANSPORT_ALWAYS_INLINE void deallocate(void* pointer) { }
+
+ private:
+ // Storage space used for handler-based custom memory allocation.
+ typename std::aligned_storage<128>::type storage_[memory_size];
+ uint32_t index_;
+};
+
+// The allocator to be associated with the handler objects. This allocator only
+// needs to satisfy the C++11 minimal allocator requirements.
+template <typename T>
+class HandlerAllocator {
+ public:
+ using value_type = T;
+
+ explicit HandlerAllocator(HandlerMemory& mem)
+ : memory_(mem) {}
+
+ template <typename U>
+ HandlerAllocator(const HandlerAllocator<U>& other) noexcept
+ : memory_(other.memory_) { }
+
+ TRANSPORT_ALWAYS_INLINE bool operator==(const HandlerAllocator& other) const noexcept {
+ return &memory_ == &other.memory_;
+ }
+
+ TRANSPORT_ALWAYS_INLINE bool operator!=(const HandlerAllocator& other) const noexcept {
+ return &memory_ != &other.memory_;
+ }
+
+ TRANSPORT_ALWAYS_INLINE T* allocate(std::size_t n) const {
+ return static_cast<T*>(memory_.allocate(sizeof(T) * n));
+ }
+
+ TRANSPORT_ALWAYS_INLINE void deallocate(T* p, std::size_t /*n*/) const {
+ return memory_.deallocate(p);
+ }
+
+ private:
+ template <typename> friend class HandlerAllocator;
+
+ // The underlying memory.
+ HandlerMemory& memory_;
+};
+
+// Wrapper class template for handler objects to allow handler memory
+// allocation to be customised. The allocator_type type and get_allocator()
+// member function are used by the asynchronous operations to obtain the
+// allocator. Calls to operator() are forwarded to the encapsulated handler.
+template <typename Handler>
+class CustomAllocatorHandler {
+ public:
+ using allocator_type = HandlerAllocator<Handler>;
+
+ CustomAllocatorHandler(HandlerMemory& m, Handler h)
+ : memory_(m),
+ handler_(h) { }
+
+ allocator_type get_allocator() const noexcept {
+ return allocator_type(memory_);
+ }
+
+ template <typename ...Args>
+ void operator()(Args&&... args) {
+ handler_(std::forward<Args>(args)...);
+ }
+
+ private:
+ HandlerMemory& memory_;
+ Handler handler_;
+};
+
+// Helper function to wrap a handler object to add custom allocation.
+template <typename Handler>
+inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler(
+ HandlerMemory& m, Handler h) {
+ return CustomAllocatorHandler<Handler>(m, h);
+}
+
+class Pool {
+ public:
+ Pool(asio::io_service &io_service)
+ : io_service_(io_service) {
+ increasePendingInterestPool();
+ increaseInterestPool();
+ increaseContentObjectPool();
+ }
+
+ TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() {
+ // Create pool of pending interests to reuse
+ for (uint32_t i = 0; i < pool_size; i++) {
+ pending_interests_pool_.add(new PendingInterest(
+ Interest::Ptr(nullptr),
+ std::make_unique<asio::steady_timer>(io_service_)));
+ }
+ }
+
+ TRANSPORT_ALWAYS_INLINE void increaseInterestPool() {
+ // Create pool of interests to reuse
+ for (uint32_t i = 0; i < pool_size; i++) {
+ interest_pool_.add(new Interest());
+ }
+ }
+
+ TRANSPORT_ALWAYS_INLINE void increaseContentObjectPool() {
+ // Create pool of content object to reuse
+ for (uint32_t i = 0; i < pool_size; i++) {
+ content_object_pool_.add(new ContentObject());
+ }
+ }
+
+ PendingInterest::Ptr getPendingInterest() {
+ auto res = pending_interests_pool_.get();
+ while (TRANSPORT_EXPECT_FALSE(!res.first)) {
+ increasePendingInterestPool();
+ res = pending_interests_pool_.get();
+ }
+
+ return std::move(res.second);
+ }
+
+ TRANSPORT_ALWAYS_INLINE ContentObject::Ptr getContentObject() {
+ auto res = content_object_pool_.get();
+ while (TRANSPORT_EXPECT_FALSE(!res.first)) {
+ increaseContentObjectPool();
+ res = content_object_pool_.get();
+ }
+
+ return std::move(res.second);
+ }
+
+ TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() {
+ auto res = interest_pool_.get();
+ while (TRANSPORT_EXPECT_FALSE(!res.first)) {
+ increaseInterestPool();
+ res = interest_pool_.get();
+ }
+
+ return std::move(res.second);
+ }
+
+ private:
+ utils::ObjectPool<PendingInterest> pending_interests_pool_;
+ utils::ObjectPool<ContentObject> content_object_pool_;
+ utils::ObjectPool<Interest> interest_pool_;
+ asio::io_service &io_service_;
+};
+
+}
+
+using PendingInterestHashTable =
+ std::unordered_map<uint32_t, PendingInterest::Ptr>;
template <typename PrefixType>
class BasicBindConfig {
@@ -84,7 +250,6 @@ class Portal {
typename ForwarderInt::ConnectorType>,
ForwarderInt>::value,
"ForwarderInt must inherit from ForwarderInterface!");
-
public:
class ConsumerCallback {
public:
@@ -108,7 +273,8 @@ class Portal {
std::placeholders::_1),
std::bind(&Portal::setLocalRoutes, this), io_service_,
app_name_),
- forwarder_interface_(connector_) {}
+ forwarder_interface_(connector_),
+ packet_pool_(io_service) { }
void setConsumerCallback(ConsumerCallback *consumer_callback) {
consumer_callback_ = consumer_callback;
@@ -124,64 +290,59 @@ class Portal {
}
TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) {
+ pending_interest_hash_table_.reserve(portal_details::pool_size);
forwarder_interface_.connect(is_consumer);
}
~Portal() { stopEventsLoop(true); }
TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
- auto it = pending_interest_hash_table_.find(name);
- if (it != pending_interest_hash_table_.end())
- if (!it->second->isReceived()) return true;
+ auto it =
+ pending_interest_hash_table_.find(name.getHash32() + name.getSuffix());
+ if (it != pending_interest_hash_table_.end()) {
+ return true;
+ }
return false;
}
- TRANSPORT_ALWAYS_INLINE void sendInterest(Interest::Ptr &&interest) {
- const Name name(interest->getName(), true);
-
- // Send it
- forwarder_interface_.send(*interest);
-
- pending_interest_hash_table_[name] = std::make_unique<PendingInterest>(
- std::move(interest), std::make_unique<asio::steady_timer>(io_service_));
-
- pending_interest_hash_table_[name]->startCountdown(
- std::bind(&Portal<ForwarderInt>::timerHandler, this,
- std::placeholders::_1, name));
- }
-
TRANSPORT_ALWAYS_INLINE void sendInterest(
Interest::Ptr &&interest,
- const OnContentObjectCallback &&on_content_object_callback,
- const OnInterestTimeoutCallback &&on_interest_timeout_callback) {
- const Name name(interest->getName(), true);
-
+ OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
+ OnInterestTimeoutCallback &&on_interest_timeout_callback =
+ UNSET_CALLBACK) {
+ uint32_t hash =
+ interest->getName().getHash32() + interest->getName().getSuffix();
// Send it
forwarder_interface_.send(*interest);
- pending_interest_hash_table_[name] = std::make_unique<PendingInterest>(
- std::move(interest), std::move(on_content_object_callback),
- std::move(on_interest_timeout_callback),
- std::make_unique<asio::steady_timer>(io_service_));
-
- pending_interest_hash_table_[name]->startCountdown(
+ auto pending_interest = packet_pool_.getPendingInterest();
+ pending_interest->setInterest(std::move(interest));
+ pending_interest->setOnContentObjectCallback(
+ std::move(on_content_object_callback));
+ pending_interest->setOnTimeoutCallback(
+ std::move(on_interest_timeout_callback));
+ pending_interest->startCountdown(
+ portal_details::makeCustomAllocatorHandler(
+ async_callback_memory_,
std::bind(&Portal<ForwarderInt>::timerHandler, this,
- std::placeholders::_1, name));
+ std::placeholders::_1, hash)));
+ pending_interest_hash_table_.emplace(
+ std::make_pair(hash, std::move(pending_interest)));
}
TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec,
- const Name &name) {
+ uint32_t hash) {
bool is_stopped = io_service_.stopped();
if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
return;
}
if (TRANSPORT_EXPECT_TRUE(!ec)) {
- std::unordered_map<Name, std::unique_ptr<PendingInterest>>::iterator it =
- pending_interest_hash_table_.find(name);
+ PendingInterestHashTable::iterator it =
+ pending_interest_hash_table_.find(hash);
if (it != pending_interest_hash_table_.end()) {
- std::unique_ptr<PendingInterest> ptr = std::move(it->second);
+ PendingInterest::Ptr ptr = std::move(it->second);
pending_interest_hash_table_.erase(it);
if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) {
@@ -273,10 +434,13 @@ class Portal {
if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) {
if (!Packet::isInterest(packet_buffer->data())) {
- processContentObject(
- ContentObject::Ptr(new ContentObject(std::move(packet_buffer))));
+ auto content_object = packet_pool_.getContentObject();
+ content_object->replace(std::move(packet_buffer));
+ processContentObject(std::move(content_object));
} else {
- processInterest(Interest::Ptr(new Interest(std::move(packet_buffer))));
+ auto interest = packet_pool_.getInterest();
+ interest->replace(std::move(packet_buffer));
+ processInterest(std::move(interest));
}
} else {
TRANSPORT_LOGE("Received not supported packet. Ignoring it.");
@@ -298,30 +462,23 @@ class Portal {
TRANSPORT_ALWAYS_INLINE void processContentObject(
ContentObject::Ptr &&content_object) {
- PendingInterestHashTable::iterator it =
- pending_interest_hash_table_.find(content_object->getName());
+ uint32_t hash = content_object->getName().getHash32() +
+ content_object->getName().getSuffix();
- if (TRANSPORT_EXPECT_TRUE(it != pending_interest_hash_table_.end())) {
- std::unique_ptr<PendingInterest> interest_ptr = std::move(it->second);
+ auto it = pending_interest_hash_table_.find(hash);
+ if (it != pending_interest_hash_table_.end()) {
+ PendingInterest::Ptr interest_ptr = std::move(it->second);
+ pending_interest_hash_table_.erase(it);
interest_ptr->cancelTimer();
- if (TRANSPORT_EXPECT_TRUE(!interest_ptr->isReceived())) {
- interest_ptr->setReceived();
- pending_interest_hash_table_.erase(content_object->getName());
-
- if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) {
- interest_ptr->on_content_object_callback_(
- std::move(interest_ptr->getInterest()),
- std::move(content_object));
- } else if (consumer_callback_) {
- consumer_callback_->onContentObject(
- std::move(interest_ptr->getInterest()),
- std::move(content_object));
- }
-
- } else {
- TRANSPORT_LOGW(
- "Content already received (interest already satisfied).");
+ if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) {
+ interest_ptr->on_content_object_callback_(
+ std::move(interest_ptr->getInterest()),
+ std::move(content_object));
+ } else if (consumer_callback_) {
+ consumer_callback_->onContentObject(
+ std::move(interest_ptr->getInterest()),
+ std::move(content_object));
}
} else {
TRANSPORT_LOGW("No pending interests for current content (%s)",
@@ -349,9 +506,8 @@ class Portal {
ForwarderInt forwarder_interface_;
std::list<Prefix> served_namespaces_;
-
- ip_address_t locator4_;
- ip_address_t locator6_;
+ portal_details::HandlerMemory async_callback_memory_;
+ portal_details::Pool packet_pool_;
};
} // end namespace core