summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-02-11 10:44:29 +0100
committerMauro Sardara <msardara@cisco.com>2019-02-18 10:48:38 +0000
commit79e0d4f89c4d532189aae06cc5dfbc14e3269703 (patch)
treeadc83eccb824c019c3c43cc48bcd4de6656eea8d
parent7d2b217bd01a8da1a2ac57aaad59b3179c7af916 (diff)
[HICN-50] Added udp application connector.
Change-Id: I0c5afad4b404ec485f50b1342b81e70ef85a5163 Signed-off-by: Mauro Sardara <msardara@cisco.com> Signed-off-by: michele papalini <micpapal@cisco.com>
-rw-r--r--hicn-light/src/command_line/daemon/hicnLightDaemon_main.c5
-rw-r--r--hicn-light/src/config/configuration.c24
-rw-r--r--hicn-light/src/config/configurationListeners.c21
-rw-r--r--hicn-light/src/config/configurationListeners.h3
-rw-r--r--hicn-light/src/core/connection.c7
-rw-r--r--hicn-light/src/core/connection.h6
-rw-r--r--hicn-light/src/core/forwarder.c5
-rw-r--r--hicn-light/src/core/forwarder.h6
-rw-r--r--hicn-light/src/io/hicnConnection.c9
-rw-r--r--hicn-light/src/io/ioOperations.c6
-rw-r--r--hicn-light/src/io/ioOperations.h36
-rw-r--r--hicn-light/src/io/streamConnection.c38
-rw-r--r--hicn-light/src/io/udpConnection.c94
-rw-r--r--hicn-light/src/io/udpListener.c216
-rw-r--r--hicn-light/src/utils/commands.h51
-rw-r--r--libtransport/src/hicn/transport/core/CMakeLists.txt8
-rw-r--r--libtransport/src/hicn/transport/core/connector.cc8
-rw-r--r--libtransport/src/hicn/transport/core/connector.h27
-rw-r--r--libtransport/src/hicn/transport/core/forwarder_interface.h20
-rw-r--r--libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc121
-rw-r--r--libtransport/src/hicn/transport/core/hicn_forwarder_interface.h26
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.cc5
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.h2
-rw-r--r--libtransport/src/hicn/transport/core/portal.h33
-rw-r--r--libtransport/src/hicn/transport/core/raw_socket_connector.cc4
-rw-r--r--libtransport/src/hicn/transport/core/raw_socket_connector.h3
-rw-r--r--libtransport/src/hicn/transport/core/raw_socket_interface.h10
-rw-r--r--libtransport/src/hicn/transport/core/tcp_socket_connector.cc (renamed from libtransport/src/hicn/transport/core/socket_connector.cc)50
-rw-r--r--libtransport/src/hicn/transport/core/tcp_socket_connector.h (renamed from libtransport/src/hicn/transport/core/socket_connector.h)14
-rw-r--r--libtransport/src/hicn/transport/core/udp_socket_connector.cc201
-rw-r--r--libtransport/src/hicn/transport/core/udp_socket_connector.h88
-rw-r--r--libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc32
-rw-r--r--libtransport/src/hicn/transport/core/vpp_forwarder_interface.h13
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc4
-rw-r--r--utils/src/hiperf.cc39
-rw-r--r--utils/src/ping_client.cc4
-rw-r--r--utils/src/ping_server.cc16
37 files changed, 829 insertions, 426 deletions
diff --git a/hicn-light/src/command_line/daemon/hicnLightDaemon_main.c b/hicn-light/src/command_line/daemon/hicnLightDaemon_main.c
index dac8ee89f..ce8c373ca 100644
--- a/hicn-light/src/command_line/daemon/hicnLightDaemon_main.c
+++ b/hicn-light/src/command_line/daemon/hicnLightDaemon_main.c
@@ -371,12 +371,9 @@ int main(int argc, const char *argv[]) {
configuration_SetObjectStoreSize(configuration, capacity);
}
+ forwarder_SetupLocalListeners(forwarder, port);
if (configFileName) {
- forwarder_SetupAllListeners(forwarder, port, NULL);
forwarder_SetupFromConfigFile(forwarder, configFileName);
- } else {
- // NULL to not setup AF_UNIX
- forwarder_SetupAllListeners(forwarder, port, NULL);
}
Dispatcher *dispatcher = forwarder_GetDispatcher(forwarder);
diff --git a/hicn-light/src/config/configuration.c b/hicn-light/src/config/configuration.c
index 1a41a9642..865dbca4d 100644
--- a/hicn-light/src/config/configuration.c
+++ b/hicn-light/src/config/configuration.c
@@ -115,7 +115,7 @@ struct iovec *configuration_ProcessRegisterHicnPrefix(Configuration *config,
const char *symbolicOrConnid = control->symbolicOrConnid;
- if (strcmp(symbolicOrConnid, "SELF_ROUTE") == 0) {
+ if (strcmp(symbolicOrConnid, "SELF") == 0) {
success = forwarder_AddOrUpdateRoute(config->forwarder, control, ingressId);
} else if (utils_IsNumber(symbolicOrConnid)) {
// case for connid as input
@@ -304,11 +304,14 @@ static void configuration_SendResponse(Configuration *config, struct iovec *msg,
ConnectionTable *connectionTable =
forwarder_GetConnectionTable(config->forwarder);
const Connection *conn = connectionTable_FindById(connectionTable, egressId);
- parcAssertNotNull(conn,
- "Got null connection for control message we just received");
- IoOperations *ops = connection_GetIoOperations(conn);
- streamState_SendCommandResponse(ops, msg);
+ if (conn == NULL) {
+ return;
+ }
+
+ connection_SendCommandResponse(conn, msg);
+ //IoOperations *ops = connection_GetIoOperations(conn);
+ //streamState_SendCommandResponse(ops, msg);
}
struct iovec *configuration_ProcessCreateTunnel(Configuration *config,
@@ -425,7 +428,8 @@ struct iovec *configuration_ProcessCreateTunnel(Configuration *config,
*/
struct iovec *configuration_ProcessRemoveTunnel(Configuration *config,
- struct iovec *request) {
+ struct iovec *request,
+ unsigned ingressId) {
header_control_message *header = request[0].iov_base;
remove_connection_command *control = request[1].iov_base;
@@ -434,7 +438,11 @@ struct iovec *configuration_ProcessRemoveTunnel(Configuration *config,
const char *symbolicOrConnid = control->symbolicOrConnid;
ConnectionTable *table = forwarder_GetConnectionTable(config->forwarder);
- if (utils_IsNumber(symbolicOrConnid)) {
+ if (strcmp(symbolicOrConnid, "SELF") == 0) {
+ forwarder_RemoveConnectionIdFromRoutes(config->forwarder, ingressId);
+ connectionTable_RemoveById(table, ingressId);
+ success = true;
+ } else if (utils_IsNumber(symbolicOrConnid)) {
// case for connid as input
unsigned connid = (unsigned)strtold(symbolicOrConnid, NULL);
@@ -997,7 +1005,7 @@ struct iovec *configuration_DispatchCommand(Configuration *config,
break;
case REMOVE_CONNECTION:
- response = configuration_ProcessRemoveTunnel(config, control);
+ response = configuration_ProcessRemoveTunnel(config, control, ingressId);
break;
case REMOVE_ROUTE:
diff --git a/hicn-light/src/config/configurationListeners.c b/hicn-light/src/config/configurationListeners.c
index 01ab9a3e7..11276e2dd 100644
--- a/hicn-light/src/config/configurationListeners.c
+++ b/hicn-light/src/config/configurationListeners.c
@@ -217,7 +217,7 @@ static bool _setupTcpListenerOnInet(Forwarder *forwarder, ipv4_addr_t *addr4,
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
- addr.sin_port = *port;
+ addr.sin_port = htons(*port);
addr.sin_addr.s_addr = *addr4;
ListenerOps *ops = tcpListener_CreateInet(forwarder, addr);
@@ -231,12 +231,13 @@ static bool _setupTcpListenerOnInet(Forwarder *forwarder, ipv4_addr_t *addr4,
static bool _setupUdpListenerOnInet(Forwarder *forwarder, ipv4_addr_t *addr4,
uint16_t *port) {
+
bool success = false;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
- addr.sin_port = *port;
+ addr.sin_port = htons(*port);
addr.sin_addr.s_addr = *addr4;
ListenerOps *ops = udpListener_CreateInet(forwarder, addr);
@@ -256,7 +257,7 @@ static bool _setupTcpListenerOnInet6Light(Forwarder *forwarder,
struct sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
- addr.sin6_port = *port;
+ addr.sin6_port = htons(*port);
addr.sin6_addr = *addr6;
addr.sin6_scope_id = scopeId;
@@ -276,7 +277,7 @@ static bool _setupUdpListenerOnInet6Light(Forwarder *forwarder,
struct sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
- addr.sin6_port = *port;
+ addr.sin6_port = htons(*port);
addr.sin6_addr = *addr6;
addr.sin6_scope_id = 0;
@@ -529,9 +530,13 @@ void configurationListeners_SetupAll(const Configuration *config, uint16_t port,
}
}
- // if (localPath != NULL) {
- // _setupLocalListener(forwarder, localPath);
- //}
-
interfaceSetDestroy(&set);
}
+
+void configurationListeners_SetutpLocalIPv4(const Configuration *config,
+ uint16_t port) {
+ Forwarder *forwarder = configuration_GetForwarder(config);
+ in_addr_t addr = inet_addr("127.0.0.1");
+ _setupUdpListenerOnInet(forwarder, (ipv4_addr_t *) &(addr), &port);
+ _setupTcpListenerOnInet(forwarder, (ipv4_addr_t *) &(addr), &port);
+}
diff --git a/hicn-light/src/config/configurationListeners.h b/hicn-light/src/config/configurationListeners.h
index 7332b0c64..c97617d81 100644
--- a/hicn-light/src/config/configurationListeners.h
+++ b/hicn-light/src/config/configurationListeners.h
@@ -47,6 +47,9 @@
void configurationListeners_SetupAll(const Configuration *config, uint16_t port,
const char *localPath);
+void configurationListeners_SetutpLocalIPv4(const Configuration *config,
+ uint16_t port);
+
bool configurationListeners_Remove(const Configuration *config);
// light functions
diff --git a/hicn-light/src/core/connection.c b/hicn-light/src/core/connection.c
index 505bba081..525fe29c4 100644
--- a/hicn-light/src/core/connection.c
+++ b/hicn-light/src/core/connection.c
@@ -112,6 +112,13 @@ bool connection_Send(const Connection *conn, Message *message) {
return false;
}
+bool connection_SendCommandResponse(const Connection *conn, struct iovec *msg){
+ parcAssertNotNull(conn, "Parameter conn must be non-null");
+ parcAssertNotNull(msg, "Parameter message must be non-null");
+
+ return ioOperations_SendCommandResponse(conn->ops, msg);
+}
+
static void _sendProbe(Connection *conn, unsigned probeType, uint8_t *message) {
parcAssertNotNull(conn, "Parameter conn must be non-null");
diff --git a/hicn-light/src/core/connection.h b/hicn-light/src/core/connection.h
index b5c703527..56fa131b5 100644
--- a/hicn-light/src/core/connection.h
+++ b/hicn-light/src/core/connection.h
@@ -65,6 +65,12 @@ Connection *connection_Acquire(Connection *connection);
bool connection_Send(const Connection *conn, Message *message);
/**
+ * @function connection_SendCommandResponse
+ * @abstract Sends a response (ack/nack) for a command
+ */
+bool connection_SendCommandResponse(const Connection *conn, struct iovec *msg);
+
+/**
* Return the `IoOperations` instance associated with the specified `Connection`
* instance.
* @param [in] connection The allocated connection
diff --git a/hicn-light/src/core/forwarder.c b/hicn-light/src/core/forwarder.c
index bceb206a3..2b00a35e5 100644
--- a/hicn-light/src/core/forwarder.c
+++ b/hicn-light/src/core/forwarder.c
@@ -277,6 +277,11 @@ void forwarder_SetupAllListeners(Forwarder *forwarder, uint16_t port,
configurationListeners_SetupAll(forwarder->config, port, localPath);
}
+void forwarder_SetupLocalListeners(Forwarder *forwarder, uint16_t port) {
+ parcAssertNotNull(forwarder, "Parameter must be non-null");
+ configurationListeners_SetutpLocalIPv4(forwarder->config, port);
+}
+
void forwarder_SetupFromConfigFile(Forwarder *forwarder, const char *filename) {
ConfigurationFile *configFile = configurationFile_Create(forwarder, filename);
if (configFile) {
diff --git a/hicn-light/src/core/forwarder.h b/hicn-light/src/core/forwarder.h
index 6bc823294..de736a1f8 100644
--- a/hicn-light/src/core/forwarder.h
+++ b/hicn-light/src/core/forwarder.h
@@ -90,6 +90,12 @@ void forwarder_Destroy(Forwarder **ptr);
*/
void forwarder_SetupAllListeners(Forwarder *forwarder, uint16_t port,
const char *localPath);
+/**
+ * @function forwarder_SetupAllListeners
+ * @abstract Setup one tcp and one udp listener on address 127.0.0.1 and the
+ * given port
+ */
+void forwarder_SetupLocalListeners(Forwarder *forwarder, uint16_t port);
/**
* Configure hicn-light via a configuration file
diff --git a/hicn-light/src/io/hicnConnection.c b/hicn-light/src/io/hicnConnection.c
index 6def8ed43..991c0064e 100644
--- a/hicn-light/src/io/hicnConnection.c
+++ b/hicn-light/src/io/hicnConnection.c
@@ -75,6 +75,7 @@ typedef struct hicn_state {
// Prototypes
static bool _send(IoOperations *ops, const Address *nexthop, Message *message);
+static bool _sendCommandResponse(IoOperations *ops, struct iovec *message);
static const Address *_getRemoteAddress(const IoOperations *ops);
static const AddressPair *_getAddressPair(const IoOperations *ops);
static unsigned _getConnectionId(const IoOperations *ops);
@@ -100,6 +101,7 @@ static const void *_streamConnection_Class(const IoOperations *ops) {
static IoOperations _template = {.closure = NULL,
.send = &_send,
+ .sendCommandResponse = &_sendCommandResponse,
.getRemoteAddress = &_getRemoteAddress,
.getAddressPair = &_getAddressPair,
.getConnectionId = &_getConnectionId,
@@ -253,7 +255,6 @@ static unsigned _getConnectionId(const IoOperations *ops) {
* sends a message to the peer.
*
* @param dummy is ignored. .
- * @return <#return#>
*/
static bool _send(IoOperations *ops, const Address *dummy, Message *message) {
parcAssertNotNull(ops, "Parameter ops must be non-null");
@@ -328,6 +329,12 @@ static bool _send(IoOperations *ops, const Address *dummy, Message *message) {
return true;
}
+static bool _sendCommandResponse(IoOperations *ops, struct iovec *message) {
+ //XXX this should be nerver called since we do not handle control messages
+ //with hicn connections, so nothing to do here!
+ return false;
+}
+
static list_connections_type _getConnectionType(const IoOperations *ops) {
return CONN_HICN;
}
diff --git a/hicn-light/src/io/ioOperations.c b/hicn-light/src/io/ioOperations.c
index bbc8cec91..b40b51d76 100644
--- a/hicn-light/src/io/ioOperations.c
+++ b/hicn-light/src/io/ioOperations.c
@@ -28,6 +28,12 @@ bool ioOperations_Send(IoOperations *ops, const Address *nexthop,
return ops->send(ops, nexthop, message);
}
+bool ioOperations_SendCommandResponse(IoOperations *ops,
+ struct iovec *message) {
+ return ops->sendCommandResponse(ops, message);
+}
+
+
const Address *ioOperations_GetRemoteAddress(const IoOperations *ops) {
return ops->getRemoteAddress(ops);
}
diff --git a/hicn-light/src/io/ioOperations.h b/hicn-light/src/io/ioOperations.h
index dee66030d..48319c259 100644
--- a/hicn-light/src/io/ioOperations.h
+++ b/hicn-light/src/io/ioOperations.h
@@ -15,38 +15,6 @@
/**
* Defines the interface all connections use to communicate with the forwarder.
- *
- * @code
- *
- * static IoOperations _template = {
- * .closure = NULL,
- * .send = &_etherConnection_Send,
- * .getRemoteAddress = &_etherConnection_GetRemoteAddress,
- * .getAddressPair = &_etherConnection_GetAddressPair,
- * .getConnectionId = &_etherConnection_GetConnectionId,
- * .isUp = &_etherConnection_IsUp,
- * .isLocal = &_etherConnection_IsLocal,
- * .destroy = &_etherConnection_DestroyOperations,
- * .class = &_etherConnection_Class,
- * .getConnectionType = &_etherConnection_getConnectionType
- * };
- *
- * IoOperations *
- * etherConnection_Create(Forwarder *forwarder, GenericEther *ether,
- * AddressPair *pair)
- * {
- * _EtherState *etherConnState = parcMemory_Allocate(sizeof(_EtherState));
- * // Fill in etherConnState with instance variables
- *
- * IoOperations *io_ops = parcMemory_Allocate(sizeof(IoOperations));
- * memcpy(io_ops, &_template, sizeof(IoOperations));
- * io_ops->closure = etherConnState;
- * // Add to connection table, send missives about connection state
- *
- * return op_ops;
- * }
- * @endcode
- *
*/
/**
@@ -95,6 +63,7 @@ typedef struct io_ops IoOperations;
struct io_ops {
void *closure;
bool (*send)(IoOperations *ops, const Address *nexthop, Message *message);
+ bool (*sendCommandResponse)(IoOperations *ops, struct iovec *message);
const Address *(*getRemoteAddress)(const IoOperations *ops);
const AddressPair *(*getAddressPair)(const IoOperations *ops);
bool (*isUp)(const IoOperations *ops);
@@ -201,6 +170,9 @@ void ioOperations_Release(IoOperations **opsPtr);
bool ioOperations_Send(IoOperations *ops, const Address *nexthop,
Message *message);
+bool ioOperations_SendCommandResponse(IoOperations *ops,
+ struct iovec *message);
+
/**
* A connection is made up of a local and a remote address. This function
* returns the remote address.
diff --git a/hicn-light/src/io/streamConnection.c b/hicn-light/src/io/streamConnection.c
index 948b6c01b..78c19fb18 100644
--- a/hicn-light/src/io/streamConnection.c
+++ b/hicn-light/src/io/streamConnection.c
@@ -65,6 +65,8 @@ typedef struct stream_state {
// Prototypes
static bool _streamConnection_Send(IoOperations *ops, const Address *nexthop,
Message *message);
+static bool _streamConnection_SendCommandResponse(IoOperations *ops,
+ struct iovec *msg);
static const Address *_streamConnection_GetRemoteAddress(
const IoOperations *ops);
static const AddressPair *_streamConnection_GetAddressPair(
@@ -80,31 +82,6 @@ static list_connections_type _streamConnection_GetConnectionType(
static Ticks _sendProbe(IoOperations *ops, unsigned probeType,
uint8_t *message);
-// REMINDER: when a new_command is added, the following array has to be updated
-// with the sizeof(new_command). It allows to allocate the buffer for receiving
-// the payload of the CONTROLLER REQUEST after the header has beed read. Each
-// command identifier (typedef enum command_id) corresponds to a position in the
-// following array.
-static int payloadLengthDaemon[LAST_COMMAND_VALUE] = {
- sizeof(add_listener_command),
- sizeof(add_connection_command),
- 0, // list connections: payload always 0 when get controller request
- sizeof(add_route_command),
- 0, // list routes: payload always 0 when get controller request
- sizeof(remove_connection_command),
- sizeof(remove_route_command),
- sizeof(cache_store_command),
- sizeof(cache_serve_command),
- 0, // cache clear
- sizeof(set_strategy_command),
- sizeof(set_wldr_command),
- sizeof(add_punting_command),
- 0, // list listeners: payload always 0 when get controller request
- sizeof(mapme_activator_command),
- sizeof(mapme_activator_command),
- sizeof(mapme_timing_command),
- sizeof(mapme_timing_command)};
-
/*
* This assigns a unique pointer to the void * which we use
* as a GUID for this class.
@@ -121,6 +98,7 @@ static const void *_streamConnection_Class(const IoOperations *ops) {
static IoOperations _template = {
.closure = NULL,
.send = &_streamConnection_Send,
+ .sendCommandResponse = &_streamConnection_SendCommandResponse,
.getRemoteAddress = &_streamConnection_GetRemoteAddress,
.getAddressPair = &_streamConnection_GetAddressPair,
.getConnectionId = &_streamConnection_GetConnectionId,
@@ -308,7 +286,7 @@ static unsigned _streamConnection_GetConnectionId(const IoOperations *ops) {
return stream->id;
}
-bool streamState_SendCommandResponse(IoOperations *ops,
+bool _streamConnection_SendCommandResponse(IoOperations *ops,
struct iovec *response) {
parcAssertNotNull(ops, "Parameter ops must be non-null");
parcAssertNotNull(response, "Parameter message must be non-null");
@@ -447,7 +425,7 @@ int _isACommand(PARCEventBuffer *input) {
// read first byte of the header
// first byte: must be a REQUEST_LIGHT
- if (msg[0] != 100) {
+ if (msg[0] != REQUEST_LIGHT) {
return LAST_COMMAND_VALUE;
}
@@ -468,7 +446,7 @@ PARCEventBuffer *_tryReadControlMessage(_StreamState *stream,
if (stream->nextMessageLength == 0) {
stream->nextMessageLength =
sizeof(header_control_message) +
- payloadLengthDaemon[command]; // consider the whole packet.
+ payloadLengthDaemon(command); // consider the whole packet.
}
if (bytesAvailable >= stream->nextMessageLength) {
@@ -487,13 +465,13 @@ PARCEventBuffer *_tryReadControlMessage(_StreamState *stream,
}
(*request)[0].iov_base = control; // header
(*request)[0].iov_len = sizeof(header_control_message);
- if (payloadLengthDaemon[command] > 0) {
+ if (payloadLengthDaemon(command) > 0) {
(*request)[1].iov_base =
control + sizeof(header_control_message); // payload
} else {
(*request)[1].iov_base = NULL;
}
- (*request)[1].iov_len = payloadLengthDaemon[command];
+ (*request)[1].iov_len = payloadLengthDaemon(command);
// now reset message length for next packet
stream->nextMessageLength = 0;
diff --git a/hicn-light/src/io/udpConnection.c b/hicn-light/src/io/udpConnection.c
index 6c2e35392..3faf2bfac 100644
--- a/hicn-light/src/io/udpConnection.c
+++ b/hicn-light/src/io/udpConnection.c
@@ -19,6 +19,7 @@
* NB The Send() function may overflow the output buffer
*
*/
+#include <sys/uio.h>
#include <errno.h>
#include <src/config.h>
@@ -58,6 +59,7 @@ typedef struct udp_state {
// Prototypes
static bool _send(IoOperations *ops, const Address *nexthop, Message *message);
+static bool _sendCommandResponse(IoOperations *ops, struct iovec *message);
static const Address *_getRemoteAddress(const IoOperations *ops);
static const AddressPair *_getAddressPair(const IoOperations *ops);
static unsigned _getConnectionId(const IoOperations *ops);
@@ -82,6 +84,7 @@ static const void *_streamConnection_Class(const IoOperations *ops) {
static IoOperations _template = {.closure = NULL,
.send = &_send,
+ .sendCommandResponse = &_sendCommandResponse,
.getRemoteAddress = &_getRemoteAddress,
.getAddressPair = &_getAddressPair,
.getConnectionId = &_getConnectionId,
@@ -239,38 +242,6 @@ static bool _send(IoOperations *ops, const Address *dummy, Message *message) {
// in this particular connection we don't need natting beacause we send the
// packet to the next hop using upd connection
-#if 0
- if((hicnConnState->peerAddressLength == sizeof(struct sockaddr_in)) || (hicnConnState->localAddressLength == sizeof(struct sockaddr_in)))
- return false;
-
- if(message_GetType(message) = MessagePacketType_ContentObject){
- //this is a data packet. We need to put the remote address in the destination field
- messageHandler_SetDestination_IPv6((uint8_t *) message_FixedHeader(message),
- &((struct sockaddr_in6 *) hicnConnState->peerAddress)->sin6_addr);
-
- } else if (message_GetType(message) == MessagePacketType_Interest) {
- //this si an interest packet. We need to put the local address in the source field
- messageHandler_SetSource_IPv6((uint8_t *) message_FixedHeader(message),
- &((struct sockaddr_in6 *) hicnConnState->localAddress)->sin6_addr);
-
- //only in this case we may need to set the probeDestAddress
- if(hicnConnState->refreshProbeDestAddress){
- _refreshProbeDestAddress(hicnConnState, message_FixedHeader(message));
- }
-
- } else if (message_GetType(message) == MessagePacketType_WldrNotification) {
- //here we don't need to do anything for now
- }else{
- //unkown packet
- if (logger_IsLoggable(hicnConnState->logger, LoggerFacility_IO, PARCLogLevel_Debug)) {
- logger_Log(hicnConnState->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__,
- "connid %u can't parse the message",
- hicnConnState->id);
- }
- return false;
- }
-#endif
-
ssize_t writeLength =
sendto(udpConnState->udpListenerSocket, message_FixedHeader(message),
(int)message_Length(message), 0, udpConnState->peerAddress,
@@ -290,44 +261,39 @@ static bool _send(IoOperations *ops, const Address *dummy, Message *message) {
return true;
}
+static bool _sendCommandResponse(IoOperations *ops, struct iovec *message){
+ parcAssertNotNull(ops, "Parameter ops must be non-null");
+ parcAssertNotNull(message, "Parameter message must be non-null");
+ _UdpState *udpConnState = (_UdpState *)ioOperations_GetClosure(ops);
+
+ // Perform connect before to establish association between this peer and
+ // the remote peer. This is required to use writev.
+ // Connection association can be changed at any time.
+ connect(udpConnState->udpListenerSocket,
+ udpConnState->peerAddress,
+ udpConnState->peerAddressLength);
+
+ ssize_t writeLength = writev(udpConnState->udpListenerSocket, message, 2);
+
+ struct sockaddr any_address = {0};
+ any_address.sa_family = AF_UNSPEC;
+ connect(udpConnState->udpListenerSocket,
+ &any_address, (socklen_t)sizeof(any_address));
+
+ if (writeLength < 0) {
+ return false;
+ }
+
+ return true;
+}
+
static list_connections_type _getConnectionType(const IoOperations *ops) {
return CONN_UDP;
}
static Ticks _sendProbe(IoOperations *ops, unsigned probeType,
uint8_t *message) {
-#if 0
- parcAssertNotNull(ops, "Parameter ops must be non-null");
- _MetisUdpState *udpConnState = (_MetisUdpState *) metisIoOperations_GetClosure(ops);
-
-
- uint8_t *pkt;
- size_t pkt_size = 8;
- pkt = (uint8_t *) malloc(sizeof(uint8_t) * pkt_size);
- for (unsigned i = 0; i < pkt_size; i++) {
- pkt[i] = 0;
- }
- pkt[0] = 1; //type
- pkt[1] = probeType; //packet type
- pkt[6] = 8; //header len (16bit, network order)
-
- ssize_t writeLen = sendto(udpConnState->udpListenerSocket, pkt, pkt_size, 0, udpConnState->peerAddress, udpConnState->peerAddressLength);
-
- if (writeLen < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- free(pkt);
- return 0;
- } else {
- //this print is for debugging
- printf("Incorrect write length %zd, expected %zd: (%d) %s\n", writeLen, pkt_size, errno, strerror(errno));
- free(pkt);
- return 0;
- }
- }
-
- free(pkt);
- return metisForwarder_GetTicks(udpConnState->metis);
-#endif
+ //TODO
return 0;
}
diff --git a/hicn-light/src/io/udpListener.c b/hicn-light/src/io/udpListener.c
index 6c2947c66..4411bc7f6 100644
--- a/hicn-light/src/io/udpListener.c
+++ b/hicn-light/src/io/udpListener.c
@@ -282,55 +282,9 @@ static int _getSocket(const ListenerOps *ops) {
return (int)udp->udp_socket;
}
-// void
-// udpListener_SetPacketType(ListenerOps *ops, MessagePacketType type)
-//{
-// return;
-//}
-
// =====================================================================
/**
- * @function peekMesageLength
- * @abstract Peek at the next packet to learn its length by reading the fixed
- * header
- * @discussion
- * <#Discussion#>
- *
- * @param <#param1#>
- * @return <#return#>
- */
-static size_t _peekMessageLength(UdpListener *udp, int fd,
- struct sockaddr *peerIpAddress,
- socklen_t *peerIpAddressLengthPtr) {
- // to be fast I try to use just ipv6, this needs to be validated for ipv4
-
- size_t packetLength = 0;
-
- uint8_t *fixedHeader = (uint8_t *)malloc(
- sizeof(uint8_t) * messageHandler_GetIPHeaderLength(IPv6));
-
- // peek at the UDP packet and read in the fixed header.
- // Also returns the socket information for the remote peer
-
- ssize_t res = recvfrom(
- fd, fixedHeader, (int)messageHandler_GetIPHeaderLength(IPv6), MSG_PEEK,
- (struct sockaddr *)peerIpAddress, peerIpAddressLengthPtr);
-
- if (res == messageHandler_GetIPHeaderLength(IPv6)) {
- packetLength = messageHandler_GetTotalPacketLength(fixedHeader);
- } else {
- if (res < 0) {
- printf("error while readin packet\n");
- }
- }
-
- free(fixedHeader);
-
- return packetLength;
-}
-
-/**
* @function _constructAddressPair
* @abstract Creates the address pair that uniquely identifies the connection
* @discussion
@@ -441,100 +395,108 @@ static void _handleWldrNotification(UdpListener *udp, unsigned connId,
message_Release(&message);
}
-static Message *_readMessage(UdpListener *udp, int fd, size_t packetLength,
- AddressPair *pair) {
- uint8_t *msgBuffer = parcMemory_AllocateAndClear(packetLength);
-
- ssize_t readLength = read(fd, msgBuffer, (unsigned int)packetLength);
+static Message *_readMessage(UdpListener *udp, int fd,
+ AddressPair *pair, uint8_t * packet, bool * processed) {
Message *message = NULL;
- if (readLength < 0) {
- printf("read failed %d: (%d) %s\n", fd, errno, strerror(errno));
- return message;
- }
-
unsigned connid = 0;
bool foundConnection = _lookupConnectionId(udp, pair, &connid);
- if (readLength == packetLength) {
- // we need to check if it is a valid packet
- if (messageHandler_IsTCP(msgBuffer)) {
- MessagePacketType pktType;
-
- if (messageHandler_IsData(msgBuffer)) {
- pktType = MessagePacketType_ContentObject;
- if (!foundConnection) {
- parcMemory_Deallocate((void **)&msgBuffer);
- return message;
- }
- } else if (messageHandler_IsInterest(msgBuffer)) {
- pktType = MessagePacketType_Interest;
- if (!foundConnection) {
- connid = _createNewConnection(udp, fd, pair);
- }
- } else {
- printf("Got a packet that is not a data nor an interest, drop it!\n");
- parcMemory_Deallocate((void **)&msgBuffer);
+ if (messageHandler_IsTCP(packet)) {
+ *processed = true;
+ MessagePacketType pktType;
+
+ if (messageHandler_IsData(packet)) {
+ pktType = MessagePacketType_ContentObject;
+ if (!foundConnection) {
+ parcMemory_Deallocate((void **)&packet);
return message;
}
+ } else if (messageHandler_IsInterest(packet)) {
+ pktType = MessagePacketType_Interest;
+ if (!foundConnection) {
+ connid = _createNewConnection(udp, fd, pair);
+ }
+ } else {
+ printf("Got a packet that is not a data nor an interest, drop it!\n");
+ parcMemory_Deallocate((void **)&packet);
+ return message;
+ }
- message = message_CreateFromByteArray(
- connid, msgBuffer, pktType, forwarder_GetTicks(udp->forwarder),
- forwarder_GetLogger(udp->forwarder));
+ message = message_CreateFromByteArray(
+ connid, packet, pktType, forwarder_GetTicks(udp->forwarder),
+ forwarder_GetLogger(udp->forwarder));
- if (message == NULL) {
- parcMemory_Deallocate((void **)&msgBuffer);
- }
- } else if (messageHandler_IsWldrNotification(msgBuffer)) {
- _handleWldrNotification(udp, connid, msgBuffer);
- } else if (messageHandler_IsLoadBalancerProbe(msgBuffer)) {
- _handleProbeMessage(udp, msgBuffer);
+ if (message == NULL) {
+ parcMemory_Deallocate((void **)&packet);
}
+ } else if (messageHandler_IsWldrNotification(packet)) {
+ *processed = true;
+ _handleWldrNotification(udp, connid, packet);
+ } else if (messageHandler_IsLoadBalancerProbe(packet)) {
+ *processed = true;
+ _handleProbeMessage(udp, packet);
+ }
#ifdef WITH_MAPME
- else if (mapMe_isMapMe(msgBuffer)) {
- forwarder_ProcessMapMe(udp->forwarder, msgBuffer, connid);
- }
-#endif /* WITH_MAPME */
+ else if (mapMe_isMapMe(packet)) {
+ *processed = true;
+ forwarder_ProcessMapMe(udp->forwarder, packet, connid);
}
+#endif /* WITH_MAPME */
return message;
}
-static void _receivePacket(UdpListener *udp, int fd, size_t packetLength,
- struct sockaddr_storage *peerIpAddress,
- socklen_t peerIpAddressLength) {
- AddressPair *pair = _constructAddressPair(
- udp, (struct sockaddr *)peerIpAddress, peerIpAddressLength);
+static void _readCommand(UdpListener *udp, int fd,
+ AddressPair *pair,
+ uint8_t * command) {
- Message *message = _readMessage(udp, fd, packetLength, pair);
- addressPair_Release(&pair);
+ if (*command != REQUEST_LIGHT){
+ printf("the message received is not a command, drop\n");
+ return;
+ }
- if (message) {
- forwarder_Receive(udp->forwarder, message);
- } else {
+ command_id id = *(command + 1);
+
+ if ( id < 0 || id >= LAST_COMMAND_VALUE){
+ printf("the message received is not a valid command, drop\n");
return;
}
+
+ unsigned connid = 0;
+ bool foundConnection = _lookupConnectionId(udp, pair, &connid);
+ if(!foundConnection){
+ connid = _createNewConnection(udp, fd, pair);
+ }
+
+ struct iovec *request;
+ if (!(request = (struct iovec *) parcMemory_AllocateAndClear(
+ sizeof(struct iovec) * 2))) {
+ return;
+ }
+
+ request[0].iov_base = command;
+ request[0].iov_len = sizeof(header_control_message);
+ request[1].iov_base = command + sizeof(header_control_message);
+ request[1].iov_len = payloadLengthDaemon(id);
+
+ forwarder_ReceiveCommand(udp->forwarder, id, request, connid);
+ parcMemory_Deallocate((void **) &command);
+ parcMemory_Deallocate((void **) &request);
}
-static void _readFrameToDiscard(UdpListener *udp, int fd) {
- // we need to discard the frame. Read 1 byte. This will clear it off the
- // stack.
- uint8_t buffer;
- ssize_t nread = read(fd, &buffer, 1);
- if (nread == 1) {
- if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug)) {
- logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__,
- "Discarded frame from fd %d", fd);
- }
- } else if (nread < 0) {
- if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Error)) {
- logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Error, __func__,
- "Error trying to discard frame from fd %d: (%d) %s", fd, errno,
- strerror(errno));
- }
+static bool _receivePacket(UdpListener *udp, int fd,
+ AddressPair *pair,
+ uint8_t * packet) {
+ bool processed = false;
+ Message *message = _readMessage(udp, fd, pair,
+ packet, &processed);
+ if (message) {
+ forwarder_Receive(udp->forwarder, message);
}
+ return processed;
}
static void _readcb(int fd, PARCEventType what, void *udpVoid) {
@@ -553,14 +515,24 @@ static void _readcb(int fd, PARCEventType what, void *udpVoid) {
struct sockaddr_storage peerIpAddress;
socklen_t peerIpAddressLength = sizeof(peerIpAddress);
- size_t packetLength = _peekMessageLength(
- udp, fd, (struct sockaddr *)&peerIpAddress, &peerIpAddressLength);
+ //packet it deallocated by _receivePacket or _readCommand
+ uint8_t * packet = parcMemory_AllocateAndClear(1500); //max MTU
+ ssize_t readLength = recvfrom(fd, packet, 1500, 0,
+ (struct sockaddr *)&peerIpAddress, &peerIpAddressLength);
- if (packetLength > 0) {
- _receivePacket(udp, fd, packetLength, &peerIpAddress,
- peerIpAddressLength);
- } else {
- _readFrameToDiscard(udp, fd);
+ if(readLength < 0) {
+ printf("unable to read the message\n");
+ return;
}
+
+ AddressPair *pair = _constructAddressPair(
+ udp, (struct sockaddr *)&peerIpAddress, peerIpAddressLength);
+
+ bool done = _receivePacket(udp, fd, pair, packet);
+ if(!done){
+ _readCommand(udp, fd, pair, packet);
+ }
+
+ addressPair_Release(&pair);
}
}
diff --git a/hicn-light/src/utils/commands.h b/hicn-light/src/utils/commands.h
index e276d8dd2..d4147cbdd 100644
--- a/hicn-light/src/utils/commands.h
+++ b/hicn-light/src/utils/commands.h
@@ -41,7 +41,7 @@ union commandAddr {
};
typedef enum {
- REQUEST_LIGHT = 100,
+ REQUEST_LIGHT = 0xc0, //this is a command
RESPONSE_LIGHT,
ACK_LIGHT,
NACK_LIGHT,
@@ -49,7 +49,7 @@ typedef enum {
} message_type;
typedef enum {
- ADD_LISTENER,
+ ADD_LISTENER = 0,
ADD_CONNECTION,
LIST_CONNECTIONS,
ADD_ROUTE,
@@ -283,4 +283,51 @@ typedef struct {
// SIZE=1
+//===== size of commands ======
+// REMINDER: when a new_command is added, the following switch has to be
+// updated.
+static inline int payloadLengthDaemon(command_id id) {
+ switch (id){
+ case ADD_LISTENER:
+ return sizeof(add_listener_command);
+ case ADD_CONNECTION:
+ return sizeof(add_connection_command);
+ case LIST_CONNECTIONS:
+ return 0; // list connections: payload always 0
+ case ADD_ROUTE:
+ return sizeof(add_route_command);
+ case LIST_ROUTES:
+ return 0; // list routes: payload always 0
+ case REMOVE_CONNECTION:
+ return sizeof(remove_connection_command);
+ case REMOVE_ROUTE:
+ return sizeof(remove_route_command);
+ case CACHE_STORE:
+ return sizeof(cache_store_command);
+ case CACHE_SERVE:
+ return sizeof(cache_serve_command);
+ case CACHE_CLEAR:
+ return 0; // cache clear
+ case SET_STRATEGY:
+ return sizeof(set_strategy_command);
+ case SET_WLDR:
+ return sizeof(set_wldr_command);
+ case ADD_PUNTING:
+ return sizeof(add_punting_command);
+ case LIST_LISTENERS:
+ return 0; // list listeners: payload always 0
+ case MAPME_ENABLE:
+ return sizeof(mapme_activator_command);
+ case MAPME_DISCOVERY:
+ return sizeof(mapme_activator_command);
+ case MAPME_TIMESCALE:
+ return sizeof(mapme_timing_command);
+ case MAPME_RETX:
+ return sizeof(mapme_timing_command);
+ case LAST_COMMAND_VALUE:
+ return 0;
+ default:
+ return 0;
+ }
+}
#endif
diff --git a/libtransport/src/hicn/transport/core/CMakeLists.txt b/libtransport/src/hicn/transport/core/CMakeLists.txt
index dff93adeb..0e674fcac 100644
--- a/libtransport/src/hicn/transport/core/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/core/CMakeLists.txt
@@ -17,7 +17,6 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/content_object.h
${CMAKE_CURRENT_SOURCE_DIR}/facade.h
${CMAKE_CURRENT_SOURCE_DIR}/interest.h
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.h
${CMAKE_CURRENT_SOURCE_DIR}/manifest.h
${CMAKE_CURRENT_SOURCE_DIR}/manifest_inline.h
${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.h
@@ -29,7 +28,8 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/portal.h
${CMAKE_CURRENT_SOURCE_DIR}/prefix.h
${CMAKE_CURRENT_SOURCE_DIR}/connector.h
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.h
${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h
${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.h
${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_interface.h
@@ -39,12 +39,12 @@ list(APPEND HEADER_FILES
list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/content_object.cc
${CMAKE_CURRENT_SOURCE_DIR}/interest.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.cc
${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.cc
${CMAKE_CURRENT_SOURCE_DIR}/packet.cc
${CMAKE_CURRENT_SOURCE_DIR}/name.cc
${CMAKE_CURRENT_SOURCE_DIR}/prefix.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.cc
${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.cc
${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.cc
${CMAKE_CURRENT_SOURCE_DIR}/connector.cc
diff --git a/libtransport/src/hicn/transport/core/connector.cc b/libtransport/src/hicn/transport/core/connector.cc
index ff567d78a..e89b98f8a 100644
--- a/libtransport/src/hicn/transport/core/connector.cc
+++ b/libtransport/src/hicn/transport/core/connector.cc
@@ -21,7 +21,13 @@ namespace core {
std::once_flag Connector::init_flag_;
-Connector::Connector() : packet_pool_() { init(); }
+Connector::Connector(PacketReceivedCallback &&receive_callback,
+ OnReconnect &&reconnect_callback)
+ : packet_pool_(),
+ receive_callback_(std::move(receive_callback)),
+ on_reconnect_callback_(std::move(reconnect_callback)) {
+ init();
+}
void Connector::init() { increasePoolSize(); }
diff --git a/libtransport/src/hicn/transport/core/connector.h b/libtransport/src/hicn/transport/core/connector.h
index ae82ee973..529e97bf9 100644
--- a/libtransport/src/hicn/transport/core/connector.h
+++ b/libtransport/src/hicn/transport/core/connector.h
@@ -33,19 +33,20 @@ enum class ConnectorType : uint8_t {
VPP_CONNECTOR,
};
-static constexpr std::size_t packet_size = 2048;
-static constexpr std::size_t queue_size = 4096;
-static constexpr std::size_t packet_pool_size = 4096;
-
-using PacketRing = utils::CircularFifo<Packet::MemBufPtr, queue_size>;
-using PacketQueue = std::deque<Packet::MemBufPtr>;
-using PacketReceivedCallback = std::function<void(Packet::MemBufPtr &&)>;
-using OnReconnect = std::function<void()>;
-using PacketSentCallback = std::function<void()>;
-
class Connector {
public:
- Connector();
+ static constexpr std::size_t packet_size = 2048;
+ static constexpr std::size_t queue_size = 4096;
+ static constexpr std::size_t packet_pool_size = 4096;
+
+ using PacketRing = utils::CircularFifo<Packet::MemBufPtr, queue_size>;
+ using PacketQueue = std::deque<Packet::MemBufPtr>;
+ using PacketReceivedCallback = std::function<void(Packet::MemBufPtr &&)>;
+ using OnReconnect = std::function<void()>;
+ using PacketSentCallback = std::function<void()>;
+
+ Connector(PacketReceivedCallback &&receive_callback,
+ OnReconnect &&reconnect_callback);
virtual ~Connector() = default;
@@ -88,6 +89,10 @@ class Connector {
static std::once_flag init_flag_;
utils::ObjectPool<utils::MemBuf> packet_pool_;
PacketQueue output_buffer_;
+
+ // Connector events
+ PacketReceivedCallback receive_callback_;
+ OnReconnect on_reconnect_callback_;
};
} // end namespace core
diff --git a/libtransport/src/hicn/transport/core/forwarder_interface.h b/libtransport/src/hicn/transport/core/forwarder_interface.h
index e7b6fb1a6..de9f3b568 100644
--- a/libtransport/src/hicn/transport/core/forwarder_interface.h
+++ b/libtransport/src/hicn/transport/core/forwarder_interface.h
@@ -16,7 +16,7 @@
#pragma once
#include <hicn/transport/core/prefix.h>
-#include <hicn/transport/core/socket_connector.h>
+#include <hicn/transport/core/udp_socket_connector.h>
#include <hicn/transport/portability/portability.h>
#include <deque>
@@ -54,8 +54,6 @@ class ForwarderInterface {
}
public:
- static constexpr uint8_t ack_code = 102;
-
virtual ~ForwarderInterface() {}
TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) {
@@ -70,6 +68,20 @@ class ForwarderInterface {
return static_cast<Implementation &>(*this).getMtu();
}
+ TRANSPORT_ALWAYS_INLINE static bool isControlMessage(const uint8_t *message) {
+ return Implementation::isControlMessageImpl(message);
+ }
+
+ template <typename R>
+ TRANSPORT_ALWAYS_INLINE void processControlMessageReply(R &&packet_buffer) {
+ return static_cast<Implementation &>(*this).processControlMessageReplyImpl(
+ std::forward<R &&>(packet_buffer));
+ }
+
+ TRANSPORT_ALWAYS_INLINE void closeConnection() {
+ return static_cast<Implementation &>(*this).closeConnection();
+ }
+
template <
typename R,
typename = std::enable_if_t<
@@ -97,7 +109,7 @@ class ForwarderInterface {
counters_.tx_bytes += len;
// Perfect forwarding
- connector_.send(packet, len, std::forward<Handler>(packet_sent));
+ connector_.send(packet, len, std::forward<Handler &&>(packet_sent));
}
TRANSPORT_ALWAYS_INLINE void shutdown() { connector_.close(); }
diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc
index 9dc3b63bb..1c8060906 100644
--- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc
+++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc
@@ -15,11 +15,6 @@
#include <hicn/transport/core/hicn_forwarder_interface.h>
-#define ADDR_INET 1
-#define ADDR_INET6 2
-#define ADD_ROUTE 3
-#define REQUEST_LIGHT 100
-
union AddressLight {
uint32_t ipv4;
struct in6_addr ipv6;
@@ -30,6 +25,13 @@ typedef struct {
uint8_t command_id;
uint16_t length;
uint32_t seq_num;
+} CommandHeader;
+
+typedef struct {
+ uint8_t message_type;
+ uint8_t command_id;
+ uint16_t length;
+ uint32_t seq_num;
char symbolic_or_connid[16];
union AddressLight address;
uint16_t cost;
@@ -37,51 +39,104 @@ typedef struct {
uint8_t len;
} RouteToSelfCommand;
-namespace transport {
-
-namespace core {
-
-HicnForwarderInterface::HicnForwarderInterface(SocketConnector &connector)
- : ForwarderInterface<HicnForwarderInterface, SocketConnector>(connector) {}
-
-HicnForwarderInterface::~HicnForwarderInterface() {}
+typedef struct {
+ uint8_t message_type;
+ uint8_t command_id;
+ uint16_t length;
+ uint32_t seq_num;
+ char symbolic_or_connid[16];
+} DeleteSelfConnectionCommand;
-void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); }
+namespace {
+static constexpr uint8_t addr_inet = 1;
+static constexpr uint8_t addr_inet6 = 2;
+static constexpr uint8_t add_route_command = 3;
+static constexpr uint8_t delete_connection_command = 5;
+static constexpr uint8_t request_light = 0xc0;
+static constexpr char identifier[] = "SELF";
-void HicnForwarderInterface::registerRoute(Prefix &prefix) {
- auto addr = prefix.toSockaddr();
- const char *identifier = {"SELF_ROUTE"};
+void fillCommandHeader(CommandHeader *header) {
+ // Allocate and fill the header
+ header->message_type = request_light;
+ header->length = 1;
+}
- // allocate command payload
- RouteToSelfCommand *route_to_self = new RouteToSelfCommand();
+std::unique_ptr<RouteToSelfCommand> createCommandRoute(
+ std::unique_ptr<sockaddr> &&addr, uint8_t prefix_length) {
+ auto command = std::make_unique<RouteToSelfCommand>();
// check and set IP address
if (addr->sa_family == AF_INET) {
- route_to_self->address_type = ADDR_INET;
- route_to_self->address.ipv4 = ((Sockaddr4 *)addr.get())->sin_addr.s_addr;
+ command->address_type = addr_inet;
+ command->address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr;
} else if (addr->sa_family == AF_INET6) {
- route_to_self->address_type = ADDR_INET6;
- route_to_self->address.ipv6 = ((Sockaddr6 *)addr.get())->sin6_addr;
+ command->address_type = addr_inet6;
+ command->address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr;
}
// Fill remaining payload fields
#ifndef _WIN32
- strcpy(route_to_self->symbolic_or_connid, identifier);
+ strcpy(command->symbolic_or_connid, identifier);
#else
- strcpy_s(route_to_self->symbolic_or_connid, strlen(route_to_self->symbolic_or_connid), identifier);
+ strcpy_s(route_to_self->symbolic_or_connid,
+ strlen(route_to_self->symbolic_or_connid), identifier);
#endif
- route_to_self->cost = 1;
- route_to_self->len = (uint8_t) prefix.getPrefixLength();
+ command->cost = 1;
+ command->len = (uint8_t)prefix_length;
// Allocate and fill the header
- route_to_self->command_id = ADD_ROUTE;
- route_to_self->message_type = REQUEST_LIGHT;
- route_to_self->length = 1;
+ command->command_id = add_route_command;
+ fillCommandHeader((CommandHeader *)command.get());
+
+ return command;
+}
+
+std::unique_ptr<DeleteSelfConnectionCommand> createCommandDeleteConnection() {
+ auto command = std::make_unique<DeleteSelfConnectionCommand>();
+ fillCommandHeader((CommandHeader *)command.get());
+ command->command_id = delete_connection_command;
+
+#ifndef _WIN32
+ strcpy(command->symbolic_or_connid, identifier);
+#else
+ strcpy_s(route_to_self->symbolic_or_connid,
+ strlen(route_to_self->symbolic_or_connid), identifier);
+#endif
+
+ return command;
+}
+
+} // namespace
+
+namespace transport {
+
+namespace core {
+
+HicnForwarderInterface::HicnForwarderInterface(UdpSocketConnector &connector)
+ : ForwarderInterface<HicnForwarderInterface, UdpSocketConnector>(
+ connector) {}
+
+HicnForwarderInterface::~HicnForwarderInterface() {}
+
+void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); }
+
+void HicnForwarderInterface::registerRoute(Prefix &prefix) {
+ auto command =
+ createCommandRoute(prefix.toSockaddr(), prefix.getPrefixLength())
+ .release();
+ send((uint8_t *)command, sizeof(RouteToSelfCommand),
+ [command]() { delete command; });
+}
- send((uint8_t *)route_to_self, sizeof(RouteToSelfCommand),
- [route_to_self]() { delete route_to_self; });
+void HicnForwarderInterface::closeConnection() {
+ auto command = createCommandDeleteConnection().release();
+ send((uint8_t *)command, sizeof(DeleteSelfConnectionCommand),
+ [this, command]() {
+ delete command;
+ connector_.close();
+ });
}
} // namespace core
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h
index e57fae105..b11841b69 100644
--- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h
+++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h
@@ -17,7 +17,7 @@
#include <hicn/transport/core/forwarder_interface.h>
#include <hicn/transport/core/prefix.h>
-#include <hicn/transport/core/socket_connector.h>
+#include <hicn/transport/core/udp_socket_connector.h>
#include <deque>
@@ -26,7 +26,10 @@ namespace transport {
namespace core {
class HicnForwarderInterface
- : public ForwarderInterface<HicnForwarderInterface, SocketConnector> {
+ : public ForwarderInterface<HicnForwarderInterface, UdpSocketConnector> {
+ static constexpr uint8_t ack_code = 0xc2;
+ static constexpr uint8_t nack_code = 0xc3;
+
public:
union addressLight {
uint32_t ipv4;
@@ -46,9 +49,9 @@ class HicnForwarderInterface
};
using route_to_self_command = struct route_to_self_command;
- using ConnectorType = SocketConnector;
+ using ConnectorType = UdpSocketConnector;
- HicnForwarderInterface(SocketConnector &connector);
+ HicnForwarderInterface(UdpSocketConnector &connector);
~HicnForwarderInterface();
@@ -58,6 +61,21 @@ class HicnForwarderInterface
std::uint16_t getMtu() { return interface_mtu; }
+ TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl(
+ const uint8_t *message) {
+ return message[0] == ack_code || message[0] == nack_code;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl(
+ Packet::MemBufPtr &&packet_buffer) {
+ if (packet_buffer->data()[0] == nack_code) {
+ throw errors::RuntimeException(
+ "Received Nack message from hicn light forwarder.");
+ }
+ }
+
+ void closeConnection();
+
private:
static constexpr std::uint16_t interface_mtu = 1500;
};
diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc
index 38b2a2a98..c69a87fb7 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.cc
+++ b/libtransport/src/hicn/transport/core/memif_connector.cc
@@ -57,7 +57,7 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
OnReconnect &&on_reconnect_callback,
asio::io_service &io_service,
std::string app_name)
- : Connector(),
+ : Connector(std::move(receive_callback), std::move(on_reconnect_callback)),
memif_worker_(nullptr),
timer_set_(false),
send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
@@ -71,8 +71,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
enable_burst_(false),
closed_(false),
app_name_(app_name),
- receive_callback_(receive_callback),
- on_reconnect_callback_(on_reconnect_callback),
socket_filename_("") {
std::call_once(MemifConnector::flag_, &MemifConnector::init, this);
}
@@ -372,7 +370,6 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
packet->append(packet_length);
if (!connector->input_buffer_.push(std::move(packet))) {
-
TRANSPORT_LOGI("Error pushing packet. Ring buffer full.");
// TODO Here we should consider the possibility to signal the congestion
diff --git a/libtransport/src/hicn/transport/core/memif_connector.h b/libtransport/src/hicn/transport/core/memif_connector.h
index 3d2e8411d..06a8fd73e 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.h
+++ b/libtransport/src/hicn/transport/core/memif_connector.h
@@ -128,8 +128,6 @@ class MemifConnector : public Connector {
uint8_t memif_mode_;
std::string app_name_;
uint16_t transmission_index_;
- PacketReceivedCallback receive_callback_;
- OnReconnect on_reconnect_callback_;
utils::SpinLock write_msgs_lock_;
std::string socket_filename_;
diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h
index 0932b56c6..7efbc2009 100644
--- a/libtransport/src/hicn/transport/core/portal.h
+++ b/libtransport/src/hicn/transport/core/portal.h
@@ -22,7 +22,7 @@
#include <hicn/transport/core/name.h>
#include <hicn/transport/core/pending_interest.h>
#include <hicn/transport/core/prefix.h>
-#include <hicn/transport/core/socket_connector.h>
+#include <hicn/transport/core/udp_socket_connector.h>
#include <hicn/transport/errors/errors.h>
#include <hicn/transport/portability/portability.h>
#include <hicn/transport/utils/log.h>
@@ -222,22 +222,25 @@ class Portal {
}
TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) {
- for (auto &pend_interest : pending_interest_hash_table_) {
- pend_interest.second->cancelTimer();
- }
-
- clear();
-
if (kill_connection) {
- connector_.close();
+ forwarder_interface_.closeConnection();
}
- io_service_.post([this]() { io_service_.stop(); });
+ io_service_.post([this]() {
+ clear();
+ io_service_.stop();
+ });
}
TRANSPORT_ALWAYS_INLINE void killConnection() { connector_.close(); }
- TRANSPORT_ALWAYS_INLINE void clear() { pending_interest_hash_table_.clear(); }
+ TRANSPORT_ALWAYS_INLINE void clear() {
+ for (auto &pend_interest : pending_interest_hash_table_) {
+ pend_interest.second->cancelTimer();
+ }
+
+ pending_interest_hash_table_.clear();
+ }
TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() {
return io_service_;
@@ -260,17 +263,16 @@ class Portal {
return;
}
- if (packet_buffer->data()[0] == ForwarderInt::ack_code) {
- // Hicn forwarder message
+ if (TRANSPORT_EXPECT_FALSE(
+ ForwarderInt::isControlMessage(packet_buffer->data()))) {
processControlMessage(std::move(packet_buffer));
return;
}
- bool is_interest = Packet::isInterest(packet_buffer->data());
Packet::Format format = Packet::getFormatFromBuffer(packet_buffer->data());
if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) {
- if (!is_interest) {
+ if (!Packet::isInterest(packet_buffer->data())) {
processContentObject(
ContentObject::Ptr(new ContentObject(std::move(packet_buffer))));
} else {
@@ -329,8 +331,7 @@ class Portal {
TRANSPORT_ALWAYS_INLINE void processControlMessage(
Packet::MemBufPtr &&packet_buffer) {
- // Control message as response to the route set by a producer.
- // Do nothing
+ forwarder_interface_.processControlMessageReply(std::move(packet_buffer));
}
private:
diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.cc b/libtransport/src/hicn/transport/core/raw_socket_connector.cc
index 5cfff39fb..fe16d2132 100644
--- a/libtransport/src/hicn/transport/core/raw_socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/raw_socket_connector.cc
@@ -39,15 +39,13 @@ RawSocketConnector::RawSocketConnector(
PacketReceivedCallback &&receive_callback,
OnReconnect &&on_reconnect_callback, asio::io_service &io_service,
std::string app_name)
- : Connector(),
+ : Connector(std::move(receive_callback), std::move(on_reconnect_callback)),
io_service_(io_service),
socket_(io_service_, raw_protocol(PF_PACKET, SOCK_RAW)),
// resolver_(io_service_),
timer_(io_service_),
read_msg_(packet_pool_.makePtr(nullptr)),
data_available_(false),
- receive_callback_(receive_callback),
- on_reconnect_callback_(on_reconnect_callback),
app_name_(app_name) {
memset(&link_layer_address_, 0, sizeof(link_layer_address_));
}
diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.h b/libtransport/src/hicn/transport/core/raw_socket_connector.h
index 5e39efa0e..bb24d9d54 100644
--- a/libtransport/src/hicn/transport/core/raw_socket_connector.h
+++ b/libtransport/src/hicn/transport/core/raw_socket_connector.h
@@ -74,9 +74,6 @@ class RawSocketConnector : public Connector {
utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
bool data_available_;
-
- PacketReceivedCallback receive_callback_;
- OnReconnect on_reconnect_callback_;
std::string app_name_;
};
diff --git a/libtransport/src/hicn/transport/core/raw_socket_interface.h b/libtransport/src/hicn/transport/core/raw_socket_interface.h
index c030af662..ac48e5874 100644
--- a/libtransport/src/hicn/transport/core/raw_socket_interface.h
+++ b/libtransport/src/hicn/transport/core/raw_socket_interface.h
@@ -41,6 +41,16 @@ class RawSocketInterface
std::uint16_t getMtu() { return interface_mtu; }
+ TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl(
+ const uint8_t *message) {
+ return false;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl(
+ Packet::MemBufPtr &&packet_buffer) {}
+
+ TRANSPORT_ALWAYS_INLINE void closeConnection(){};
+
private:
static constexpr std::uint16_t interface_mtu = 1500;
std::string remote_mac_address_;
diff --git a/libtransport/src/hicn/transport/core/socket_connector.cc b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc
index 7bf0570ad..ade0f2611 100644
--- a/libtransport/src/hicn/transport/core/socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc
@@ -16,7 +16,7 @@
#ifdef _WIN32
#include <hicn/transport/portability/win_portability.h>
#endif
-#include <hicn/transport/core/socket_connector.h>
+#include <hicn/transport/core/tcp_socket_connector.h>
#include <hicn/transport/errors/errors.h>
#include <hicn/transport/utils/log.h>
#include <hicn/transport/utils/object_pool.h>
@@ -52,11 +52,11 @@ class NetworkMessage {
};
} // namespace
-SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback,
- OnReconnect &&on_reconnect_callback,
- asio::io_service &io_service,
- std::string app_name)
- : Connector(),
+TcpSocketConnector::TcpSocketConnector(
+ PacketReceivedCallback &&receive_callback,
+ OnReconnect &&on_reconnect_callback, asio::io_service &io_service,
+ std::string app_name)
+ : Connector(std::move(receive_callback), std::move(on_reconnect_callback)),
io_service_(io_service),
socket_(io_service_),
resolver_(io_service_),
@@ -66,30 +66,28 @@ SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback,
is_reconnection_(false),
data_available_(false),
is_closed_(false),
- receive_callback_(receive_callback),
- on_reconnect_callback_(on_reconnect_callback),
app_name_(app_name) {}
-SocketConnector::~SocketConnector() {}
+TcpSocketConnector::~TcpSocketConnector() {}
-void SocketConnector::connect(std::string ip_address, std::string port) {
+void TcpSocketConnector::connect(std::string ip_address, std::string port) {
endpoint_iterator_ = resolver_.resolve(
{ip_address, port, asio::ip::resolver_query_base::numeric_service});
doConnect();
}
-void SocketConnector::state() { return; }
+void TcpSocketConnector::state() { return; }
-void SocketConnector::send(const uint8_t *packet, std::size_t len,
- const PacketSentCallback &packet_sent) {
+void TcpSocketConnector::send(const uint8_t *packet, std::size_t len,
+ const PacketSentCallback &packet_sent) {
asio::async_write(socket_, asio::buffer(packet, len),
[packet_sent](std::error_code ec, std::size_t /*length*/) {
packet_sent();
});
}
-void SocketConnector::send(const Packet::MemBufPtr &packet) {
+void TcpSocketConnector::send(const Packet::MemBufPtr &packet) {
io_service_.post([this, packet]() {
bool write_in_progress = !output_buffer_.empty();
output_buffer_.push_back(std::move(packet));
@@ -104,7 +102,7 @@ void SocketConnector::send(const Packet::MemBufPtr &packet) {
});
}
-void SocketConnector::close() {
+void TcpSocketConnector::close() {
io_service_.dispatch([this]() {
is_closed_ = true;
socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
@@ -112,7 +110,7 @@ void SocketConnector::close() {
});
}
-void SocketConnector::doWrite() {
+void TcpSocketConnector::doWrite() {
// TODO improve this piece of code for sending many buffers togethers
// if list contains more than one packet
auto packet = output_buffer_.front().get();
@@ -143,7 +141,7 @@ void SocketConnector::doWrite() {
});
}
-void SocketConnector::doReadBody(std::size_t body_length) {
+void TcpSocketConnector::doReadBody(std::size_t body_length) {
asio::async_read(
socket_, asio::buffer(read_msg_->writableTail(), body_length),
asio::transfer_exactly(body_length),
@@ -163,7 +161,7 @@ void SocketConnector::doReadBody(std::size_t body_length) {
});
}
-void SocketConnector::doReadHeader() {
+void TcpSocketConnector::doReadHeader() {
read_msg_ = getPacket();
asio::async_read(
socket_,
@@ -191,7 +189,7 @@ void SocketConnector::doReadHeader() {
});
}
-void SocketConnector::tryReconnect() {
+void TcpSocketConnector::tryReconnect() {
if (!is_connecting_ && !is_closed_) {
TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
is_connecting_ = true;
@@ -205,7 +203,7 @@ void SocketConnector::tryReconnect() {
}
}
-void SocketConnector::doConnect() {
+void TcpSocketConnector::doConnect() {
asio::async_connect(socket_, endpoint_iterator_,
[this](std::error_code ec, tcp::resolver::iterator) {
if (!ec) {
@@ -232,17 +230,17 @@ void SocketConnector::doConnect() {
});
}
-bool SocketConnector::checkConnected() { return !is_connecting_; }
+bool TcpSocketConnector::checkConnected() { return !is_connecting_; }
-void SocketConnector::enableBurst() { return; }
+void TcpSocketConnector::enableBurst() { return; }
-void SocketConnector::startConnectionTimer() {
+void TcpSocketConnector::startConnectionTimer() {
timer_.expires_from_now(std::chrono::seconds(60));
- timer_.async_wait(
- std::bind(&SocketConnector::handleDeadline, this, std::placeholders::_1));
+ timer_.async_wait(std::bind(&TcpSocketConnector::handleDeadline, this,
+ std::placeholders::_1));
}
-void SocketConnector::handleDeadline(const std::error_code &ec) {
+void TcpSocketConnector::handleDeadline(const std::error_code &ec) {
if (!ec) {
io_service_.post([this]() {
socket_.close();
diff --git a/libtransport/src/hicn/transport/core/socket_connector.h b/libtransport/src/hicn/transport/core/tcp_socket_connector.h
index 6eff1aff5..8dfda4eb8 100644
--- a/libtransport/src/hicn/transport/core/socket_connector.h
+++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.h
@@ -28,14 +28,14 @@ namespace core {
using asio::ip::tcp;
-class SocketConnector : public Connector {
+class TcpSocketConnector : public Connector {
public:
- SocketConnector(PacketReceivedCallback &&receive_callback,
- OnReconnect &&reconnect_callback,
- asio::io_service &io_service,
- std::string app_name = "Libtransport");
+ TcpSocketConnector(PacketReceivedCallback &&receive_callback,
+ OnReconnect &&reconnect_callback,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
- ~SocketConnector() override;
+ ~TcpSocketConnector() override;
void send(const Packet::MemBufPtr &packet) override;
@@ -81,8 +81,6 @@ class SocketConnector : public Connector {
bool data_available_;
bool is_closed_;
- PacketReceivedCallback receive_callback_;
- OnReconnect on_reconnect_callback_;
std::string app_name_;
};
diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.cc b/libtransport/src/hicn/transport/core/udp_socket_connector.cc
new file mode 100644
index 000000000..f38891e71
--- /dev/null
+++ b/libtransport/src/hicn/transport/core/udp_socket_connector.cc
@@ -0,0 +1,201 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef _WIN32
+#include <hicn/transport/portability/win_portability.h>
+#endif
+#include <hicn/transport/core/udp_socket_connector.h>
+#include <hicn/transport/errors/errors.h>
+#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/object_pool.h>
+
+#include <vector>
+
+namespace transport {
+
+namespace core {
+
+UdpSocketConnector::UdpSocketConnector(
+ PacketReceivedCallback &&receive_callback,
+ OnReconnect &&on_reconnect_callback, asio::io_service &io_service,
+ std::string app_name)
+ : Connector(std::move(receive_callback), std::move(on_reconnect_callback)),
+ io_service_(io_service),
+ socket_(io_service_),
+ resolver_(io_service_),
+ connection_timer_(io_service_),
+ connection_timeout_(io_service_),
+ read_msg_(packet_pool_.makePtr(nullptr)),
+ is_connecting_(false),
+ is_reconnection_(false),
+ data_available_(false),
+ is_closed_(false),
+ app_name_(app_name) {}
+
+UdpSocketConnector::~UdpSocketConnector() {}
+
+void UdpSocketConnector::connect(std::string ip_address, std::string port) {
+ endpoint_iterator_ = resolver_.resolve(
+ {ip_address, port, asio::ip::resolver_query_base::numeric_service});
+
+ doConnect();
+}
+
+void UdpSocketConnector::state() { return; }
+
+void UdpSocketConnector::send(const uint8_t *packet, std::size_t len,
+ const PacketSentCallback &packet_sent) {
+ socket_.async_send(asio::buffer(packet, len),
+ [packet_sent](std::error_code ec, std::size_t /*length*/) {
+ packet_sent();
+ });
+}
+
+void UdpSocketConnector::send(const Packet::MemBufPtr &packet) {
+ io_service_.post([this, packet]() {
+ bool write_in_progress = !output_buffer_.empty();
+ output_buffer_.push_back(std::move(packet));
+ if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) {
+ if (!write_in_progress) {
+ doWrite();
+ }
+ } else {
+ // Tell the handle connect it has data to write
+ data_available_ = true;
+ }
+ });
+}
+
+void UdpSocketConnector::close() {
+ io_service_.dispatch([this]() {
+ is_closed_ = true;
+ socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ });
+}
+
+void UdpSocketConnector::doWrite() {
+ // TODO improve this piece of code for sending many buffers togethers
+ // if list contains more than one packet
+ auto packet = output_buffer_.front().get();
+ auto array = std::vector<asio::const_buffer>();
+
+ const utils::MemBuf *current = packet;
+ do {
+ array.push_back(asio::const_buffer(current->data(), current->length()));
+ current = current->next();
+ } while (current != packet);
+
+ socket_.async_send(std::move(array), [this /*, packet*/](std::error_code ec,
+ std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ output_buffer_.pop_front();
+ if (!output_buffer_.empty()) {
+ doWrite();
+ }
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ tryReconnect();
+ }
+ });
+}
+
+void UdpSocketConnector::doRead() {
+ read_msg_ = getPacket();
+ socket_.async_receive(
+ asio::buffer(read_msg_->writableData(), Connector::packet_size),
+ [this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ read_msg_->append(length);
+ receive_callback_(std::move(read_msg_));
+ doRead();
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
+ tryReconnect();
+ }
+ });
+}
+
+void UdpSocketConnector::tryReconnect() {
+ if (!is_connecting_ && !is_closed_) {
+ TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
+ is_connecting_ = true;
+ is_reconnection_ = true;
+ connection_timer_.expires_from_now(std::chrono::seconds(1));
+ connection_timer_.async_wait([this](const std::error_code &ec) {
+ if (!ec) {
+ socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ startConnectionTimer();
+ doConnect();
+ }
+ });
+ }
+}
+
+void UdpSocketConnector::doConnect() {
+ asio::async_connect(socket_, endpoint_iterator_,
+ [this](std::error_code ec, udp::resolver::iterator) {
+ if (!ec) {
+ connection_timeout_.cancel();
+ is_connecting_ = false;
+ doRead();
+
+ if (data_available_) {
+ data_available_ = false;
+ doWrite();
+ }
+
+ if (is_reconnection_) {
+ is_reconnection_ = false;
+ on_reconnect_callback_();
+ }
+ } else {
+ sleep(1);
+ doConnect();
+ }
+ });
+}
+
+bool UdpSocketConnector::checkConnected() { return !is_connecting_; }
+
+void UdpSocketConnector::enableBurst() { return; }
+
+void UdpSocketConnector::startConnectionTimer() {
+ connection_timeout_.expires_from_now(std::chrono::seconds(60));
+ connection_timeout_.async_wait(std::bind(&UdpSocketConnector::handleDeadline,
+ this, std::placeholders::_1));
+}
+
+void UdpSocketConnector::handleDeadline(const std::error_code &ec) {
+ if (!ec) {
+ io_service_.post([this]() {
+ socket_.close();
+ TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n");
+ io_service_.stop();
+ });
+ }
+}
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.h b/libtransport/src/hicn/transport/core/udp_socket_connector.h
new file mode 100644
index 000000000..4704fa50b
--- /dev/null
+++ b/libtransport/src/hicn/transport/core/udp_socket_connector.h
@@ -0,0 +1,88 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/core/name.h>
+#include <hicn/transport/utils/branch_prediction.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <deque>
+
+namespace transport {
+namespace core {
+
+using asio::ip::udp;
+
+class UdpSocketConnector : public Connector {
+ public:
+ UdpSocketConnector(PacketReceivedCallback &&receive_callback,
+ OnReconnect &&reconnect_callback,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
+
+ ~UdpSocketConnector() override;
+
+ void send(const Packet::MemBufPtr &packet) override;
+
+ void send(const uint8_t *packet, std::size_t len,
+ const PacketSentCallback &packet_sent = 0) override;
+
+ void close() override;
+
+ void enableBurst() override;
+
+ void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
+
+ void state() override;
+
+ private:
+ void doConnect();
+
+ void doRead();
+
+ void doWrite();
+
+ bool checkConnected();
+
+ private:
+ void handleDeadline(const std::error_code &ec);
+
+ void startConnectionTimer();
+
+ void tryReconnect();
+
+ asio::io_service &io_service_;
+ asio::ip::udp::socket socket_;
+ asio::ip::udp::resolver resolver_;
+ asio::ip::udp::resolver::iterator endpoint_iterator_;
+ asio::steady_timer connection_timer_;
+ asio::steady_timer connection_timeout_;
+
+ utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
+
+ bool is_connecting_;
+ bool is_reconnection_;
+ bool data_available_;
+ bool is_closed_;
+
+ std::string app_name_;
+};
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc
index 69b18c0d9..8dc607295 100644
--- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc
+++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc
@@ -47,20 +47,7 @@ VPPForwarderInterface::VPPForwarderInterface(MemifConnector &connector)
sw_if_index_(~0),
face_id_(~0) {}
-VPPForwarderInterface::~VPPForwarderInterface() {
- if (sw_if_index_ != uint32_t(~0) && VPPForwarderInterface::memif_api_) {
- int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_,
- sw_if_index_);
-
- if (ret < 0) {
- TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_);
- }
- }
-
- if (VPPForwarderInterface::api_) {
- vpp_binary_api_destroy(VPPForwarderInterface::api_);
- }
-}
+VPPForwarderInterface::~VPPForwarderInterface() {}
/**
* @brief Create a memif interface in the local VPP forwarder.
@@ -220,6 +207,23 @@ void VPPForwarderInterface::registerRoute(Prefix &prefix) {
}
}
+void VPPForwarderInterface::closeConnection() {
+ if (sw_if_index_ != uint32_t(~0) && VPPForwarderInterface::memif_api_) {
+ int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_,
+ sw_if_index_);
+
+ if (ret < 0) {
+ TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_);
+ }
+ }
+
+ if (VPPForwarderInterface::api_) {
+ vpp_binary_api_destroy(VPPForwarderInterface::api_);
+ }
+
+ connector_.close();
+}
+
} // namespace core
} // namespace transport
diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h
index 322cd1f8b..62af8bc3b 100644
--- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h
+++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h
@@ -31,6 +31,8 @@ namespace core {
class VPPForwarderInterface
: public ForwarderInterface<VPPForwarderInterface, MemifConnector> {
+ static constexpr std::uint16_t interface_mtu = 1500;
+
public:
VPPForwarderInterface(MemifConnector &connector);
@@ -44,6 +46,16 @@ class VPPForwarderInterface
TRANSPORT_ALWAYS_INLINE std::uint16_t getMtu() { return interface_mtu; }
+ TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl(
+ const uint8_t *message) {
+ return false;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl(
+ Packet::MemBufPtr &&packet_buffer) {}
+
+ void closeConnection();
+
private:
uint32_t getMemifConfiguration();
@@ -58,7 +70,6 @@ class VPPForwarderInterface
uint32_t sw_if_index_;
uint32_t face_id_;
static std::mutex global_lock_;
- static constexpr std::uint16_t interface_mtu = 1500;
};
} // namespace core
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index dbf1c1bd1..1356ad566 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -614,10 +614,10 @@ void RTCTransportProtocol::processRtcpHeader(uint8_t *offset) {
uint8_t pkt_type = (*(offset + 1));
switch (pkt_type) {
case HICN_RTCP_RR: // Receiver report
- TRANSPORT_LOGI("got RR packet\n");
+ TRANSPORT_LOGD("got RR packet\n");
break;
case HICN_RTCP_SR: // Sender report
- TRANSPORT_LOGI("got SR packet\n");
+ TRANSPORT_LOGD("got SR packet\n");
break;
case HICN_RTCP_SDES: // Description
processSDES(offset);
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index 8db4d9f0d..bbef2d27e 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -108,7 +108,8 @@ class HIperfClient {
HIperfClient(const ClientConfiguration &conf)
: configuration_(conf),
total_duration_milliseconds_(0),
- old_bytes_value_(0) {}
+ old_bytes_value_(0),
+ signals_(io_service_, SIGINT) {}
void processPayload(ConsumerSocket &c, std::size_t bytes_transferred,
const std::error_code &ec) {
@@ -122,6 +123,8 @@ class HIperfClient {
std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- "
<< (bytes_transferred * 8) * 1.0 / usec * 1.0 << " [Mbps]"
<< std::endl;
+
+ io_service_.stop();
}
bool verifyData(ConsumerSocket &c, const ContentObject &contentObject) {
@@ -302,11 +305,15 @@ class HIperfClient {
int run() {
std::cout << "Starting download of " << configuration_.name << std::endl;
- do {
- t1_ = std::chrono::steady_clock::now();
- consumer_socket_->consume(configuration_.name,
- *configuration_.receive_buffer);
- } while (configuration_.virtual_download);
+ signals_.async_wait([this](const std::error_code &, const int &) {
+ consumer_socket_->stop();
+ io_service_.stop();
+ });
+
+ t1_ = std::chrono::steady_clock::now();
+ consumer_socket_->asyncConsume(configuration_.name,
+ configuration_.receive_buffer);
+ io_service_.run();
return ERROR_SUCCESS;
}
@@ -317,7 +324,8 @@ class HIperfClient {
Time t1_;
uint32_t total_duration_milliseconds_;
uint64_t old_bytes_value_;
- // std::unique_ptr<asio::signal_set> signals_;
+ asio::io_service io_service_;
+ asio::signal_set signals_;
};
class HIperfServer {
@@ -326,13 +334,10 @@ class HIperfServer {
public:
HIperfServer(ServerConfiguration &conf)
: configuration_(conf),
- // signals_(io_service_, SIGINT, SIGQUIT),
+ signals_(io_service_, SIGINT),
content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)),
content_objects_index_(0),
mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1) {
- // signals_.async_wait([this] (const std::error_code&, const int&)
- // {std::cout << "STOPPING!!" << std::endl; io_service_.stop();});
-
std::string buffer(1440, 'X');
std::cout << "Producing contents under name " << conf.name.getName()
@@ -480,7 +485,14 @@ class HIperfServer {
int run() {
std::cerr << "Starting to serve consumers" << std::endl;
- producer_socket_->serveForever();
+
+ signals_.async_wait([this](const std::error_code &, const int &) {
+ std::cout << "STOPPING!!" << std::endl;
+ producer_socket_->stop();
+ io_service_.stop();
+ });
+
+ io_service_.run();
return ERROR_SUCCESS;
}
@@ -488,7 +500,8 @@ class HIperfServer {
private:
ServerConfiguration configuration_;
std::unique_ptr<ProducerSocket> producer_socket_;
- // asio::signal_set signals_;
+ asio::io_service io_service_;
+ asio::signal_set signals_;
std::vector<std::shared_ptr<ContentObject>> content_objects_;
std::uint16_t content_objects_index_;
std::uint16_t mask_;
diff --git a/utils/src/ping_client.cc b/utils/src/ping_client.cc
index 598012143..d31147c70 100644
--- a/utils/src/ping_client.cc
+++ b/utils/src/ping_client.cc
@@ -117,9 +117,9 @@ class Client : interface::BasePortal::ConsumerCallback {
auto dt =
std::chrono::duration_cast<std::chrono::microseconds>(t1 - t0);
std::cout << "Verification time: " << dt.count() << std::endl;
- std::cout << "<<<<<< Signature OK!!!" << std::endl;
+ std::cout << "<<< Signature Ok." << std::endl;
} else {
- std::cout << "<<<<<< Signature verification failed!" << std::endl;
+ std::cout << "<<< Signature verification failed!" << std::endl;
}
}
diff --git a/utils/src/ping_server.cc b/utils/src/ping_server.cc
index e3500a4ba..3a20dffd1 100644
--- a/utils/src/ping_server.cc
+++ b/utils/src/ping_server.cc
@@ -288,9 +288,7 @@ int main(int argc, char **argv) {
reset, ttl, identity, sign);
}
- asio::io_service io_service;
-
- ProducerSocket p(io_service); // , setProducerIdentity());
+ ProducerSocket p;
p.registerPrefix(producer_namespace);
p.setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
@@ -301,7 +299,17 @@ int main(int argc, char **argv) {
p.connect();
- p.serveForever();
+ asio::io_service io_service;
+ asio::signal_set signal_set(io_service, SIGINT);
+ signal_set.async_wait(
+ [&p, &io_service](const std::error_code &, const int &) {
+ std::cout << "STOPPING!!" << std::endl;
+ p.stop();
+ io_service.stop();
+ });
+
+ io_service.run();
+
#ifdef _WIN32
WSACleanup();
#endif