From b375370d0f11163da8cb752c4a3f992a89ef80ee Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Tue, 16 Apr 2019 18:38:07 +0200 Subject: [HICN-178] Sync send of control messages. Change-Id: I9a07c6c806ceba10f80a5f67337dce2eee76120d Signed-off-by: Mauro Sardara --- libtransport/src/hicn/transport/core/portal.h | 201 +++++++++++++++++++++++--- 1 file changed, 177 insertions(+), 24 deletions(-) (limited to 'libtransport/src/hicn/transport/core/portal.h') diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index 07f84075e..3ea37c938 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -260,6 +260,26 @@ class BasicBindConfig { using BindConfig = BasicBindConfig; +/** + * Portal is a opaque class which is used for sending/receiving interest/data + * packets over multiple kind of connector. The connector itself is defined by + * the template ForwarderInt, which is resolved at compile time. It is then not + * possible to decide at runtime what the connector will be. + * + * The tasks performed by portal are the following: + * - Sending/Receiving Interest packets + * - Sending/Receiving Data packets + * - Set timers (one per interest), in order to trigger events if an interest is + * not satisfied + * - Register a producer prefix to the local forwarder + * + * The way of working of portal is event-based, which means that data and + * interests are sent/received in a asynchronous manner, and the notifications + * are performed through callbacks. + * + * The portal class is not thread safe, appropriate locking is required by the + * users of this class. + */ template class Portal { static_assert( @@ -269,12 +289,20 @@ class Portal { "ForwarderInt must inherit from ForwarderInterface!"); public: + /** + * Consumer callback is an abstract class containing two methods to be + * implemented by a consumer application. + */ class ConsumerCallback { public: virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0; virtual void onTimeout(Interest::Ptr &&i) = 0; }; + /** + * Producer callback is an abstract class containing two methods to be + * implemented by a producer application. + */ class ProducerCallback { public: virtual void onInterest(Interest::Ptr &&i) = 0; @@ -287,33 +315,65 @@ class Portal { app_name_("libtransport_application"), consumer_callback_(nullptr), producer_callback_(nullptr), + packet_pool_(io_service), connector_(std::bind(&Portal::processIncomingMessages, this, std::placeholders::_1), std::bind(&Portal::setLocalRoutes, this), io_service_, app_name_), - forwarder_interface_(connector_), - packet_pool_(io_service) {} + forwarder_interface_(connector_) {} + /** + * Set the consumer callback. + * + * @param consumer_callback - The pointer to the ConsumerCallback object. + */ void setConsumerCallback(ConsumerCallback *consumer_callback) { consumer_callback_ = consumer_callback; } + /** + * Set the producer callback. + * + * @param producer_callback - The pointer to the ProducerCallback object. + */ void setProducerCallback(ProducerCallback *producer_callback) { producer_callback_ = producer_callback; } + /** + * Specify the output interface to use. This method will be useful in a future + * scenario where the library will be able to forward packets without + * connecting to a local forwarder. Now it is not used. + * + * @param output_interface - The output interface to use for + * forwarding/receiving packets. + */ TRANSPORT_ALWAYS_INLINE void setOutputInterface( const std::string &output_interface) { forwarder_interface_.setOutputInterface(output_interface); } + /** + * Connect the transport to the local hicn forwarder. + * + * @param is_consumer - Boolean specifying if the application on top of portal + * is a consumer or a producer. + */ TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { pending_interest_hash_table_.reserve(portal_details::pool_size); forwarder_interface_.connect(is_consumer); } + /** + * Destructor. + */ ~Portal() { killConnection(); } + /** + * Check if there is already a pending interest for a given name. + * + * @param name - The interest name. + */ TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { auto it = pending_interest_hash_table_.find(name.getHash32() + name.getSuffix()); @@ -324,6 +384,20 @@ class Portal { return false; } + /** + * Send an interest through to the local forwarder. + * + * @param interest - The pointer to the interest. The ownership of the + * interest is transferred by the caller to portal. + * + * @param on_content_object_callback - If the caller wishes to use a different + * callback to be called for this interest, it can set this parameter. + * Otherwise ConsumerCallback::onContentObject will be used. + * + * @param on_interest_timeout_callback - If the caller wishes to use a + * different callback to be called for this interest, it can set this + * parameter. Otherwise ConsumerCallback::onTimeout will be used. + */ TRANSPORT_ALWAYS_INLINE void sendInterest( Interest::Ptr &&interest, OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, @@ -347,6 +421,14 @@ class Portal { std::make_pair(hash, std::move(pending_interest))); } + /** + * Handler fot the timer set when the interest is sent. + * + * @param ec - Error code which says whether the timer expired or has been + * canceled upon data packet reception. + * + * @param hash - The index of the interest in the pending interest hash table. + */ TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, uint32_t hash) { bool is_stopped = io_service_.stopped(); @@ -370,34 +452,59 @@ class Portal { } } + /** + * Register a producer name to the local forwarder and optionally set the + * content store size in a per-face manner. + * + * @param config - The configuration for the local forwarder binding. + */ TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) { - connector_.enableBurst(); forwarder_interface_.setContentStoreSize(config.csReserved()); served_namespaces_.push_back(config.prefix()); - registerRoute(served_namespaces_.back()); + + setLocalRoutes(); } + /** + * Start the event loop. This function blocks here and calls the callback set + * by the application upon interest/data received or timeout. + */ TRANSPORT_ALWAYS_INLINE void runEventsLoop() { if (io_service_.stopped()) { io_service_.reset(); // ensure that run()/poll() will do some work } - this->io_service_.run(); + io_service_.run(); } + /** + * Run one event and return. + */ TRANSPORT_ALWAYS_INLINE void runOneEvent() { if (io_service_.stopped()) { io_service_.reset(); // ensure that run()/poll() will do some work } - this->io_service_.run_one(); + io_service_.run_one(); } + /** + * Send a data packet to the local forwarder. As opposite to sendInterest, the + * ownership of the content object is not transferred to the portal. + * + * @param content_object - The data packet. + */ TRANSPORT_ALWAYS_INLINE void sendContentObject( ContentObject &content_object) { forwarder_interface_.send(content_object); } + /** + * Stop the event loop, canceling all the pending events in the event queue. + * + * Beware that stopping the event loop DOES NOT disconnect the transport from + * the local forwarder, the connector underneath will stay connected. + */ TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { if (!io_service_.stopped()) { io_service_.dispatch([this]() { @@ -407,32 +514,59 @@ class Portal { } } + /** + * Disconnect the transport from the local forwarder. + */ TRANSPORT_ALWAYS_INLINE void killConnection() { forwarder_interface_.closeConnection(); } + /** + * Clear the pending interest hash table. + */ TRANSPORT_ALWAYS_INLINE void clear() { - for (auto &pend_interest : pending_interest_hash_table_) { - pend_interest.second->cancelTimer(); + if (!io_service_.stopped()) { + io_service_.dispatch(std::bind(&Portal::doClear, this)); + } else { + doClear(); } - - pending_interest_hash_table_.clear(); } + /** + * Get a reference to the io_service object. + */ TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { return io_service_; } - TRANSPORT_ALWAYS_INLINE std::size_t getPITSize() { - connector_.state(); - return pending_interest_hash_table_.size(); - } - + /** + * Register a route to the local forwarder. + */ TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) { - forwarder_interface_.registerRoute(prefix); + served_namespaces_.push_back(prefix); + if (connector_.isConnected()) { + forwarder_interface_.registerRoute(prefix); + } } private: + /** + * Clear the pending interest hash table. + */ + TRANSPORT_ALWAYS_INLINE void doClear() { + for (auto &pend_interest : pending_interest_hash_table_) { + pend_interest.second->cancelTimer(); + } + + pending_interest_hash_table_.clear(); + } + + /** + * Callback called by the underlying connector upon reception of a packet from + * the local forwarder. + * + * @param packet_buffer - The bytes of the packet. + */ TRANSPORT_ALWAYS_INLINE void processIncomingMessages( Packet::MemBufPtr &&packet_buffer) { bool is_stopped = io_service_.stopped(); @@ -463,9 +597,16 @@ class Portal { } } + /** + * Callback called by the transport upon connection to the local forwarder. + * It register the prefixes in the served_namespaces_ list to the local + * forwarder. + */ TRANSPORT_ALWAYS_INLINE void setLocalRoutes() { - for (auto &name : served_namespaces_) { - registerRoute(name); + for (auto &prefix : served_namespaces_) { + if (connector_.isConnected()) { + forwarder_interface_.registerRoute(prefix); + } } } @@ -476,6 +617,14 @@ class Portal { } } + /** + * Process a content object: + * - Check if the data packet was effectively requested by portal + * - Delete its timer + * - Pass packet to application + * + * @param content_object - The data packet + */ TRANSPORT_ALWAYS_INLINE void processContentObject( ContentObject::Ptr &&content_object) { uint32_t hash = content_object->getName().getHash32() + @@ -500,6 +649,11 @@ class Portal { } } + /** + * Process a control message. Control messages are different depending on the + * connector, then the forwarder_interface will do the job of understanding + * them. + */ TRANSPORT_ALWAYS_INLINE void processControlMessage( Packet::MemBufPtr &&packet_buffer) { forwarder_interface_.processControlMessageReply(std::move(packet_buffer)); @@ -512,19 +666,18 @@ class Portal { std::string app_name_; PendingInterestHashTable pending_interest_hash_table_; + std::list served_namespaces_; ConsumerCallback *consumer_callback_; ProducerCallback *producer_callback_; - typename ForwarderInt::ConnectorType connector_; - ForwarderInt forwarder_interface_; - - std::list served_namespaces_; portal_details::Pool packet_pool_; - portal_details::HandlerMemory async_callback_memory_; + + typename ForwarderInt::ConnectorType connector_; + ForwarderInt forwarder_interface_; }; -} // end namespace core +} // namespace core } // end namespace transport -- cgit 1.2.3-korg