From 79e0d4f89c4d532189aae06cc5dfbc14e3269703 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Mon, 11 Feb 2019 10:44:29 +0100 Subject: [HICN-50] Added udp application connector. Change-Id: I0c5afad4b404ec485f50b1342b81e70ef85a5163 Signed-off-by: Mauro Sardara Signed-off-by: michele papalini --- .../src/command_line/daemon/hicnLightDaemon_main.c | 5 +- hicn-light/src/config/configuration.c | 24 +- hicn-light/src/config/configurationListeners.c | 21 +- hicn-light/src/config/configurationListeners.h | 3 + hicn-light/src/core/connection.c | 7 + hicn-light/src/core/connection.h | 6 + hicn-light/src/core/forwarder.c | 5 + hicn-light/src/core/forwarder.h | 6 + hicn-light/src/io/hicnConnection.c | 9 +- hicn-light/src/io/ioOperations.c | 6 + hicn-light/src/io/ioOperations.h | 36 +-- hicn-light/src/io/streamConnection.c | 38 +-- hicn-light/src/io/udpConnection.c | 94 +++----- hicn-light/src/io/udpListener.c | 216 ++++++++--------- hicn-light/src/utils/commands.h | 51 +++- .../src/hicn/transport/core/CMakeLists.txt | 8 +- libtransport/src/hicn/transport/core/connector.cc | 8 +- libtransport/src/hicn/transport/core/connector.h | 27 ++- .../src/hicn/transport/core/forwarder_interface.h | 20 +- .../transport/core/hicn_forwarder_interface.cc | 121 +++++++--- .../hicn/transport/core/hicn_forwarder_interface.h | 26 ++- .../src/hicn/transport/core/memif_connector.cc | 5 +- .../src/hicn/transport/core/memif_connector.h | 2 - libtransport/src/hicn/transport/core/portal.h | 33 +-- .../hicn/transport/core/raw_socket_connector.cc | 4 +- .../src/hicn/transport/core/raw_socket_connector.h | 3 - .../src/hicn/transport/core/raw_socket_interface.h | 10 + .../src/hicn/transport/core/socket_connector.cc | 257 --------------------- .../src/hicn/transport/core/socket_connector.h | 91 -------- .../hicn/transport/core/tcp_socket_connector.cc | 255 ++++++++++++++++++++ .../src/hicn/transport/core/tcp_socket_connector.h | 89 +++++++ .../hicn/transport/core/udp_socket_connector.cc | 201 ++++++++++++++++ .../src/hicn/transport/core/udp_socket_connector.h | 88 +++++++ .../hicn/transport/core/vpp_forwarder_interface.cc | 32 +-- .../hicn/transport/core/vpp_forwarder_interface.h | 13 +- libtransport/src/hicn/transport/protocols/rtc.cc | 4 +- utils/src/hiperf.cc | 39 ++-- utils/src/ping_client.cc | 4 +- utils/src/ping_server.cc | 16 +- 39 files changed, 1143 insertions(+), 740 deletions(-) delete mode 100644 libtransport/src/hicn/transport/core/socket_connector.cc delete mode 100644 libtransport/src/hicn/transport/core/socket_connector.h create mode 100644 libtransport/src/hicn/transport/core/tcp_socket_connector.cc create mode 100644 libtransport/src/hicn/transport/core/tcp_socket_connector.h create mode 100644 libtransport/src/hicn/transport/core/udp_socket_connector.cc create mode 100644 libtransport/src/hicn/transport/core/udp_socket_connector.h diff --git a/hicn-light/src/command_line/daemon/hicnLightDaemon_main.c b/hicn-light/src/command_line/daemon/hicnLightDaemon_main.c index dac8ee89f..ce8c373ca 100644 --- a/hicn-light/src/command_line/daemon/hicnLightDaemon_main.c +++ b/hicn-light/src/command_line/daemon/hicnLightDaemon_main.c @@ -371,12 +371,9 @@ int main(int argc, const char *argv[]) { configuration_SetObjectStoreSize(configuration, capacity); } + forwarder_SetupLocalListeners(forwarder, port); if (configFileName) { - forwarder_SetupAllListeners(forwarder, port, NULL); forwarder_SetupFromConfigFile(forwarder, configFileName); - } else { - // NULL to not setup AF_UNIX - forwarder_SetupAllListeners(forwarder, port, NULL); } Dispatcher *dispatcher = forwarder_GetDispatcher(forwarder); diff --git a/hicn-light/src/config/configuration.c b/hicn-light/src/config/configuration.c index 1a41a9642..865dbca4d 100644 --- a/hicn-light/src/config/configuration.c +++ b/hicn-light/src/config/configuration.c @@ -115,7 +115,7 @@ struct iovec *configuration_ProcessRegisterHicnPrefix(Configuration *config, const char *symbolicOrConnid = control->symbolicOrConnid; - if (strcmp(symbolicOrConnid, "SELF_ROUTE") == 0) { + if (strcmp(symbolicOrConnid, "SELF") == 0) { success = forwarder_AddOrUpdateRoute(config->forwarder, control, ingressId); } else if (utils_IsNumber(symbolicOrConnid)) { // case for connid as input @@ -304,11 +304,14 @@ static void configuration_SendResponse(Configuration *config, struct iovec *msg, ConnectionTable *connectionTable = forwarder_GetConnectionTable(config->forwarder); const Connection *conn = connectionTable_FindById(connectionTable, egressId); - parcAssertNotNull(conn, - "Got null connection for control message we just received"); - IoOperations *ops = connection_GetIoOperations(conn); - streamState_SendCommandResponse(ops, msg); + if (conn == NULL) { + return; + } + + connection_SendCommandResponse(conn, msg); + //IoOperations *ops = connection_GetIoOperations(conn); + //streamState_SendCommandResponse(ops, msg); } struct iovec *configuration_ProcessCreateTunnel(Configuration *config, @@ -425,7 +428,8 @@ struct iovec *configuration_ProcessCreateTunnel(Configuration *config, */ struct iovec *configuration_ProcessRemoveTunnel(Configuration *config, - struct iovec *request) { + struct iovec *request, + unsigned ingressId) { header_control_message *header = request[0].iov_base; remove_connection_command *control = request[1].iov_base; @@ -434,7 +438,11 @@ struct iovec *configuration_ProcessRemoveTunnel(Configuration *config, const char *symbolicOrConnid = control->symbolicOrConnid; ConnectionTable *table = forwarder_GetConnectionTable(config->forwarder); - if (utils_IsNumber(symbolicOrConnid)) { + if (strcmp(symbolicOrConnid, "SELF") == 0) { + forwarder_RemoveConnectionIdFromRoutes(config->forwarder, ingressId); + connectionTable_RemoveById(table, ingressId); + success = true; + } else if (utils_IsNumber(symbolicOrConnid)) { // case for connid as input unsigned connid = (unsigned)strtold(symbolicOrConnid, NULL); @@ -997,7 +1005,7 @@ struct iovec *configuration_DispatchCommand(Configuration *config, break; case REMOVE_CONNECTION: - response = configuration_ProcessRemoveTunnel(config, control); + response = configuration_ProcessRemoveTunnel(config, control, ingressId); break; case REMOVE_ROUTE: diff --git a/hicn-light/src/config/configurationListeners.c b/hicn-light/src/config/configurationListeners.c index 01ab9a3e7..11276e2dd 100644 --- a/hicn-light/src/config/configurationListeners.c +++ b/hicn-light/src/config/configurationListeners.c @@ -217,7 +217,7 @@ static bool _setupTcpListenerOnInet(Forwarder *forwarder, ipv4_addr_t *addr4, struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; - addr.sin_port = *port; + addr.sin_port = htons(*port); addr.sin_addr.s_addr = *addr4; ListenerOps *ops = tcpListener_CreateInet(forwarder, addr); @@ -231,12 +231,13 @@ static bool _setupTcpListenerOnInet(Forwarder *forwarder, ipv4_addr_t *addr4, static bool _setupUdpListenerOnInet(Forwarder *forwarder, ipv4_addr_t *addr4, uint16_t *port) { + bool success = false; struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; - addr.sin_port = *port; + addr.sin_port = htons(*port); addr.sin_addr.s_addr = *addr4; ListenerOps *ops = udpListener_CreateInet(forwarder, addr); @@ -256,7 +257,7 @@ static bool _setupTcpListenerOnInet6Light(Forwarder *forwarder, struct sockaddr_in6 addr; memset(&addr, 0, sizeof(addr)); addr.sin6_family = AF_INET6; - addr.sin6_port = *port; + addr.sin6_port = htons(*port); addr.sin6_addr = *addr6; addr.sin6_scope_id = scopeId; @@ -276,7 +277,7 @@ static bool _setupUdpListenerOnInet6Light(Forwarder *forwarder, struct sockaddr_in6 addr; memset(&addr, 0, sizeof(addr)); addr.sin6_family = AF_INET6; - addr.sin6_port = *port; + addr.sin6_port = htons(*port); addr.sin6_addr = *addr6; addr.sin6_scope_id = 0; @@ -529,9 +530,13 @@ void configurationListeners_SetupAll(const Configuration *config, uint16_t port, } } - // if (localPath != NULL) { - // _setupLocalListener(forwarder, localPath); - //} - interfaceSetDestroy(&set); } + +void configurationListeners_SetutpLocalIPv4(const Configuration *config, + uint16_t port) { + Forwarder *forwarder = configuration_GetForwarder(config); + in_addr_t addr = inet_addr("127.0.0.1"); + _setupUdpListenerOnInet(forwarder, (ipv4_addr_t *) &(addr), &port); + _setupTcpListenerOnInet(forwarder, (ipv4_addr_t *) &(addr), &port); +} diff --git a/hicn-light/src/config/configurationListeners.h b/hicn-light/src/config/configurationListeners.h index 7332b0c64..c97617d81 100644 --- a/hicn-light/src/config/configurationListeners.h +++ b/hicn-light/src/config/configurationListeners.h @@ -47,6 +47,9 @@ void configurationListeners_SetupAll(const Configuration *config, uint16_t port, const char *localPath); +void configurationListeners_SetutpLocalIPv4(const Configuration *config, + uint16_t port); + bool configurationListeners_Remove(const Configuration *config); // light functions diff --git a/hicn-light/src/core/connection.c b/hicn-light/src/core/connection.c index 505bba081..525fe29c4 100644 --- a/hicn-light/src/core/connection.c +++ b/hicn-light/src/core/connection.c @@ -112,6 +112,13 @@ bool connection_Send(const Connection *conn, Message *message) { return false; } +bool connection_SendCommandResponse(const Connection *conn, struct iovec *msg){ + parcAssertNotNull(conn, "Parameter conn must be non-null"); + parcAssertNotNull(msg, "Parameter message must be non-null"); + + return ioOperations_SendCommandResponse(conn->ops, msg); +} + static void _sendProbe(Connection *conn, unsigned probeType, uint8_t *message) { parcAssertNotNull(conn, "Parameter conn must be non-null"); diff --git a/hicn-light/src/core/connection.h b/hicn-light/src/core/connection.h index b5c703527..56fa131b5 100644 --- a/hicn-light/src/core/connection.h +++ b/hicn-light/src/core/connection.h @@ -64,6 +64,12 @@ Connection *connection_Acquire(Connection *connection); */ bool connection_Send(const Connection *conn, Message *message); +/** + * @function connection_SendCommandResponse + * @abstract Sends a response (ack/nack) for a command + */ +bool connection_SendCommandResponse(const Connection *conn, struct iovec *msg); + /** * Return the `IoOperations` instance associated with the specified `Connection` * instance. diff --git a/hicn-light/src/core/forwarder.c b/hicn-light/src/core/forwarder.c index bceb206a3..2b00a35e5 100644 --- a/hicn-light/src/core/forwarder.c +++ b/hicn-light/src/core/forwarder.c @@ -277,6 +277,11 @@ void forwarder_SetupAllListeners(Forwarder *forwarder, uint16_t port, configurationListeners_SetupAll(forwarder->config, port, localPath); } +void forwarder_SetupLocalListeners(Forwarder *forwarder, uint16_t port) { + parcAssertNotNull(forwarder, "Parameter must be non-null"); + configurationListeners_SetutpLocalIPv4(forwarder->config, port); +} + void forwarder_SetupFromConfigFile(Forwarder *forwarder, const char *filename) { ConfigurationFile *configFile = configurationFile_Create(forwarder, filename); if (configFile) { diff --git a/hicn-light/src/core/forwarder.h b/hicn-light/src/core/forwarder.h index 6bc823294..de736a1f8 100644 --- a/hicn-light/src/core/forwarder.h +++ b/hicn-light/src/core/forwarder.h @@ -90,6 +90,12 @@ void forwarder_Destroy(Forwarder **ptr); */ void forwarder_SetupAllListeners(Forwarder *forwarder, uint16_t port, const char *localPath); +/** + * @function forwarder_SetupAllListeners + * @abstract Setup one tcp and one udp listener on address 127.0.0.1 and the + * given port + */ +void forwarder_SetupLocalListeners(Forwarder *forwarder, uint16_t port); /** * Configure hicn-light via a configuration file diff --git a/hicn-light/src/io/hicnConnection.c b/hicn-light/src/io/hicnConnection.c index 6def8ed43..991c0064e 100644 --- a/hicn-light/src/io/hicnConnection.c +++ b/hicn-light/src/io/hicnConnection.c @@ -75,6 +75,7 @@ typedef struct hicn_state { // Prototypes static bool _send(IoOperations *ops, const Address *nexthop, Message *message); +static bool _sendCommandResponse(IoOperations *ops, struct iovec *message); static const Address *_getRemoteAddress(const IoOperations *ops); static const AddressPair *_getAddressPair(const IoOperations *ops); static unsigned _getConnectionId(const IoOperations *ops); @@ -100,6 +101,7 @@ static const void *_streamConnection_Class(const IoOperations *ops) { static IoOperations _template = {.closure = NULL, .send = &_send, + .sendCommandResponse = &_sendCommandResponse, .getRemoteAddress = &_getRemoteAddress, .getAddressPair = &_getAddressPair, .getConnectionId = &_getConnectionId, @@ -253,7 +255,6 @@ static unsigned _getConnectionId(const IoOperations *ops) { * sends a message to the peer. * * @param dummy is ignored. . - * @return <#return#> */ static bool _send(IoOperations *ops, const Address *dummy, Message *message) { parcAssertNotNull(ops, "Parameter ops must be non-null"); @@ -328,6 +329,12 @@ static bool _send(IoOperations *ops, const Address *dummy, Message *message) { return true; } +static bool _sendCommandResponse(IoOperations *ops, struct iovec *message) { + //XXX this should be nerver called since we do not handle control messages + //with hicn connections, so nothing to do here! + return false; +} + static list_connections_type _getConnectionType(const IoOperations *ops) { return CONN_HICN; } diff --git a/hicn-light/src/io/ioOperations.c b/hicn-light/src/io/ioOperations.c index bbc8cec91..b40b51d76 100644 --- a/hicn-light/src/io/ioOperations.c +++ b/hicn-light/src/io/ioOperations.c @@ -28,6 +28,12 @@ bool ioOperations_Send(IoOperations *ops, const Address *nexthop, return ops->send(ops, nexthop, message); } +bool ioOperations_SendCommandResponse(IoOperations *ops, + struct iovec *message) { + return ops->sendCommandResponse(ops, message); +} + + const Address *ioOperations_GetRemoteAddress(const IoOperations *ops) { return ops->getRemoteAddress(ops); } diff --git a/hicn-light/src/io/ioOperations.h b/hicn-light/src/io/ioOperations.h index dee66030d..48319c259 100644 --- a/hicn-light/src/io/ioOperations.h +++ b/hicn-light/src/io/ioOperations.h @@ -15,38 +15,6 @@ /** * Defines the interface all connections use to communicate with the forwarder. - * - * @code - * - * static IoOperations _template = { - * .closure = NULL, - * .send = &_etherConnection_Send, - * .getRemoteAddress = &_etherConnection_GetRemoteAddress, - * .getAddressPair = &_etherConnection_GetAddressPair, - * .getConnectionId = &_etherConnection_GetConnectionId, - * .isUp = &_etherConnection_IsUp, - * .isLocal = &_etherConnection_IsLocal, - * .destroy = &_etherConnection_DestroyOperations, - * .class = &_etherConnection_Class, - * .getConnectionType = &_etherConnection_getConnectionType - * }; - * - * IoOperations * - * etherConnection_Create(Forwarder *forwarder, GenericEther *ether, - * AddressPair *pair) - * { - * _EtherState *etherConnState = parcMemory_Allocate(sizeof(_EtherState)); - * // Fill in etherConnState with instance variables - * - * IoOperations *io_ops = parcMemory_Allocate(sizeof(IoOperations)); - * memcpy(io_ops, &_template, sizeof(IoOperations)); - * io_ops->closure = etherConnState; - * // Add to connection table, send missives about connection state - * - * return op_ops; - * } - * @endcode - * */ /** @@ -95,6 +63,7 @@ typedef struct io_ops IoOperations; struct io_ops { void *closure; bool (*send)(IoOperations *ops, const Address *nexthop, Message *message); + bool (*sendCommandResponse)(IoOperations *ops, struct iovec *message); const Address *(*getRemoteAddress)(const IoOperations *ops); const AddressPair *(*getAddressPair)(const IoOperations *ops); bool (*isUp)(const IoOperations *ops); @@ -201,6 +170,9 @@ void ioOperations_Release(IoOperations **opsPtr); bool ioOperations_Send(IoOperations *ops, const Address *nexthop, Message *message); +bool ioOperations_SendCommandResponse(IoOperations *ops, + struct iovec *message); + /** * A connection is made up of a local and a remote address. This function * returns the remote address. diff --git a/hicn-light/src/io/streamConnection.c b/hicn-light/src/io/streamConnection.c index 948b6c01b..78c19fb18 100644 --- a/hicn-light/src/io/streamConnection.c +++ b/hicn-light/src/io/streamConnection.c @@ -65,6 +65,8 @@ typedef struct stream_state { // Prototypes static bool _streamConnection_Send(IoOperations *ops, const Address *nexthop, Message *message); +static bool _streamConnection_SendCommandResponse(IoOperations *ops, + struct iovec *msg); static const Address *_streamConnection_GetRemoteAddress( const IoOperations *ops); static const AddressPair *_streamConnection_GetAddressPair( @@ -80,31 +82,6 @@ static list_connections_type _streamConnection_GetConnectionType( static Ticks _sendProbe(IoOperations *ops, unsigned probeType, uint8_t *message); -// REMINDER: when a new_command is added, the following array has to be updated -// with the sizeof(new_command). It allows to allocate the buffer for receiving -// the payload of the CONTROLLER REQUEST after the header has beed read. Each -// command identifier (typedef enum command_id) corresponds to a position in the -// following array. -static int payloadLengthDaemon[LAST_COMMAND_VALUE] = { - sizeof(add_listener_command), - sizeof(add_connection_command), - 0, // list connections: payload always 0 when get controller request - sizeof(add_route_command), - 0, // list routes: payload always 0 when get controller request - sizeof(remove_connection_command), - sizeof(remove_route_command), - sizeof(cache_store_command), - sizeof(cache_serve_command), - 0, // cache clear - sizeof(set_strategy_command), - sizeof(set_wldr_command), - sizeof(add_punting_command), - 0, // list listeners: payload always 0 when get controller request - sizeof(mapme_activator_command), - sizeof(mapme_activator_command), - sizeof(mapme_timing_command), - sizeof(mapme_timing_command)}; - /* * This assigns a unique pointer to the void * which we use * as a GUID for this class. @@ -121,6 +98,7 @@ static const void *_streamConnection_Class(const IoOperations *ops) { static IoOperations _template = { .closure = NULL, .send = &_streamConnection_Send, + .sendCommandResponse = &_streamConnection_SendCommandResponse, .getRemoteAddress = &_streamConnection_GetRemoteAddress, .getAddressPair = &_streamConnection_GetAddressPair, .getConnectionId = &_streamConnection_GetConnectionId, @@ -308,7 +286,7 @@ static unsigned _streamConnection_GetConnectionId(const IoOperations *ops) { return stream->id; } -bool streamState_SendCommandResponse(IoOperations *ops, +bool _streamConnection_SendCommandResponse(IoOperations *ops, struct iovec *response) { parcAssertNotNull(ops, "Parameter ops must be non-null"); parcAssertNotNull(response, "Parameter message must be non-null"); @@ -447,7 +425,7 @@ int _isACommand(PARCEventBuffer *input) { // read first byte of the header // first byte: must be a REQUEST_LIGHT - if (msg[0] != 100) { + if (msg[0] != REQUEST_LIGHT) { return LAST_COMMAND_VALUE; } @@ -468,7 +446,7 @@ PARCEventBuffer *_tryReadControlMessage(_StreamState *stream, if (stream->nextMessageLength == 0) { stream->nextMessageLength = sizeof(header_control_message) + - payloadLengthDaemon[command]; // consider the whole packet. + payloadLengthDaemon(command); // consider the whole packet. } if (bytesAvailable >= stream->nextMessageLength) { @@ -487,13 +465,13 @@ PARCEventBuffer *_tryReadControlMessage(_StreamState *stream, } (*request)[0].iov_base = control; // header (*request)[0].iov_len = sizeof(header_control_message); - if (payloadLengthDaemon[command] > 0) { + if (payloadLengthDaemon(command) > 0) { (*request)[1].iov_base = control + sizeof(header_control_message); // payload } else { (*request)[1].iov_base = NULL; } - (*request)[1].iov_len = payloadLengthDaemon[command]; + (*request)[1].iov_len = payloadLengthDaemon(command); // now reset message length for next packet stream->nextMessageLength = 0; diff --git a/hicn-light/src/io/udpConnection.c b/hicn-light/src/io/udpConnection.c index 6c2e35392..3faf2bfac 100644 --- a/hicn-light/src/io/udpConnection.c +++ b/hicn-light/src/io/udpConnection.c @@ -19,6 +19,7 @@ * NB The Send() function may overflow the output buffer * */ +#include #include #include @@ -58,6 +59,7 @@ typedef struct udp_state { // Prototypes static bool _send(IoOperations *ops, const Address *nexthop, Message *message); +static bool _sendCommandResponse(IoOperations *ops, struct iovec *message); static const Address *_getRemoteAddress(const IoOperations *ops); static const AddressPair *_getAddressPair(const IoOperations *ops); static unsigned _getConnectionId(const IoOperations *ops); @@ -82,6 +84,7 @@ static const void *_streamConnection_Class(const IoOperations *ops) { static IoOperations _template = {.closure = NULL, .send = &_send, + .sendCommandResponse = &_sendCommandResponse, .getRemoteAddress = &_getRemoteAddress, .getAddressPair = &_getAddressPair, .getConnectionId = &_getConnectionId, @@ -239,38 +242,6 @@ static bool _send(IoOperations *ops, const Address *dummy, Message *message) { // in this particular connection we don't need natting beacause we send the // packet to the next hop using upd connection -#if 0 - if((hicnConnState->peerAddressLength == sizeof(struct sockaddr_in)) || (hicnConnState->localAddressLength == sizeof(struct sockaddr_in))) - return false; - - if(message_GetType(message) = MessagePacketType_ContentObject){ - //this is a data packet. We need to put the remote address in the destination field - messageHandler_SetDestination_IPv6((uint8_t *) message_FixedHeader(message), - &((struct sockaddr_in6 *) hicnConnState->peerAddress)->sin6_addr); - - } else if (message_GetType(message) == MessagePacketType_Interest) { - //this si an interest packet. We need to put the local address in the source field - messageHandler_SetSource_IPv6((uint8_t *) message_FixedHeader(message), - &((struct sockaddr_in6 *) hicnConnState->localAddress)->sin6_addr); - - //only in this case we may need to set the probeDestAddress - if(hicnConnState->refreshProbeDestAddress){ - _refreshProbeDestAddress(hicnConnState, message_FixedHeader(message)); - } - - } else if (message_GetType(message) == MessagePacketType_WldrNotification) { - //here we don't need to do anything for now - }else{ - //unkown packet - if (logger_IsLoggable(hicnConnState->logger, LoggerFacility_IO, PARCLogLevel_Debug)) { - logger_Log(hicnConnState->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__, - "connid %u can't parse the message", - hicnConnState->id); - } - return false; - } -#endif - ssize_t writeLength = sendto(udpConnState->udpListenerSocket, message_FixedHeader(message), (int)message_Length(message), 0, udpConnState->peerAddress, @@ -290,44 +261,39 @@ static bool _send(IoOperations *ops, const Address *dummy, Message *message) { return true; } +static bool _sendCommandResponse(IoOperations *ops, struct iovec *message){ + parcAssertNotNull(ops, "Parameter ops must be non-null"); + parcAssertNotNull(message, "Parameter message must be non-null"); + _UdpState *udpConnState = (_UdpState *)ioOperations_GetClosure(ops); + + // Perform connect before to establish association between this peer and + // the remote peer. This is required to use writev. + // Connection association can be changed at any time. + connect(udpConnState->udpListenerSocket, + udpConnState->peerAddress, + udpConnState->peerAddressLength); + + ssize_t writeLength = writev(udpConnState->udpListenerSocket, message, 2); + + struct sockaddr any_address = {0}; + any_address.sa_family = AF_UNSPEC; + connect(udpConnState->udpListenerSocket, + &any_address, (socklen_t)sizeof(any_address)); + + if (writeLength < 0) { + return false; + } + + return true; +} + static list_connections_type _getConnectionType(const IoOperations *ops) { return CONN_UDP; } static Ticks _sendProbe(IoOperations *ops, unsigned probeType, uint8_t *message) { -#if 0 - parcAssertNotNull(ops, "Parameter ops must be non-null"); - _MetisUdpState *udpConnState = (_MetisUdpState *) metisIoOperations_GetClosure(ops); - - - uint8_t *pkt; - size_t pkt_size = 8; - pkt = (uint8_t *) malloc(sizeof(uint8_t) * pkt_size); - for (unsigned i = 0; i < pkt_size; i++) { - pkt[i] = 0; - } - pkt[0] = 1; //type - pkt[1] = probeType; //packet type - pkt[6] = 8; //header len (16bit, network order) - - ssize_t writeLen = sendto(udpConnState->udpListenerSocket, pkt, pkt_size, 0, udpConnState->peerAddress, udpConnState->peerAddressLength); - - if (writeLen < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - free(pkt); - return 0; - } else { - //this print is for debugging - printf("Incorrect write length %zd, expected %zd: (%d) %s\n", writeLen, pkt_size, errno, strerror(errno)); - free(pkt); - return 0; - } - } - - free(pkt); - return metisForwarder_GetTicks(udpConnState->metis); -#endif + //TODO return 0; } diff --git a/hicn-light/src/io/udpListener.c b/hicn-light/src/io/udpListener.c index 6c2947c66..4411bc7f6 100644 --- a/hicn-light/src/io/udpListener.c +++ b/hicn-light/src/io/udpListener.c @@ -282,54 +282,8 @@ static int _getSocket(const ListenerOps *ops) { return (int)udp->udp_socket; } -// void -// udpListener_SetPacketType(ListenerOps *ops, MessagePacketType type) -//{ -// return; -//} - // ===================================================================== -/** - * @function peekMesageLength - * @abstract Peek at the next packet to learn its length by reading the fixed - * header - * @discussion - * <#Discussion#> - * - * @param <#param1#> - * @return <#return#> - */ -static size_t _peekMessageLength(UdpListener *udp, int fd, - struct sockaddr *peerIpAddress, - socklen_t *peerIpAddressLengthPtr) { - // to be fast I try to use just ipv6, this needs to be validated for ipv4 - - size_t packetLength = 0; - - uint8_t *fixedHeader = (uint8_t *)malloc( - sizeof(uint8_t) * messageHandler_GetIPHeaderLength(IPv6)); - - // peek at the UDP packet and read in the fixed header. - // Also returns the socket information for the remote peer - - ssize_t res = recvfrom( - fd, fixedHeader, (int)messageHandler_GetIPHeaderLength(IPv6), MSG_PEEK, - (struct sockaddr *)peerIpAddress, peerIpAddressLengthPtr); - - if (res == messageHandler_GetIPHeaderLength(IPv6)) { - packetLength = messageHandler_GetTotalPacketLength(fixedHeader); - } else { - if (res < 0) { - printf("error while readin packet\n"); - } - } - - free(fixedHeader); - - return packetLength; -} - /** * @function _constructAddressPair * @abstract Creates the address pair that uniquely identifies the connection @@ -441,100 +395,108 @@ static void _handleWldrNotification(UdpListener *udp, unsigned connId, message_Release(&message); } -static Message *_readMessage(UdpListener *udp, int fd, size_t packetLength, - AddressPair *pair) { - uint8_t *msgBuffer = parcMemory_AllocateAndClear(packetLength); - - ssize_t readLength = read(fd, msgBuffer, (unsigned int)packetLength); +static Message *_readMessage(UdpListener *udp, int fd, + AddressPair *pair, uint8_t * packet, bool * processed) { Message *message = NULL; - if (readLength < 0) { - printf("read failed %d: (%d) %s\n", fd, errno, strerror(errno)); - return message; - } - unsigned connid = 0; bool foundConnection = _lookupConnectionId(udp, pair, &connid); - if (readLength == packetLength) { - // we need to check if it is a valid packet - if (messageHandler_IsTCP(msgBuffer)) { - MessagePacketType pktType; - - if (messageHandler_IsData(msgBuffer)) { - pktType = MessagePacketType_ContentObject; - if (!foundConnection) { - parcMemory_Deallocate((void **)&msgBuffer); - return message; - } - } else if (messageHandler_IsInterest(msgBuffer)) { - pktType = MessagePacketType_Interest; - if (!foundConnection) { - connid = _createNewConnection(udp, fd, pair); - } - } else { - printf("Got a packet that is not a data nor an interest, drop it!\n"); - parcMemory_Deallocate((void **)&msgBuffer); + if (messageHandler_IsTCP(packet)) { + *processed = true; + MessagePacketType pktType; + + if (messageHandler_IsData(packet)) { + pktType = MessagePacketType_ContentObject; + if (!foundConnection) { + parcMemory_Deallocate((void **)&packet); return message; } + } else if (messageHandler_IsInterest(packet)) { + pktType = MessagePacketType_Interest; + if (!foundConnection) { + connid = _createNewConnection(udp, fd, pair); + } + } else { + printf("Got a packet that is not a data nor an interest, drop it!\n"); + parcMemory_Deallocate((void **)&packet); + return message; + } - message = message_CreateFromByteArray( - connid, msgBuffer, pktType, forwarder_GetTicks(udp->forwarder), - forwarder_GetLogger(udp->forwarder)); + message = message_CreateFromByteArray( + connid, packet, pktType, forwarder_GetTicks(udp->forwarder), + forwarder_GetLogger(udp->forwarder)); - if (message == NULL) { - parcMemory_Deallocate((void **)&msgBuffer); - } - } else if (messageHandler_IsWldrNotification(msgBuffer)) { - _handleWldrNotification(udp, connid, msgBuffer); - } else if (messageHandler_IsLoadBalancerProbe(msgBuffer)) { - _handleProbeMessage(udp, msgBuffer); + if (message == NULL) { + parcMemory_Deallocate((void **)&packet); } + } else if (messageHandler_IsWldrNotification(packet)) { + *processed = true; + _handleWldrNotification(udp, connid, packet); + } else if (messageHandler_IsLoadBalancerProbe(packet)) { + *processed = true; + _handleProbeMessage(udp, packet); + } #ifdef WITH_MAPME - else if (mapMe_isMapMe(msgBuffer)) { - forwarder_ProcessMapMe(udp->forwarder, msgBuffer, connid); - } -#endif /* WITH_MAPME */ + else if (mapMe_isMapMe(packet)) { + *processed = true; + forwarder_ProcessMapMe(udp->forwarder, packet, connid); } +#endif /* WITH_MAPME */ return message; } -static void _receivePacket(UdpListener *udp, int fd, size_t packetLength, - struct sockaddr_storage *peerIpAddress, - socklen_t peerIpAddressLength) { - AddressPair *pair = _constructAddressPair( - udp, (struct sockaddr *)peerIpAddress, peerIpAddressLength); +static void _readCommand(UdpListener *udp, int fd, + AddressPair *pair, + uint8_t * command) { - Message *message = _readMessage(udp, fd, packetLength, pair); - addressPair_Release(&pair); + if (*command != REQUEST_LIGHT){ + printf("the message received is not a command, drop\n"); + return; + } - if (message) { - forwarder_Receive(udp->forwarder, message); - } else { + command_id id = *(command + 1); + + if ( id < 0 || id >= LAST_COMMAND_VALUE){ + printf("the message received is not a valid command, drop\n"); return; } + + unsigned connid = 0; + bool foundConnection = _lookupConnectionId(udp, pair, &connid); + if(!foundConnection){ + connid = _createNewConnection(udp, fd, pair); + } + + struct iovec *request; + if (!(request = (struct iovec *) parcMemory_AllocateAndClear( + sizeof(struct iovec) * 2))) { + return; + } + + request[0].iov_base = command; + request[0].iov_len = sizeof(header_control_message); + request[1].iov_base = command + sizeof(header_control_message); + request[1].iov_len = payloadLengthDaemon(id); + + forwarder_ReceiveCommand(udp->forwarder, id, request, connid); + parcMemory_Deallocate((void **) &command); + parcMemory_Deallocate((void **) &request); } -static void _readFrameToDiscard(UdpListener *udp, int fd) { - // we need to discard the frame. Read 1 byte. This will clear it off the - // stack. - uint8_t buffer; - ssize_t nread = read(fd, &buffer, 1); - if (nread == 1) { - if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug)) { - logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__, - "Discarded frame from fd %d", fd); - } - } else if (nread < 0) { - if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Error)) { - logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Error, __func__, - "Error trying to discard frame from fd %d: (%d) %s", fd, errno, - strerror(errno)); - } +static bool _receivePacket(UdpListener *udp, int fd, + AddressPair *pair, + uint8_t * packet) { + bool processed = false; + Message *message = _readMessage(udp, fd, pair, + packet, &processed); + if (message) { + forwarder_Receive(udp->forwarder, message); } + return processed; } static void _readcb(int fd, PARCEventType what, void *udpVoid) { @@ -553,14 +515,24 @@ static void _readcb(int fd, PARCEventType what, void *udpVoid) { struct sockaddr_storage peerIpAddress; socklen_t peerIpAddressLength = sizeof(peerIpAddress); - size_t packetLength = _peekMessageLength( - udp, fd, (struct sockaddr *)&peerIpAddress, &peerIpAddressLength); + //packet it deallocated by _receivePacket or _readCommand + uint8_t * packet = parcMemory_AllocateAndClear(1500); //max MTU + ssize_t readLength = recvfrom(fd, packet, 1500, 0, + (struct sockaddr *)&peerIpAddress, &peerIpAddressLength); - if (packetLength > 0) { - _receivePacket(udp, fd, packetLength, &peerIpAddress, - peerIpAddressLength); - } else { - _readFrameToDiscard(udp, fd); + if(readLength < 0) { + printf("unable to read the message\n"); + return; } + + AddressPair *pair = _constructAddressPair( + udp, (struct sockaddr *)&peerIpAddress, peerIpAddressLength); + + bool done = _receivePacket(udp, fd, pair, packet); + if(!done){ + _readCommand(udp, fd, pair, packet); + } + + addressPair_Release(&pair); } } diff --git a/hicn-light/src/utils/commands.h b/hicn-light/src/utils/commands.h index e276d8dd2..d4147cbdd 100644 --- a/hicn-light/src/utils/commands.h +++ b/hicn-light/src/utils/commands.h @@ -41,7 +41,7 @@ union commandAddr { }; typedef enum { - REQUEST_LIGHT = 100, + REQUEST_LIGHT = 0xc0, //this is a command RESPONSE_LIGHT, ACK_LIGHT, NACK_LIGHT, @@ -49,7 +49,7 @@ typedef enum { } message_type; typedef enum { - ADD_LISTENER, + ADD_LISTENER = 0, ADD_CONNECTION, LIST_CONNECTIONS, ADD_ROUTE, @@ -283,4 +283,51 @@ typedef struct { // SIZE=1 +//===== size of commands ====== +// REMINDER: when a new_command is added, the following switch has to be +// updated. +static inline int payloadLengthDaemon(command_id id) { + switch (id){ + case ADD_LISTENER: + return sizeof(add_listener_command); + case ADD_CONNECTION: + return sizeof(add_connection_command); + case LIST_CONNECTIONS: + return 0; // list connections: payload always 0 + case ADD_ROUTE: + return sizeof(add_route_command); + case LIST_ROUTES: + return 0; // list routes: payload always 0 + case REMOVE_CONNECTION: + return sizeof(remove_connection_command); + case REMOVE_ROUTE: + return sizeof(remove_route_command); + case CACHE_STORE: + return sizeof(cache_store_command); + case CACHE_SERVE: + return sizeof(cache_serve_command); + case CACHE_CLEAR: + return 0; // cache clear + case SET_STRATEGY: + return sizeof(set_strategy_command); + case SET_WLDR: + return sizeof(set_wldr_command); + case ADD_PUNTING: + return sizeof(add_punting_command); + case LIST_LISTENERS: + return 0; // list listeners: payload always 0 + case MAPME_ENABLE: + return sizeof(mapme_activator_command); + case MAPME_DISCOVERY: + return sizeof(mapme_activator_command); + case MAPME_TIMESCALE: + return sizeof(mapme_timing_command); + case MAPME_RETX: + return sizeof(mapme_timing_command); + case LAST_COMMAND_VALUE: + return 0; + default: + return 0; + } +} #endif diff --git a/libtransport/src/hicn/transport/core/CMakeLists.txt b/libtransport/src/hicn/transport/core/CMakeLists.txt index dff93adeb..0e674fcac 100644 --- a/libtransport/src/hicn/transport/core/CMakeLists.txt +++ b/libtransport/src/hicn/transport/core/CMakeLists.txt @@ -17,7 +17,6 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/content_object.h ${CMAKE_CURRENT_SOURCE_DIR}/facade.h ${CMAKE_CURRENT_SOURCE_DIR}/interest.h - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest_inline.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.h @@ -29,7 +28,8 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/portal.h ${CMAKE_CURRENT_SOURCE_DIR}/prefix.h ${CMAKE_CURRENT_SOURCE_DIR}/connector.h - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.h ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.h ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_interface.h @@ -39,12 +39,12 @@ list(APPEND HEADER_FILES list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/content_object.cc ${CMAKE_CURRENT_SOURCE_DIR}/interest.cc - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.cc ${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.cc ${CMAKE_CURRENT_SOURCE_DIR}/packet.cc ${CMAKE_CURRENT_SOURCE_DIR}/name.cc ${CMAKE_CURRENT_SOURCE_DIR}/prefix.cc - ${CMAKE_CURRENT_SOURCE_DIR}/socket_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.cc ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.cc ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.cc ${CMAKE_CURRENT_SOURCE_DIR}/connector.cc diff --git a/libtransport/src/hicn/transport/core/connector.cc b/libtransport/src/hicn/transport/core/connector.cc index ff567d78a..e89b98f8a 100644 --- a/libtransport/src/hicn/transport/core/connector.cc +++ b/libtransport/src/hicn/transport/core/connector.cc @@ -21,7 +21,13 @@ namespace core { std::once_flag Connector::init_flag_; -Connector::Connector() : packet_pool_() { init(); } +Connector::Connector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback) + : packet_pool_(), + receive_callback_(std::move(receive_callback)), + on_reconnect_callback_(std::move(reconnect_callback)) { + init(); +} void Connector::init() { increasePoolSize(); } diff --git a/libtransport/src/hicn/transport/core/connector.h b/libtransport/src/hicn/transport/core/connector.h index ae82ee973..529e97bf9 100644 --- a/libtransport/src/hicn/transport/core/connector.h +++ b/libtransport/src/hicn/transport/core/connector.h @@ -33,19 +33,20 @@ enum class ConnectorType : uint8_t { VPP_CONNECTOR, }; -static constexpr std::size_t packet_size = 2048; -static constexpr std::size_t queue_size = 4096; -static constexpr std::size_t packet_pool_size = 4096; - -using PacketRing = utils::CircularFifo; -using PacketQueue = std::deque; -using PacketReceivedCallback = std::function; -using OnReconnect = std::function; -using PacketSentCallback = std::function; - class Connector { public: - Connector(); + static constexpr std::size_t packet_size = 2048; + static constexpr std::size_t queue_size = 4096; + static constexpr std::size_t packet_pool_size = 4096; + + using PacketRing = utils::CircularFifo; + using PacketQueue = std::deque; + using PacketReceivedCallback = std::function; + using OnReconnect = std::function; + using PacketSentCallback = std::function; + + Connector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback); virtual ~Connector() = default; @@ -88,6 +89,10 @@ class Connector { static std::once_flag init_flag_; utils::ObjectPool packet_pool_; PacketQueue output_buffer_; + + // Connector events + PacketReceivedCallback receive_callback_; + OnReconnect on_reconnect_callback_; }; } // end namespace core diff --git a/libtransport/src/hicn/transport/core/forwarder_interface.h b/libtransport/src/hicn/transport/core/forwarder_interface.h index e7b6fb1a6..de9f3b568 100644 --- a/libtransport/src/hicn/transport/core/forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/forwarder_interface.h @@ -16,7 +16,7 @@ #pragma once #include -#include +#include #include #include @@ -54,8 +54,6 @@ class ForwarderInterface { } public: - static constexpr uint8_t ack_code = 102; - virtual ~ForwarderInterface() {} TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { @@ -70,6 +68,20 @@ class ForwarderInterface { return static_cast(*this).getMtu(); } + TRANSPORT_ALWAYS_INLINE static bool isControlMessage(const uint8_t *message) { + return Implementation::isControlMessageImpl(message); + } + + template + TRANSPORT_ALWAYS_INLINE void processControlMessageReply(R &&packet_buffer) { + return static_cast(*this).processControlMessageReplyImpl( + std::forward(packet_buffer)); + } + + TRANSPORT_ALWAYS_INLINE void closeConnection() { + return static_cast(*this).closeConnection(); + } + template < typename R, typename = std::enable_if_t< @@ -97,7 +109,7 @@ class ForwarderInterface { counters_.tx_bytes += len; // Perfect forwarding - connector_.send(packet, len, std::forward(packet_sent)); + connector_.send(packet, len, std::forward(packet_sent)); } TRANSPORT_ALWAYS_INLINE void shutdown() { connector_.close(); } diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc index 9dc3b63bb..1c8060906 100644 --- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc +++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.cc @@ -15,16 +15,18 @@ #include -#define ADDR_INET 1 -#define ADDR_INET6 2 -#define ADD_ROUTE 3 -#define REQUEST_LIGHT 100 - union AddressLight { uint32_t ipv4; struct in6_addr ipv6; }; +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; +} CommandHeader; + typedef struct { uint8_t message_type; uint8_t command_id; @@ -37,51 +39,104 @@ typedef struct { uint8_t len; } RouteToSelfCommand; -namespace transport { - -namespace core { - -HicnForwarderInterface::HicnForwarderInterface(SocketConnector &connector) - : ForwarderInterface(connector) {} - -HicnForwarderInterface::~HicnForwarderInterface() {} +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; + char symbolic_or_connid[16]; +} DeleteSelfConnectionCommand; -void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); } +namespace { +static constexpr uint8_t addr_inet = 1; +static constexpr uint8_t addr_inet6 = 2; +static constexpr uint8_t add_route_command = 3; +static constexpr uint8_t delete_connection_command = 5; +static constexpr uint8_t request_light = 0xc0; +static constexpr char identifier[] = "SELF"; -void HicnForwarderInterface::registerRoute(Prefix &prefix) { - auto addr = prefix.toSockaddr(); - const char *identifier = {"SELF_ROUTE"}; +void fillCommandHeader(CommandHeader *header) { + // Allocate and fill the header + header->message_type = request_light; + header->length = 1; +} - // allocate command payload - RouteToSelfCommand *route_to_self = new RouteToSelfCommand(); +std::unique_ptr createCommandRoute( + std::unique_ptr &&addr, uint8_t prefix_length) { + auto command = std::make_unique(); // check and set IP address if (addr->sa_family == AF_INET) { - route_to_self->address_type = ADDR_INET; - route_to_self->address.ipv4 = ((Sockaddr4 *)addr.get())->sin_addr.s_addr; + command->address_type = addr_inet; + command->address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr; } else if (addr->sa_family == AF_INET6) { - route_to_self->address_type = ADDR_INET6; - route_to_self->address.ipv6 = ((Sockaddr6 *)addr.get())->sin6_addr; + command->address_type = addr_inet6; + command->address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr; } // Fill remaining payload fields #ifndef _WIN32 - strcpy(route_to_self->symbolic_or_connid, identifier); + strcpy(command->symbolic_or_connid, identifier); #else - strcpy_s(route_to_self->symbolic_or_connid, strlen(route_to_self->symbolic_or_connid), identifier); + strcpy_s(route_to_self->symbolic_or_connid, + strlen(route_to_self->symbolic_or_connid), identifier); #endif - route_to_self->cost = 1; - route_to_self->len = (uint8_t) prefix.getPrefixLength(); + command->cost = 1; + command->len = (uint8_t)prefix_length; // Allocate and fill the header - route_to_self->command_id = ADD_ROUTE; - route_to_self->message_type = REQUEST_LIGHT; - route_to_self->length = 1; + command->command_id = add_route_command; + fillCommandHeader((CommandHeader *)command.get()); + + return command; +} + +std::unique_ptr createCommandDeleteConnection() { + auto command = std::make_unique(); + fillCommandHeader((CommandHeader *)command.get()); + command->command_id = delete_connection_command; + +#ifndef _WIN32 + strcpy(command->symbolic_or_connid, identifier); +#else + strcpy_s(route_to_self->symbolic_or_connid, + strlen(route_to_self->symbolic_or_connid), identifier); +#endif + + return command; +} + +} // namespace + +namespace transport { + +namespace core { + +HicnForwarderInterface::HicnForwarderInterface(UdpSocketConnector &connector) + : ForwarderInterface( + connector) {} + +HicnForwarderInterface::~HicnForwarderInterface() {} + +void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); } + +void HicnForwarderInterface::registerRoute(Prefix &prefix) { + auto command = + createCommandRoute(prefix.toSockaddr(), prefix.getPrefixLength()) + .release(); + send((uint8_t *)command, sizeof(RouteToSelfCommand), + [command]() { delete command; }); +} - send((uint8_t *)route_to_self, sizeof(RouteToSelfCommand), - [route_to_self]() { delete route_to_self; }); +void HicnForwarderInterface::closeConnection() { + auto command = createCommandDeleteConnection().release(); + send((uint8_t *)command, sizeof(DeleteSelfConnectionCommand), + [this, command]() { + delete command; + connector_.close(); + }); } } // namespace core -} // namespace transport \ No newline at end of file +} // namespace transport diff --git a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h index e57fae105..b11841b69 100644 --- a/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/hicn_forwarder_interface.h @@ -17,7 +17,7 @@ #include #include -#include +#include #include @@ -26,7 +26,10 @@ namespace transport { namespace core { class HicnForwarderInterface - : public ForwarderInterface { + : public ForwarderInterface { + static constexpr uint8_t ack_code = 0xc2; + static constexpr uint8_t nack_code = 0xc3; + public: union addressLight { uint32_t ipv4; @@ -46,9 +49,9 @@ class HicnForwarderInterface }; using route_to_self_command = struct route_to_self_command; - using ConnectorType = SocketConnector; + using ConnectorType = UdpSocketConnector; - HicnForwarderInterface(SocketConnector &connector); + HicnForwarderInterface(UdpSocketConnector &connector); ~HicnForwarderInterface(); @@ -58,6 +61,21 @@ class HicnForwarderInterface std::uint16_t getMtu() { return interface_mtu; } + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return message[0] == ack_code || message[0] == nack_code; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) { + if (packet_buffer->data()[0] == nack_code) { + throw errors::RuntimeException( + "Received Nack message from hicn light forwarder."); + } + } + + void closeConnection(); + private: static constexpr std::uint16_t interface_mtu = 1500; }; diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc index 38b2a2a98..c69a87fb7 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.cc +++ b/libtransport/src/hicn/transport/core/memif_connector.cc @@ -57,7 +57,7 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, OnReconnect &&on_reconnect_callback, asio::io_service &io_service, std::string app_name) - : Connector(), + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), memif_worker_(nullptr), timer_set_(false), send_timer_(std::make_unique(event_reactor_)), @@ -71,8 +71,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, enable_burst_(false), closed_(false), app_name_(app_name), - receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback), socket_filename_("") { std::call_once(MemifConnector::flag_, &MemifConnector::init, this); } @@ -372,7 +370,6 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, packet->append(packet_length); if (!connector->input_buffer_.push(std::move(packet))) { - TRANSPORT_LOGI("Error pushing packet. Ring buffer full."); // TODO Here we should consider the possibility to signal the congestion diff --git a/libtransport/src/hicn/transport/core/memif_connector.h b/libtransport/src/hicn/transport/core/memif_connector.h index 3d2e8411d..06a8fd73e 100644 --- a/libtransport/src/hicn/transport/core/memif_connector.h +++ b/libtransport/src/hicn/transport/core/memif_connector.h @@ -128,8 +128,6 @@ class MemifConnector : public Connector { uint8_t memif_mode_; std::string app_name_; uint16_t transmission_index_; - PacketReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; utils::SpinLock write_msgs_lock_; std::string socket_filename_; diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index 0932b56c6..7efbc2009 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -22,7 +22,7 @@ #include #include #include -#include +#include #include #include #include @@ -222,22 +222,25 @@ class Portal { } TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) { - for (auto &pend_interest : pending_interest_hash_table_) { - pend_interest.second->cancelTimer(); - } - - clear(); - if (kill_connection) { - connector_.close(); + forwarder_interface_.closeConnection(); } - io_service_.post([this]() { io_service_.stop(); }); + io_service_.post([this]() { + clear(); + io_service_.stop(); + }); } TRANSPORT_ALWAYS_INLINE void killConnection() { connector_.close(); } - TRANSPORT_ALWAYS_INLINE void clear() { pending_interest_hash_table_.clear(); } + TRANSPORT_ALWAYS_INLINE void clear() { + for (auto &pend_interest : pending_interest_hash_table_) { + pend_interest.second->cancelTimer(); + } + + pending_interest_hash_table_.clear(); + } TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { return io_service_; @@ -260,17 +263,16 @@ class Portal { return; } - if (packet_buffer->data()[0] == ForwarderInt::ack_code) { - // Hicn forwarder message + if (TRANSPORT_EXPECT_FALSE( + ForwarderInt::isControlMessage(packet_buffer->data()))) { processControlMessage(std::move(packet_buffer)); return; } - bool is_interest = Packet::isInterest(packet_buffer->data()); Packet::Format format = Packet::getFormatFromBuffer(packet_buffer->data()); if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { - if (!is_interest) { + if (!Packet::isInterest(packet_buffer->data())) { processContentObject( ContentObject::Ptr(new ContentObject(std::move(packet_buffer)))); } else { @@ -329,8 +331,7 @@ class Portal { TRANSPORT_ALWAYS_INLINE void processControlMessage( Packet::MemBufPtr &&packet_buffer) { - // Control message as response to the route set by a producer. - // Do nothing + forwarder_interface_.processControlMessageReply(std::move(packet_buffer)); } private: diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.cc b/libtransport/src/hicn/transport/core/raw_socket_connector.cc index 5cfff39fb..fe16d2132 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_connector.cc +++ b/libtransport/src/hicn/transport/core/raw_socket_connector.cc @@ -39,15 +39,13 @@ RawSocketConnector::RawSocketConnector( PacketReceivedCallback &&receive_callback, OnReconnect &&on_reconnect_callback, asio::io_service &io_service, std::string app_name) - : Connector(), + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), io_service_(io_service), socket_(io_service_, raw_protocol(PF_PACKET, SOCK_RAW)), // resolver_(io_service_), timer_(io_service_), read_msg_(packet_pool_.makePtr(nullptr)), data_available_(false), - receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback), app_name_(app_name) { memset(&link_layer_address_, 0, sizeof(link_layer_address_)); } diff --git a/libtransport/src/hicn/transport/core/raw_socket_connector.h b/libtransport/src/hicn/transport/core/raw_socket_connector.h index 5e39efa0e..bb24d9d54 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_connector.h +++ b/libtransport/src/hicn/transport/core/raw_socket_connector.h @@ -74,9 +74,6 @@ class RawSocketConnector : public Connector { utils::ObjectPool::Ptr read_msg_; bool data_available_; - - PacketReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; std::string app_name_; }; diff --git a/libtransport/src/hicn/transport/core/raw_socket_interface.h b/libtransport/src/hicn/transport/core/raw_socket_interface.h index c030af662..ac48e5874 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_interface.h +++ b/libtransport/src/hicn/transport/core/raw_socket_interface.h @@ -41,6 +41,16 @@ class RawSocketInterface std::uint16_t getMtu() { return interface_mtu; } + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return false; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) {} + + TRANSPORT_ALWAYS_INLINE void closeConnection(){}; + private: static constexpr std::uint16_t interface_mtu = 1500; std::string remote_mac_address_; diff --git a/libtransport/src/hicn/transport/core/socket_connector.cc b/libtransport/src/hicn/transport/core/socket_connector.cc deleted file mode 100644 index 7bf0570ad..000000000 --- a/libtransport/src/hicn/transport/core/socket_connector.cc +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifdef _WIN32 -#include -#endif -#include -#include -#include -#include - -#include - -namespace transport { - -namespace core { - -namespace { -class NetworkMessage { - public: - static constexpr std::size_t fixed_header_length = 10; - - static std::size_t decodeHeader(const uint8_t *packet) { - // General checks - // CCNX Control packet format - uint8_t first_byte = packet[0]; - uint8_t ip_format = (packet[0] & 0xf0) >> 4; - - if (TRANSPORT_EXPECT_FALSE(first_byte == 102)) { - // Get packet length - return 44; - } else if (TRANSPORT_EXPECT_TRUE(ip_format == 6 || ip_format == 4)) { - Packet::Format format = Packet::getFormatFromBuffer(packet); - return Packet::getHeaderSizeFromBuffer(format, packet) + - Packet::getPayloadSizeFromBuffer(format, packet); - } - - return 0; - } -}; -} // namespace - -SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback, - OnReconnect &&on_reconnect_callback, - asio::io_service &io_service, - std::string app_name) - : Connector(), - io_service_(io_service), - socket_(io_service_), - resolver_(io_service_), - timer_(io_service_), - read_msg_(packet_pool_.makePtr(nullptr)), - is_connecting_(false), - is_reconnection_(false), - data_available_(false), - is_closed_(false), - receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback), - app_name_(app_name) {} - -SocketConnector::~SocketConnector() {} - -void SocketConnector::connect(std::string ip_address, std::string port) { - endpoint_iterator_ = resolver_.resolve( - {ip_address, port, asio::ip::resolver_query_base::numeric_service}); - - doConnect(); -} - -void SocketConnector::state() { return; } - -void SocketConnector::send(const uint8_t *packet, std::size_t len, - const PacketSentCallback &packet_sent) { - asio::async_write(socket_, asio::buffer(packet, len), - [packet_sent](std::error_code ec, std::size_t /*length*/) { - packet_sent(); - }); -} - -void SocketConnector::send(const Packet::MemBufPtr &packet) { - io_service_.post([this, packet]() { - bool write_in_progress = !output_buffer_.empty(); - output_buffer_.push_back(std::move(packet)); - if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) { - if (!write_in_progress) { - doWrite(); - } - } else { - // Tell the handle connect it has data to write - data_available_ = true; - } - }); -} - -void SocketConnector::close() { - io_service_.dispatch([this]() { - is_closed_ = true; - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - }); -} - -void SocketConnector::doWrite() { - // TODO improve this piece of code for sending many buffers togethers - // if list contains more than one packet - auto packet = output_buffer_.front().get(); - auto array = std::vector(); - - const utils::MemBuf *current = packet; - do { - array.push_back(asio::const_buffer(current->data(), current->length())); - current = current->next(); - } while (current != packet); - - asio::async_write( - socket_, std::move(array), - [this /*, packet*/](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - output_buffer_.pop_front(); - if (!output_buffer_.empty()) { - doWrite(); - } - } else if (ec.value() == - static_cast(std::errc::operation_canceled)) { - // The connection has been closed by the application. - return; - } else { - TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); - tryReconnect(); - } - }); -} - -void SocketConnector::doReadBody(std::size_t body_length) { - asio::async_read( - socket_, asio::buffer(read_msg_->writableTail(), body_length), - asio::transfer_exactly(body_length), - [this](std::error_code ec, std::size_t length) { - read_msg_->append(length); - if (TRANSPORT_EXPECT_TRUE(!ec)) { - receive_callback_(std::move(read_msg_)); - doReadHeader(); - } else if (ec.value() == - static_cast(std::errc::operation_canceled)) { - // The connection has been closed by the application. - return; - } else { - TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); - tryReconnect(); - } - }); -} - -void SocketConnector::doReadHeader() { - read_msg_ = getPacket(); - asio::async_read( - socket_, - asio::buffer(read_msg_->writableData(), - NetworkMessage::fixed_header_length), - asio::transfer_exactly(NetworkMessage::fixed_header_length), - [this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - read_msg_->append(NetworkMessage::fixed_header_length); - std::size_t body_length = 0; - if ((body_length = NetworkMessage::decodeHeader(read_msg_->data())) > - 0) { - doReadBody(body_length - length); - } else { - TRANSPORT_LOGE("Decoding error. Ignoring packet."); - } - } else if (ec.value() == - static_cast(std::errc::operation_canceled)) { - // The connection has been closed by the application. - return; - } else { - TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); - tryReconnect(); - } - }); -} - -void SocketConnector::tryReconnect() { - if (!is_connecting_ && !is_closed_) { - TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); - is_connecting_ = true; - is_reconnection_ = true; - io_service_.post([this]() { - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - startConnectionTimer(); - doConnect(); - }); - } -} - -void SocketConnector::doConnect() { - asio::async_connect(socket_, endpoint_iterator_, - [this](std::error_code ec, tcp::resolver::iterator) { - if (!ec) { - timer_.cancel(); - is_connecting_ = false; - asio::ip::tcp::no_delay noDelayOption(true); - socket_.set_option(noDelayOption); - doReadHeader(); - - if (data_available_) { - data_available_ = false; - doWrite(); - } - - if (is_reconnection_) { - is_reconnection_ = false; - TRANSPORT_LOGI("Connection recovered!\n"); - on_reconnect_callback_(); - } - } else { - sleep(1); - doConnect(); - } - }); -} - -bool SocketConnector::checkConnected() { return !is_connecting_; } - -void SocketConnector::enableBurst() { return; } - -void SocketConnector::startConnectionTimer() { - timer_.expires_from_now(std::chrono::seconds(60)); - timer_.async_wait( - std::bind(&SocketConnector::handleDeadline, this, std::placeholders::_1)); -} - -void SocketConnector::handleDeadline(const std::error_code &ec) { - if (!ec) { - io_service_.post([this]() { - socket_.close(); - TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n"); - io_service_.stop(); - }); - } -} - -} // end namespace core - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/socket_connector.h b/libtransport/src/hicn/transport/core/socket_connector.h deleted file mode 100644 index 6eff1aff5..000000000 --- a/libtransport/src/hicn/transport/core/socket_connector.h +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#include -#include -#include - -namespace transport { -namespace core { - -using asio::ip::tcp; - -class SocketConnector : public Connector { - public: - SocketConnector(PacketReceivedCallback &&receive_callback, - OnReconnect &&reconnect_callback, - asio::io_service &io_service, - std::string app_name = "Libtransport"); - - ~SocketConnector() override; - - void send(const Packet::MemBufPtr &packet) override; - - void send(const uint8_t *packet, std::size_t len, - const PacketSentCallback &packet_sent = 0) override; - - void close() override; - - void enableBurst() override; - - void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); - - void state() override; - - private: - void doConnect(); - - void doReadHeader(); - - void doReadBody(std::size_t body_length); - - void doWrite(); - - bool checkConnected(); - - private: - void handleDeadline(const std::error_code &ec); - - void startConnectionTimer(); - - void tryReconnect(); - - asio::io_service &io_service_; - asio::ip::tcp::socket socket_; - asio::ip::tcp::resolver resolver_; - asio::ip::tcp::resolver::iterator endpoint_iterator_; - asio::steady_timer timer_; - - utils::ObjectPool::Ptr read_msg_; - - bool is_connecting_; - bool is_reconnection_; - bool data_available_; - bool is_closed_; - - PacketReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; - std::string app_name_; -}; - -} // end namespace core - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.cc b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc new file mode 100644 index 000000000..ade0f2611 --- /dev/null +++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.cc @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef _WIN32 +#include +#endif +#include +#include +#include +#include + +#include + +namespace transport { + +namespace core { + +namespace { +class NetworkMessage { + public: + static constexpr std::size_t fixed_header_length = 10; + + static std::size_t decodeHeader(const uint8_t *packet) { + // General checks + // CCNX Control packet format + uint8_t first_byte = packet[0]; + uint8_t ip_format = (packet[0] & 0xf0) >> 4; + + if (TRANSPORT_EXPECT_FALSE(first_byte == 102)) { + // Get packet length + return 44; + } else if (TRANSPORT_EXPECT_TRUE(ip_format == 6 || ip_format == 4)) { + Packet::Format format = Packet::getFormatFromBuffer(packet); + return Packet::getHeaderSizeFromBuffer(format, packet) + + Packet::getPayloadSizeFromBuffer(format, packet); + } + + return 0; + } +}; +} // namespace + +TcpSocketConnector::TcpSocketConnector( + PacketReceivedCallback &&receive_callback, + OnReconnect &&on_reconnect_callback, asio::io_service &io_service, + std::string app_name) + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), + io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + timer_(io_service_), + read_msg_(packet_pool_.makePtr(nullptr)), + is_connecting_(false), + is_reconnection_(false), + data_available_(false), + is_closed_(false), + app_name_(app_name) {} + +TcpSocketConnector::~TcpSocketConnector() {} + +void TcpSocketConnector::connect(std::string ip_address, std::string port) { + endpoint_iterator_ = resolver_.resolve( + {ip_address, port, asio::ip::resolver_query_base::numeric_service}); + + doConnect(); +} + +void TcpSocketConnector::state() { return; } + +void TcpSocketConnector::send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent) { + asio::async_write(socket_, asio::buffer(packet, len), + [packet_sent](std::error_code ec, std::size_t /*length*/) { + packet_sent(); + }); +} + +void TcpSocketConnector::send(const Packet::MemBufPtr &packet) { + io_service_.post([this, packet]() { + bool write_in_progress = !output_buffer_.empty(); + output_buffer_.push_back(std::move(packet)); + if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) { + if (!write_in_progress) { + doWrite(); + } + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } + }); +} + +void TcpSocketConnector::close() { + io_service_.dispatch([this]() { + is_closed_ = true; + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + }); +} + +void TcpSocketConnector::doWrite() { + // TODO improve this piece of code for sending many buffers togethers + // if list contains more than one packet + auto packet = output_buffer_.front().get(); + auto array = std::vector(); + + const utils::MemBuf *current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); + + asio::async_write( + socket_, std::move(array), + [this /*, packet*/](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + output_buffer_.pop_front(); + if (!output_buffer_.empty()) { + doWrite(); + } + } else if (ec.value() == + static_cast(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void TcpSocketConnector::doReadBody(std::size_t body_length) { + asio::async_read( + socket_, asio::buffer(read_msg_->writableTail(), body_length), + asio::transfer_exactly(body_length), + [this](std::error_code ec, std::size_t length) { + read_msg_->append(length); + if (TRANSPORT_EXPECT_TRUE(!ec)) { + receive_callback_(std::move(read_msg_)); + doReadHeader(); + } else if (ec.value() == + static_cast(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void TcpSocketConnector::doReadHeader() { + read_msg_ = getPacket(); + asio::async_read( + socket_, + asio::buffer(read_msg_->writableData(), + NetworkMessage::fixed_header_length), + asio::transfer_exactly(NetworkMessage::fixed_header_length), + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + read_msg_->append(NetworkMessage::fixed_header_length); + std::size_t body_length = 0; + if ((body_length = NetworkMessage::decodeHeader(read_msg_->data())) > + 0) { + doReadBody(body_length - length); + } else { + TRANSPORT_LOGE("Decoding error. Ignoring packet."); + } + } else if (ec.value() == + static_cast(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void TcpSocketConnector::tryReconnect() { + if (!is_connecting_ && !is_closed_) { + TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); + is_connecting_ = true; + is_reconnection_ = true; + io_service_.post([this]() { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + startConnectionTimer(); + doConnect(); + }); + } +} + +void TcpSocketConnector::doConnect() { + asio::async_connect(socket_, endpoint_iterator_, + [this](std::error_code ec, tcp::resolver::iterator) { + if (!ec) { + timer_.cancel(); + is_connecting_ = false; + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); + doReadHeader(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + TRANSPORT_LOGI("Connection recovered!\n"); + on_reconnect_callback_(); + } + } else { + sleep(1); + doConnect(); + } + }); +} + +bool TcpSocketConnector::checkConnected() { return !is_connecting_; } + +void TcpSocketConnector::enableBurst() { return; } + +void TcpSocketConnector::startConnectionTimer() { + timer_.expires_from_now(std::chrono::seconds(60)); + timer_.async_wait(std::bind(&TcpSocketConnector::handleDeadline, this, + std::placeholders::_1)); +} + +void TcpSocketConnector::handleDeadline(const std::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n"); + io_service_.stop(); + }); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/tcp_socket_connector.h b/libtransport/src/hicn/transport/core/tcp_socket_connector.h new file mode 100644 index 000000000..8dfda4eb8 --- /dev/null +++ b/libtransport/src/hicn/transport/core/tcp_socket_connector.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace transport { +namespace core { + +using asio::ip::tcp; + +class TcpSocketConnector : public Connector { + public: + TcpSocketConnector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback, + asio::io_service &io_service, + std::string app_name = "Libtransport"); + + ~TcpSocketConnector() override; + + void send(const Packet::MemBufPtr &packet) override; + + void send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent = 0) override; + + void close() override; + + void enableBurst() override; + + void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); + + void state() override; + + private: + void doConnect(); + + void doReadHeader(); + + void doReadBody(std::size_t body_length); + + void doWrite(); + + bool checkConnected(); + + private: + void handleDeadline(const std::error_code &ec); + + void startConnectionTimer(); + + void tryReconnect(); + + asio::io_service &io_service_; + asio::ip::tcp::socket socket_; + asio::ip::tcp::resolver resolver_; + asio::ip::tcp::resolver::iterator endpoint_iterator_; + asio::steady_timer timer_; + + utils::ObjectPool::Ptr read_msg_; + + bool is_connecting_; + bool is_reconnection_; + bool data_available_; + bool is_closed_; + + std::string app_name_; +}; + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.cc b/libtransport/src/hicn/transport/core/udp_socket_connector.cc new file mode 100644 index 000000000..f38891e71 --- /dev/null +++ b/libtransport/src/hicn/transport/core/udp_socket_connector.cc @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef _WIN32 +#include +#endif +#include +#include +#include +#include + +#include + +namespace transport { + +namespace core { + +UdpSocketConnector::UdpSocketConnector( + PacketReceivedCallback &&receive_callback, + OnReconnect &&on_reconnect_callback, asio::io_service &io_service, + std::string app_name) + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), + io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + connection_timer_(io_service_), + connection_timeout_(io_service_), + read_msg_(packet_pool_.makePtr(nullptr)), + is_connecting_(false), + is_reconnection_(false), + data_available_(false), + is_closed_(false), + app_name_(app_name) {} + +UdpSocketConnector::~UdpSocketConnector() {} + +void UdpSocketConnector::connect(std::string ip_address, std::string port) { + endpoint_iterator_ = resolver_.resolve( + {ip_address, port, asio::ip::resolver_query_base::numeric_service}); + + doConnect(); +} + +void UdpSocketConnector::state() { return; } + +void UdpSocketConnector::send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent) { + socket_.async_send(asio::buffer(packet, len), + [packet_sent](std::error_code ec, std::size_t /*length*/) { + packet_sent(); + }); +} + +void UdpSocketConnector::send(const Packet::MemBufPtr &packet) { + io_service_.post([this, packet]() { + bool write_in_progress = !output_buffer_.empty(); + output_buffer_.push_back(std::move(packet)); + if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) { + if (!write_in_progress) { + doWrite(); + } + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } + }); +} + +void UdpSocketConnector::close() { + io_service_.dispatch([this]() { + is_closed_ = true; + socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both); + socket_.close(); + }); +} + +void UdpSocketConnector::doWrite() { + // TODO improve this piece of code for sending many buffers togethers + // if list contains more than one packet + auto packet = output_buffer_.front().get(); + auto array = std::vector(); + + const utils::MemBuf *current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); + + socket_.async_send(std::move(array), [this /*, packet*/](std::error_code ec, + std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + output_buffer_.pop_front(); + if (!output_buffer_.empty()) { + doWrite(); + } + } else if (ec.value() == static_cast(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void UdpSocketConnector::doRead() { + read_msg_ = getPacket(); + socket_.async_receive( + asio::buffer(read_msg_->writableData(), Connector::packet_size), + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + read_msg_->append(length); + receive_callback_(std::move(read_msg_)); + doRead(); + } else if (ec.value() == + static_cast(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void UdpSocketConnector::tryReconnect() { + if (!is_connecting_ && !is_closed_) { + TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); + is_connecting_ = true; + is_reconnection_ = true; + connection_timer_.expires_from_now(std::chrono::seconds(1)); + connection_timer_.async_wait([this](const std::error_code &ec) { + if (!ec) { + socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both); + socket_.close(); + startConnectionTimer(); + doConnect(); + } + }); + } +} + +void UdpSocketConnector::doConnect() { + asio::async_connect(socket_, endpoint_iterator_, + [this](std::error_code ec, udp::resolver::iterator) { + if (!ec) { + connection_timeout_.cancel(); + is_connecting_ = false; + doRead(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + on_reconnect_callback_(); + } + } else { + sleep(1); + doConnect(); + } + }); +} + +bool UdpSocketConnector::checkConnected() { return !is_connecting_; } + +void UdpSocketConnector::enableBurst() { return; } + +void UdpSocketConnector::startConnectionTimer() { + connection_timeout_.expires_from_now(std::chrono::seconds(60)); + connection_timeout_.async_wait(std::bind(&UdpSocketConnector::handleDeadline, + this, std::placeholders::_1)); +} + +void UdpSocketConnector::handleDeadline(const std::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n"); + io_service_.stop(); + }); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/udp_socket_connector.h b/libtransport/src/hicn/transport/core/udp_socket_connector.h new file mode 100644 index 000000000..4704fa50b --- /dev/null +++ b/libtransport/src/hicn/transport/core/udp_socket_connector.h @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace transport { +namespace core { + +using asio::ip::udp; + +class UdpSocketConnector : public Connector { + public: + UdpSocketConnector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback, + asio::io_service &io_service, + std::string app_name = "Libtransport"); + + ~UdpSocketConnector() override; + + void send(const Packet::MemBufPtr &packet) override; + + void send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent = 0) override; + + void close() override; + + void enableBurst() override; + + void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); + + void state() override; + + private: + void doConnect(); + + void doRead(); + + void doWrite(); + + bool checkConnected(); + + private: + void handleDeadline(const std::error_code &ec); + + void startConnectionTimer(); + + void tryReconnect(); + + asio::io_service &io_service_; + asio::ip::udp::socket socket_; + asio::ip::udp::resolver resolver_; + asio::ip::udp::resolver::iterator endpoint_iterator_; + asio::steady_timer connection_timer_; + asio::steady_timer connection_timeout_; + + utils::ObjectPool::Ptr read_msg_; + + bool is_connecting_; + bool is_reconnection_; + bool data_available_; + bool is_closed_; + + std::string app_name_; +}; + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc index 69b18c0d9..8dc607295 100644 --- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc +++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc @@ -47,20 +47,7 @@ VPPForwarderInterface::VPPForwarderInterface(MemifConnector &connector) sw_if_index_(~0), face_id_(~0) {} -VPPForwarderInterface::~VPPForwarderInterface() { - if (sw_if_index_ != uint32_t(~0) && VPPForwarderInterface::memif_api_) { - int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_, - sw_if_index_); - - if (ret < 0) { - TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_); - } - } - - if (VPPForwarderInterface::api_) { - vpp_binary_api_destroy(VPPForwarderInterface::api_); - } -} +VPPForwarderInterface::~VPPForwarderInterface() {} /** * @brief Create a memif interface in the local VPP forwarder. @@ -220,6 +207,23 @@ void VPPForwarderInterface::registerRoute(Prefix &prefix) { } } +void VPPForwarderInterface::closeConnection() { + if (sw_if_index_ != uint32_t(~0) && VPPForwarderInterface::memif_api_) { + int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_, + sw_if_index_); + + if (ret < 0) { + TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_); + } + } + + if (VPPForwarderInterface::api_) { + vpp_binary_api_destroy(VPPForwarderInterface::api_); + } + + connector_.close(); +} + } // namespace core } // namespace transport diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h index 322cd1f8b..62af8bc3b 100644 --- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.h @@ -31,6 +31,8 @@ namespace core { class VPPForwarderInterface : public ForwarderInterface { + static constexpr std::uint16_t interface_mtu = 1500; + public: VPPForwarderInterface(MemifConnector &connector); @@ -44,6 +46,16 @@ class VPPForwarderInterface TRANSPORT_ALWAYS_INLINE std::uint16_t getMtu() { return interface_mtu; } + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return false; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) {} + + void closeConnection(); + private: uint32_t getMemifConfiguration(); @@ -58,7 +70,6 @@ class VPPForwarderInterface uint32_t sw_if_index_; uint32_t face_id_; static std::mutex global_lock_; - static constexpr std::uint16_t interface_mtu = 1500; }; } // namespace core diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index dbf1c1bd1..1356ad566 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -614,10 +614,10 @@ void RTCTransportProtocol::processRtcpHeader(uint8_t *offset) { uint8_t pkt_type = (*(offset + 1)); switch (pkt_type) { case HICN_RTCP_RR: // Receiver report - TRANSPORT_LOGI("got RR packet\n"); + TRANSPORT_LOGD("got RR packet\n"); break; case HICN_RTCP_SR: // Sender report - TRANSPORT_LOGI("got SR packet\n"); + TRANSPORT_LOGD("got SR packet\n"); break; case HICN_RTCP_SDES: // Description processSDES(offset); diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 8db4d9f0d..bbef2d27e 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -108,7 +108,8 @@ class HIperfClient { HIperfClient(const ClientConfiguration &conf) : configuration_(conf), total_duration_milliseconds_(0), - old_bytes_value_(0) {} + old_bytes_value_(0), + signals_(io_service_, SIGINT) {} void processPayload(ConsumerSocket &c, std::size_t bytes_transferred, const std::error_code &ec) { @@ -122,6 +123,8 @@ class HIperfClient { std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- " << (bytes_transferred * 8) * 1.0 / usec * 1.0 << " [Mbps]" << std::endl; + + io_service_.stop(); } bool verifyData(ConsumerSocket &c, const ContentObject &contentObject) { @@ -302,11 +305,15 @@ class HIperfClient { int run() { std::cout << "Starting download of " << configuration_.name << std::endl; - do { - t1_ = std::chrono::steady_clock::now(); - consumer_socket_->consume(configuration_.name, - *configuration_.receive_buffer); - } while (configuration_.virtual_download); + signals_.async_wait([this](const std::error_code &, const int &) { + consumer_socket_->stop(); + io_service_.stop(); + }); + + t1_ = std::chrono::steady_clock::now(); + consumer_socket_->asyncConsume(configuration_.name, + configuration_.receive_buffer); + io_service_.run(); return ERROR_SUCCESS; } @@ -317,7 +324,8 @@ class HIperfClient { Time t1_; uint32_t total_duration_milliseconds_; uint64_t old_bytes_value_; - // std::unique_ptr signals_; + asio::io_service io_service_; + asio::signal_set signals_; }; class HIperfServer { @@ -326,13 +334,10 @@ class HIperfServer { public: HIperfServer(ServerConfiguration &conf) : configuration_(conf), - // signals_(io_service_, SIGINT, SIGQUIT), + signals_(io_service_, SIGINT), content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), content_objects_index_(0), mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1) { - // signals_.async_wait([this] (const std::error_code&, const int&) - // {std::cout << "STOPPING!!" << std::endl; io_service_.stop();}); - std::string buffer(1440, 'X'); std::cout << "Producing contents under name " << conf.name.getName() @@ -480,7 +485,14 @@ class HIperfServer { int run() { std::cerr << "Starting to serve consumers" << std::endl; - producer_socket_->serveForever(); + + signals_.async_wait([this](const std::error_code &, const int &) { + std::cout << "STOPPING!!" << std::endl; + producer_socket_->stop(); + io_service_.stop(); + }); + + io_service_.run(); return ERROR_SUCCESS; } @@ -488,7 +500,8 @@ class HIperfServer { private: ServerConfiguration configuration_; std::unique_ptr producer_socket_; - // asio::signal_set signals_; + asio::io_service io_service_; + asio::signal_set signals_; std::vector> content_objects_; std::uint16_t content_objects_index_; std::uint16_t mask_; diff --git a/utils/src/ping_client.cc b/utils/src/ping_client.cc index 598012143..d31147c70 100644 --- a/utils/src/ping_client.cc +++ b/utils/src/ping_client.cc @@ -117,9 +117,9 @@ class Client : interface::BasePortal::ConsumerCallback { auto dt = std::chrono::duration_cast(t1 - t0); std::cout << "Verification time: " << dt.count() << std::endl; - std::cout << "<<<<<< Signature OK!!!" << std::endl; + std::cout << "<<< Signature Ok." << std::endl; } else { - std::cout << "<<<<<< Signature verification failed!" << std::endl; + std::cout << "<<< Signature verification failed!" << std::endl; } } diff --git a/utils/src/ping_server.cc b/utils/src/ping_server.cc index e3500a4ba..3a20dffd1 100644 --- a/utils/src/ping_server.cc +++ b/utils/src/ping_server.cc @@ -288,9 +288,7 @@ int main(int argc, char **argv) { reset, ttl, identity, sign); } - asio::io_service io_service; - - ProducerSocket p(io_service); // , setProducerIdentity()); + ProducerSocket p; p.registerPrefix(producer_namespace); p.setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U); @@ -301,7 +299,17 @@ int main(int argc, char **argv) { p.connect(); - p.serveForever(); + asio::io_service io_service; + asio::signal_set signal_set(io_service, SIGINT); + signal_set.async_wait( + [&p, &io_service](const std::error_code &, const int &) { + std::cout << "STOPPING!!" << std::endl; + p.stop(); + io_service.stop(); + }); + + io_service.run(); + #ifdef _WIN32 WSACleanup(); #endif -- cgit 1.2.3-korg