aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
authorAlberto Compagno <acompagn+fdio@cisco.com>2019-02-18 10:56:49 +0000
committerGerrit Code Review <gerrit@fd.io>2019-02-18 10:56:49 +0000
commit2fd90aea1831942cda49d6635e95c86d8e494966 (patch)
treed7182493aa06976f252a0bdce34228e39ee8d050 /libtransport
parent7465d7ee3fbae80d24342930ad78682a6e674bb9 (diff)
parent79e0d4f89c4d532189aae06cc5dfbc14e3269703 (diff)
Merge "[HICN-50] Added udp application connector."
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/src/hicn/transport/core/CMakeLists.txt8
-rw-r--r--libtransport/src/hicn/transport/core/connector.cc8
-rw-r--r--libtransport/src/hicn/transport/core/connector.h27
-rw-r--r--libtransport/src/hicn/transport/core/forwarder_interface.h20
-rw-r--r--libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc121
-rw-r--r--libtransport/src/hicn/transport/core/hicn_forwarder_interface.h26
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.cc5
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.h2
-rw-r--r--libtransport/src/hicn/transport/core/portal.h33
-rw-r--r--libtransport/src/hicn/transport/core/raw_socket_connector.cc4
-rw-r--r--libtransport/src/hicn/transport/core/raw_socket_connector.h3
-rw-r--r--libtransport/src/hicn/transport/core/raw_socket_interface.h10
-rw-r--r--libtransport/src/hicn/transport/core/tcp_socket_connector.cc (renamed from libtransport/src/hicn/transport/core/socket_connector.cc)50
-rw-r--r--libtransport/src/hicn/transport/core/tcp_socket_connector.h (renamed from libtransport/src/hicn/transport/core/socket_connector.h)14
-rw-r--r--libtransport/src/hicn/transport/core/udp_socket_connector.cc201
-rw-r--r--libtransport/src/hicn/transport/core/udp_socket_connector.h88
-rw-r--r--libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc32
-rw-r--r--libtransport/src/hicn/transport/core/vpp_forwarder_interface.h13
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc4
19 files changed, 533 insertions, 136 deletions
diff --git a/libtransport/src/hicn/transport/core/CMakeLists.txt b/libtransport/src/hicn/transport/core/CMakeLists.txt
index dff93adeb..0e674fcac 100644
--- a/libtransport/src/hicn/transport/core/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/core/CMakeLists.txt
@@ -17,7 +17,6 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/content_object.h
${CMAKE_CURRENT_SOURCE_DIR}/facade.h
${CMAKE_CURRENT_SOURCE_DIR}/interest.h
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.h
${CMAKE_CURRENT_SOURCE_DIR}/manifest.h
${CMAKE_CURRENT_SOURCE_DIR}/manifest_inline.h
${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.h
@@ -29,7 +28,8 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/portal.h
${CMAKE_CURRENT_SOURCE_DIR}/prefix.h
${CMAKE_CURRENT_SOURCE_DIR}/connector.h
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.h
${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h
${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.h
${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_interface.h
@@ -39,12 +39,12 @@ list(APPEND HEADER_FILES
list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/content_object.cc
${CMAKE_CURRENT_SOURCE_DIR}/interest.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.cc
${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.cc
${CMAKE_CURRENT_SOURCE_DIR}/packet.cc
${CMAKE_CURRENT_SOURCE_DIR}/name.cc
${CMAKE_CURRENT_SOURCE_DIR}/prefix.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.cc
${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.cc
${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.cc
${CMAKE_CURRENT_SOURCE_DIR}/connector.cc
diff --git a/libtransport/src/hicn/transport/core/connector.cc b/libtransport/src/hicn/transport/core/connector.cc
index ff567d78a..e89b98f8a 100644
--- a/libtransport/src/hicn/transport/core/connector.cc
+++ b/libtransport/src/hicn/transport/core/connector.cc
@@ -21,7 +21,13 @@ namespace core {
std::once_flag Connector::init_flag_;
-Connector::Connector() : packet_pool_() { init(); }
+Connector::Connector(PacketReceivedCallback &&receive_callback,
+ OnReconnect &&reconnect_callback)
+ : packet_pool_(),
+ receive_callback_(std::move(receive_callback)),
+ on_reconnect_callback_(std::move(reconnect_callback)) {
+ init();
+}
void Connector::init() { increasePoolSize(); }
diff --git a/libtransport/src/hicn/transport/core/connector.h b/libtransport/src/hicn/transport/core/connector.h
index ae82ee973..529e97bf9 100644
--- a/libtransport/src/hicn/transport/core/connector.h
+++ b/libtransport/src/hicn/transport/core/connector.h
@@ -33,19 +33,20 @@ enum class ConnectorType : uint8_t {
VPP_CONNECTOR,
};
-static constexpr std::size_t packet_size = 2048;
-static constexpr std::size_t queue_size = 4096;
-static constexpr std::size_t packet_pool_size = 4096;
-
-using PacketRing = utils::CircularFifo<Packet::MemBufPtr, queue_size>;
-using PacketQueue = std::deque<Packet::MemBufPtr>;
-using PacketReceivedCallback = std::function<void(Packet::MemBufPtr &&)>;
-using OnReconnect = std::function<void()>;
-using PacketSentCallback = std::function<void()>;
-
class Connector {
public:
- Connector();
+ static constexpr std::size_t packet_size = 2048;
+ static constexpr std::size_t queue_size = 4096;
+ static constexpr std::size_t packet_pool_size = 4096;
+
+ using PacketRing = utils::CircularFifo<Packet::MemBufPtr, queue_size>;
+ using PacketQueue = std::deque<Packet::MemBufPtr>;
+ using PacketReceivedCallback = std::function<void(Packet::MemBufPtr &&)>;
+ using OnReconnect = std::function<void()>;
+ using PacketSentCallback = std::function<void()>;
+
+ Connector(PacketReceivedCallback &&receive_callback,
+ OnReconnect &&reconnect_callback);
virtual ~Connector() = default;
@@ -88,6 +89,10 @@ class Connector {
static std::once_flag init_flag_;
utils::ObjectPool<utils::MemBuf> packet_pool_;
PacketQueue output_buffer_;
+
+ // Connector events
+ PacketReceivedCallback receive_callback_;
+ OnReconnect on_reconnect_callback_;
};
} // end namespace core
diff --git a/libtransport/src/hicn/transport/core/forwarder_interface.h b/libtransport/src/hicn/transport/core/forwarder_interface.h
index e7b6fb1a6..de9f3b568 100644
--- a/libtransport/src/hicn/transport/core/forwarder_interface.h
+++ b/libtransport/src/hicn/transport/core/forwarder_interface.h
@@ -16,7 +16,7 @@
#pragma once
#include <hicn/transport/core/prefix.h>
-#include <hicn/transport/core/socket_connector.h>
+#include <hicn/transport/core/udp_socket_connector.h>
#include <hicn/transport/portability/portability.h>
#include <deque>
@@ -54,8 +54,6 @@ class ForwarderInterface {
}
public:
- static constexpr uint8_t ack_code = 102;
-
virtual ~ForwarderInterface() {}
TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) {
@@ -70,6 +68,20 @@ class ForwarderInterface {
return static_cast<Implementation &>(*this).getMtu();
}
+ TRANSPORT_ALWAYS_INLINE static bool isControlMessage(const uint8_t *message) {
+ return Implementation::isControlMessageImpl(message);
+ }
+
+ template <typename R>
+ TRANSPORT_ALWAYS_INLINE void processControlMessageReply(R &&packet_buffer) {
+ return static_cast<Implementation &>(*this).processControlMessageReplyImpl(
+ std::forward<R &&>(packet_buffer));
+ }
+
+ TRANSPORT_ALWAYS_INLINE void closeConnection() {
+ return static_cast<Implementation &>(*this).closeConnection();
+ }
+
template <
typename R,
typename = std::enable_if_t<
@@ -97,7 +109,7 @@ class ForwarderInterface {
counters_.tx_bytes += len;
// Perfect forwarding
- connector_.send(packet, len, std::forward<Handler>(packet_sent));
+ connector_.send(packet, len, std::forward<Handler &&>(packet_sent));
}
TRANSPORT_ALWAYS_INLINE void shutdown() { connector_.close(); }
diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc
index 9dc3b63bb..1c8060906 100644
--- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc
+++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc
@@ -15,11 +15,6 @@
#include <hicn/transport/core/hicn_forwarder_interface.h>
-#define ADDR_INET 1
-#define ADDR_INET6 2
-#define ADD_ROUTE 3
-#define REQUEST_LIGHT 100
-
union AddressLight {
uint32_t ipv4;
struct in6_addr ipv6;
@@ -30,6 +25,13 @@ typedef struct {
uint8_t command_id;
uint16_t length;
uint32_t seq_num;
+} CommandHeader;
+
+typedef struct {
+ uint8_t message_type;
+ uint8_t command_id;
+ uint16_t length;
+ uint32_t seq_num;
char symbolic_or_connid[16];
union AddressLight address;
uint16_t cost;
@@ -37,51 +39,104 @@ typedef struct {
uint8_t len;
} RouteToSelfCommand;
-namespace transport {
-
-namespace core {
-
-HicnForwarderInterface::HicnForwarderInterface(SocketConnector &connector)
- : ForwarderInterface<HicnForwarderInterface, SocketConnector>(connector) {}
-
-HicnForwarderInterface::~HicnForwarderInterface() {}
+typedef struct {
+ uint8_t message_type;
+ uint8_t command_id;
+ uint16_t length;
+ uint32_t seq_num;
+ char symbolic_or_connid[16];
+} DeleteSelfConnectionCommand;
-void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); }
+namespace {
+static constexpr uint8_t addr_inet = 1;
+static constexpr uint8_t addr_inet6 = 2;
+static constexpr uint8_t add_route_command = 3;
+static constexpr uint8_t delete_connection_command = 5;
+static constexpr uint8_t request_light = 0xc0;
+static constexpr char identifier[] = "SELF";
-void HicnForwarderInterface::registerRoute(Prefix &prefix) {
- auto addr = prefix.toSockaddr();
- const char *identifier = {"SELF_ROUTE"};
+void fillCommandHeader(CommandHeader *header) {
+ // Allocate and fill the header
+ header->message_type = request_light;
+ header->length = 1;
+}
- // allocate command payload
- RouteToSelfCommand *route_to_self = new RouteToSelfCommand();
+std::unique_ptr<RouteToSelfCommand> createCommandRoute(
+ std::unique_ptr<sockaddr> &&addr, uint8_t prefix_length) {
+ auto command = std::make_unique<RouteToSelfCommand>();
// check and set IP address
if (addr->sa_family == AF_INET) {
- route_to_self->address_type = ADDR_INET;
- route_to_self->address.ipv4 = ((Sockaddr4 *)addr.get())->sin_addr.s_addr;
+ command->address_type = addr_inet;
+ command->address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr;
} else if (addr->sa_family == AF_INET6) {
- route_to_self->address_type = ADDR_INET6;
- route_to_self->address.ipv6 = ((Sockaddr6 *)addr.get())->sin6_addr;
+ command->address_type = addr_inet6;
+ command->address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr;
}
// Fill remaining payload fields
#ifndef _WIN32
- strcpy(route_to_self->symbolic_or_connid, identifier);
+ strcpy(command->symbolic_or_connid, identifier);
#else
- strcpy_s(route_to_self->symbolic_or_connid, strlen(route_to_self->symbolic_or_connid), identifier);
+ strcpy_s(route_to_self->symbolic_or_connid,
+ strlen(route_to_self->symbolic_or_connid), identifier);
#endif
- route_to_self->cost = 1;
- route_to_self->len = (uint8_t) prefix.getPrefixLength();
+ command->cost = 1;
+ command->len = (uint8_t)prefix_length;
// Allocate and fill the header
- route_to_self->command_id = ADD_ROUTE;
- route_to_self->message_type = REQUEST_LIGHT;
- route_to_self->length = 1;
+ command->command_id = add_route_command;
+ fillCommandHeader((CommandHeader *)command.get());
+
+ return command;
+}
+
+std::unique_ptr<DeleteSelfConnectionCommand> createCommandDeleteConnection() {
+ auto command = std::make_unique<DeleteSelfConnectionCommand>();
+ fillCommandHeader((CommandHeader *)command.get());
+ command->command_id = delete_connection_command;
+
+#ifndef _WIN32
+ strcpy(command->symbolic_or_connid, identifier);
+#else
+ strcpy_s(route_to_self->symbolic_or_connid,
+ strlen(route_to_self->symbolic_or_connid), identifier);
+#endif
+
+ return command;
+}
+
+} // namespace
+
+namespace transport {
+
+namespace core {
+
+HicnForwarderInterface::HicnForwarderInterface(UdpSocketConnector &connector)
+ : ForwarderInterface<HicnForwarderInterface, UdpSocketConnector>(
+ connector) {}
+
+HicnForwarderInterface::~HicnForwarderInterface() {}
+
+void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); }
+
+void HicnForwarderInterface::registerRoute(Prefix &prefix) {
+ auto command =
+ createCommandRoute(prefix.toSockaddr(), prefix.getPrefixLength())
+ .release();
+ send((uint8_t *)command, sizeof(RouteToSelfCommand),
+ [command]() { delete command; });
+}
- send((uint8_t *)route_to_self, sizeof(RouteToSelfCommand),
- [route_to_self]() { delete route_to_self; });
+void HicnForwarderInterface::closeConnection() {
+ auto command = createCommandDeleteConnection().release();
+ send((uint8_t *)command, sizeof(DeleteSelfConnectionCommand),
+ [this, command]() {
+ delete command;
+ connector_.close();
+ });
}
} // namespace core
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h
index e57fae105..b11841b69 100644
--- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h
+++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h
@@ -17,7 +17,7 @@
#include <hicn/transport/core/forwarder_interface.h>
#include <hicn/transport/core/prefix.h>
-#include <hicn/transport/core/socket_connector.h>
+#include <hicn/transport/core/udp_socket_connector.h>
#include <deque>
@@ -26,7 +26,10 @@ namespace transport {
namespace core {
class HicnForwarderInterface
- : public ForwarderInterface<HicnForwarderInterface, SocketConnector> {
+ : public ForwarderInterface<HicnForwarderInterface, UdpSocketConnector> {
+ static constexpr uint8_t ack_code = 0xc2;
+ static constexpr uint8_t nack_code = 0xc3;
+
public:
union addressLight {
uint32_t ipv4;
@@ -46,9 +49,9 @@ class HicnForwarderInterface
};
using route_to_self_command = struct route_to_self_command;
- using ConnectorType = SocketConnector;
+ using ConnectorType = UdpSocketConnector;
- HicnForwarderInterface(SocketConnector &connector);
+ HicnForwarderInterface(UdpSocketConnector &connector);
~HicnForwarderInterface();
@@ -58,6 +61,21 @@ class HicnForwarderInterface
std::uint16_t getMtu() { return interface_mtu; }
+ TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl(
+ const uint8_t *message) {
+ return message[0] == ack_code || message[0] == nack_code;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl(
+ Packet::MemBufPtr &&packet_buffer) {
+ if (packet_buffer->data()[0] == nack_code) {
+ throw errors::RuntimeException(
+ "Received Nack message from hicn light forwarder.");
+ }
+ }
+
+ void closeConnection();
+
private:
static constexpr std::uint16_t interface_mtu = 1500;
};
diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc
index 38b2a2a98..c69a87fb7 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.cc
+++ b/libtransport/src/hicn/transport/core/memif_connector.cc
@@ -57,7 +57,7 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
OnReconnect &&on_reconnect_callback,
asio::io_service &io_service,
std::string app_name)
- : Connector(),
+ : Connector(std::move(receive_callback), std::move(on_reconnect_callback)),
memif_worker_(nullptr),
timer_set_(false),
send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
@@ -71,8 +71,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
enable_burst_(false),
closed_(false),
app_name_(app_name),
- receive_callback_(receive_callback),
- on_reconnect_callback_(on_reconnect_callback),
socket_filename_("") {
std::call_once(MemifConnector::flag_, &MemifConnector::init, this);
}
@@ -372,7 +370,6 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
packet->append(packet_length);
if (!connector->input_buffer_.push(std::move(packet))) {
-
TRANSPORT_LOGI("Error pushing packet. Ring buffer full.");
// TODO Here we should consider the possibility to signal the congestion
diff --git a/libtransport/src/hicn/transport/core/memif_connector.h b/libtransport/src/hicn/transport/core/memif_connector.h
index 3d2e8411d..06a8fd73e 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.h
+++ b/libtransport/src/hicn/transport/core/memif_connector.h
@@ -128,8 +128,6 @@ class MemifConnector : public Connector {
uint8_t memif_mode_;
std::string app_name_;
uint16_t transmission_index_;
- PacketReceivedCallback receive_callback_;
- OnReconnect on_reconnect_callback_;
utils::SpinLock write_msgs_lock_;
std::string socket_filename_;
diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h
index 0932b56c6..7efbc2009 100644
--- a/libtransport/src/hicn/transport/core/portal.h
+++ b/libtransport/src/hicn/transport/core/portal.h
@@ -22,7 +22,7 @@
#include <hicn/transport/core/name.h>
#include <hicn/transport/core/pending_interest.h>
#include <hicn/transport/core/prefix.h>
-#include <hicn/transport/core/socket_connector.h>
+#include <hicn/transport/core/udp_socket_connector.h>
#include <hicn/transport/errors/errors.h>
#include <hicn/transport/portability/portability.h>
#include <hicn/transport/utils/log.h>
@@ -222,22 +222,25 @@ class Portal {
}
TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) {
- for (auto &pend_interest : pending_interest_hash_table_) {
- pend_interest.second->cancelTimer();
- }
-
- clear();
-
if (kill_connection) {
- connector_.close();
+ forwarder_interface_.closeConnection();
}
- io_service_.post([this]() { io_service_.stop(); });
+ io_service_.post([this]() {
+ clear();
+ io_service_.stop();
+ });
}
TRANSPORT_ALWAYS_INLINE void killConnection() { connector_.close(); }
- TRANSPORT_ALWAYS_INLINE void clear() { pending_interest_hash_table_.clear(); }
+ TRANSPORT_ALWAYS_INLINE void clear() {
+ for (auto &pend_interest : pending_interest_hash_table_) {
+ pend_interest.second->cancelTimer();
+ }
+
+ pending_interest_hash_table_.clear();
+ }
TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() {
return io_service_;
@@ -260,17 +263,16 @@ class Portal {
return;
}
- if (packet_buffer->data()[0] == ForwarderInt::ack_code) {
- // Hicn forwarder message
+ if (TRANSPORT_EXPECT_FALSE(
+ ForwarderInt::isControlMessage(packet_buffer->data()))) {
processControlMessage(std::move(packet_buffer));
return;
}
- bool is_interest = Packet::isInterest(packet_buffer->data());
Packet::Format format = Packet::getFormatFromBuffer(packet_buffer->data());
if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) {
- if (!is_interest) {
+ if (!Packet::isInterest(packet_buffer->data())) {
processContentObject(
ContentObject::Ptr(new ContentObject(std::move(packet_buffer))));
} else {
@@ -329,8 +331,7 @@ class Portal {
TRANSPORT_ALWAYS_INLINE void processControlMessage(
Packet::MemBufPtr &&packet_buffer) {
- // Control message as response to the route set by a producer.
- // Do nothing
+ forwarder_interface_.processControlMessageReply(std::move(packet_buffer));
}
private:
diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.cc b/libtransport/src/hicn/transport/core/raw_socket_connector.cc
index 5cfff39fb..fe16d2132 100644
--- a/libtransport/src/hicn/transport/core/raw_socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/raw_socket_connector.cc
@@ -39,15 +39,13 @@ RawSocketConnector::RawSocketConnector(
PacketReceivedCallback &&receive_callback,
OnReconnect &&on_reconnect_callback, asio::io_service &io_service,
std::string app_name)
- : Connector(),
+ : Connector(std::move(receive_callback), std::move(on_reconnect_callback)),
io_service_(io_service),
socket_(io_service_, raw_protocol(PF_PACKET, SOCK_RAW)),
// resolver_(io_service_),
timer_(io_service_),
read_msg_(packet_pool_.makePtr(nullptr)),
data_available_(false),
- receive_callback_(receive_callback),
- on_reconnect_callback_(on_reconnect_callback),
app_name_(app_name) {
memset(&link_layer_address_, 0, sizeof(link_layer_address_));
}
diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.h b/libtransport/src/hicn/transport/core/raw_socket_connector.h
index 5e39efa0e..bb24d9d54 100644
--- a/libtransport/src/hicn/transport/core/raw_socket_connector.h
+++ b/libtransport/src/hicn/transport/core/raw_socket_connector.h
@@ -74,9 +74,6 @@ class RawSocketConnector : public Connector {
utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
bool data_available_;
-
- PacketReceivedCallback receive_callback_;
- OnReconnect on_reconnect_callback_;
std::string app_name_;
};
diff --git a/libtransport/src/hicn/transport/core/raw_socket_interface.h b/libtransport/src/hicn/transport/core/raw_socket_interface.h
index c030af662..ac48e5874 100644
--- a/libtransport/src/hicn/transport/core/raw_socket_interface.h
+++ b/libtransport/src/hicn/transport/core/raw_socket_interface.h
@@ -41,6 +41,16 @@ class RawSocketInterface
std::uint16_t getMtu() { return interface_mtu; }
+ TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl(
+ const uint8_t *message) {
+ return false;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl(
+ Packet::MemBufPtr &&packet_buffer) {}
+
+ TRANSPORT_ALWAYS_INLINE void closeConnection(){};
+
private:
static constexpr std::uint16_t interface_mtu = 1500;
std::string remote_mac_address_;
diff --git a/libtransport/src/hicn/transport/core/socket_connector.cc b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc
index 7bf0570ad..ade0f2611 100644
--- a/libtransport/src/hicn/transport/core/socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc
@@ -16,7 +16,7 @@
#ifdef _WIN32
#include <hicn/transport/portability/win_portability.h>
#endif
-#include <hicn/transport/core/socket_connector.h>
+#include <hicn/transport/core/tcp_socket_connector.h>
#include <hicn/transport/errors/errors.h>
#include <hicn/transport/utils/log.h>
#include <hicn/transport/utils/object_pool.h>
@@ -52,11 +52,11 @@ class NetworkMessage {
};
} // namespace
-SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback,
- OnReconnect &&on_reconnect_callback,
- asio::io_service &io_service,
- std::string app_name)
- : Connector(),
+TcpSocketConnector::TcpSocketConnector(
+ PacketReceivedCallback &&receive_callback,
+ OnReconnect &&on_reconnect_callback, asio::io_service &io_service,
+ std::string app_name)
+ : Connector(std::move(receive_callback), std::move(on_reconnect_callback)),
io_service_(io_service),
socket_(io_service_),
resolver_(io_service_),
@@ -66,30 +66,28 @@ SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback,
is_reconnection_(false),
data_available_(false),
is_closed_(false),
- receive_callback_(receive_callback),
- on_reconnect_callback_(on_reconnect_callback),
app_name_(app_name) {}
-SocketConnector::~SocketConnector() {}
+TcpSocketConnector::~TcpSocketConnector() {}
-void SocketConnector::connect(std::string ip_address, std::string port) {
+void TcpSocketConnector::connect(std::string ip_address, std::string port) {
endpoint_iterator_ = resolver_.resolve(
{ip_address, port, asio::ip::resolver_query_base::numeric_service});
doConnect();
}
-void SocketConnector::state() { return; }
+void TcpSocketConnector::state() { return; }
-void SocketConnector::send(const uint8_t *packet, std::size_t len,
- const PacketSentCallback &packet_sent) {
+void TcpSocketConnector::send(const uint8_t *packet, std::size_t len,
+ const PacketSentCallback &packet_sent) {
asio::async_write(socket_, asio::buffer(packet, len),
[packet_sent](std::error_code ec, std::size_t /*length*/) {
packet_sent();
});
}
-void SocketConnector::send(const Packet::MemBufPtr &packet) {
+void TcpSocketConnector::send(const Packet::MemBufPtr &packet) {
io_service_.post([this, packet]() {
bool write_in_progress = !output_buffer_.empty();
output_buffer_.push_back(std::move(packet));
@@ -104,7 +102,7 @@ void SocketConnector::send(const Packet::MemBufPtr &packet) {
});
}
-void SocketConnector::close() {
+void TcpSocketConnector::close() {
io_service_.dispatch([this]() {
is_closed_ = true;
socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
@@ -112,7 +110,7 @@ void SocketConnector::close() {
});
}
-void SocketConnector::doWrite() {
+void TcpSocketConnector::doWrite() {
// TODO improve this piece of code for sending many buffers togethers
// if list contains more than one packet
auto packet = output_buffer_.front().get();
@@ -143,7 +141,7 @@ void SocketConnector::doWrite() {
});
}
-void SocketConnector::doReadBody(std::size_t body_length) {
+void TcpSocketConnector::doReadBody(std::size_t body_length) {
asio::async_read(
socket_, asio::buffer(read_msg_->writableTail(), body_length),
asio::transfer_exactly(body_length),
@@ -163,7 +161,7 @@ void SocketConnector::doReadBody(std::size_t body_length) {
});
}
-void SocketConnector::doReadHeader() {
+void TcpSocketConnector::doReadHeader() {
read_msg_ = getPacket();
asio::async_read(
socket_,
@@ -191,7 +189,7 @@ void SocketConnector::doReadHeader() {
});
}
-void SocketConnector::tryReconnect() {
+void TcpSocketConnector::tryReconnect() {
if (!is_connecting_ && !is_closed_) {
TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
is_connecting_ = true;
@@ -205,7 +203,7 @@ void SocketConnector::tryReconnect() {
}
}
-void SocketConnector::doConnect() {
+void TcpSocketConnector::doConnect() {
asio::async_connect(socket_, endpoint_iterator_,
[this](std::error_code ec, tcp::resolver::iterator) {
if (!ec) {
@@ -232,17 +230,17 @@ void SocketConnector::doConnect() {
});
}
-bool SocketConnector::checkConnected() { return !is_connecting_; }
+bool TcpSocketConnector::checkConnected() { return !is_connecting_; }
-void SocketConnector::enableBurst() { return; }
+void TcpSocketConnector::enableBurst() { return; }
-void SocketConnector::startConnectionTimer() {
+void TcpSocketConnector::startConnectionTimer() {
timer_.expires_from_now(std::chrono::seconds(60));
- timer_.async_wait(
- std::bind(&SocketConnector::handleDeadline, this, std::placeholders::_1));
+ timer_.async_wait(std::bind(&TcpSocketConnector::handleDeadline, this,
+ std::placeholders::_1));
}
-void SocketConnector::handleDeadline(const std::error_code &ec) {
+void TcpSocketConnector::handleDeadline(const std::error_code &ec) {
if (!ec) {
io_service_.post([this]() {
socket_.close();
diff --git a/libtransport/src/hicn/transport/core/socket_connector.h b/libtransport/src/hicn/transport/core/tcp_socket_connector.h
index 6eff1aff5..8dfda4eb8 100644
--- a/libtransport/src/hicn/transport/core/socket_connector.h
+++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.h
@@ -28,14 +28,14 @@ namespace core {
using asio::ip::tcp;
-class SocketConnector : public Connector {
+class TcpSocketConnector : public Connector {
public:
- SocketConnector(PacketReceivedCallback &&receive_callback,
- OnReconnect &&reconnect_callback,
- asio::io_service &io_service,
- std::string app_name = "Libtransport");
+ TcpSocketConnector(PacketReceivedCallback &&receive_callback,
+ OnReconnect &&reconnect_callback,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
- ~SocketConnector() override;
+ ~TcpSocketConnector() override;
void send(const Packet::MemBufPtr &packet) override;
@@ -81,8 +81,6 @@ class SocketConnector : public Connector {
bool data_available_;
bool is_closed_;
- PacketReceivedCallback receive_callback_;
- OnReconnect on_reconnect_callback_;
std::string app_name_;
};
diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.cc b/libtransport/src/hicn/transport/core/udp_socket_connector.cc
new file mode 100644
index 000000000..f38891e71
--- /dev/null
+++ b/libtransport/src/hicn/transport/core/udp_socket_connector.cc
@@ -0,0 +1,201 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef _WIN32
+#include <hicn/transport/portability/win_portability.h>
+#endif
+#include <hicn/transport/core/udp_socket_connector.h>
+#include <hicn/transport/errors/errors.h>
+#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/object_pool.h>
+
+#include <vector>
+
+namespace transport {
+
+namespace core {
+
+UdpSocketConnector::UdpSocketConnector(
+ PacketReceivedCallback &&receive_callback,
+ OnReconnect &&on_reconnect_callback, asio::io_service &io_service,
+ std::string app_name)
+ : Connector(std::move(receive_callback), std::move(on_reconnect_callback)),
+ io_service_(io_service),
+ socket_(io_service_),
+ resolver_(io_service_),
+ connection_timer_(io_service_),
+ connection_timeout_(io_service_),
+ read_msg_(packet_pool_.makePtr(nullptr)),
+ is_connecting_(false),
+ is_reconnection_(false),
+ data_available_(false),
+ is_closed_(false),
+ app_name_(app_name) {}
+
+UdpSocketConnector::~UdpSocketConnector() {}
+
+void UdpSocketConnector::connect(std::string ip_address, std::string port) {
+ endpoint_iterator_ = resolver_.resolve(
+ {ip_address, port, asio::ip::resolver_query_base::numeric_service});
+
+ doConnect();
+}
+
+void UdpSocketConnector::state() { return; }
+
+void UdpSocketConnector::send(const uint8_t *packet, std::size_t len,
+ const PacketSentCallback &packet_sent) {
+ socket_.async_send(asio::buffer(packet, len),
+ [packet_sent](std::error_code ec, std::size_t /*length*/) {
+ packet_sent();
+ });
+}
+
+void UdpSocketConnector::send(const Packet::MemBufPtr &packet) {
+ io_service_.post([this, packet]() {
+ bool write_in_progress = !output_buffer_.empty();
+ output_buffer_.push_back(std::move(packet));
+ if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) {
+ if (!write_in_progress) {
+ doWrite();
+ }
+ } else {
+ // Tell the handle connect it has data to write
+ data_available_ = true;
+ }
+ });
+}
+
+void UdpSocketConnector::close() {
+ io_service_.dispatch([this]() {
+ is_closed_ = true;
+ socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ });
+}
+
+void UdpSocketConnector::doWrite() {
+ // TODO improve this piece of code for sending many buffers togethers
+ // if list contains more than one packet
+ auto packet = output_buffer_.front().get();
+ auto array = std::vector<asio::const_buffer>();
+
+ const utils::MemBuf *current = packet;
+ do {
+ array.push_back(asio::const_buffer(current->data(), current->length()));
+ current = current->next();
+ } while (current != packet);
+
+ socket_.async_send(std::move(array), [this /*, packet*/](std::error_code ec,
+ std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ output_buffer_.pop_front();
+ if (!output_buffer_.empty()) {
+ doWrite();
+ }
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ tryReconnect();
+ }
+ });
+}
+
+void UdpSocketConnector::doRead() {
+ read_msg_ = getPacket();
+ socket_.async_receive(
+ asio::buffer(read_msg_->writableData(), Connector::packet_size),
+ [this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ read_msg_->append(length);
+ receive_callback_(std::move(read_msg_));
+ doRead();
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ tryReconnect();
+ }
+ });
+}
+
+void UdpSocketConnector::tryReconnect() {
+ if (!is_connecting_ && !is_closed_) {
+ TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
+ is_connecting_ = true;
+ is_reconnection_ = true;
+ connection_timer_.expires_from_now(std::chrono::seconds(1));
+ connection_timer_.async_wait([this](const std::error_code &ec) {
+ if (!ec) {
+ socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ startConnectionTimer();
+ doConnect();
+ }
+ });
+ }
+}
+
+void UdpSocketConnector::doConnect() {
+ asio::async_connect(socket_, endpoint_iterator_,
+ [this](std::error_code ec, udp::resolver::iterator) {
+ if (!ec) {
+ connection_timeout_.cancel();
+ is_connecting_ = false;
+ doRead();
+
+ if (data_available_) {
+ data_available_ = false;
+ doWrite();
+ }
+
+ if (is_reconnection_) {
+ is_reconnection_ = false;
+ on_reconnect_callback_();
+ }
+ } else {
+ sleep(1);
+ doConnect();
+ }
+ });
+}
+
+bool UdpSocketConnector::checkConnected() { return !is_connecting_; }
+
+void UdpSocketConnector::enableBurst() { return; }
+
+void UdpSocketConnector::startConnectionTimer() {
+ connection_timeout_.expires_from_now(std::chrono::seconds(60));
+ connection_timeout_.async_wait(std::bind(&UdpSocketConnector::handleDeadline,
+ this, std::placeholders::_1));
+}
+
+void UdpSocketConnector::handleDeadline(const std::error_code &ec) {
+ if (!ec) {
+ io_service_.post([this]() {
+ socket_.close();
+ TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n");
+ io_service_.stop();
+ });
+ }
+}
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.h b/libtransport/src/hicn/transport/core/udp_socket_connector.h
new file mode 100644
index 000000000..4704fa50b
--- /dev/null
+++ b/libtransport/src/hicn/transport/core/udp_socket_connector.h
@@ -0,0 +1,88 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/core/name.h>
+#include <hicn/transport/utils/branch_prediction.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <deque>
+
+namespace transport {
+namespace core {
+
+using asio::ip::udp;
+
+class UdpSocketConnector : public Connector {
+ public:
+ UdpSocketConnector(PacketReceivedCallback &&receive_callback,
+ OnReconnect &&reconnect_callback,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
+
+ ~UdpSocketConnector() override;
+
+ void send(const Packet::MemBufPtr &packet) override;
+
+ void send(const uint8_t *packet, std::size_t len,
+ const PacketSentCallback &packet_sent = 0) override;
+
+ void close() override;
+
+ void enableBurst() override;
+
+ void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
+
+ void state() override;
+
+ private:
+ void doConnect();
+
+ void doRead();
+
+ void doWrite();
+
+ bool checkConnected();
+
+ private:
+ void handleDeadline(const std::error_code &ec);
+
+ void startConnectionTimer();
+
+ void tryReconnect();
+
+ asio::io_service &io_service_;
+ asio::ip::udp::socket socket_;
+ asio::ip::udp::resolver resolver_;
+ asio::ip::udp::resolver::iterator endpoint_iterator_;
+ asio::steady_timer connection_timer_;
+ asio::steady_timer connection_timeout_;
+
+ utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
+
+ bool is_connecting_;
+ bool is_reconnection_;
+ bool data_available_;
+ bool is_closed_;
+
+ std::string app_name_;
+};
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc
index 69b18c0d9..8dc607295 100644
--- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc
+++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc
@@ -47,20 +47,7 @@ VPPForwarderInterface::VPPForwarderInterface(MemifConnector &connector)
sw_if_index_(~0),
face_id_(~0) {}
-VPPForwarderInterface::~VPPForwarderInterface() {
- if (sw_if_index_ != uint32_t(~0) && VPPForwarderInterface::memif_api_) {
- int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_,
- sw_if_index_);
-
- if (ret < 0) {
- TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_);
- }
- }
-
- if (VPPForwarderInterface::api_) {
- vpp_binary_api_destroy(VPPForwarderInterface::api_);
- }
-}
+VPPForwarderInterface::~VPPForwarderInterface() {}
/**
* @brief Create a memif interface in the local VPP forwarder.
@@ -220,6 +207,23 @@ void VPPForwarderInterface::registerRoute(Prefix &prefix) {
}
}
+void VPPForwarderInterface::closeConnection() {
+ if (sw_if_index_ != uint32_t(~0) && VPPForwarderInterface::memif_api_) {
+ int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_,
+ sw_if_index_);
+
+ if (ret < 0) {
+ TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_);
+ }
+ }
+
+ if (VPPForwarderInterface::api_) {
+ vpp_binary_api_destroy(VPPForwarderInterface::api_);
+ }
+
+ connector_.close();
+}
+
} // namespace core
} // namespace transport
diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h
index 322cd1f8b..62af8bc3b 100644
--- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h
+++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h
@@ -31,6 +31,8 @@ namespace core {
class VPPForwarderInterface
: public ForwarderInterface<VPPForwarderInterface, MemifConnector> {
+ static constexpr std::uint16_t interface_mtu = 1500;
+
public:
VPPForwarderInterface(MemifConnector &connector);
@@ -44,6 +46,16 @@ class VPPForwarderInterface
TRANSPORT_ALWAYS_INLINE std::uint16_t getMtu() { return interface_mtu; }
+ TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl(
+ const uint8_t *message) {
+ return false;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl(
+ Packet::MemBufPtr &&packet_buffer) {}
+
+ void closeConnection();
+
private:
uint32_t getMemifConfiguration();
@@ -58,7 +70,6 @@ class VPPForwarderInterface
uint32_t sw_if_index_;
uint32_t face_id_;
static std::mutex global_lock_;
- static constexpr std::uint16_t interface_mtu = 1500;
};
} // namespace core
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index dbf1c1bd1..1356ad566 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -614,10 +614,10 @@ void RTCTransportProtocol::processRtcpHeader(uint8_t *offset) {
uint8_t pkt_type = (*(offset + 1));
switch (pkt_type) {
case HICN_RTCP_RR: // Receiver report
- TRANSPORT_LOGI("got RR packet\n");
+ TRANSPORT_LOGD("got RR packet\n");
break;
case HICN_RTCP_SR: // Sender report
- TRANSPORT_LOGI("got SR packet\n");
+ TRANSPORT_LOGD("got SR packet\n");
break;
case HICN_RTCP_SDES: // Description
processSDES(offset);