aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/core/portal.h
diff options
context:
space:
mode:
authorLuca Muscariello <muscariello@ieee.org>2021-04-15 09:05:46 +0200
committerMauro Sardara <msardara@cisco.com>2021-04-15 16:36:16 +0200
commite92e9e839ca2cf42b56322b2489ccc0d8bf767af (patch)
tree9f1647c83a87fbf982ae329e800af25dbfb226b5 /libtransport/src/core/portal.h
parent3e541d7c947cc2f9db145f26c9274efd29a6fb56 (diff)
[HICN-690] Transport Library Major Refactory
The current patch provides a major refactory of the transportlibrary. A summary of the different components that underwent major modifications is reported below. - Transport protocol updates The hierarchy of classes has been optimized to have common transport services across different transport protocols. This can allow to customize a transport protocol with new features. - A new real-time communication protocol The RTC protocol has been optimized in terms of algorithms to reduce consumer-producer synchronization latency. - A novel socket API The API has been reworked to be easier to consumer but also to have a more efficient integration in L4 proxies. - Several performance improvements A large number of performance improvements have been included in particular to make the entire stack zero-copy and optimize cache miss. - New memory buffer framework Memory management has been reworked entirely to provide a more efficient infra with a richer API. Buffers are now allocated in blocks and a single buffer holds the memory for (1) the shared_ptr control block, (2) the metadata of the packet (e.g. name, pointer to other buffers if buffer is chained and relevant offsets), and (3) the packet itself, as it is sent/received over the network. - A new slab allocator Dynamic memory allocation is now managed by a novel slab allocator that is optimised for packet processing and connection management. Memory is organized in pools of blocks all of the same size which are used during the processing of outgoing/incoming packets. When a memory block Is allocated is always taken from a global pool and when it is deallocated is returned to the pool, thus avoiding the cost of any heap allocation in the data path. - New transport connectors Consumer and producer end-points can communication either using an hicn packet forwarder or with direct connector based on shared memories or sockets. The usage of transport connectors typically for unit and funcitonal testing but may have additional usage. - Support for FEC/ECC for transport services FEC/ECC via reed solomon is supported by default and made available to transport services as a modular component. Reed solomon block codes is a default FEC model that can be replaced in a modular way by many other codes including RLNC not avaiable in this distribution. The current FEC framework support variable size padding and efficiently makes use of the infra memory buffers to avoid additiona copies. - Secure transport framework for signature computation and verification Crypto support is nativelty used in hICN for integrity and authenticity. Novel support that includes RTC has been implemented and made modular and reusable acrosso different transport protocols. - TLS - Transport layer security over hicn Point to point confidentiality is provided by integrating TLS on top of hICN reliable and non-reliable transport. The integration is common and makes a different use of the TLS record. - MLS - Messaging layer security over hicn MLS integration on top of hICN is made by using the MLSPP implemetation open sourced by Cisco. We have included instrumentation tools to deploy performance and functional tests of groups of end-points. - Android support The overall code has been heavily tested in Android environments and has received heavy lifting to better run natively in recent Android OS. Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> Change-Id: If477ba2fa686e6f47bdf96307ac60938766aef69 Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/core/portal.h')
-rw-r--r--libtransport/src/core/portal.h285
1 files changed, 158 insertions, 127 deletions
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h
index b63eab3af..59254cf7b 100644
--- a/libtransport/src/core/portal.h
+++ b/libtransport/src/core/portal.h
@@ -15,24 +15,20 @@
#pragma once
-#include <core/forwarder_interface.h>
#include <core/pending_interest.h>
-#include <core/udp_socket_connector.h>
#include <hicn/transport/config.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/interest.h>
+#include <hicn/transport/core/io_module.h>
#include <hicn/transport/core/name.h>
#include <hicn/transport/core/prefix.h>
#include <hicn/transport/errors/errors.h>
+#include <hicn/transport/interfaces/global_conf_interface.h>
#include <hicn/transport/interfaces/portal.h>
#include <hicn/transport/portability/portability.h>
#include <hicn/transport/utils/fixed_block_allocator.h>
#include <hicn/transport/utils/log.h>
-#ifdef __vpp__
-#include <core/memif_connector.h>
-#endif
-
#include <asio.hpp>
#include <asio/steady_timer.hpp>
#include <future>
@@ -40,17 +36,19 @@
#include <queue>
#include <unordered_map>
+namespace libconfig {
+class Setting;
+}
+
namespace transport {
namespace core {
namespace portal_details {
-static constexpr uint32_t pool_size = 2048;
+static constexpr uint32_t pit_size = 1024;
class HandlerMemory {
#ifdef __vpp__
- static constexpr std::size_t memory_size = 1024 * 1024;
-
public:
HandlerMemory() {}
@@ -58,12 +56,11 @@ class HandlerMemory {
HandlerMemory &operator=(const HandlerMemory &) = delete;
TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
- return utils::FixedBlockAllocator<128, 4096>::getInstance()
- ->allocateBlock();
+ return utils::FixedBlockAllocator<128, 8192>::getInstance().allocateBlock();
}
TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
- utils::FixedBlockAllocator<128, 4096>::getInstance()->deallocateBlock(
+ utils::FixedBlockAllocator<128, 8192>::getInstance().deallocateBlock(
pointer);
}
#else
@@ -159,33 +156,16 @@ class Pool {
public:
Pool(asio::io_service &io_service) : io_service_(io_service) {
increasePendingInterestPool();
- increaseInterestPool();
- increaseContentObjectPool();
}
TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() {
// Create pool of pending interests to reuse
- for (uint32_t i = 0; i < pool_size; i++) {
+ for (uint32_t i = 0; i < pit_size; i++) {
pending_interests_pool_.add(new PendingInterest(
Interest::Ptr(nullptr),
std::make_unique<asio::steady_timer>(io_service_)));
}
}
-
- TRANSPORT_ALWAYS_INLINE void increaseInterestPool() {
- // Create pool of interests to reuse
- for (uint32_t i = 0; i < pool_size; i++) {
- interest_pool_.add(new Interest());
- }
- }
-
- TRANSPORT_ALWAYS_INLINE void increaseContentObjectPool() {
- // Create pool of content object to reuse
- for (uint32_t i = 0; i < pool_size; i++) {
- content_object_pool_.add(new ContentObject());
- }
- }
-
PendingInterest::Ptr getPendingInterest() {
auto res = pending_interests_pool_.get();
while (TRANSPORT_EXPECT_FALSE(!res.first)) {
@@ -196,35 +176,15 @@ class Pool {
return std::move(res.second);
}
- TRANSPORT_ALWAYS_INLINE ContentObject::Ptr getContentObject() {
- auto res = content_object_pool_.get();
- while (TRANSPORT_EXPECT_FALSE(!res.first)) {
- increaseContentObjectPool();
- res = content_object_pool_.get();
- }
-
- return std::move(res.second);
- }
-
- TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() {
- auto res = interest_pool_.get();
- while (TRANSPORT_EXPECT_FALSE(!res.first)) {
- increaseInterestPool();
- res = interest_pool_.get();
- }
-
- return std::move(res.second);
- }
-
private:
utils::ObjectPool<PendingInterest> pending_interests_pool_;
- utils::ObjectPool<ContentObject> content_object_pool_;
- utils::ObjectPool<Interest> interest_pool_;
asio::io_service &io_service_;
};
} // namespace portal_details
+class PortalConfiguration;
+
using PendingInterestHashTable =
std::unordered_map<uint32_t, PendingInterest::Ptr>;
@@ -250,32 +210,32 @@ using interface::BindConfig;
* The portal class is not thread safe, appropriate locking is required by the
* users of this class.
*/
-template <typename ForwarderInt>
-class Portal {
- static_assert(
- std::is_base_of<ForwarderInterface<ForwarderInt,
- typename ForwarderInt::ConnectorType>,
- ForwarderInt>::value,
- "ForwarderInt must inherit from ForwarderInterface!");
+class Portal {
public:
using ConsumerCallback = interface::Portal::ConsumerCallback;
using ProducerCallback = interface::Portal::ProducerCallback;
+ friend class PortalConfiguration;
+
Portal() : Portal(internal_io_service_) {}
Portal(asio::io_service &io_service)
- : io_service_(io_service),
+ : io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }),
+ io_service_(io_service),
packet_pool_(io_service),
app_name_("libtransport_application"),
consumer_callback_(nullptr),
producer_callback_(nullptr),
- connector_(std::bind(&Portal::processIncomingMessages, this,
- std::placeholders::_1),
- std::bind(&Portal::setLocalRoutes, this), io_service_,
- app_name_),
- forwarder_interface_(connector_) {}
-
+ is_consumer_(false) {
+ /**
+ * This workaroung allows to initialize memory for packet buffers *before*
+ * any static variables that may be initialized in the io_modules. In this
+ * way static variables in modules will be destroyed before the packet
+ * memory.
+ */
+ PacketManager<>::getInstance();
+ }
/**
* Set the consumer callback.
*
@@ -304,7 +264,7 @@ class Portal {
*/
TRANSPORT_ALWAYS_INLINE void setOutputInterface(
const std::string &output_interface) {
- forwarder_interface_.setOutputInterface(output_interface);
+ io_module_->setOutputInterface(output_interface);
}
/**
@@ -314,8 +274,19 @@ class Portal {
* is a consumer or a producer.
*/
TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) {
- pending_interest_hash_table_.reserve(portal_details::pool_size);
- forwarder_interface_.connect(is_consumer);
+ if (!io_module_) {
+ pending_interest_hash_table_.reserve(portal_details::pit_size);
+ io_module_.reset(IoModule::load(io_module_path_.c_str()));
+ assert(io_module_);
+
+ io_module_->init(std::bind(&Portal::processIncomingMessages, this,
+ std::placeholders::_1, std::placeholders::_2,
+ std::placeholders::_3),
+ std::bind(&Portal::setLocalRoutes, this), io_service_,
+ app_name_);
+ io_module_->connect(is_consumer);
+ is_consumer_ = is_consumer;
+ }
}
/**
@@ -324,13 +295,19 @@ class Portal {
~Portal() { killConnection(); }
/**
+ * Compute name hash
+ */
+ TRANSPORT_ALWAYS_INLINE uint32_t getHash(const Name &name) {
+ return name.getHash32() + name.getSuffix();
+ }
+
+ /**
* Check if there is already a pending interest for a given name.
*
* @param name - The interest name.
*/
TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
- auto it =
- pending_interest_hash_table_.find(name.getHash32() + name.getSuffix());
+ auto it = pending_interest_hash_table_.find(getHash(name));
if (it != pending_interest_hash_table_.end()) {
return true;
}
@@ -357,31 +334,46 @@ class Portal {
OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
OnInterestTimeoutCallback &&on_interest_timeout_callback =
UNSET_CALLBACK) {
- uint32_t hash =
- interest->getName().getHash32() + interest->getName().getSuffix();
// Send it
- forwarder_interface_.send(*interest);
-
- auto pending_interest = packet_pool_.getPendingInterest();
- pending_interest->setInterest(std::move(interest));
- pending_interest->setOnContentObjectCallback(
- std::move(on_content_object_callback));
- pending_interest->setOnTimeoutCallback(
- std::move(on_interest_timeout_callback));
- pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler(
- async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler,
- this, std::placeholders::_1, hash)));
+ interest->encodeSuffixes();
+ io_module_->send(*interest);
+
+ uint32_t initial_hash = interest->getName().getHash32();
+ auto hash = initial_hash + interest->getName().getSuffix();
+ uint32_t *suffix = interest->firstSuffix();
+ auto n_suffixes = interest->numberOfSuffixes();
+ uint32_t counter = 0;
+ // Set timers
+ do {
+ auto pending_interest = packet_pool_.getPendingInterest();
+ pending_interest->setInterest(std::move(interest));
+ pending_interest->setOnContentObjectCallback(
+ std::move(on_content_object_callback));
+ pending_interest->setOnTimeoutCallback(
+ std::move(on_interest_timeout_callback));
+
+ pending_interest->startCountdown(
+ portal_details::makeCustomAllocatorHandler(
+ async_callback_memory_, std::bind(&Portal::timerHandler, this,
+ std::placeholders::_1, hash)));
+
+ auto it = pending_interest_hash_table_.find(hash);
+ if (it != pending_interest_hash_table_.end()) {
+ it->second->cancelTimer();
- auto it = pending_interest_hash_table_.find(hash);
- if (it != pending_interest_hash_table_.end()) {
- it->second->cancelTimer();
+ // Get reference to interest packet in order to have it destroyed.
+ auto _int = it->second->getInterest();
+ it->second = std::move(pending_interest);
+ } else {
+ pending_interest_hash_table_[hash] = std::move(pending_interest);
+ }
- // Get reference to interest packet in order to have it destroyed.
- auto _int = it->second->getInterest();
- it->second = std::move(pending_interest);
- } else {
- pending_interest_hash_table_[hash] = std::move(pending_interest);
- }
+ if (suffix) {
+ hash = initial_hash + *suffix;
+ suffix++;
+ }
+
+ } while (counter++ < n_suffixes);
}
/**
@@ -423,9 +415,9 @@ class Portal {
* @param config - The configuration for the local forwarder binding.
*/
TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) {
- forwarder_interface_.setContentStoreSize(config.csReserved());
+ assert(io_module_);
+ io_module_->setContentStoreSize(config.csReserved());
served_namespaces_.push_back(config.prefix());
-
setLocalRoutes();
}
@@ -460,7 +452,7 @@ class Portal {
*/
TRANSPORT_ALWAYS_INLINE void sendContentObject(
ContentObject &content_object) {
- forwarder_interface_.send(content_object);
+ io_module_->send(content_object);
}
/**
@@ -482,7 +474,7 @@ class Portal {
* Disconnect the transport from the local forwarder.
*/
TRANSPORT_ALWAYS_INLINE void killConnection() {
- forwarder_interface_.closeConnection();
+ io_module_->closeConnection();
}
/**
@@ -497,6 +489,17 @@ class Portal {
}
/**
+ * Remove one pending interest.
+ */
+ TRANSPORT_ALWAYS_INLINE void clearOne(const Name &name) {
+ if (!io_service_.stopped()) {
+ io_service_.dispatch(std::bind(&Portal::doClearOne, this, name));
+ } else {
+ doClearOne(name);
+ }
+ }
+
+ /**
* Get a reference to the io_service object.
*/
TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() {
@@ -508,8 +511,8 @@ class Portal {
*/
TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) {
served_namespaces_.push_back(prefix);
- if (connector_.isConnected()) {
- forwarder_interface_.registerRoute(prefix);
+ if (io_module_->isConnected()) {
+ io_module_->registerRoute(prefix);
}
}
@@ -530,36 +533,49 @@ class Portal {
}
/**
+ * Remove one pending interest.
+ */
+ TRANSPORT_ALWAYS_INLINE void doClearOne(const Name &name) {
+ auto it = pending_interest_hash_table_.find(getHash(name));
+
+ if (it != pending_interest_hash_table_.end()) {
+ it->second->cancelTimer();
+
+ // Get interest packet from pending interest and do nothing with it. It
+ // will get destroyed as it goes out of scope.
+ auto _int = it->second->getInterest();
+
+ pending_interest_hash_table_.erase(it);
+ }
+ }
+
+ /**
* Callback called by the underlying connector upon reception of a packet from
* the local forwarder.
*
* @param packet_buffer - The bytes of the packet.
*/
TRANSPORT_ALWAYS_INLINE void processIncomingMessages(
- Packet::MemBufPtr &&packet_buffer) {
+ Connector *c, utils::MemBuf &buffer, const std::error_code &ec) {
bool is_stopped = io_service_.stopped();
if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
return;
}
- if (TRANSPORT_EXPECT_FALSE(
- ForwarderInt::isControlMessage(packet_buffer->data()))) {
- processControlMessage(std::move(packet_buffer));
+ if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer.data()))) {
+ processControlMessage(buffer);
return;
}
- Packet::Format format = Packet::getFormatFromBuffer(
- packet_buffer->data(), packet_buffer->length());
+ // The buffer is a base class for an interest or a content object
+ Packet &packet_buffer = static_cast<Packet &>(buffer);
+ auto format = packet_buffer.getFormat();
if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) {
- if (!Packet::isInterest(packet_buffer->data())) {
- auto content_object = packet_pool_.getContentObject();
- content_object->replace(std::move(packet_buffer));
- processContentObject(std::move(content_object));
+ if (is_consumer_) {
+ processContentObject(static_cast<ContentObject &>(packet_buffer));
} else {
- auto interest = packet_pool_.getInterest();
- interest->replace(std::move(packet_buffer));
- processInterest(std::move(interest));
+ processInterest(static_cast<Interest &>(packet_buffer));
}
} else {
TRANSPORT_LOGE("Received not supported packet. Ignoring it.");
@@ -573,16 +589,16 @@ class Portal {
*/
TRANSPORT_ALWAYS_INLINE void setLocalRoutes() {
for (auto &prefix : served_namespaces_) {
- if (connector_.isConnected()) {
- forwarder_interface_.registerRoute(prefix);
+ if (io_module_->isConnected()) {
+ io_module_->registerRoute(prefix);
}
}
}
- TRANSPORT_ALWAYS_INLINE void processInterest(Interest::Ptr &&interest) {
+ TRANSPORT_ALWAYS_INLINE void processInterest(Interest &interest) {
// Interest for a producer
if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) {
- producer_callback_->onInterest(std::move(interest));
+ producer_callback_->onInterest(interest);
}
}
@@ -595,24 +611,27 @@ class Portal {
* @param content_object - The data packet
*/
TRANSPORT_ALWAYS_INLINE void processContentObject(
- ContentObject::Ptr &&content_object) {
- uint32_t hash = content_object->getName().getHash32() +
- content_object->getName().getSuffix();
+ ContentObject &content_object) {
+ TRANSPORT_LOGD("processContentObject %s",
+ content_object.getName().toString().c_str());
+ uint32_t hash = getHash(content_object.getName());
auto it = pending_interest_hash_table_.find(hash);
if (it != pending_interest_hash_table_.end()) {
+ TRANSPORT_LOGD("Found pending interest.");
+
PendingInterest::Ptr interest_ptr = std::move(it->second);
pending_interest_hash_table_.erase(it);
interest_ptr->cancelTimer();
auto _int = interest_ptr->getInterest();
if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) {
- interest_ptr->on_content_object_callback_(std::move(_int),
- std::move(content_object));
+ interest_ptr->on_content_object_callback_(*_int, content_object);
} else if (consumer_callback_) {
- consumer_callback_->onContentObject(std::move(_int),
- std::move(content_object));
+ consumer_callback_->onContentObject(*_int, content_object);
}
+ } else {
+ TRANSPORT_LOGD("No interest pending for received content object.");
}
}
@@ -622,12 +641,13 @@ class Portal {
* them.
*/
TRANSPORT_ALWAYS_INLINE void processControlMessage(
- Packet::MemBufPtr &&packet_buffer) {
- forwarder_interface_.processControlMessageReply(std::move(packet_buffer));
+ utils::MemBuf &packet_buffer) {
+ io_module_->processControlMessageReply(packet_buffer);
}
private:
portal_details::HandlerMemory async_callback_memory_;
+ std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_;
asio::io_service &io_service_;
asio::io_service internal_io_service_;
@@ -641,8 +661,19 @@ class Portal {
ConsumerCallback *consumer_callback_;
ProducerCallback *producer_callback_;
- typename ForwarderInt::ConnectorType connector_;
- ForwarderInt forwarder_interface_;
+ bool is_consumer_;
+
+ private:
+ static std::string defaultIoModule();
+ static void parseIoModuleConfiguration(const libconfig::Setting &io_config,
+ std::error_code &ec);
+ static void getModuleConfiguration(
+ interface::global_config::ConfigurationObject &conf, std::error_code &ec);
+ static void setModuleConfiguration(
+ const interface::global_config::ConfigurationObject &conf,
+ std::error_code &ec);
+ static interface::global_config::IoModuleConfiguration conf_;
+ static std::string io_module_path_;
};
} // namespace core