aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/core/portal.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/core/portal.h')
-rw-r--r--libtransport/src/core/portal.h285
1 files changed, 158 insertions, 127 deletions
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h
index b63eab3af..59254cf7b 100644
--- a/libtransport/src/core/portal.h
+++ b/libtransport/src/core/portal.h
@@ -15,24 +15,20 @@
#pragma once
-#include <core/forwarder_interface.h>
#include <core/pending_interest.h>
-#include <core/udp_socket_connector.h>
#include <hicn/transport/config.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/interest.h>
+#include <hicn/transport/core/io_module.h>
#include <hicn/transport/core/name.h>
#include <hicn/transport/core/prefix.h>
#include <hicn/transport/errors/errors.h>
+#include <hicn/transport/interfaces/global_conf_interface.h>
#include <hicn/transport/interfaces/portal.h>
#include <hicn/transport/portability/portability.h>
#include <hicn/transport/utils/fixed_block_allocator.h>
#include <hicn/transport/utils/log.h>
-#ifdef __vpp__
-#include <core/memif_connector.h>
-#endif
-
#include <asio.hpp>
#include <asio/steady_timer.hpp>
#include <future>
@@ -40,17 +36,19 @@
#include <queue>
#include <unordered_map>
+namespace libconfig {
+class Setting;
+}
+
namespace transport {
namespace core {
namespace portal_details {
-static constexpr uint32_t pool_size = 2048;
+static constexpr uint32_t pit_size = 1024;
class HandlerMemory {
#ifdef __vpp__
- static constexpr std::size_t memory_size = 1024 * 1024;
-
public:
HandlerMemory() {}
@@ -58,12 +56,11 @@ class HandlerMemory {
HandlerMemory &operator=(const HandlerMemory &) = delete;
TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
- return utils::FixedBlockAllocator<128, 4096>::getInstance()
- ->allocateBlock();
+ return utils::FixedBlockAllocator<128, 8192>::getInstance().allocateBlock();
}
TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
- utils::FixedBlockAllocator<128, 4096>::getInstance()->deallocateBlock(
+ utils::FixedBlockAllocator<128, 8192>::getInstance().deallocateBlock(
pointer);
}
#else
@@ -159,33 +156,16 @@ class Pool {
public:
Pool(asio::io_service &io_service) : io_service_(io_service) {
increasePendingInterestPool();
- increaseInterestPool();
- increaseContentObjectPool();
}
TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() {
// Create pool of pending interests to reuse
- for (uint32_t i = 0; i < pool_size; i++) {
+ for (uint32_t i = 0; i < pit_size; i++) {
pending_interests_pool_.add(new PendingInterest(
Interest::Ptr(nullptr),
std::make_unique<asio::steady_timer>(io_service_)));
}
}
-
- TRANSPORT_ALWAYS_INLINE void increaseInterestPool() {
- // Create pool of interests to reuse
- for (uint32_t i = 0; i < pool_size; i++) {
- interest_pool_.add(new Interest());
- }
- }
-
- TRANSPORT_ALWAYS_INLINE void increaseContentObjectPool() {
- // Create pool of content object to reuse
- for (uint32_t i = 0; i < pool_size; i++) {
- content_object_pool_.add(new ContentObject());
- }
- }
-
PendingInterest::Ptr getPendingInterest() {
auto res = pending_interests_pool_.get();
while (TRANSPORT_EXPECT_FALSE(!res.first)) {
@@ -196,35 +176,15 @@ class Pool {
return std::move(res.second);
}
- TRANSPORT_ALWAYS_INLINE ContentObject::Ptr getContentObject() {
- auto res = content_object_pool_.get();
- while (TRANSPORT_EXPECT_FALSE(!res.first)) {
- increaseContentObjectPool();
- res = content_object_pool_.get();
- }
-
- return std::move(res.second);
- }
-
- TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() {
- auto res = interest_pool_.get();
- while (TRANSPORT_EXPECT_FALSE(!res.first)) {
- increaseInterestPool();
- res = interest_pool_.get();
- }
-
- return std::move(res.second);
- }
-
private:
utils::ObjectPool<PendingInterest> pending_interests_pool_;
- utils::ObjectPool<ContentObject> content_object_pool_;
- utils::ObjectPool<Interest> interest_pool_;
asio::io_service &io_service_;
};
} // namespace portal_details
+class PortalConfiguration;
+
using PendingInterestHashTable =
std::unordered_map<uint32_t, PendingInterest::Ptr>;
@@ -250,32 +210,32 @@ using interface::BindConfig;
* The portal class is not thread safe, appropriate locking is required by the
* users of this class.
*/
-template <typename ForwarderInt>
-class Portal {
- static_assert(
- std::is_base_of<ForwarderInterface<ForwarderInt,
- typename ForwarderInt::ConnectorType>,
- ForwarderInt>::value,
- "ForwarderInt must inherit from ForwarderInterface!");
+class Portal {
public:
using ConsumerCallback = interface::Portal::ConsumerCallback;
using ProducerCallback = interface::Portal::ProducerCallback;
+ friend class PortalConfiguration;
+
Portal() : Portal(internal_io_service_) {}
Portal(asio::io_service &io_service)
- : io_service_(io_service),
+ : io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }),
+ io_service_(io_service),
packet_pool_(io_service),
app_name_("libtransport_application"),
consumer_callback_(nullptr),
producer_callback_(nullptr),
- connector_(std::bind(&Portal::processIncomingMessages, this,
- std::placeholders::_1),
- std::bind(&Portal::setLocalRoutes, this), io_service_,
- app_name_),
- forwarder_interface_(connector_) {}
-
+ is_consumer_(false) {
+ /**
+ * This workaroung allows to initialize memory for packet buffers *before*
+ * any static variables that may be initialized in the io_modules. In this
+ * way static variables in modules will be destroyed before the packet
+ * memory.
+ */
+ PacketManager<>::getInstance();
+ }
/**
* Set the consumer callback.
*
@@ -304,7 +264,7 @@ class Portal {
*/
TRANSPORT_ALWAYS_INLINE void setOutputInterface(
const std::string &output_interface) {
- forwarder_interface_.setOutputInterface(output_interface);
+ io_module_->setOutputInterface(output_interface);
}
/**
@@ -314,8 +274,19 @@ class Portal {
* is a consumer or a producer.
*/
TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) {
- pending_interest_hash_table_.reserve(portal_details::pool_size);
- forwarder_interface_.connect(is_consumer);
+ if (!io_module_) {
+ pending_interest_hash_table_.reserve(portal_details::pit_size);
+ io_module_.reset(IoModule::load(io_module_path_.c_str()));
+ assert(io_module_);
+
+ io_module_->init(std::bind(&Portal::processIncomingMessages, this,
+ std::placeholders::_1, std::placeholders::_2,
+ std::placeholders::_3),
+ std::bind(&Portal::setLocalRoutes, this), io_service_,
+ app_name_);
+ io_module_->connect(is_consumer);
+ is_consumer_ = is_consumer;
+ }
}
/**
@@ -324,13 +295,19 @@ class Portal {
~Portal() { killConnection(); }
/**
+ * Compute name hash
+ */
+ TRANSPORT_ALWAYS_INLINE uint32_t getHash(const Name &name) {
+ return name.getHash32() + name.getSuffix();
+ }
+
+ /**
* Check if there is already a pending interest for a given name.
*
* @param name - The interest name.
*/
TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
- auto it =
- pending_interest_hash_table_.find(name.getHash32() + name.getSuffix());
+ auto it = pending_interest_hash_table_.find(getHash(name));
if (it != pending_interest_hash_table_.end()) {
return true;
}
@@ -357,31 +334,46 @@ class Portal {
OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
OnInterestTimeoutCallback &&on_interest_timeout_callback =
UNSET_CALLBACK) {
- uint32_t hash =
- interest->getName().getHash32() + interest->getName().getSuffix();
// Send it
- forwarder_interface_.send(*interest);
-
- auto pending_interest = packet_pool_.getPendingInterest();
- pending_interest->setInterest(std::move(interest));
- pending_interest->setOnContentObjectCallback(
- std::move(on_content_object_callback));
- pending_interest->setOnTimeoutCallback(
- std::move(on_interest_timeout_callback));
- pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler(
- async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler,
- this, std::placeholders::_1, hash)));
+ interest->encodeSuffixes();
+ io_module_->send(*interest);
+
+ uint32_t initial_hash = interest->getName().getHash32();
+ auto hash = initial_hash + interest->getName().getSuffix();
+ uint32_t *suffix = interest->firstSuffix();
+ auto n_suffixes = interest->numberOfSuffixes();
+ uint32_t counter = 0;
+ // Set timers
+ do {
+ auto pending_interest = packet_pool_.getPendingInterest();
+ pending_interest->setInterest(std::move(interest));
+ pending_interest->setOnContentObjectCallback(
+ std::move(on_content_object_callback));
+ pending_interest->setOnTimeoutCallback(
+ std::move(on_interest_timeout_callback));
+
+ pending_interest->startCountdown(
+ portal_details::makeCustomAllocatorHandler(
+ async_callback_memory_, std::bind(&Portal::timerHandler, this,
+ std::placeholders::_1, hash)));
+
+ auto it = pending_interest_hash_table_.find(hash);
+ if (it != pending_interest_hash_table_.end()) {
+ it->second->cancelTimer();
- auto it = pending_interest_hash_table_.find(hash);
- if (it != pending_interest_hash_table_.end()) {
- it->second->cancelTimer();
+ // Get reference to interest packet in order to have it destroyed.
+ auto _int = it->second->getInterest();
+ it->second = std::move(pending_interest);
+ } else {
+ pending_interest_hash_table_[hash] = std::move(pending_interest);
+ }
- // Get reference to interest packet in order to have it destroyed.
- auto _int = it->second->getInterest();
- it->second = std::move(pending_interest);
- } else {
- pending_interest_hash_table_[hash] = std::move(pending_interest);
- }
+ if (suffix) {
+ hash = initial_hash + *suffix;
+ suffix++;
+ }
+
+ } while (counter++ < n_suffixes);
}
/**
@@ -423,9 +415,9 @@ class Portal {
* @param config - The configuration for the local forwarder binding.
*/
TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) {
- forwarder_interface_.setContentStoreSize(config.csReserved());
+ assert(io_module_);
+ io_module_->setContentStoreSize(config.csReserved());
served_namespaces_.push_back(config.prefix());
-
setLocalRoutes();
}
@@ -460,7 +452,7 @@ class Portal {
*/
TRANSPORT_ALWAYS_INLINE void sendContentObject(
ContentObject &content_object) {
- forwarder_interface_.send(content_object);
+ io_module_->send(content_object);
}
/**
@@ -482,7 +474,7 @@ class Portal {
* Disconnect the transport from the local forwarder.
*/
TRANSPORT_ALWAYS_INLINE void killConnection() {
- forwarder_interface_.closeConnection();
+ io_module_->closeConnection();
}
/**
@@ -497,6 +489,17 @@ class Portal {
}
/**
+ * Remove one pending interest.
+ */
+ TRANSPORT_ALWAYS_INLINE void clearOne(const Name &name) {
+ if (!io_service_.stopped()) {
+ io_service_.dispatch(std::bind(&Portal::doClearOne, this, name));
+ } else {
+ doClearOne(name);
+ }
+ }
+
+ /**
* Get a reference to the io_service object.
*/
TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() {
@@ -508,8 +511,8 @@ class Portal {
*/
TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) {
served_namespaces_.push_back(prefix);
- if (connector_.isConnected()) {
- forwarder_interface_.registerRoute(prefix);
+ if (io_module_->isConnected()) {
+ io_module_->registerRoute(prefix);
}
}
@@ -530,36 +533,49 @@ class Portal {
}
/**
+ * Remove one pending interest.
+ */
+ TRANSPORT_ALWAYS_INLINE void doClearOne(const Name &name) {
+ auto it = pending_interest_hash_table_.find(getHash(name));
+
+ if (it != pending_interest_hash_table_.end()) {
+ it->second->cancelTimer();
+
+ // Get interest packet from pending interest and do nothing with it. It
+ // will get destroyed as it goes out of scope.
+ auto _int = it->second->getInterest();
+
+ pending_interest_hash_table_.erase(it);
+ }
+ }
+
+ /**
* Callback called by the underlying connector upon reception of a packet from
* the local forwarder.
*
* @param packet_buffer - The bytes of the packet.
*/
TRANSPORT_ALWAYS_INLINE void processIncomingMessages(
- Packet::MemBufPtr &&packet_buffer) {
+ Connector *c, utils::MemBuf &buffer, const std::error_code &ec) {
bool is_stopped = io_service_.stopped();
if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
return;
}
- if (TRANSPORT_EXPECT_FALSE(
- ForwarderInt::isControlMessage(packet_buffer->data()))) {
- processControlMessage(std::move(packet_buffer));
+ if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer.data()))) {
+ processControlMessage(buffer);
return;
}
- Packet::Format format = Packet::getFormatFromBuffer(
- packet_buffer->data(), packet_buffer->length());
+ // The buffer is a base class for an interest or a content object
+ Packet &packet_buffer = static_cast<Packet &>(buffer);
+ auto format = packet_buffer.getFormat();
if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) {
- if (!Packet::isInterest(packet_buffer->data())) {
- auto content_object = packet_pool_.getContentObject();
- content_object->replace(std::move(packet_buffer));
- processContentObject(std::move(content_object));
+ if (is_consumer_) {
+ processContentObject(static_cast<ContentObject &>(packet_buffer));
} else {
- auto interest = packet_pool_.getInterest();
- interest->replace(std::move(packet_buffer));
- processInterest(std::move(interest));
+ processInterest(static_cast<Interest &>(packet_buffer));
}
} else {
TRANSPORT_LOGE("Received not supported packet. Ignoring it.");
@@ -573,16 +589,16 @@ class Portal {
*/
TRANSPORT_ALWAYS_INLINE void setLocalRoutes() {
for (auto &prefix : served_namespaces_) {
- if (connector_.isConnected()) {
- forwarder_interface_.registerRoute(prefix);
+ if (io_module_->isConnected()) {
+ io_module_->registerRoute(prefix);
}
}
}
- TRANSPORT_ALWAYS_INLINE void processInterest(Interest::Ptr &&interest) {
+ TRANSPORT_ALWAYS_INLINE void processInterest(Interest &interest) {
// Interest for a producer
if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) {
- producer_callback_->onInterest(std::move(interest));
+ producer_callback_->onInterest(interest);
}
}
@@ -595,24 +611,27 @@ class Portal {
* @param content_object - The data packet
*/
TRANSPORT_ALWAYS_INLINE void processContentObject(
- ContentObject::Ptr &&content_object) {
- uint32_t hash = content_object->getName().getHash32() +
- content_object->getName().getSuffix();
+ ContentObject &content_object) {
+ TRANSPORT_LOGD("processContentObject %s",
+ content_object.getName().toString().c_str());
+ uint32_t hash = getHash(content_object.getName());
auto it = pending_interest_hash_table_.find(hash);
if (it != pending_interest_hash_table_.end()) {
+ TRANSPORT_LOGD("Found pending interest.");
+
PendingInterest::Ptr interest_ptr = std::move(it->second);
pending_interest_hash_table_.erase(it);
interest_ptr->cancelTimer();
auto _int = interest_ptr->getInterest();
if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) {
- interest_ptr->on_content_object_callback_(std::move(_int),
- std::move(content_object));
+ interest_ptr->on_content_object_callback_(*_int, content_object);
} else if (consumer_callback_) {
- consumer_callback_->onContentObject(std::move(_int),
- std::move(content_object));
+ consumer_callback_->onContentObject(*_int, content_object);
}
+ } else {
+ TRANSPORT_LOGD("No interest pending for received content object.");
}
}
@@ -622,12 +641,13 @@ class Portal {
* them.
*/
TRANSPORT_ALWAYS_INLINE void processControlMessage(
- Packet::MemBufPtr &&packet_buffer) {
- forwarder_interface_.processControlMessageReply(std::move(packet_buffer));
+ utils::MemBuf &packet_buffer) {
+ io_module_->processControlMessageReply(packet_buffer);
}
private:
portal_details::HandlerMemory async_callback_memory_;
+ std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_;
asio::io_service &io_service_;
asio::io_service internal_io_service_;
@@ -641,8 +661,19 @@ class Portal {
ConsumerCallback *consumer_callback_;
ProducerCallback *producer_callback_;
- typename ForwarderInt::ConnectorType connector_;
- ForwarderInt forwarder_interface_;
+ bool is_consumer_;
+
+ private:
+ static std::string defaultIoModule();
+ static void parseIoModuleConfiguration(const libconfig::Setting &io_config,
+ std::error_code &ec);
+ static void getModuleConfiguration(
+ interface::global_config::ConfigurationObject &conf, std::error_code &ec);
+ static void setModuleConfiguration(
+ const interface::global_config::ConfigurationObject &conf,
+ std::error_code &ec);
+ static interface::global_config::IoModuleConfiguration conf_;
+ static std::string io_module_path_;
};
} // namespace core