From bac3da61644515f05663789b122554dc77549286 Mon Sep 17 00:00:00 2001 From: Luca Muscariello Date: Thu, 17 Jan 2019 13:47:57 +0100 Subject: This is the first commit of the hicn project Change-Id: I6f2544ad9b9f8891c88cc4bcce3cf19bd3cc863f Signed-off-by: Luca Muscariello --- libtransport/src/hicn/transport/core/portal.h | 343 ++++++++++++++++++++++++++ 1 file changed, 343 insertions(+) create mode 100755 libtransport/src/hicn/transport/core/portal.h (limited to 'libtransport/src/hicn/transport/core/portal.h') diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h new file mode 100755 index 000000000..88020447f --- /dev/null +++ b/libtransport/src/hicn/transport/core/portal.h @@ -0,0 +1,343 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef __vpp__ +#include +#endif + +#include +#include +#include +#include +#include + +#define UNSET_CALLBACK 0 + +namespace transport { + +namespace core { + +typedef std::unordered_map> + PendingInterestHashTable; + +template +class BasicBindConfig { + static_assert(std::is_same::value, + "Prefix must be a Prefix type."); + + const uint32_t standard_cs_reserved = 5000; + + public: + template + BasicBindConfig(T &&prefix) + : prefix_(std::forward(prefix)), + content_store_reserved_(standard_cs_reserved) {} + + template + BasicBindConfig(T &&prefix, uint32_t cs_reserved) + : prefix_(std::forward(prefix)), + content_store_reserved_(cs_reserved) {} + + TRANSPORT_ALWAYS_INLINE const PrefixType &prefix() const { return prefix_; } + + TRANSPORT_ALWAYS_INLINE uint32_t csReserved() const { + return content_store_reserved_; + } + + private: + PrefixType prefix_; + uint32_t content_store_reserved_; +}; + +using BindConfig = BasicBindConfig; + +template +class Portal { + static_assert( + std::is_base_of, + ForwarderInt>::value, + "ForwarderInt must inherit from ForwarderInterface!"); + + public: + class ConsumerCallback { + public: + virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0; + virtual void onTimeout(Interest::Ptr &&i) = 0; + }; + + class ProducerCallback { + public: + virtual void onInterest(Interest::Ptr &&i) = 0; + }; + + Portal() : Portal(internal_io_service_) { + internal_work_ = std::make_unique(io_service_); + } + + Portal(asio::io_service &io_service) + : io_service_(io_service), + is_running_(false), + app_name_("libtransport_application"), + consumer_callback_(nullptr), + producer_callback_(nullptr), + connector_(std::bind(&Portal::processIncomingMessages, this, + std::placeholders::_1), + std::bind(&Portal::setLocalRoutes, this), io_service_, + app_name_), + forwarder_interface_(connector_) {} + + void setConsumerCallback(ConsumerCallback *consumer_callback) { + consumer_callback_ = consumer_callback; + } + + void setProducerCallback(ProducerCallback *producer_callback) { + producer_callback_ = producer_callback; + } + + TRANSPORT_ALWAYS_INLINE void setOutputInterface( + const std::string &output_interface) { + forwarder_interface_.setOutputInterface(output_interface); + } + + TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { + forwarder_interface_.connect(is_consumer); + } + + ~Portal() { + connector_.close(); + stopEventsLoop(); + } + + TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { + auto it = pending_interest_hash_table_.find(name); + if (it != pending_interest_hash_table_.end()) + if (!it->second->isReceived()) return true; + + return false; + } + + TRANSPORT_ALWAYS_INLINE void sendInterest(Interest::Ptr &&interest) { + const Name name(interest->getName(), true); + + // Send it + forwarder_interface_.send(*interest); + + pending_interest_hash_table_[name] = std::make_unique( + std::move(interest), std::make_unique(io_service_)); + + pending_interest_hash_table_[name]->startCountdown( + std::bind(&Portal::timerHandler, this, + std::placeholders::_1, name)); + } + + TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, + const Name &name) { + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + if (TRANSPORT_EXPECT_TRUE(!ec)) { + std::unordered_map>::iterator it = + pending_interest_hash_table_.find(name); + if (it != pending_interest_hash_table_.end()) { + std::unique_ptr ptr = std::move(it->second); + pending_interest_hash_table_.erase(it); + + if (consumer_callback_) { + consumer_callback_->onTimeout(std::move(ptr->getInterest())); + } + } + } + } + + TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) { + connector_.enableBurst(); + forwarder_interface_.setContentStoreSize(config.csReserved()); + served_namespaces_.push_back(config.prefix()); + registerRoute(served_namespaces_.back()); + } + + TRANSPORT_ALWAYS_INLINE void runEventsLoop() { + if (io_service_.stopped()) { + io_service_.reset(); // ensure that run()/poll() will do some work + } + + is_running_ = true; + this->io_service_.run(); + is_running_ = false; + } + + TRANSPORT_ALWAYS_INLINE void runOneEvent() { + if (io_service_.stopped()) { + io_service_.reset(); // ensure that run()/poll() will do some work + } + + is_running_ = true; + this->io_service_.run_one(); + is_running_ = false; + } + + TRANSPORT_ALWAYS_INLINE void sendContentObject( + ContentObject &content_object) { + forwarder_interface_.send(content_object); + } + + TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { + is_running_ = false; + internal_work_.reset(); + + for (auto &pend_interest : pending_interest_hash_table_) { + pend_interest.second->cancelTimer(); + } + + clear(); + + io_service_.post([this]() { io_service_.stop(); }); + } + + TRANSPORT_ALWAYS_INLINE void killConnection() { connector_.close(); } + + TRANSPORT_ALWAYS_INLINE void clear() { pending_interest_hash_table_.clear();} + + TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { + return io_service_; + } + + TRANSPORT_ALWAYS_INLINE std::size_t getPITSize() { + connector_.state(); + return pending_interest_hash_table_.size(); + } + + TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) { + forwarder_interface_.registerRoute(prefix); + } + + private: + TRANSPORT_ALWAYS_INLINE void processIncomingMessages( + Packet::MemBufPtr &&packet_buffer) { + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + if (packet_buffer->data()[0] == ForwarderInt::ack_code) { + // Hicn forwarder message + processControlMessage(std::move(packet_buffer)); + return; + } + + bool is_interest = Packet::isInterest(packet_buffer->data()); + Packet::Format format = Packet::getFormatFromBuffer(packet_buffer->data()); + + if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { + if (!is_interest) { + processContentObject( + ContentObject::Ptr(new ContentObject(std::move(packet_buffer)))); + } else { + processInterest(Interest::Ptr(new Interest(std::move(packet_buffer)))); + } + } else { + TRANSPORT_LOGE("Received not supported packet. Ignoring it."); + } + } + + TRANSPORT_ALWAYS_INLINE void setLocalRoutes() { + for (auto &name : served_namespaces_) { + registerRoute(name); + } + } + + TRANSPORT_ALWAYS_INLINE void processInterest(Interest::Ptr &&interest) { + // Interest for a producer + if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) { + producer_callback_->onInterest(std::move(interest)); + } + } + + TRANSPORT_ALWAYS_INLINE void processContentObject( + ContentObject::Ptr &&content_object) { + PendingInterestHashTable::iterator it = + pending_interest_hash_table_.find(content_object->getName()); + + if (TRANSPORT_EXPECT_TRUE(it != pending_interest_hash_table_.end())) { + std::unique_ptr interest_ptr = std::move(it->second); + interest_ptr->cancelTimer(); + + if (TRANSPORT_EXPECT_TRUE(!interest_ptr->isReceived())) { + interest_ptr->setReceived(); + pending_interest_hash_table_.erase(content_object->getName()); + + if (consumer_callback_) { + consumer_callback_->onContentObject( + std::move(interest_ptr->getInterest()), + std::move(content_object)); + } + + } else { + TRANSPORT_LOGW( + "Content already received (interest already satisfied)."); + } + } else { + TRANSPORT_LOGW("No pending interests for current content (%s)", + content_object->getName().toString().c_str()); + } + } + + TRANSPORT_ALWAYS_INLINE void processControlMessage( + Packet::MemBufPtr &&packet_buffer) { + // Control message as response to the route set by a producer. + // Do nothing + } + + private: + asio::io_service &io_service_; + asio::io_service internal_io_service_; + std::unique_ptr internal_work_; + + volatile bool is_running_; + + std::string app_name_; + + PendingInterestHashTable pending_interest_hash_table_; + + ConsumerCallback *consumer_callback_; + ProducerCallback *producer_callback_; + + typename ForwarderInt::ConnectorType connector_; + ForwarderInt forwarder_interface_; + + std::list served_namespaces_; + + ip_address_t locator4_; + ip_address_t locator6_; +}; + +} // end namespace core + +} // end namespace transport -- cgit 1.2.3-korg