/* * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace libconfig { class Setting; } namespace transport { namespace core { namespace portal_details { static constexpr uint32_t pit_size = 1024; class HandlerMemory { #ifdef __vpp__ public: HandlerMemory() {} HandlerMemory(const HandlerMemory &) = delete; HandlerMemory &operator=(const HandlerMemory &) = delete; void *allocate(std::size_t /* size */) { return utils::FixedBlockAllocator<128, 8192>::getInstance().allocateBlock(); } void deallocate(void *pointer) { utils::FixedBlockAllocator<128, 8192>::getInstance().deallocateBlock( pointer); } #else public: HandlerMemory() {} HandlerMemory(const HandlerMemory &) = delete; HandlerMemory &operator=(const HandlerMemory &) = delete; void *allocate(std::size_t size) { return ::operator new(size); } void deallocate(void *pointer) { ::operator delete(pointer); } #endif }; // The allocator to be associated with the handler objects. This allocator only // needs to satisfy the C++11 minimal allocator requirements. template class HandlerAllocator { public: using value_type = T; explicit HandlerAllocator(HandlerMemory &mem) : memory_(mem) {} template HandlerAllocator(const HandlerAllocator &other) noexcept : memory_(other.memory_) {} bool operator==(const HandlerAllocator &other) const noexcept { return &memory_ == &other.memory_; } bool operator!=(const HandlerAllocator &other) const noexcept { return &memory_ != &other.memory_; } T *allocate(std::size_t n) const { return static_cast(memory_.allocate(sizeof(T) * n)); } void deallocate(T *p, std::size_t /*n*/) const { return memory_.deallocate(p); } private: template friend class HandlerAllocator; // The underlying memory. HandlerMemory &memory_; }; // Wrapper class template for handler objects to allow handler memory // allocation to be customised. The allocator_type type and get_allocator() // member function are used by the asynchronous operations to obtain the // allocator. Calls to operator() are forwarded to the encapsulated handler. template class CustomAllocatorHandler { public: using allocator_type = HandlerAllocator; CustomAllocatorHandler(HandlerMemory &m, Handler h) : memory_(m), handler_(h) {} allocator_type get_allocator() const noexcept { return allocator_type(memory_); } template void operator()(Args &&...args) { handler_(std::forward(args)...); } private: HandlerMemory &memory_; Handler handler_; }; // Helper function to wrap a handler object to add custom allocation. template inline CustomAllocatorHandler makeCustomAllocatorHandler( HandlerMemory &m, Handler h) { return CustomAllocatorHandler(m, h); } } // namespace portal_details class PortalConfiguration; using PendingInterestHashTable = std::unordered_map; /** * Portal is a opaque class which is used for sending/receiving interest/data * packets over multiple kind of io_modules. The io_module itself is an external * module loaded at runtime. * * The tasks performed by portal are the following: * - Sending/Receiving Interest packets * - Sending/Receiving Data packets * - Set timers (one per interest), in order to trigger events if an interest is * not satisfied * - Register a producer prefix to the local forwarder * * The way of working of portal is event-based, which means that data and * interests are sent/received in a asynchronous manner, and the notifications * are performed through callbacks. * * The portal class is not thread safe, appropriate locking is required by the * users of this class. */ class Portal : public ::utils::NonCopyable, public std::enable_shared_from_this { private: Portal() : Portal(GlobalWorkers::getInstance().getWorker()) {} Portal(::utils::EventThread &worker) : io_module_(nullptr), worker_(worker), app_name_("libtransport_application"), transport_callback_(nullptr), is_consumer_(false) {} public: using TransportCallback = interface::Portal::TransportCallback; friend class PortalConfiguration; static std::shared_ptr createShared() { return std::shared_ptr(new Portal()); } static std::shared_ptr createShared(::utils::EventThread &worker) { return std::shared_ptr(new Portal(worker)); } bool isConnected() const { return io_module_.get() != nullptr; } /** * Set the transport callback. Must be called from the same worker thread. * * @param consumer_callback - The pointer to the TransportCallback object. */ void registerTransportCallback(TransportCallback *transport_callback) { DCHECK(std::this_thread::get_id() == worker_.getThreadId()); transport_callback_ = transport_callback; } /** * Unset the consumer callback. Must be called from the same worker thread. */ void unregisterTransportCallback() { DCHECK(std::this_thread::get_id() == worker_.getThreadId()); transport_callback_ = nullptr; } /** * Specify the output interface to use. This method will be useful in a * future scenario where the library will be able to forward packets without * connecting to a local forwarder. Now it is not used. * * @param output_interface - The output interface to use for * forwarding/receiving packets. */ void setOutputInterface(const std::string &output_interface) { if (io_module_) { io_module_->setOutputInterface(output_interface); } } /** * Connect the transport to the local hicn forwarder. * * @param is_consumer - Boolean specifying if the application on top of * portal is a consumer or a producer. */ void connect(bool is_consumer = true) { if (isConnected()) { return; } worker_.addAndWaitForExecution([this, is_consumer]() { if (!io_module_) { pending_interest_hash_table_.reserve(portal_details::pit_size); io_module_.reset(IoModule::load(io_module_path_.c_str())); CHECK(io_module_); std::weak_ptr self(shared_from_this()); io_module_->init( [self](Connector *c, const std::vector &buffers, const std::error_code &ec) { if (auto ptr = self.lock()) { ptr->processIncomingMessages(c, buffers, ec); } }, [self](Connector *c, const std::error_code &ec) { if (!ec) { return; } auto ptr = self.lock(); if (ptr && ptr->transport_callback_) { ptr->transport_callback_->onError(ec); } }, [self]([[maybe_unused]] Connector *c) { /* Nothing to do here */ }, [self](Connector *c, const std::error_code &ec) { auto ptr = self.lock(); if (ptr) { if (ec && ptr->transport_callback_) { ptr->transport_callback_->onError(ec); return; } ptr->setLocalRoutes(); } }, worker_.getIoService(), app_name_); io_module_->connect(is_consumer); is_consumer_ = is_consumer; } }); } /** * Destructor. */ ~Portal() { killConnection(); } /** * Check if there is already a pending interest for a given name. * * @param name - The interest name. */ bool interestIsPending(const Name &name) { DCHECK(std::this_thread::get_id() == worker_.getThreadId()); auto it = pending_interest_hash_table_.find(getHash(name)); if (it != pending_interest_hash_table_.end()) { return true; } return false; } /** * @brief Add interest to PIT * */ void addInterestToPIT( const Interest::Ptr &interest, uint32_t lifetime, OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, OnInterestTimeoutCallback &&on_interest_timeout_callback = UNSET_CALLBACK) { uint32_t initial_hash = interest->getName().getHash32(false); auto hash = initial_hash + interest->getName().getSuffix(); uint32_t seq = interest->getName().getSuffix(); const uint32_t *suffix = interest->firstSuffix() != nullptr ? interest->firstSuffix() + 1 : nullptr; auto n_suffixes = interest->numberOfSuffixes() > 0 ? interest->numberOfSuffixes() - 1 : 0; uint32_t counter = 0; // Set timers do { auto pend_int = pending_interest_hash_table_.try_emplace( hash, worker_.getIoService(), interest); PendingInterest &pending_interest = pend_int.first->second; if (!pend_int.second) { // element was already in map pend_int.first->second.cancelTimer(); pending_interest.setInterest(interest); } pending_interest.setOnContentObjectCallback( std::move(on_content_object_callback)); pending_interest.setOnTimeoutCallback( std::move(on_interest_timeout_callback)); if (is_consumer_) { auto self = weak_from_this(); pending_interest.startCountdown( lifetime, portal_details::makeCustomAllocatorHandler( async_callback_memory_, [self, hash, seq](const std::error_code &ec) { if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { return; } if (auto ptr = self.lock()) { ptr->timerHandler(hash, seq); } })); } if (suffix) { hash = initial_hash + *suffix; seq = *suffix; suffix++; } } while (counter++ < n_suffixes); } void matchContentObjectInPIT(ContentObject &content_object) { uint32_t hash = getHash(content_object.getName()); auto it = pending_interest_hash_table_.find(hash); if (it != pending_interest_hash_table_.end()) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest."; PendingInterest &pend_interest = it->second; pend_interest.cancelTimer(); auto _int = pend_interest.getInterest(); auto callback = pend_interest.getOnDataCallback(); pending_interest_hash_table_.erase(it); if (is_consumer_) { // Send object is for the app if (callback != UNSET_CALLBACK) { callback(*_int, content_object); } else if (transport_callback_) { transport_callback_->onContentObject(*_int, content_object); } } else { // Send content object to the network io_module_->send(content_object); } } else if (is_consumer_) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "No interest pending for received content object."; } } /** * Send an interest through to the local forwarder. * * @param interest - The pointer to the interest. The ownership of the * interest is transferred by the caller to portal. * * @param on_content_object_callback - If the caller wishes to use a * different callback to be called for this interest, it can set this * parameter. Otherwise ConsumerCallback::onContentObject will be used. * * @param on_interest_timeout_callback - If the caller wishes to use a * different callback to be called for this interest, it can set this * parameter. Otherwise ConsumerCallback::onTimeout will be used. */ void sendInterest( Interest::Ptr &interest, uint32_t lifetime, OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, OnInterestTimeoutCallback &&on_interest_timeout_callback = UNSET_CALLBACK) { DCHECK(std::this_thread::get_id() == worker_.getThreadId()); addInterestToPIT(interest, lifetime, std::move(on_content_object_callback), std::move(on_interest_timeout_callback)); interest->serializeSuffixes(); io_module_->send(*interest); } /** * Handler fot the timer set when the interest is sent. * * @param ec - Error code which says whether the timer expired or has been * canceled upon data packet reception. * * @param hash - The index of the interest in the pending interest hash * table. */ void timerHandler(uint32_t hash, uint32_t seq) { PendingInterestHashTable::iterator it = pending_interest_hash_table_.find(hash); if (it != pending_interest_hash_table_.end()) { PendingInterest &pend_interest = it->second; auto _int = pend_interest.getInterest(); auto callback = pend_interest.getOnTimeoutCallback(); pending_interest_hash_table_.erase(it); Name &name = const_cast(_int->getName()); name.setSuffix(seq); if (callback != UNSET_CALLBACK) { callback(_int, name); } else if (transport_callback_) { transport_callback_->onTimeout(_int, name); } } } /** * Send a data packet to the local forwarder. * * @param content_object - The data packet. */ void sendContentObject(ContentObject &content_object) { DCHECK(io_module_); DCHECK(std::this_thread::get_id() == worker_.getThreadId()); matchContentObjectInPIT(content_object); } /** * Disconnect the transport from the local forwarder. */ void killConnection() { if (TRANSPORT_EXPECT_TRUE(io_module_ != nullptr)) { io_module_->closeConnection(); } } /** * Clear the pending interest hash table. */ void clear() { worker_.tryRunHandlerNow([self{shared_from_this()}]() { self->doClear(); }); } /** * Get a reference to the io_service object. */ utils::EventThread &getThread() { return worker_; } /** * Register a route to the local forwarder. */ void registerRoute(const Prefix &prefix) { std::weak_ptr self = shared_from_this(); worker_.tryRunHandlerNow([self, prefix]() { if (auto ptr = self.lock()) { auto ret = ptr->served_namespaces_.insert(prefix); if (ret.second && ptr->io_module_ && ptr->io_module_->isConnected()) { ptr->io_module_->registerRoute(prefix); } } }); } /** * Send a MAP-Me update to traverse NATs. */ TRANSPORT_ALWAYS_INLINE void sendMapme() { if (io_module_->isConnected()) { io_module_->sendMapme(); } } /** * set forwarding strategy */ TRANSPORT_ALWAYS_INLINE void setForwardingStrategy(Prefix &prefix, std::string &strategy) { if (io_module_->isConnected()) { io_module_->setForwardingStrategy(prefix, strategy); } } /** * Check if the transport is connected to a forwarder or not */ bool isConnectedToFwd() { std::string mod = io_module_path_.substr(0, io_module_path_.find(".")); if (mod == "forwarder_module") return false; return true; } auto &getServedNamespaces() { return served_namespaces_; } private: /** * Compute name hash */ uint32_t getHash(const Name &name) { return name.getHash32(false) + name.getSuffix(); } /** * Clear the pending interest hash table. */ void doClear() { for (auto &pend_interest : pending_interest_hash_table_) { pend_interest.second.cancelTimer(); } pending_interest_hash_table_.clear(); } void dumpPIT() { std::vector sorted_elements; for (const auto &[key, value] : pending_interest_hash_table_) { sorted_elements.push_back(value.getInterestReference()->getName()); } std::sort(sorted_elements.begin(), sorted_elements.end(), [](const Name &a, const Name &b) { return a.getSuffix() < b.getSuffix(); }); for (auto &elt : sorted_elements) { LOG(INFO) << elt; } } /** * Callback called by the underlying connector upon reception of a packet * from the local forwarder. * * @param packet_buffer - The bytes of the packet. */ void processIncomingMessages(Connector *c, const std::vector &buffers, const std::error_code &ec) { if (!transport_callback_) { return; } if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { // Error receiving from underlying infra. if (transport_callback_) { transport_callback_->onError(ec); } return; } for (auto &buffer_ptr : buffers) { auto &buffer = *buffer_ptr; if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer))) { processControlMessage(buffer); return; } // The buffer is a base class for an interest or a content object Packet &packet_buffer = static_cast(buffer); switch (packet_buffer.getType()) { case HICN_PACKET_TYPE_INTEREST: if (!is_consumer_) { processInterest(static_cast(packet_buffer)); } else { LOG(ERROR) << "Received an Interest packet with name " << packet_buffer.getName() << " in a consumer transport. Ignoring it."; } break; case HICN_PACKET_TYPE_DATA: if (is_consumer_) { processContentObject(static_cast(packet_buffer)); } else { LOG(ERROR) << "Received a Data packet with name " << packet_buffer.getName() << " in a producer transport. Ignoring it."; } break; default: LOG(ERROR) << "Received not supported packet. Ignoring it."; break; } } } /** * Callback called by the transport upon connection to the local forwarder. * It register the prefixes in the served_namespaces_ list to the local * forwarder. */ void setLocalRoutes() { DCHECK(io_module_); DCHECK(io_module_->isConnected()); DCHECK(std::this_thread::get_id() == worker_.getThreadId()); for (auto &prefix : served_namespaces_) { io_module_->registerRoute(prefix); } } void processInterest(Interest &interest) { // Interest for a producer DLOG_IF(INFO, VLOG_IS_ON(3)) << "processInterest " << interest.getName(); // Save interest in PIT interest.deserializeSuffixes(); addInterestToPIT(interest.shared_from_this(), interest.getLifetime()); if (TRANSPORT_EXPECT_TRUE(transport_callback_ != nullptr)) { transport_callback_->onInterest(interest); } } /** * Process a content object: * - Check if the data packet was effectively requested by portal * - Delete its timer * - Pass packet to application * * @param content_object - The data packet */ void processContentObject(ContentObject &content_object) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "processContentObject " << content_object.getName(); matchContentObjectInPIT(content_object); } /** * Process a control message. Control messages are different depending on * the connector, then the forwarder_interface will do the job of * understanding them. */ void processControlMessage(utils::MemBuf &packet_buffer) { io_module_->processControlMessageReply(packet_buffer); } private: portal_details::HandlerMemory async_callback_memory_; std::unique_ptr io_module_; ::utils::EventThread &worker_; std::string app_name_; PendingInterestHashTable pending_interest_hash_table_; std::set served_namespaces_; TransportCallback *transport_callback_; bool is_consumer_; private: static std::string defaultIoModule(); static void parseIoModuleConfiguration(const libconfig::Setting &io_config, std::error_code &ec); static void getModuleConfiguration( interface::global_config::ConfigurationObject &conf, std::error_code &ec); static void setModuleConfiguration( const interface::global_config::ConfigurationObject &conf, std::error_code &ec); static interface::global_config::IoModuleConfiguration conf_; static std::string io_module_path_; }; } // namespace core } // end namespace transport