aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-04-16 18:38:07 +0200
committerMauro Sardara <msardara@cisco.com>2019-04-18 16:56:04 +0000
commitb375370d0f11163da8cb752c4a3f992a89ef80ee (patch)
treeb34e8d92fb616261209a09b89e3538ea94b4c3c3
parent564dfea33b993c3ff6572894ef35f91ba37d23ed (diff)
[HICN-178] Sync send of control messages.v19.04
Change-Id: I9a07c6c806ceba10f80a5f67337dce2eee76120d Signed-off-by: Mauro Sardara <msardara@cisco.com>
-rw-r--r--libtransport/src/hicn/transport/core/connector.h4
-rw-r--r--libtransport/src/hicn/transport/core/forwarder_interface.h6
-rw-r--r--libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc55
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.cc8
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.h5
-rw-r--r--libtransport/src/hicn/transport/core/portal.h201
-rw-r--r--libtransport/src/hicn/transport/core/raw_socket_connector.cc18
-rw-r--r--libtransport/src/hicn/transport/core/raw_socket_connector.h2
-rw-r--r--libtransport/src/hicn/transport/core/tcp_socket_connector.cc61
-rw-r--r--libtransport/src/hicn/transport/core/tcp_socket_connector.h2
-rw-r--r--libtransport/src/hicn/transport/core/udp_socket_connector.cc100
-rw-r--r--libtransport/src/hicn/transport/core/udp_socket_connector.h5
-rw-r--r--libtransport/src/hicn/transport/http/client_connection.cc2
-rw-r--r--libtransport/src/hicn/transport/http/client_connection.h2
-rw-r--r--libtransport/src/hicn/transport/http/server_acceptor.cc4
-rw-r--r--libtransport/src/hicn/transport/http/server_acceptor.h1
-rw-r--r--libtransport/src/hicn/transport/http/server_publisher.cc2
-rw-r--r--libtransport/src/hicn/transport/http/server_publisher.h1
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc2
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h2
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.cc12
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h2
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc2
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h2
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc2
-rw-r--r--utils/src/hiperf.cc2
26 files changed, 330 insertions, 175 deletions
diff --git a/libtransport/src/hicn/transport/core/connector.h b/libtransport/src/hicn/transport/core/connector.h
index c790f2b..f2bbe5d 100644
--- a/libtransport/src/hicn/transport/core/connector.h
+++ b/libtransport/src/hicn/transport/core/connector.h
@@ -64,10 +64,10 @@ class Connector {
virtual void close() = 0;
- virtual void enableBurst() = 0;
-
virtual ConnectorState state() { return state_; };
+ virtual bool isConnected() { return state_ == ConnectorState::CONNECTED; }
+
protected:
void increasePoolSize(std::size_t size = packet_pool_size);
diff --git a/libtransport/src/hicn/transport/core/forwarder_interface.h b/libtransport/src/hicn/transport/core/forwarder_interface.h
index b4bc26d..8fefba8 100644
--- a/libtransport/src/hicn/transport/core/forwarder_interface.h
+++ b/libtransport/src/hicn/transport/core/forwarder_interface.h
@@ -101,15 +101,13 @@ class ForwarderInterface {
connector_.send(packet.acquireMemBufReference());
}
- template <typename Handler>
- TRANSPORT_ALWAYS_INLINE void send(const uint8_t *packet, std::size_t len,
- Handler &&packet_sent) {
+ TRANSPORT_ALWAYS_INLINE void send(const uint8_t *packet, std::size_t len) {
// ASIO_COMPLETION_HANDLER_CHECK(Handler, packet_sent) type_check;
counters_.tx_packets++;
counters_.tx_bytes += len;
// Perfect forwarding
- connector_.send(packet, len, std::forward<Handler &&>(packet_sent));
+ connector_.send(packet, len);
}
TRANSPORT_ALWAYS_INLINE void shutdown() { connector_.close(); }
diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc
index 585d80f..33a37f5 100644
--- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc
+++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc
@@ -61,44 +61,44 @@ void fillCommandHeader(CommandHeader *header) {
header->length = 1;
}
-std::unique_ptr<RouteToSelfCommand> createCommandRoute(
- std::unique_ptr<sockaddr> &&addr, uint8_t prefix_length) {
- auto command = std::make_unique<RouteToSelfCommand>();
+RouteToSelfCommand createCommandRoute(std::unique_ptr<sockaddr> &&addr,
+ uint8_t prefix_length) {
+ RouteToSelfCommand command;
// check and set IP address
if (addr->sa_family == AF_INET) {
- command->address_type = addr_inet;
- command->address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr;
+ command.address_type = addr_inet;
+ command.address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr;
} else if (addr->sa_family == AF_INET6) {
- command->address_type = addr_inet6;
- command->address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr;
+ command.address_type = addr_inet6;
+ command.address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr;
}
// Fill remaining payload fields
#ifndef _WIN32
- strcpy(command->symbolic_or_connid, identifier);
+ strcpy(command.symbolic_or_connid, identifier);
#else
- strcpy_s(command->symbolic_or_connid, 16, identifier);
+ strcpy_s(command.symbolic_or_connid, 16, identifier);
#endif
- command->cost = 1;
- command->len = (uint8_t)prefix_length;
+ command.cost = 1;
+ command.len = (uint8_t)prefix_length;
// Allocate and fill the header
- command->command_id = add_route_command;
- fillCommandHeader((CommandHeader *)command.get());
+ command.command_id = add_route_command;
+ fillCommandHeader((CommandHeader *)&command);
return command;
}
-std::unique_ptr<DeleteSelfConnectionCommand> createCommandDeleteConnection() {
- auto command = std::make_unique<DeleteSelfConnectionCommand>();
- fillCommandHeader((CommandHeader *)command.get());
- command->command_id = delete_connection_command;
+DeleteSelfConnectionCommand createCommandDeleteConnection() {
+ DeleteSelfConnectionCommand command;
+ fillCommandHeader((CommandHeader *)&command);
+ command.command_id = delete_connection_command;
#ifndef _WIN32
- strcpy(command->symbolic_or_connid, identifier);
+ strcpy(command.symbolic_or_connid, identifier);
#else
- strcpy_s(command->symbolic_or_connid, 16, identifier);
+ strcpy_s(command.symbolic_or_connid, 16, identifier);
#endif
return command;
@@ -119,20 +119,15 @@ HicnForwarderInterface::~HicnForwarderInterface() {}
void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); }
void HicnForwarderInterface::registerRoute(Prefix &prefix) {
- auto command =
- createCommandRoute(prefix.toSockaddr(), (uint8_t)prefix.getPrefixLength())
- .release();
- send((uint8_t *)command, sizeof(RouteToSelfCommand),
- [command]() { delete command; });
+ auto command = createCommandRoute(prefix.toSockaddr(),
+ (uint8_t)prefix.getPrefixLength());
+ send((uint8_t *)&command, sizeof(RouteToSelfCommand));
}
void HicnForwarderInterface::closeConnection() {
- auto command = createCommandDeleteConnection().release();
- send((uint8_t *)command, sizeof(DeleteSelfConnectionCommand),
- [this, command]() {
- delete command;
- connector_.close();
- });
+ auto command = createCommandDeleteConnection();
+ send((uint8_t *)&command, sizeof(DeleteSelfConnectionCommand));
+ connector_.close();
}
} // namespace core
diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc
index f969580..a77f148 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.cc
+++ b/libtransport/src/hicn/transport/core/memif_connector.cc
@@ -14,6 +14,7 @@
*/
#include <hicn/transport/core/memif_connector.h>
+#include <hicn/transport/errors/not_implemented_exception.h>
#ifdef __vpp__
@@ -69,7 +70,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
tx_buf_counter_(0),
is_reconnection_(false),
data_available_(false),
- enable_burst_(false),
app_name_(app_name),
socket_filename_("") {
std::call_once(MemifConnector::flag_, &MemifConnector::init, this);
@@ -429,8 +429,6 @@ void MemifConnector::close() {
}
}
-void MemifConnector::enableBurst() { enable_burst_ = true; }
-
void MemifConnector::send(const Packet::MemBufPtr &packet) {
{
utils::SpinLock::Acquire locked(write_msgs_lock_);
@@ -496,7 +494,9 @@ int MemifConnector::doSend() {
}
void MemifConnector::send(const uint8_t *packet, std::size_t len,
- const PacketSentCallback &packet_sent) {}
+ const PacketSentCallback &packet_sent) {
+ throw errors::NotImplementedException();
+}
} // end namespace core
diff --git a/libtransport/src/hicn/transport/core/memif_connector.h b/libtransport/src/hicn/transport/core/memif_connector.h
index 057df37..ef100e3 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.h
+++ b/libtransport/src/hicn/transport/core/memif_connector.h
@@ -65,10 +65,6 @@ class MemifConnector : public Connector {
void connect(uint32_t memif_id, long memif_mode);
- // void runEventsLoop();
-
- void enableBurst() override;
-
TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; };
private:
@@ -120,7 +116,6 @@ class MemifConnector : public Connector {
PacketRing input_buffer_;
bool is_reconnection_;
bool data_available_;
- bool enable_burst_;
uint32_t memif_id_;
uint8_t memif_mode_;
std::string app_name_;
diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h
index 07f8407..3ea37c9 100644
--- a/libtransport/src/hicn/transport/core/portal.h
+++ b/libtransport/src/hicn/transport/core/portal.h
@@ -260,6 +260,26 @@ class BasicBindConfig {
using BindConfig = BasicBindConfig<Prefix>;
+/**
+ * Portal is a opaque class which is used for sending/receiving interest/data
+ * packets over multiple kind of connector. The connector itself is defined by
+ * the template ForwarderInt, which is resolved at compile time. It is then not
+ * possible to decide at runtime what the connector will be.
+ *
+ * The tasks performed by portal are the following:
+ * - Sending/Receiving Interest packets
+ * - Sending/Receiving Data packets
+ * - Set timers (one per interest), in order to trigger events if an interest is
+ * not satisfied
+ * - Register a producer prefix to the local forwarder
+ *
+ * The way of working of portal is event-based, which means that data and
+ * interests are sent/received in a asynchronous manner, and the notifications
+ * are performed through callbacks.
+ *
+ * The portal class is not thread safe, appropriate locking is required by the
+ * users of this class.
+ */
template <typename ForwarderInt>
class Portal {
static_assert(
@@ -269,12 +289,20 @@ class Portal {
"ForwarderInt must inherit from ForwarderInterface!");
public:
+ /**
+ * Consumer callback is an abstract class containing two methods to be
+ * implemented by a consumer application.
+ */
class ConsumerCallback {
public:
virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0;
virtual void onTimeout(Interest::Ptr &&i) = 0;
};
+ /**
+ * Producer callback is an abstract class containing two methods to be
+ * implemented by a producer application.
+ */
class ProducerCallback {
public:
virtual void onInterest(Interest::Ptr &&i) = 0;
@@ -287,33 +315,65 @@ class Portal {
app_name_("libtransport_application"),
consumer_callback_(nullptr),
producer_callback_(nullptr),
+ packet_pool_(io_service),
connector_(std::bind(&Portal::processIncomingMessages, this,
std::placeholders::_1),
std::bind(&Portal::setLocalRoutes, this), io_service_,
app_name_),
- forwarder_interface_(connector_),
- packet_pool_(io_service) {}
+ forwarder_interface_(connector_) {}
+ /**
+ * Set the consumer callback.
+ *
+ * @param consumer_callback - The pointer to the ConsumerCallback object.
+ */
void setConsumerCallback(ConsumerCallback *consumer_callback) {
consumer_callback_ = consumer_callback;
}
+ /**
+ * Set the producer callback.
+ *
+ * @param producer_callback - The pointer to the ProducerCallback object.
+ */
void setProducerCallback(ProducerCallback *producer_callback) {
producer_callback_ = producer_callback;
}
+ /**
+ * Specify the output interface to use. This method will be useful in a future
+ * scenario where the library will be able to forward packets without
+ * connecting to a local forwarder. Now it is not used.
+ *
+ * @param output_interface - The output interface to use for
+ * forwarding/receiving packets.
+ */
TRANSPORT_ALWAYS_INLINE void setOutputInterface(
const std::string &output_interface) {
forwarder_interface_.setOutputInterface(output_interface);
}
+ /**
+ * Connect the transport to the local hicn forwarder.
+ *
+ * @param is_consumer - Boolean specifying if the application on top of portal
+ * is a consumer or a producer.
+ */
TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) {
pending_interest_hash_table_.reserve(portal_details::pool_size);
forwarder_interface_.connect(is_consumer);
}
+ /**
+ * Destructor.
+ */
~Portal() { killConnection(); }
+ /**
+ * Check if there is already a pending interest for a given name.
+ *
+ * @param name - The interest name.
+ */
TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
auto it =
pending_interest_hash_table_.find(name.getHash32() + name.getSuffix());
@@ -324,6 +384,20 @@ class Portal {
return false;
}
+ /**
+ * Send an interest through to the local forwarder.
+ *
+ * @param interest - The pointer to the interest. The ownership of the
+ * interest is transferred by the caller to portal.
+ *
+ * @param on_content_object_callback - If the caller wishes to use a different
+ * callback to be called for this interest, it can set this parameter.
+ * Otherwise ConsumerCallback::onContentObject will be used.
+ *
+ * @param on_interest_timeout_callback - If the caller wishes to use a
+ * different callback to be called for this interest, it can set this
+ * parameter. Otherwise ConsumerCallback::onTimeout will be used.
+ */
TRANSPORT_ALWAYS_INLINE void sendInterest(
Interest::Ptr &&interest,
OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
@@ -347,6 +421,14 @@ class Portal {
std::make_pair(hash, std::move(pending_interest)));
}
+ /**
+ * Handler fot the timer set when the interest is sent.
+ *
+ * @param ec - Error code which says whether the timer expired or has been
+ * canceled upon data packet reception.
+ *
+ * @param hash - The index of the interest in the pending interest hash table.
+ */
TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec,
uint32_t hash) {
bool is_stopped = io_service_.stopped();
@@ -370,34 +452,59 @@ class Portal {
}
}
+ /**
+ * Register a producer name to the local forwarder and optionally set the
+ * content store size in a per-face manner.
+ *
+ * @param config - The configuration for the local forwarder binding.
+ */
TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) {
- connector_.enableBurst();
forwarder_interface_.setContentStoreSize(config.csReserved());
served_namespaces_.push_back(config.prefix());
- registerRoute(served_namespaces_.back());
+
+ setLocalRoutes();
}
+ /**
+ * Start the event loop. This function blocks here and calls the callback set
+ * by the application upon interest/data received or timeout.
+ */
TRANSPORT_ALWAYS_INLINE void runEventsLoop() {
if (io_service_.stopped()) {
io_service_.reset(); // ensure that run()/poll() will do some work
}
- this->io_service_.run();
+ io_service_.run();
}
+ /**
+ * Run one event and return.
+ */
TRANSPORT_ALWAYS_INLINE void runOneEvent() {
if (io_service_.stopped()) {
io_service_.reset(); // ensure that run()/poll() will do some work
}
- this->io_service_.run_one();
+ io_service_.run_one();
}
+ /**
+ * Send a data packet to the local forwarder. As opposite to sendInterest, the
+ * ownership of the content object is not transferred to the portal.
+ *
+ * @param content_object - The data packet.
+ */
TRANSPORT_ALWAYS_INLINE void sendContentObject(
ContentObject &content_object) {
forwarder_interface_.send(content_object);
}
+ /**
+ * Stop the event loop, canceling all the pending events in the event queue.
+ *
+ * Beware that stopping the event loop DOES NOT disconnect the transport from
+ * the local forwarder, the connector underneath will stay connected.
+ */
TRANSPORT_ALWAYS_INLINE void stopEventsLoop() {
if (!io_service_.stopped()) {
io_service_.dispatch([this]() {
@@ -407,32 +514,59 @@ class Portal {
}
}
+ /**
+ * Disconnect the transport from the local forwarder.
+ */
TRANSPORT_ALWAYS_INLINE void killConnection() {
forwarder_interface_.closeConnection();
}
+ /**
+ * Clear the pending interest hash table.
+ */
TRANSPORT_ALWAYS_INLINE void clear() {
- for (auto &pend_interest : pending_interest_hash_table_) {
- pend_interest.second->cancelTimer();
+ if (!io_service_.stopped()) {
+ io_service_.dispatch(std::bind(&Portal::doClear, this));
+ } else {
+ doClear();
}
-
- pending_interest_hash_table_.clear();
}
+ /**
+ * Get a reference to the io_service object.
+ */
TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() {
return io_service_;
}
- TRANSPORT_ALWAYS_INLINE std::size_t getPITSize() {
- connector_.state();
- return pending_interest_hash_table_.size();
- }
-
+ /**
+ * Register a route to the local forwarder.
+ */
TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) {
- forwarder_interface_.registerRoute(prefix);
+ served_namespaces_.push_back(prefix);
+ if (connector_.isConnected()) {
+ forwarder_interface_.registerRoute(prefix);
+ }
}
private:
+ /**
+ * Clear the pending interest hash table.
+ */
+ TRANSPORT_ALWAYS_INLINE void doClear() {
+ for (auto &pend_interest : pending_interest_hash_table_) {
+ pend_interest.second->cancelTimer();
+ }
+
+ pending_interest_hash_table_.clear();
+ }
+
+ /**
+ * Callback called by the underlying connector upon reception of a packet from
+ * the local forwarder.
+ *
+ * @param packet_buffer - The bytes of the packet.
+ */
TRANSPORT_ALWAYS_INLINE void processIncomingMessages(
Packet::MemBufPtr &&packet_buffer) {
bool is_stopped = io_service_.stopped();
@@ -463,9 +597,16 @@ class Portal {
}
}
+ /**
+ * Callback called by the transport upon connection to the local forwarder.
+ * It register the prefixes in the served_namespaces_ list to the local
+ * forwarder.
+ */
TRANSPORT_ALWAYS_INLINE void setLocalRoutes() {
- for (auto &name : served_namespaces_) {
- registerRoute(name);
+ for (auto &prefix : served_namespaces_) {
+ if (connector_.isConnected()) {
+ forwarder_interface_.registerRoute(prefix);
+ }
}
}
@@ -476,6 +617,14 @@ class Portal {
}
}
+ /**
+ * Process a content object:
+ * - Check if the data packet was effectively requested by portal
+ * - Delete its timer
+ * - Pass packet to application
+ *
+ * @param content_object - The data packet
+ */
TRANSPORT_ALWAYS_INLINE void processContentObject(
ContentObject::Ptr &&content_object) {
uint32_t hash = content_object->getName().getHash32() +
@@ -500,6 +649,11 @@ class Portal {
}
}
+ /**
+ * Process a control message. Control messages are different depending on the
+ * connector, then the forwarder_interface will do the job of understanding
+ * them.
+ */
TRANSPORT_ALWAYS_INLINE void processControlMessage(
Packet::MemBufPtr &&packet_buffer) {
forwarder_interface_.processControlMessageReply(std::move(packet_buffer));
@@ -512,19 +666,18 @@ class Portal {
std::string app_name_;
PendingInterestHashTable pending_interest_hash_table_;
+ std::list<Prefix> served_namespaces_;
ConsumerCallback *consumer_callback_;
ProducerCallback *producer_callback_;
- typename ForwarderInt::ConnectorType connector_;
- ForwarderInt forwarder_interface_;
-
- std::list<Prefix> served_namespaces_;
portal_details::Pool packet_pool_;
-
portal_details::HandlerMemory async_callback_memory_;
+
+ typename ForwarderInt::ConnectorType connector_;
+ ForwarderInt forwarder_interface_;
};
-} // end namespace core
+} // namespace core
} // end namespace transport
diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.cc b/libtransport/src/hicn/transport/core/raw_socket_connector.cc
index 12cc4e0..0e17436 100644
--- a/libtransport/src/hicn/transport/core/raw_socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/raw_socket_connector.cc
@@ -115,11 +115,17 @@ void RawSocketConnector::connect(const std::string &interface_name,
void RawSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
- // asio::async_write(socket_, asio::buffer(packet, len),
- // [packet_sent] (std::error_code ec,
- // std::size_t /*length*/) {
- // packet_sent();
- // });
+ if (packet_sent != 0) {
+ socket_.async_send(
+ asio::buffer(packet, len),
+ [packet_sent](std::error_code ec, std::size_t /*length*/) {
+ packet_sent();
+ });
+ } else {
+ if (state_ == ConnectorState::CONNECTED) {
+ socket_.send(asio::buffer(packet, len));
+ }
+ }
}
void RawSocketConnector::send(const Packet::MemBufPtr &packet) {
@@ -191,8 +197,6 @@ void RawSocketConnector::doConnect() {
socket_.bind(raw_endpoint(&link_layer_address_, sizeof(link_layer_address_)));
}
-void RawSocketConnector::enableBurst() { return; }
-
} // end namespace core
} // end namespace transport
diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.h b/libtransport/src/hicn/transport/core/raw_socket_connector.h
index a5474f7..fe9ceb2 100644
--- a/libtransport/src/hicn/transport/core/raw_socket_connector.h
+++ b/libtransport/src/hicn/transport/core/raw_socket_connector.h
@@ -49,8 +49,6 @@ class RawSocketConnector : public Connector {
void close() override;
- void enableBurst() override;
-
void connect(const std::string &interface_name,
const std::string &mac_address_str);
diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.cc b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc
index f1fd4bb..c82373a 100644
--- a/libtransport/src/hicn/transport/core/tcp_socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc
@@ -21,6 +21,7 @@
#include <hicn/transport/utils/log.h>
#include <hicn/transport/utils/object_pool.h>
+#include <thread>
#include <vector>
namespace transport {
@@ -78,10 +79,15 @@ void TcpSocketConnector::connect(std::string ip_address, std::string port) {
void TcpSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
- asio::async_write(socket_, asio::buffer(packet, len),
- [packet_sent](std::error_code ec, std::size_t /*length*/) {
- packet_sent();
- });
+ if (packet_sent != 0) {
+ asio::async_write(socket_, asio::buffer(packet, len),
+ [packet_sent](std::error_code ec,
+ std::size_t /*length*/) { packet_sent(); });
+ } else {
+ if (state_ == ConnectorState::CONNECTED) {
+ asio::write(socket_, asio::buffer(packet, len));
+ }
+ }
}
void TcpSocketConnector::send(const Packet::MemBufPtr &packet) {
@@ -223,38 +229,37 @@ void TcpSocketConnector::tryReconnect() {
}
void TcpSocketConnector::doConnect() {
- asio::async_connect(socket_, endpoint_iterator_,
- [this](std::error_code ec, tcp::resolver::iterator) {
- if (!ec) {
- timer_.cancel();
- state_ = ConnectorState::CONNECTED;
- asio::ip::tcp::no_delay noDelayOption(true);
- socket_.set_option(noDelayOption);
- doReadHeader();
+ asio::async_connect(
+ socket_, endpoint_iterator_,
+ [this](std::error_code ec, tcp::resolver::iterator) {
+ if (!ec) {
+ timer_.cancel();
+ state_ = ConnectorState::CONNECTED;
+ asio::ip::tcp::no_delay noDelayOption(true);
+ socket_.set_option(noDelayOption);
+ doReadHeader();
- if (data_available_) {
- data_available_ = false;
- doWrite();
- }
+ if (data_available_) {
+ data_available_ = false;
+ doWrite();
+ }
- if (is_reconnection_) {
- is_reconnection_ = false;
- TRANSPORT_LOGI("Connection recovered!\n");
- on_reconnect_callback_();
- }
- } else {
- sleep(1);
- doConnect();
- }
- });
+ if (is_reconnection_) {
+ is_reconnection_ = false;
+ TRANSPORT_LOGI("Connection recovered!\n");
+ on_reconnect_callback_();
+ }
+ } else {
+ doConnect();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ });
}
bool TcpSocketConnector::checkConnected() {
return state_ == ConnectorState::CONNECTED;
}
-void TcpSocketConnector::enableBurst() { return; }
-
void TcpSocketConnector::startConnectionTimer() {
timer_.expires_from_now(std::chrono::seconds(60));
timer_.async_wait(std::bind(&TcpSocketConnector::handleDeadline, this,
diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.h b/libtransport/src/hicn/transport/core/tcp_socket_connector.h
index 6df2fed..d755b5e 100644
--- a/libtransport/src/hicn/transport/core/tcp_socket_connector.h
+++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.h
@@ -47,8 +47,6 @@ class TcpSocketConnector : public Connector {
void close() override;
- void enableBurst() override;
-
void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
private:
diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.cc b/libtransport/src/hicn/transport/core/udp_socket_connector.cc
index 54c0eb9..38945e7 100644
--- a/libtransport/src/hicn/transport/core/udp_socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/udp_socket_connector.cc
@@ -21,6 +21,7 @@
#include <hicn/transport/utils/log.h>
#include <hicn/transport/utils/object_pool.h>
+#include <thread>
#include <vector>
namespace transport {
@@ -36,7 +37,6 @@ UdpSocketConnector::UdpSocketConnector(
socket_(io_service_),
resolver_(io_service_),
connection_timer_(io_service_),
- connection_timeout_(io_service_),
read_msg_(packet_pool_.makePtr(nullptr)),
is_reconnection_(false),
data_available_(false),
@@ -54,10 +54,17 @@ void UdpSocketConnector::connect(std::string ip_address, std::string port) {
void UdpSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
- socket_.async_send(asio::buffer(packet, len),
- [packet_sent](std::error_code ec, std::size_t /*length*/) {
- packet_sent();
- });
+ if (packet_sent != 0) {
+ socket_.async_send(
+ asio::buffer(packet, len),
+ [packet_sent](std::error_code ec, std::size_t /*length*/) {
+ packet_sent();
+ });
+ } else {
+ if (state_ == ConnectorState::CONNECTED) {
+ socket_.send(asio::buffer(packet, len));
+ }
+ }
}
void UdpSocketConnector::send(const Packet::MemBufPtr &packet) {
@@ -76,6 +83,14 @@ void UdpSocketConnector::send(const Packet::MemBufPtr &packet) {
}
void UdpSocketConnector::close() {
+ if (io_service_.stopped()) {
+ doClose();
+ } else {
+ io_service_.dispatch(std::bind(&UdpSocketConnector::doClose, this));
+ }
+}
+
+void UdpSocketConnector::doClose() {
if (state_ != ConnectorState::CLOSED) {
state_ = ConnectorState::CLOSED;
if (socket_.is_open()) {
@@ -86,8 +101,6 @@ void UdpSocketConnector::close() {
}
void UdpSocketConnector::doWrite() {
- // TODO improve this piece of code for sending many buffers togethers
- // if list contains more than one packet
auto packet = output_buffer_.front().get();
auto array = std::vector<asio::const_buffer>();
@@ -97,8 +110,8 @@ void UdpSocketConnector::doWrite() {
current = current->next();
} while (current != packet);
- socket_.async_send(std::move(array), [this /*, packet*/](std::error_code ec,
- std::size_t length) {
+ socket_.async_send(std::move(array), [this](std::error_code ec,
+ std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
output_buffer_.pop_front();
if (!output_buffer_.empty()) {
@@ -139,54 +152,53 @@ void UdpSocketConnector::tryReconnect() {
TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
state_ = ConnectorState::CONNECTING;
is_reconnection_ = true;
- connection_timer_.expires_from_now(std::chrono::seconds(1));
- connection_timer_.async_wait([this](const std::error_code &ec) {
- if (!ec) {
- if (socket_.is_open()) {
- socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
- socket_.close();
- }
- startConnectionTimer();
- doConnect();
+ io_service_.post([this]() {
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
}
+
+ doConnect();
+ startConnectionTimer();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
});
}
}
void UdpSocketConnector::doConnect() {
- asio::async_connect(socket_, endpoint_iterator_,
- [this](std::error_code ec, udp::resolver::iterator) {
- if (!ec) {
- connection_timeout_.cancel();
- state_ = ConnectorState::CONNECTED;
- doRead();
-
- if (data_available_) {
- data_available_ = false;
- doWrite();
- }
-
- if (is_reconnection_) {
- is_reconnection_ = false;
- on_reconnect_callback_();
- }
- } else {
- sleep(1);
- doConnect();
- }
- });
+ asio::async_connect(
+ socket_, endpoint_iterator_,
+ [this](std::error_code ec, udp::resolver::iterator) {
+ if (!ec) {
+ connection_timer_.cancel();
+ state_ = ConnectorState::CONNECTED;
+ doRead();
+
+ if (data_available_) {
+ data_available_ = false;
+ doWrite();
+ }
+
+ if (is_reconnection_) {
+ is_reconnection_ = false;
+ }
+
+ on_reconnect_callback_();
+ } else {
+ doConnect();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ });
}
bool UdpSocketConnector::checkConnected() {
return state_ == ConnectorState::CONNECTED;
}
-void UdpSocketConnector::enableBurst() { return; }
-
void UdpSocketConnector::startConnectionTimer() {
- connection_timeout_.expires_from_now(std::chrono::seconds(60));
- connection_timeout_.async_wait(std::bind(&UdpSocketConnector::handleDeadline,
- this, std::placeholders::_1));
+ connection_timer_.expires_from_now(std::chrono::seconds(60));
+ connection_timer_.async_wait(std::bind(&UdpSocketConnector::handleDeadline,
+ this, std::placeholders::_1));
}
void UdpSocketConnector::handleDeadline(const std::error_code &ec) {
diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.h b/libtransport/src/hicn/transport/core/udp_socket_connector.h
index 87198ef..7c5dbaf 100644
--- a/libtransport/src/hicn/transport/core/udp_socket_connector.h
+++ b/libtransport/src/hicn/transport/core/udp_socket_connector.h
@@ -45,8 +45,6 @@ class UdpSocketConnector : public Connector {
void close() override;
- void enableBurst() override;
-
void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
private:
@@ -56,6 +54,8 @@ class UdpSocketConnector : public Connector {
void doWrite();
+ void doClose();
+
bool checkConnected();
private:
@@ -70,7 +70,6 @@ class UdpSocketConnector : public Connector {
asio::ip::udp::resolver resolver_;
asio::ip::udp::resolver::iterator endpoint_iterator_;
asio::steady_timer connection_timer_;
- asio::steady_timer connection_timeout_;
utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
diff --git a/libtransport/src/hicn/transport/http/client_connection.cc b/libtransport/src/hicn/transport/http/client_connection.cc
index fb7dbdf..fadf0ae 100644
--- a/libtransport/src/hicn/transport/http/client_connection.cc
+++ b/libtransport/src/hicn/transport/http/client_connection.cc
@@ -26,7 +26,7 @@ namespace http {
using namespace transport;
HTTPClientConnection::HTTPClientConnection()
- : consumer_(TransportProtocolAlgorithms::RAAQM, io_service_),
+ : consumer_(TransportProtocolAlgorithms::RAAQM),
read_bytes_callback_(nullptr),
read_buffer_(nullptr),
response_(std::make_shared<HTTPResponse>()),
diff --git a/libtransport/src/hicn/transport/http/client_connection.h b/libtransport/src/hicn/transport/http/client_connection.h
index 6c150f8..d062470 100644
--- a/libtransport/src/hicn/transport/http/client_connection.h
+++ b/libtransport/src/hicn/transport/http/client_connection.h
@@ -88,8 +88,6 @@ class HTTPClientConnection : public ConsumerSocket::ReadCallback {
void readError(const std::error_code ec) noexcept override;
void readSuccess(std::size_t total_size) noexcept override;
- asio::io_service io_service_;
-
// The consumer socket
ConsumerSocket consumer_;
diff --git a/libtransport/src/hicn/transport/http/server_acceptor.cc b/libtransport/src/hicn/transport/http/server_acceptor.cc
index 486b04c..e478dfc 100644
--- a/libtransport/src/hicn/transport/http/server_acceptor.cc
+++ b/libtransport/src/hicn/transport/http/server_acceptor.cc
@@ -63,9 +63,7 @@ HTTPServerAcceptor::HTTPServerAcceptor(std::string &server_locator,
core::Prefix acceptor_namespace(network, 64);
std::string producer_identity = "acceptor_producer";
- acceptor_producer_ = std::make_shared<ProducerSocket>(
- io_service_); /*,
- utils::Identity::generateIdentity(producer_identity));*/
+ acceptor_producer_ = std::make_shared<ProducerSocket>();
acceptor_producer_->registerPrefix(acceptor_namespace);
}
diff --git a/libtransport/src/hicn/transport/http/server_acceptor.h b/libtransport/src/hicn/transport/http/server_acceptor.h
index 4e7350b7..6ed58f7 100644
--- a/libtransport/src/hicn/transport/http/server_acceptor.h
+++ b/libtransport/src/hicn/transport/http/server_acceptor.h
@@ -53,7 +53,6 @@ class HTTPServerAcceptor {
void processIncomingInterest(ProducerSocket &p, Interest &interest);
OnHttpRequest callback_;
- asio::io_service io_service_;
std::shared_ptr<ProducerSocket> acceptor_producer_;
std::map<int, std::shared_ptr<HTTPServerPublisher>> publishers_;
diff --git a/libtransport/src/hicn/transport/http/server_publisher.cc b/libtransport/src/hicn/transport/http/server_publisher.cc
index 012f360..6a4bb9c 100644
--- a/libtransport/src/hicn/transport/http/server_publisher.cc
+++ b/libtransport/src/hicn/transport/http/server_publisher.cc
@@ -23,7 +23,7 @@ namespace http {
HTTPServerPublisher::HTTPServerPublisher(const core::Name &content_name)
: content_name_(content_name, true) {
std::string identity = "acceptor_producer";
- producer_ = std::make_unique<ProducerSocket>(io_service_);
+ producer_ = std::make_unique<ProducerSocket>();
// utils::Identity::generateIdentity(identity));
core::Prefix publisher_prefix(content_name_, 128);
producer_->registerPrefix(publisher_prefix);
diff --git a/libtransport/src/hicn/transport/http/server_publisher.h b/libtransport/src/hicn/transport/http/server_publisher.h
index 1f12fd8..33d596f 100644
--- a/libtransport/src/hicn/transport/http/server_publisher.h
+++ b/libtransport/src/hicn/transport/http/server_publisher.h
@@ -59,7 +59,6 @@ class HTTPServerPublisher {
private:
Name content_name_;
std::unique_ptr<asio::steady_timer> timer_;
- asio::io_service io_service_;
std::unique_ptr<ProducerSocket> producer_;
ProducerInterestCallback interest_enter_callback_;
utils::UserCallback wait_callback_;
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 41feb4b..495f8c8 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -215,4 +215,4 @@ void RTCProducerSocket::sendNack(const Interest &interest) {
} // namespace interface
-} // end namespace transport
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index 6506b50..be39d2b 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -63,4 +63,4 @@ class RTCProducerSocket : public ProducerSocket {
} // namespace interface
-} // end namespace transport
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
index 37d5457..af99fd6 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -123,10 +123,12 @@ void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest,
}
void ConsumerSocket::stop() {
- if (transport_protocol_->isRunning()) {
- std::cout << "Stopping transport protocol " << std::endl;
- transport_protocol_->stop();
- }
+ auto &io_service = getIoService();
+ io_service.dispatch([this]() {
+ if (transport_protocol_->isRunning()) {
+ transport_protocol_->stop();
+ }
+ });
}
void ConsumerSocket::resume() {
@@ -141,4 +143,4 @@ asio::io_service &ConsumerSocket::getIoService() {
} // namespace interface
-} // end namespace transport
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index 40344af..41646c9 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -868,4 +868,4 @@ class ConsumerSocket : public BaseSocket {
} // namespace interface
-} // end namespace transport
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index c4cf958..c85b8af 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -366,4 +366,4 @@ asio::io_service &ProducerSocket::getIoService() { return io_service_; }
} // namespace interface
-} // end namespace transport
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index 200c32a..744ddd8 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -598,4 +598,4 @@ class ProducerSocket : public Socket<BasePortal>,
} // namespace interface
-} // namespace transport
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index c9aa6a5..b514d05 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -739,4 +739,4 @@ void RTCTransportProtocol::returnContentToApplication(
} // end namespace protocol
-} // end namespace transport
+} // end namespace transport \ No newline at end of file
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index ddc79d5..681696d 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -467,6 +467,7 @@ class HIperfClient {
void readError(const std::error_code ec) noexcept override {
std::cerr << "Error while reading from RTC socket" << std::endl;
+ client_.io_service_.stop();
}
void readSuccess(std::size_t total_size) noexcept override {
@@ -509,6 +510,7 @@ class HIperfClient {
void readError(const std::error_code ec) noexcept override {
std::cerr << "Error " << ec.message() << " while reading from socket"
<< std::endl;
+ client_.io_service_.stop();
}
void readSuccess(std::size_t total_size) noexcept override {