From b375370d0f11163da8cb752c4a3f992a89ef80ee Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Tue, 16 Apr 2019 18:38:07 +0200 Subject: [HICN-178] Sync send of control messages. Change-Id: I9a07c6c806ceba10f80a5f67337dce2eee76120d Signed-off-by: Mauro Sardara --- libtransport/src/hicn/transport/core/connector.h | 4 +- .../src/hicn/transport/core/forwarder_interface.h | 6 +- .../transport/core/hicn_forwarder_interface.cc | 55 +++--- .../src/hicn/transport/core/memif_connector.cc | 8 +- .../src/hicn/transport/core/memif_connector.h | 5 - libtransport/src/hicn/transport/core/portal.h | 201 ++++++++++++++++++--- .../hicn/transport/core/raw_socket_connector.cc | 18 +- .../src/hicn/transport/core/raw_socket_connector.h | 2 - .../hicn/transport/core/tcp_socket_connector.cc | 61 ++++--- .../src/hicn/transport/core/tcp_socket_connector.h | 2 - .../hicn/transport/core/udp_socket_connector.cc | 100 +++++----- .../src/hicn/transport/core/udp_socket_connector.h | 5 +- .../src/hicn/transport/http/client_connection.cc | 2 +- .../src/hicn/transport/http/client_connection.h | 2 - .../src/hicn/transport/http/server_acceptor.cc | 4 +- .../src/hicn/transport/http/server_acceptor.h | 1 - .../src/hicn/transport/http/server_publisher.cc | 2 +- .../src/hicn/transport/http/server_publisher.h | 1 - .../transport/interfaces/rtc_socket_producer.cc | 2 +- .../transport/interfaces/rtc_socket_producer.h | 2 +- .../hicn/transport/interfaces/socket_consumer.cc | 12 +- .../hicn/transport/interfaces/socket_consumer.h | 2 +- .../hicn/transport/interfaces/socket_producer.cc | 2 +- .../hicn/transport/interfaces/socket_producer.h | 2 +- libtransport/src/hicn/transport/protocols/rtc.cc | 2 +- 25 files changed, 328 insertions(+), 175 deletions(-) (limited to 'libtransport/src/hicn') diff --git a/libtransport/src/hicn/transport/core/connector.h b/libtransport/src/hicn/transport/core/connector.h index c790f2bfb..f2bbe5dcd 100644 --- a/libtransport/src/hicn/transport/core/connector.h +++ b/libtransport/src/hicn/transport/core/connector.h @@ -64,10 +64,10 @@ class Connector { virtual void close() = 0; - virtual void enableBurst() = 0; - virtual ConnectorState state() { return state_; }; + virtual bool isConnected() { return state_ == ConnectorState::CONNECTED; } + protected: void increasePoolSize(std::size_t size = packet_pool_size); diff --git a/libtransport/src/hicn/transport/core/forwarder_interface.h b/libtransport/src/hicn/transport/core/forwarder_interface.h index b4bc26da7..8fefba8ad 100644 --- a/libtransport/src/hicn/transport/core/forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/forwarder_interface.h @@ -101,15 +101,13 @@ class ForwarderInterface { connector_.send(packet.acquireMemBufReference()); } - template - TRANSPORT_ALWAYS_INLINE void send(const uint8_t *packet, std::size_t len, - Handler &&packet_sent) { + TRANSPORT_ALWAYS_INLINE void send(const uint8_t *packet, std::size_t len) { // ASIO_COMPLETION_HANDLER_CHECK(Handler, packet_sent) type_check; counters_.tx_packets++; counters_.tx_bytes += len; // Perfect forwarding - connector_.send(packet, len, std::forward(packet_sent)); + connector_.send(packet, len); } TRANSPORT_ALWAYS_INLINE void shutdown() { connector_.close(); } diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc index 585d80f8c..33a37f540 100644 --- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc +++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc @@ -61,44 +61,44 @@ void fillCommandHeader(CommandHeader *header) { header->length = 1; } -std::unique_ptr createCommandRoute( - std::unique_ptr &&addr, uint8_t prefix_length) { - auto command = std::make_unique(); +RouteToSelfCommand createCommandRoute(std::unique_ptr &&addr, + uint8_t prefix_length) { + RouteToSelfCommand command; // check and set IP address if (addr->sa_family == AF_INET) { - command->address_type = addr_inet; - command->address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr; + command.address_type = addr_inet; + command.address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr; } else if (addr->sa_family == AF_INET6) { - command->address_type = addr_inet6; - command->address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr; + command.address_type = addr_inet6; + command.address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr; } // Fill remaining payload fields #ifndef _WIN32 - strcpy(command->symbolic_or_connid, identifier); + strcpy(command.symbolic_or_connid, identifier); #else - strcpy_s(command->symbolic_or_connid, 16, identifier); + strcpy_s(command.symbolic_or_connid, 16, identifier); #endif - command->cost = 1; - command->len = (uint8_t)prefix_length; + command.cost = 1; + command.len = (uint8_t)prefix_length; // Allocate and fill the header - command->command_id = add_route_command; - fillCommandHeader((CommandHeader *)command.get()); + command.command_id = add_route_command; + fillCommandHeader((CommandHeader *)&command); return command; } -std::unique_ptr createCommandDeleteConnection() { - auto command = std::make_unique(); - fillCommandHeader((CommandHeader *)command.get()); - command->command_id = delete_connection_command; +DeleteSelfConnectionCommand createCommandDeleteConnection() { + DeleteSelfConnectionCommand command; + fillCommandHeader((CommandHeader *)&command); + command.command_id = delete_connection_command; #ifndef _WIN32 - strcpy(command->symbolic_or_connid, identifier); + strcpy(command.symbolic_or_connid, identifier); #else - strcpy_s(command->symbolic_or_connid, 16, identifier); + strcpy_s(command.symbolic_or_connid, 16, identifier); #endif return command; @@ -119,20 +119,15 @@ HicnForwarderInterface::~HicnForwarderInterface() {} void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); } void HicnForwarderInterface::registerRoute(Prefix &prefix) { - auto command = - createCommandRoute(prefix.toSockaddr(), (uint8_t)prefix.getPrefixLength()) - .release(); - send((uint8_t *)command, sizeof(RouteToSelfCommand), - [command]() { delete command; }); + auto command = createCommandRoute(prefix.toSockaddr(), + (uint8_t)prefix.getPrefixLength()); + send((uint8_t *)&command, sizeof(RouteToSelfCommand)); } void HicnForwarderInterface::closeConnection() { - auto command = createCommandDeleteConnection().release(); - send((uint8_t *)command, sizeof(DeleteSelfConnectionCommand), - [this, command]() { - delete command; - connector_.close(); - }); + auto command = createCommandDeleteConnection(); + send((uint8_t *)&command, sizeof(DeleteSelfConnectionCommand)); + connector_.close(); } } // namespace core diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc index f9695800b..a77f14839 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.cc +++ b/libtransport/src/hicn/transport/core/memif_connector.cc @@ -14,6 +14,7 @@ */ #include +#include #ifdef __vpp__ @@ -69,7 +70,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, tx_buf_counter_(0), is_reconnection_(false), data_available_(false), - enable_burst_(false), app_name_(app_name), socket_filename_("") { std::call_once(MemifConnector::flag_, &MemifConnector::init, this); @@ -429,8 +429,6 @@ void MemifConnector::close() { } } -void MemifConnector::enableBurst() { enable_burst_ = true; } - void MemifConnector::send(const Packet::MemBufPtr &packet) { { utils::SpinLock::Acquire locked(write_msgs_lock_); @@ -496,7 +494,9 @@ int MemifConnector::doSend() { } void MemifConnector::send(const uint8_t *packet, std::size_t len, - const PacketSentCallback &packet_sent) {} + const PacketSentCallback &packet_sent) { + throw errors::NotImplementedException(); +} } // end namespace core diff --git a/libtransport/src/hicn/transport/core/memif_connector.h b/libtransport/src/hicn/transport/core/memif_connector.h index 057df37e4..ef100e3fb 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.h +++ b/libtransport/src/hicn/transport/core/memif_connector.h @@ -65,10 +65,6 @@ class MemifConnector : public Connector { void connect(uint32_t memif_id, long memif_mode); - // void runEventsLoop(); - - void enableBurst() override; - TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; }; private: @@ -120,7 +116,6 @@ class MemifConnector : public Connector { PacketRing input_buffer_; bool is_reconnection_; bool data_available_; - bool enable_burst_; uint32_t memif_id_; uint8_t memif_mode_; std::string app_name_; diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index 07f84075e..3ea37c938 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -260,6 +260,26 @@ class BasicBindConfig { using BindConfig = BasicBindConfig; +/** + * Portal is a opaque class which is used for sending/receiving interest/data + * packets over multiple kind of connector. The connector itself is defined by + * the template ForwarderInt, which is resolved at compile time. It is then not + * possible to decide at runtime what the connector will be. + * + * The tasks performed by portal are the following: + * - Sending/Receiving Interest packets + * - Sending/Receiving Data packets + * - Set timers (one per interest), in order to trigger events if an interest is + * not satisfied + * - Register a producer prefix to the local forwarder + * + * The way of working of portal is event-based, which means that data and + * interests are sent/received in a asynchronous manner, and the notifications + * are performed through callbacks. + * + * The portal class is not thread safe, appropriate locking is required by the + * users of this class. + */ template class Portal { static_assert( @@ -269,12 +289,20 @@ class Portal { "ForwarderInt must inherit from ForwarderInterface!"); public: + /** + * Consumer callback is an abstract class containing two methods to be + * implemented by a consumer application. + */ class ConsumerCallback { public: virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0; virtual void onTimeout(Interest::Ptr &&i) = 0; }; + /** + * Producer callback is an abstract class containing two methods to be + * implemented by a producer application. + */ class ProducerCallback { public: virtual void onInterest(Interest::Ptr &&i) = 0; @@ -287,33 +315,65 @@ class Portal { app_name_("libtransport_application"), consumer_callback_(nullptr), producer_callback_(nullptr), + packet_pool_(io_service), connector_(std::bind(&Portal::processIncomingMessages, this, std::placeholders::_1), std::bind(&Portal::setLocalRoutes, this), io_service_, app_name_), - forwarder_interface_(connector_), - packet_pool_(io_service) {} + forwarder_interface_(connector_) {} + /** + * Set the consumer callback. + * + * @param consumer_callback - The pointer to the ConsumerCallback object. + */ void setConsumerCallback(ConsumerCallback *consumer_callback) { consumer_callback_ = consumer_callback; } + /** + * Set the producer callback. + * + * @param producer_callback - The pointer to the ProducerCallback object. + */ void setProducerCallback(ProducerCallback *producer_callback) { producer_callback_ = producer_callback; } + /** + * Specify the output interface to use. This method will be useful in a future + * scenario where the library will be able to forward packets without + * connecting to a local forwarder. Now it is not used. + * + * @param output_interface - The output interface to use for + * forwarding/receiving packets. + */ TRANSPORT_ALWAYS_INLINE void setOutputInterface( const std::string &output_interface) { forwarder_interface_.setOutputInterface(output_interface); } + /** + * Connect the transport to the local hicn forwarder. + * + * @param is_consumer - Boolean specifying if the application on top of portal + * is a consumer or a producer. + */ TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { pending_interest_hash_table_.reserve(portal_details::pool_size); forwarder_interface_.connect(is_consumer); } + /** + * Destructor. + */ ~Portal() { killConnection(); } + /** + * Check if there is already a pending interest for a given name. + * + * @param name - The interest name. + */ TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { auto it = pending_interest_hash_table_.find(name.getHash32() + name.getSuffix()); @@ -324,6 +384,20 @@ class Portal { return false; } + /** + * Send an interest through to the local forwarder. + * + * @param interest - The pointer to the interest. The ownership of the + * interest is transferred by the caller to portal. + * + * @param on_content_object_callback - If the caller wishes to use a different + * callback to be called for this interest, it can set this parameter. + * Otherwise ConsumerCallback::onContentObject will be used. + * + * @param on_interest_timeout_callback - If the caller wishes to use a + * different callback to be called for this interest, it can set this + * parameter. Otherwise ConsumerCallback::onTimeout will be used. + */ TRANSPORT_ALWAYS_INLINE void sendInterest( Interest::Ptr &&interest, OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, @@ -347,6 +421,14 @@ class Portal { std::make_pair(hash, std::move(pending_interest))); } + /** + * Handler fot the timer set when the interest is sent. + * + * @param ec - Error code which says whether the timer expired or has been + * canceled upon data packet reception. + * + * @param hash - The index of the interest in the pending interest hash table. + */ TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, uint32_t hash) { bool is_stopped = io_service_.stopped(); @@ -370,34 +452,59 @@ class Portal { } } + /** + * Register a producer name to the local forwarder and optionally set the + * content store size in a per-face manner. + * + * @param config - The configuration for the local forwarder binding. + */ TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) { - connector_.enableBurst(); forwarder_interface_.setContentStoreSize(config.csReserved()); served_namespaces_.push_back(config.prefix()); - registerRoute(served_namespaces_.back()); + + setLocalRoutes(); } + /** + * Start the event loop. This function blocks here and calls the callback set + * by the application upon interest/data received or timeout. + */ TRANSPORT_ALWAYS_INLINE void runEventsLoop() { if (io_service_.stopped()) { io_service_.reset(); // ensure that run()/poll() will do some work } - this->io_service_.run(); + io_service_.run(); } + /** + * Run one event and return. + */ TRANSPORT_ALWAYS_INLINE void runOneEvent() { if (io_service_.stopped()) { io_service_.reset(); // ensure that run()/poll() will do some work } - this->io_service_.run_one(); + io_service_.run_one(); } + /** + * Send a data packet to the local forwarder. As opposite to sendInterest, the + * ownership of the content object is not transferred to the portal. + * + * @param content_object - The data packet. + */ TRANSPORT_ALWAYS_INLINE void sendContentObject( ContentObject &content_object) { forwarder_interface_.send(content_object); } + /** + * Stop the event loop, canceling all the pending events in the event queue. + * + * Beware that stopping the event loop DOES NOT disconnect the transport from + * the local forwarder, the connector underneath will stay connected. + */ TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { if (!io_service_.stopped()) { io_service_.dispatch([this]() { @@ -407,32 +514,59 @@ class Portal { } } + /** + * Disconnect the transport from the local forwarder. + */ TRANSPORT_ALWAYS_INLINE void killConnection() { forwarder_interface_.closeConnection(); } + /** + * Clear the pending interest hash table. + */ TRANSPORT_ALWAYS_INLINE void clear() { - for (auto &pend_interest : pending_interest_hash_table_) { - pend_interest.second->cancelTimer(); + if (!io_service_.stopped()) { + io_service_.dispatch(std::bind(&Portal::doClear, this)); + } else { + doClear(); } - - pending_interest_hash_table_.clear(); } + /** + * Get a reference to the io_service object. + */ TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { return io_service_; } - TRANSPORT_ALWAYS_INLINE std::size_t getPITSize() { - connector_.state(); - return pending_interest_hash_table_.size(); - } - + /** + * Register a route to the local forwarder. + */ TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) { - forwarder_interface_.registerRoute(prefix); + served_namespaces_.push_back(prefix); + if (connector_.isConnected()) { + forwarder_interface_.registerRoute(prefix); + } } private: + /** + * Clear the pending interest hash table. + */ + TRANSPORT_ALWAYS_INLINE void doClear() { + for (auto &pend_interest : pending_interest_hash_table_) { + pend_interest.second->cancelTimer(); + } + + pending_interest_hash_table_.clear(); + } + + /** + * Callback called by the underlying connector upon reception of a packet from + * the local forwarder. + * + * @param packet_buffer - The bytes of the packet. + */ TRANSPORT_ALWAYS_INLINE void processIncomingMessages( Packet::MemBufPtr &&packet_buffer) { bool is_stopped = io_service_.stopped(); @@ -463,9 +597,16 @@ class Portal { } } + /** + * Callback called by the transport upon connection to the local forwarder. + * It register the prefixes in the served_namespaces_ list to the local + * forwarder. + */ TRANSPORT_ALWAYS_INLINE void setLocalRoutes() { - for (auto &name : served_namespaces_) { - registerRoute(name); + for (auto &prefix : served_namespaces_) { + if (connector_.isConnected()) { + forwarder_interface_.registerRoute(prefix); + } } } @@ -476,6 +617,14 @@ class Portal { } } + /** + * Process a content object: + * - Check if the data packet was effectively requested by portal + * - Delete its timer + * - Pass packet to application + * + * @param content_object - The data packet + */ TRANSPORT_ALWAYS_INLINE void processContentObject( ContentObject::Ptr &&content_object) { uint32_t hash = content_object->getName().getHash32() + @@ -500,6 +649,11 @@ class Portal { } } + /** + * Process a control message. Control messages are different depending on the + * connector, then the forwarder_interface will do the job of understanding + * them. + */ TRANSPORT_ALWAYS_INLINE void processControlMessage( Packet::MemBufPtr &&packet_buffer) { forwarder_interface_.processControlMessageReply(std::move(packet_buffer)); @@ -512,19 +666,18 @@ class Portal { std::string app_name_; PendingInterestHashTable pending_interest_hash_table_; + std::list served_namespaces_; ConsumerCallback *consumer_callback_; ProducerCallback *producer_callback_; - typename ForwarderInt::ConnectorType connector_; - ForwarderInt forwarder_interface_; - - std::list served_namespaces_; portal_details::Pool packet_pool_; - portal_details::HandlerMemory async_callback_memory_; + + typename ForwarderInt::ConnectorType connector_; + ForwarderInt forwarder_interface_; }; -} // end namespace core +} // namespace core } // end namespace transport diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.cc b/libtransport/src/hicn/transport/core/raw_socket_connector.cc index 12cc4e0fa..0e1743671 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_connector.cc +++ b/libtransport/src/hicn/transport/core/raw_socket_connector.cc @@ -115,11 +115,17 @@ void RawSocketConnector::connect(const std::string &interface_name, void RawSocketConnector::send(const uint8_t *packet, std::size_t len, const PacketSentCallback &packet_sent) { - // asio::async_write(socket_, asio::buffer(packet, len), - // [packet_sent] (std::error_code ec, - // std::size_t /*length*/) { - // packet_sent(); - // }); + if (packet_sent != 0) { + socket_.async_send( + asio::buffer(packet, len), + [packet_sent](std::error_code ec, std::size_t /*length*/) { + packet_sent(); + }); + } else { + if (state_ == ConnectorState::CONNECTED) { + socket_.send(asio::buffer(packet, len)); + } + } } void RawSocketConnector::send(const Packet::MemBufPtr &packet) { @@ -191,8 +197,6 @@ void RawSocketConnector::doConnect() { socket_.bind(raw_endpoint(&link_layer_address_, sizeof(link_layer_address_))); } -void RawSocketConnector::enableBurst() { return; } - } // end namespace core } // end namespace transport diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.h b/libtransport/src/hicn/transport/core/raw_socket_connector.h index a5474f7f8..fe9ceb227 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_connector.h +++ b/libtransport/src/hicn/transport/core/raw_socket_connector.h @@ -49,8 +49,6 @@ class RawSocketConnector : public Connector { void close() override; - void enableBurst() override; - void connect(const std::string &interface_name, const std::string &mac_address_str); diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.cc b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc index f1fd4bbac..c82373ae1 100644 --- a/libtransport/src/hicn/transport/core/tcp_socket_connector.cc +++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc @@ -21,6 +21,7 @@ #include #include +#include #include namespace transport { @@ -78,10 +79,15 @@ void TcpSocketConnector::connect(std::string ip_address, std::string port) { void TcpSocketConnector::send(const uint8_t *packet, std::size_t len, const PacketSentCallback &packet_sent) { - asio::async_write(socket_, asio::buffer(packet, len), - [packet_sent](std::error_code ec, std::size_t /*length*/) { - packet_sent(); - }); + if (packet_sent != 0) { + asio::async_write(socket_, asio::buffer(packet, len), + [packet_sent](std::error_code ec, + std::size_t /*length*/) { packet_sent(); }); + } else { + if (state_ == ConnectorState::CONNECTED) { + asio::write(socket_, asio::buffer(packet, len)); + } + } } void TcpSocketConnector::send(const Packet::MemBufPtr &packet) { @@ -223,38 +229,37 @@ void TcpSocketConnector::tryReconnect() { } void TcpSocketConnector::doConnect() { - asio::async_connect(socket_, endpoint_iterator_, - [this](std::error_code ec, tcp::resolver::iterator) { - if (!ec) { - timer_.cancel(); - state_ = ConnectorState::CONNECTED; - asio::ip::tcp::no_delay noDelayOption(true); - socket_.set_option(noDelayOption); - doReadHeader(); + asio::async_connect( + socket_, endpoint_iterator_, + [this](std::error_code ec, tcp::resolver::iterator) { + if (!ec) { + timer_.cancel(); + state_ = ConnectorState::CONNECTED; + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); + doReadHeader(); - if (data_available_) { - data_available_ = false; - doWrite(); - } + if (data_available_) { + data_available_ = false; + doWrite(); + } - if (is_reconnection_) { - is_reconnection_ = false; - TRANSPORT_LOGI("Connection recovered!\n"); - on_reconnect_callback_(); - } - } else { - sleep(1); - doConnect(); - } - }); + if (is_reconnection_) { + is_reconnection_ = false; + TRANSPORT_LOGI("Connection recovered!\n"); + on_reconnect_callback_(); + } + } else { + doConnect(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + }); } bool TcpSocketConnector::checkConnected() { return state_ == ConnectorState::CONNECTED; } -void TcpSocketConnector::enableBurst() { return; } - void TcpSocketConnector::startConnectionTimer() { timer_.expires_from_now(std::chrono::seconds(60)); timer_.async_wait(std::bind(&TcpSocketConnector::handleDeadline, this, diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.h b/libtransport/src/hicn/transport/core/tcp_socket_connector.h index 6df2fedff..d755b5e86 100644 --- a/libtransport/src/hicn/transport/core/tcp_socket_connector.h +++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.h @@ -47,8 +47,6 @@ class TcpSocketConnector : public Connector { void close() override; - void enableBurst() override; - void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); private: diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.cc b/libtransport/src/hicn/transport/core/udp_socket_connector.cc index 54c0eb978..38945e755 100644 --- a/libtransport/src/hicn/transport/core/udp_socket_connector.cc +++ b/libtransport/src/hicn/transport/core/udp_socket_connector.cc @@ -21,6 +21,7 @@ #include #include +#include #include namespace transport { @@ -36,7 +37,6 @@ UdpSocketConnector::UdpSocketConnector( socket_(io_service_), resolver_(io_service_), connection_timer_(io_service_), - connection_timeout_(io_service_), read_msg_(packet_pool_.makePtr(nullptr)), is_reconnection_(false), data_available_(false), @@ -54,10 +54,17 @@ void UdpSocketConnector::connect(std::string ip_address, std::string port) { void UdpSocketConnector::send(const uint8_t *packet, std::size_t len, const PacketSentCallback &packet_sent) { - socket_.async_send(asio::buffer(packet, len), - [packet_sent](std::error_code ec, std::size_t /*length*/) { - packet_sent(); - }); + if (packet_sent != 0) { + socket_.async_send( + asio::buffer(packet, len), + [packet_sent](std::error_code ec, std::size_t /*length*/) { + packet_sent(); + }); + } else { + if (state_ == ConnectorState::CONNECTED) { + socket_.send(asio::buffer(packet, len)); + } + } } void UdpSocketConnector::send(const Packet::MemBufPtr &packet) { @@ -76,6 +83,14 @@ void UdpSocketConnector::send(const Packet::MemBufPtr &packet) { } void UdpSocketConnector::close() { + if (io_service_.stopped()) { + doClose(); + } else { + io_service_.dispatch(std::bind(&UdpSocketConnector::doClose, this)); + } +} + +void UdpSocketConnector::doClose() { if (state_ != ConnectorState::CLOSED) { state_ = ConnectorState::CLOSED; if (socket_.is_open()) { @@ -86,8 +101,6 @@ void UdpSocketConnector::close() { } void UdpSocketConnector::doWrite() { - // TODO improve this piece of code for sending many buffers togethers - // if list contains more than one packet auto packet = output_buffer_.front().get(); auto array = std::vector(); @@ -97,8 +110,8 @@ void UdpSocketConnector::doWrite() { current = current->next(); } while (current != packet); - socket_.async_send(std::move(array), [this /*, packet*/](std::error_code ec, - std::size_t length) { + socket_.async_send(std::move(array), [this](std::error_code ec, + std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { output_buffer_.pop_front(); if (!output_buffer_.empty()) { @@ -139,54 +152,53 @@ void UdpSocketConnector::tryReconnect() { TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); state_ = ConnectorState::CONNECTING; is_reconnection_ = true; - connection_timer_.expires_from_now(std::chrono::seconds(1)); - connection_timer_.async_wait([this](const std::error_code &ec) { - if (!ec) { - if (socket_.is_open()) { - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - } - startConnectionTimer(); - doConnect(); + io_service_.post([this]() { + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); } + + doConnect(); + startConnectionTimer(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); }); } } void UdpSocketConnector::doConnect() { - asio::async_connect(socket_, endpoint_iterator_, - [this](std::error_code ec, udp::resolver::iterator) { - if (!ec) { - connection_timeout_.cancel(); - state_ = ConnectorState::CONNECTED; - doRead(); - - if (data_available_) { - data_available_ = false; - doWrite(); - } - - if (is_reconnection_) { - is_reconnection_ = false; - on_reconnect_callback_(); - } - } else { - sleep(1); - doConnect(); - } - }); + asio::async_connect( + socket_, endpoint_iterator_, + [this](std::error_code ec, udp::resolver::iterator) { + if (!ec) { + connection_timer_.cancel(); + state_ = ConnectorState::CONNECTED; + doRead(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + } + + on_reconnect_callback_(); + } else { + doConnect(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + }); } bool UdpSocketConnector::checkConnected() { return state_ == ConnectorState::CONNECTED; } -void UdpSocketConnector::enableBurst() { return; } - void UdpSocketConnector::startConnectionTimer() { - connection_timeout_.expires_from_now(std::chrono::seconds(60)); - connection_timeout_.async_wait(std::bind(&UdpSocketConnector::handleDeadline, - this, std::placeholders::_1)); + connection_timer_.expires_from_now(std::chrono::seconds(60)); + connection_timer_.async_wait(std::bind(&UdpSocketConnector::handleDeadline, + this, std::placeholders::_1)); } void UdpSocketConnector::handleDeadline(const std::error_code &ec) { diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.h b/libtransport/src/hicn/transport/core/udp_socket_connector.h index 87198efde..7c5dbaf10 100644 --- a/libtransport/src/hicn/transport/core/udp_socket_connector.h +++ b/libtransport/src/hicn/transport/core/udp_socket_connector.h @@ -45,8 +45,6 @@ class UdpSocketConnector : public Connector { void close() override; - void enableBurst() override; - void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); private: @@ -56,6 +54,8 @@ class UdpSocketConnector : public Connector { void doWrite(); + void doClose(); + bool checkConnected(); private: @@ -70,7 +70,6 @@ class UdpSocketConnector : public Connector { asio::ip::udp::resolver resolver_; asio::ip::udp::resolver::iterator endpoint_iterator_; asio::steady_timer connection_timer_; - asio::steady_timer connection_timeout_; utils::ObjectPool::Ptr read_msg_; diff --git a/libtransport/src/hicn/transport/http/client_connection.cc b/libtransport/src/hicn/transport/http/client_connection.cc index fb7dbdfac..fadf0ae44 100644 --- a/libtransport/src/hicn/transport/http/client_connection.cc +++ b/libtransport/src/hicn/transport/http/client_connection.cc @@ -26,7 +26,7 @@ namespace http { using namespace transport; HTTPClientConnection::HTTPClientConnection() - : consumer_(TransportProtocolAlgorithms::RAAQM, io_service_), + : consumer_(TransportProtocolAlgorithms::RAAQM), read_bytes_callback_(nullptr), read_buffer_(nullptr), response_(std::make_shared()), diff --git a/libtransport/src/hicn/transport/http/client_connection.h b/libtransport/src/hicn/transport/http/client_connection.h index 6c150f848..d0624702f 100644 --- a/libtransport/src/hicn/transport/http/client_connection.h +++ b/libtransport/src/hicn/transport/http/client_connection.h @@ -88,8 +88,6 @@ class HTTPClientConnection : public ConsumerSocket::ReadCallback { void readError(const std::error_code ec) noexcept override; void readSuccess(std::size_t total_size) noexcept override; - asio::io_service io_service_; - // The consumer socket ConsumerSocket consumer_; diff --git a/libtransport/src/hicn/transport/http/server_acceptor.cc b/libtransport/src/hicn/transport/http/server_acceptor.cc index 486b04c57..e478dfcd4 100644 --- a/libtransport/src/hicn/transport/http/server_acceptor.cc +++ b/libtransport/src/hicn/transport/http/server_acceptor.cc @@ -63,9 +63,7 @@ HTTPServerAcceptor::HTTPServerAcceptor(std::string &server_locator, core::Prefix acceptor_namespace(network, 64); std::string producer_identity = "acceptor_producer"; - acceptor_producer_ = std::make_shared( - io_service_); /*, - utils::Identity::generateIdentity(producer_identity));*/ + acceptor_producer_ = std::make_shared(); acceptor_producer_->registerPrefix(acceptor_namespace); } diff --git a/libtransport/src/hicn/transport/http/server_acceptor.h b/libtransport/src/hicn/transport/http/server_acceptor.h index 4e7350b76..6ed58f70e 100644 --- a/libtransport/src/hicn/transport/http/server_acceptor.h +++ b/libtransport/src/hicn/transport/http/server_acceptor.h @@ -53,7 +53,6 @@ class HTTPServerAcceptor { void processIncomingInterest(ProducerSocket &p, Interest &interest); OnHttpRequest callback_; - asio::io_service io_service_; std::shared_ptr acceptor_producer_; std::map> publishers_; diff --git a/libtransport/src/hicn/transport/http/server_publisher.cc b/libtransport/src/hicn/transport/http/server_publisher.cc index 012f36091..6a4bb9c48 100644 --- a/libtransport/src/hicn/transport/http/server_publisher.cc +++ b/libtransport/src/hicn/transport/http/server_publisher.cc @@ -23,7 +23,7 @@ namespace http { HTTPServerPublisher::HTTPServerPublisher(const core::Name &content_name) : content_name_(content_name, true) { std::string identity = "acceptor_producer"; - producer_ = std::make_unique(io_service_); + producer_ = std::make_unique(); // utils::Identity::generateIdentity(identity)); core::Prefix publisher_prefix(content_name_, 128); producer_->registerPrefix(publisher_prefix); diff --git a/libtransport/src/hicn/transport/http/server_publisher.h b/libtransport/src/hicn/transport/http/server_publisher.h index 1f12fd8f9..33d596f63 100644 --- a/libtransport/src/hicn/transport/http/server_publisher.h +++ b/libtransport/src/hicn/transport/http/server_publisher.h @@ -59,7 +59,6 @@ class HTTPServerPublisher { private: Name content_name_; std::unique_ptr timer_; - asio::io_service io_service_; std::unique_ptr producer_; ProducerInterestCallback interest_enter_callback_; utils::UserCallback wait_callback_; diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 41feb4b45..495f8c8f3 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -215,4 +215,4 @@ void RTCProducerSocket::sendNack(const Interest &interest) { } // namespace interface -} // end namespace transport +} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index 6506b506e..be39d2b32 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -63,4 +63,4 @@ class RTCProducerSocket : public ProducerSocket { } // namespace interface -} // end namespace transport +} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index 37d545779..af99fd60c 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -123,10 +123,12 @@ void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest, } void ConsumerSocket::stop() { - if (transport_protocol_->isRunning()) { - std::cout << "Stopping transport protocol " << std::endl; - transport_protocol_->stop(); - } + auto &io_service = getIoService(); + io_service.dispatch([this]() { + if (transport_protocol_->isRunning()) { + transport_protocol_->stop(); + } + }); } void ConsumerSocket::resume() { @@ -141,4 +143,4 @@ asio::io_service &ConsumerSocket::getIoService() { } // namespace interface -} // end namespace transport +} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 40344af5d..41646c940 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -868,4 +868,4 @@ class ConsumerSocket : public BaseSocket { } // namespace interface -} // end namespace transport +} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index c4cf95895..c85b8af32 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -366,4 +366,4 @@ asio::io_service &ProducerSocket::getIoService() { return io_service_; } } // namespace interface -} // end namespace transport +} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 200c32a95..744ddd86d 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -598,4 +598,4 @@ class ProducerSocket : public Socket, } // namespace interface -} // namespace transport +} // namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index c9aa6a56a..b514d0587 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -739,4 +739,4 @@ void RTCTransportProtocol::returnContentToApplication( } // end namespace protocol -} // end namespace transport +} // end namespace transport \ No newline at end of file -- cgit 1.2.3-korg