diff options
Diffstat (limited to 'hicn-light/src/hicn/io')
-rw-r--r-- | hicn-light/src/hicn/io/CMakeLists.txt | 1 | ||||
-rw-r--r-- | hicn-light/src/hicn/io/base.c | 133 | ||||
-rw-r--r-- | hicn-light/src/hicn/io/base.h | 38 | ||||
-rw-r--r-- | hicn-light/src/hicn/io/hicn.c | 95 | ||||
-rw-r--r-- | hicn-light/src/hicn/io/tcp.c | 69 | ||||
-rw-r--r-- | hicn-light/src/hicn/io/udp.c | 133 |
6 files changed, 297 insertions, 172 deletions
diff --git a/hicn-light/src/hicn/io/CMakeLists.txt b/hicn-light/src/hicn/io/CMakeLists.txt index d7e6977d6..80a1ba867 100644 --- a/hicn-light/src/hicn/io/CMakeLists.txt +++ b/hicn-light/src/hicn/io/CMakeLists.txt @@ -17,6 +17,7 @@ list(APPEND HEADER_FILES ) list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/base.c ${CMAKE_CURRENT_SOURCE_DIR}/hicn.c ${CMAKE_CURRENT_SOURCE_DIR}/tcp.c ${CMAKE_CURRENT_SOURCE_DIR}/udp.c diff --git a/hicn-light/src/hicn/io/base.c b/hicn-light/src/hicn/io/base.c new file mode 100644 index 000000000..bd5904ccb --- /dev/null +++ b/hicn-light/src/hicn/io/base.c @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file base.c + * #brief Implementation of base IO functions. + */ + +#include <hicn/util/log.h> + +#include "base.h" + +/** + * @brief Helper function for listener to read a single packet on a socket + */ +ssize_t io_read_single_fd(int fd, msgbuf_t * msgbuf, address_t * address) +{ + uint8_t * packet = msgbuf_get_packet(msgbuf); + size_t size = msgbuf_get_len(msgbuf); + + for(;;) { + ssize_t n = read(fd, packet, size); + if (n == 0) + return n; + if (n < 0) { + if (errno == EINTR) + continue; // XXX was break; + ERROR("read failed %d: (%d) %s", fd, errno, strerror(errno)); + return -1; + } + + msgbuf->length = n; + *address = ADDRESS_ANY(AF_UNSPEC, 0); // XXX placeholder, see hicn.c + } + + return 1; +} + +ssize_t io_read_single_socket(int fd, msgbuf_t * msgbuf, address_t * address) +{ + + struct sockaddr_storage * sa = (struct sockaddr_storage *)address; + socklen_t sa_len = sizeof(sa); + + uint8_t * packet = msgbuf_get_packet(msgbuf); + + for (;;) { + ssize_t n = recvfrom(fd, packet, MTU, 0, (struct sockaddr *)&sa, &sa_len); + if (n == 0) + return n; + if (n < 0) { + if (errno == EINTR) + continue; // XXX was break; + ERROR("recvfrom failed %d: (%d) %s", fd, errno, strerror(errno)); + return -1; + } + + msgbuf->length = n; + } + + return 1; +} + +#ifdef __linux__ +ssize_t io_read_batch_socket(int fd, msgbuf_t ** msgbuf, + address_t ** address, size_t batch_size) +{ + struct mmsghdr msghdr[batch_size]; + struct iovec iovecs[batch_size]; + struct sockaddr_storage addrs[batch_size]; + + /* Prepare the mmghdr struct for recvmmsg */ + for (unsigned i = 0; i < MAX_MSG; i++) { + struct mmsghdr *msg = &msghdr[i]; + *msg = (struct mmsghdr) { + .msg_hdr = { + .msg_iov = &iovecs[i], + .msg_iovlen = 1, + .msg_name = &addrs[i], + .msg_namelen = sizeof(struct sockaddr_storage), + .msg_control = NULL, + .msg_controllen = 0, + }, + }; + + iovecs[i] = (struct iovec) { + .iov_base = msgbuf_get_packet(msgbuf[i]), + .iov_len = MTU, + }; + } + + int n; + for (;;) { + n = recvmmsg(fd, msghdr, batch_size, /* flags */ 0, + /* timeout */ NULL); + if (n == 0) + return 0; + if (n < 0) { + if (errno == EINTR) + continue; // XXX was break; + ERROR("read failed %d: (%d) %s", fd, errno, strerror(errno)); + return (ssize_t) n; + } + + /* Assign size to msgbuf, and build address pair */ + // TODO: local (in pair) is not initialized + for (int i = 0; i < n; i++) { + struct mmsghdr *msg = &msghdr[i]; + msgbuf[i]->length = msg->msg_hdr.msg_iovlen; + **address = *(address_t*)msg->msg_hdr.msg_name; + + struct sockaddr_in *src = msghdr[i].msg_hdr.msg_name; + DEBUG("msg received from port %u", ntohs(src->sin_port)); + } + break; + } + + return n; +} +#endif /* __linux__ */ + diff --git a/hicn-light/src/hicn/io/base.h b/hicn-light/src/hicn/io/base.h new file mode 100644 index 000000000..639334072 --- /dev/null +++ b/hicn-light/src/hicn/io/base.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file base.h + * #brief Base IO functions. + */ + +#ifndef HICNLIGHT_IO_BASE +#define HICNLIGHT_IO_BASE + +#include "../core/address_pair.h" +#include "../core/msgbuf.h" + +#define MAX_MSG 64 //16 //32 + +ssize_t io_read_single_fd(int fd, msgbuf_t * msgbuf, + address_t * address); + +ssize_t io_read_single_socket(int fd, msgbuf_t * msgbuf, + address_t * address); + +ssize_t io_read_batch_socket(int fd, msgbuf_t ** msgbuf, + address_t ** address, size_t n); + +#endif /* HICNLIGHT_IO_BASE */ diff --git a/hicn-light/src/hicn/io/hicn.c b/hicn-light/src/hicn/io/hicn.c index a239dc781..02b6728a9 100644 --- a/hicn-light/src/hicn/io/hicn.c +++ b/hicn-light/src/hicn/io/hicn.c @@ -26,17 +26,18 @@ #include <string.h> #include <unistd.h> -#include <hicn/core/listener.h> -#include <hicn/core/listener_vft.h> -#include <hicn/core/connection.h> -#include <hicn/core/connection_vft.h> -#include <hicn/core/connection_table.h> -#include <hicn/core/forwarder.h> -#include <hicn/core/mapme.h> -#include <hicn/core/messagePacketType.h> -#include <hicn/socket/api.h> #include <hicn/util/log.h> +#include "base.h" +#include "../core/listener.h" +#include "../core/listener_vft.h" +#include "../core/connection.h" +#include "../core/connection_vft.h" +#include "../core/connection_table.h" +#include "../core/forwarder.h" +#include "../core/mapme.h" +#include "../socket/api.h" + #define IPv6 6 #define IPv4 4 #define MTU_SIZE 1500 // bytes @@ -242,65 +243,7 @@ typedef struct { } listener_hicn_data_t; -static -void -listener_hicn_read_callback(listener_t * listener, int fd, void * data) -{ - assert(listener); - assert(!data); /* No user data */ - uint8_t packet[MTU_SIZE]; - - int family = address_family(&listener->address); - if ((family != AF_INET) && (family != AF_INET6)) { - /* - * We need to discard the frame. Read 1 byte. This will clear it off - * the stack. - */ - int nread = read(fd, packet, 1); - - if (nread > 0) { - DEBUG("Discarded frame from fd %d", fd); - } else if (nread < 0) { - ERROR("Error trying to discard frame from fd %d: (%d) %s", fd, errno, - strerror(errno)); - } - return; - } - -#if 0 - if (!(what & PARCEventType_Read)) - return; -#endif - - ssize_t n = read(fd, packet, MTU_SIZE); - if (n < 0) { - ERROR("read failed %d: (%d) %s", fd, errno, strerror(errno)); - return; - } - -#if 0 - address_t packet_addr; - if (_createAddressFromPacket(packet, &packet_addr) < 0) - return; - - address_pair_t pair_find = { - .local = packet_addr, - .remote = /* dummy */ hicn->localAddress, - }; - const Connection *conn = _lookupConnection(listener, &pair_find); - if (!conn) { - address_pair_t pair = { - .local = hicn->localAddress, - .remote = packet_addr, - }; - connid = _createNewConnection(listener, fd, &pair); - } else { - connid = connection_GetConnectionId(conn); - } -#endif - - listener_read_callback(listener->forwarder, listener, fd, &listener->address, packet, n); -} +#define listener_hicn_read_callback listener_read_callback bool listener_hicn_bind(listener_t * listener, const address_t * address) @@ -426,7 +369,8 @@ listener_hicn_get_socket(const listener_t * listener, const address_t * local, { assert(listener); assert(listener_get_type(listener) == FACE_TYPE_HICN); - assert(pair); + assert(local); + assert(remote); /* ... */ @@ -434,6 +378,9 @@ listener_hicn_get_socket(const listener_t * listener, const address_t * local, } +#define listener_hicn_read_single io_read_single_fd +#define listener_hicn_read_batch NULL + DECLARE_LISTENER(hicn); /****************************************************************************** @@ -503,7 +450,7 @@ static int connection_hicn_send_packet(const connection_t * connection, const uint8_t * packet, size_t size) { - assert(ops); + assert(connection); assert(packet); /* ... */ @@ -511,12 +458,4 @@ connection_hicn_send_packet(const connection_t * connection, const uint8_t * pac return 0; } -static -void -connection_hicn_read_callback(connection_t * connection, int fd, void * data) -{ - ERROR("Unexpected read callback for hicn connection"); - return; -} - DECLARE_CONNECTION(hicn); diff --git a/hicn-light/src/hicn/io/tcp.c b/hicn-light/src/hicn/io/tcp.c index 03911e556..1e36f78ea 100644 --- a/hicn-light/src/hicn/io/tcp.c +++ b/hicn-light/src/hicn/io/tcp.c @@ -27,19 +27,18 @@ #include <stdio.h> #include <string.h> -#include <hicn/core/connection.h> -#include <hicn/core/connection_vft.h> -#include <hicn/core/listener.h> -#include <hicn/core/listener_vft.h> -#include <hicn/core/msgbuf.h> -#include <hicn/core/forwarder.h> - -#include <hicn/core/messageHandler.h> - -#include <hicn/utils/commands.h> +#include <hicn/hicn.h> #include <hicn/util/log.h> -#include <hicn/hicn.h> +#include "base.h" +#include "../core/connection.h" +#include "../core/connection_vft.h" +#include "../core/listener.h" +#include "../core/listener_vft.h" +#include "../core/msgbuf.h" +#include "../core/forwarder.h" +#include "../core/messageHandler.h" + // 128 KB output queue #define OUTPUT_QUEUE_BYTES (128 * 1024) @@ -93,13 +92,8 @@ listener_tcp_get_socket(const listener_t * listener, const address_t * local, } -static -void -listener_tcp_read_callback(listener_t * listener, int fd, void * data) -{ - ERROR("[listener_tcp_read_callback] Not implemented"); - -} +#define listener_tcp_read_single io_read_single_socket +#define listener_tcp_read_batch NULL DECLARE_LISTENER(tcp); @@ -149,12 +143,14 @@ connection_tcp_accept(connection_t * connection, forwarder_t *forwarder, int fd, .closed = false, }; - // XXX this new connection needs to be registered - //char *str = pair_ToString(udp->pair); + char addr_str[INET6_ADDRSTRLEN]; + if (local) + address_to_string(&(pair->local), addr_str); + else + address_to_string(&(pair->remote), addr_str); INFO("%s connection %p created for address %s (local=%s)", - face_type_str(connection->type), connection, "N/A", + face_type_str(connection->type), connection, addr_str, connection_is_local(connection) ? "true" : "false"); - //free(str); return 0; } @@ -224,7 +220,7 @@ int connection_tcp_initialize(connection_t * connection) { assert(connection); - assert(connection->type = FACE_TYPE_TCP); + assert(connection->type == FACE_TYPE_TCP); connection_tcp_data_t * data = connection->data; assert(data); @@ -238,10 +234,12 @@ connection_tcp_initialize(connection_t * connection) return -1; } - //char *pair_str = address_pair_ToString(pair); - INFO("%s connection %p connect for address pair %s", - face_type_str(connection->type), connection, "N/A"); - //free(pair_str); + char local_addr_str[INET6_ADDRSTRLEN]; + address_to_string(&(connection->pair.local), local_addr_str); + char remote_addr_str[INET6_ADDRSTRLEN]; + address_to_string(&(connection->pair.remote), remote_addr_str); + INFO("%s connection %p connect for address pair %s - %s", + face_type_str(connection->type), connection,local_addr_str, remote_addr_str); return 0; } @@ -324,11 +322,10 @@ connection_tcp_sendv(connnection_t * connection, struct iovec * iov, // XXX too much repeated code with sendv here static int -connection_tcp_send(const connection_t * connection, //const address_t * address, - msgbuf_t * msgbuf, bool queue) +connection_tcp_send(const connection_t * connection, msgbuf_t * msgbuf, + bool queue) { assert(connection); - assert(address); /* msgbuf can be NULL */ /* No need to flush */ @@ -397,14 +394,14 @@ connection_tcp_read_message(connection_t * connection, msgbuf_t * msgbuf) size_t n = evbuffer_get_length(data->evbuffer); // XXX this check was wrong - // parcAssertTrue(n >= sizeof(header_control_message), + // parcAssertTrue(n >= sizeof(cmd_header_t), "Called with too short an input: %zu", n); // XXX WTF if (stream->next_len == 0) { // this linearizes the first messageHandler_GetIPv6HeaderLength() bytes of the // input buffer's iovecs and returns a pointer to it. - uint8_t *fh = parcEventBuffer_Pullup(data->evbuffer, sizeof(header_control_message)); + uint8_t *fh = parcEventBuffer_Pullup(data->evbuffer, sizeof(cmd_header_t)); // Calculate the total message size based on the fixed header stream->next_len = messageHandler_GetTotalPacketLength(fh); @@ -459,12 +456,13 @@ ERR: * @param <#param1#> * @return <#return#> */ +#if 0 static void connection_tcp_read_callback(connection_t * connection, int fd, void * user_data) { - assert(!!(what & PARCEventType_Read)); - assert(connection_void); + assert(connection); + /* user_data can be NULL */ connection_tcp_data_t * data = connection->data; assert(RECV_BUFLEN - data->woff > MTU); @@ -487,7 +485,7 @@ connection_tcp_read_callback(connection_t * connection, int fd, void * user_data uint8_t * packet = data->buf + data->roff; size_t size = data->woff - data->roff; /* > 0 */ - ssize_t used = listener_read_callback(connection->forwarder, NULL, fd, + ssize_t used = listener_read_callback(NULL, fd, address_pair_get_local(&connection->pair), packet, size); if (used < 0) return; // XXX close connection ? @@ -517,6 +515,7 @@ connection_tcp_read_callback(connection_t * connection, int fd, void * user_data return; } +#endif #if 0 static diff --git a/hicn-light/src/hicn/io/udp.c b/hicn-light/src/hicn/io/udp.c index 2baae1e5b..edff902de 100644 --- a/hicn-light/src/hicn/io/udp.c +++ b/hicn-light/src/hicn/io/udp.c @@ -31,6 +31,7 @@ #include <fcntl.h> #include <stdbool.h> #include <stdio.h> +#include <stdlib.h> #include <string.h> #ifndef _WIN32 #include <sys/uio.h> @@ -46,19 +47,20 @@ #define UDP_GRO 104 #endif /* WITH_GSO */ -#include <hicn/core/address_pair.h> -#include <hicn/core/connection.h> -#include <hicn/core/connection_vft.h> -#include <hicn/core/listener.h> -#include <hicn/core/listener_vft.h> -#include <hicn/base/loop.h> -#include <hicn/core/msgbuf.h> -#include <hicn/core/forwarder.h> -#include <hicn/core/messageHandler.h> -#include <hicn/core/messagePacketType.h> -#include <hicn/hicn-light/config.h> #include <hicn/util/log.h> +#include "base.h" +#include "../base/loop.h" +#include "../core/address_pair.h" +#include "../core/connection.h" +#include "../core/connection_vft.h" +#include "../core/forwarder.h" +#include "../core/listener.h" +#include "../core/listener_vft.h" +#include "../core/messageHandler.h" +#include "../core/msgbuf.h" +//#include "../hicn-light/config.h" + // Batching based on recvmmsg is also generic // the difference is the handling of packet as in tcp we need to go through the // ring buffer first to do the framing, while in UDP this is already done @@ -75,8 +77,6 @@ typedef struct { uint16_t port; // in address ? - - batch_buffer_t bb; } listener_udp_data_t; #ifdef __ANDROID__ @@ -211,19 +211,10 @@ listener_udp_initialize(listener_t * listener) { assert(listener); +#if 0 listener_udp_data_t * data = listener->data; assert(data); - - init_batch_buffers(&data->bb); - - // XXX Socket creation should be a function per-se and not be called in - // initialize ! - listener->fd = listener_get_socket(listener, &listener->address, NULL, - listener->interface_name); - if (listener->fd < 0) { - ERROR("Error creating UDP socket: (%d) %s", errno, strerror(errno)); - return -1; - } +#endif return 0; } @@ -249,7 +240,7 @@ int listener_udp_get_socket(const listener_t * listener, const address_t * local, const address_t * remote, const char * interface_name) { - int fd = socket(address_family(remote), SOCK_DGRAM, 0); + int fd = socket(address_family(local), SOCK_DGRAM, 0); if (fd < 0) goto ERR_SOCKET; @@ -257,7 +248,7 @@ listener_udp_get_socket(const listener_t * listener, const address_t * local, goto ERR; } - if (bind(fd, address_sa(local), address_socklen(remote)) < 0) { + if (bind(fd, address_sa(local), address_socklen(local)) < 0) { perror("bind"); goto ERR; } @@ -287,18 +278,13 @@ ERR_SOCKET: return -1; } -static -void -listener_udp_read_callback(listener_t * listener, int fd, void * user_data) -{ - assert(listener); - assert(!user_data); - - listener_udp_data_t * data = listener->data; - assert(data); +#define listener_udp_read_single io_read_single_socket - listener_batch_read_callback(listener->forwarder, listener, fd, &listener->address, &data->bb); -} +#ifdef __linux__ +#define listener_udp_read_batch io_read_batch_socket +#else +#define listener_udp_read_batch NULL +#endif /* __linux__ */ DECLARE_LISTENER(udp); @@ -307,7 +293,8 @@ DECLARE_LISTENER(udp); ******************************************************************************/ typedef struct { - batch_buffer_t bb; + // XXX queue storage : msfbuf id + off_t queue[MAX_MSG]; // sized according to the max batch int queue_len; } connection_udp_data_t; @@ -317,15 +304,12 @@ connection_udp_initialize(connection_t * connection) { assert(connection); - assert(connection->type == FACE_TYPE_UDP); - assert(interface_name); - assert(address_pair); + assert(connection->type == FACE_TYPE_UDP || connection->type == FACE_TYPE_UDP_LISTENER); + assert(connection->interface_name); connection_udp_data_t * data = connection->data; assert(data); - init_batch_buffers(&data->bb); - data->queue_len = 0; return 0; @@ -359,6 +343,10 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que connection_udp_data_t * data = connection->data; assert(data); + forwarder_t * forwarder = connection->forwarder; + msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + + /* Flush if required or if queue is full */ if ((!msgbuf) || (queue && (data->queue_len > MAX_MSG))) { /* Flush operation */ @@ -367,7 +355,38 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que #else int flags = 0; #endif /* WITH_ZEROCOPY */ - int n = sendmmsg(connection->fd, data->bb.msghdr, data->queue_len, flags); + + + // BEGIN : send batch + + /* Preparing the struct mmsghdr for batch sending */ + struct mmsghdr msghdr[MAX_MSG]; + struct iovec iovecs[MAX_MSG]; + + /* Prepare the mmghdr struct for recvmmsg */ + for (unsigned i = 0; i < data->queue_len; i++) { + struct mmsghdr *msg = &msghdr[i]; + *msg = (struct mmsghdr) { + .msg_hdr = { + .msg_iov = &iovecs[i], + .msg_iovlen = 1, + .msg_name = NULL, + .msg_namelen = 0, + .msg_control = NULL, + .msg_controllen = 0, + }, + }; + + msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, data->queue[i]); + iovecs[i] = (struct iovec) { + .iov_base = msgbuf_get_packet(msgbuf), + .iov_len = msgbuf_get_len(msgbuf), + }; + } + + // XXX build mmsghdr from the msgbuf queue + + int n = sendmmsg(connection->fd, msghdr, data->queue_len, flags); if (n == -1) { perror("sendmmsg()"); data->queue_len = 0; @@ -385,11 +404,10 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que } if (queue) { - struct iovec *iovec = &data->bb.iovecs[data->queue_len++]; - iovec->iov_base = msgbuf_get_packet(msgbuf); - iovec->iov_len = msgbuf_get_len(msgbuf); - + /* Queue packet */ + data->queue[data->queue_len++] = msgbuf_pool_get_id(msgbuf_pool, msgbuf); } else { + /* Send one */ ssize_t writeLength = write(connection->fd, msgbuf_get_packet(msgbuf), msgbuf_get_len(msgbuf)); @@ -454,7 +472,7 @@ static int connection_udp_send_packet(const connection_t * connection, const uint8_t * packet, size_t size) { - assert(ops); + assert(connection); assert(packet); if(connection_is_local(connection)) @@ -468,16 +486,13 @@ connection_udp_send_packet(const connection_t * connection, const uint8_t * pack return 0; } -static -void -connection_udp_read_callback(connection_t * connection, int fd, void * user_data) -{ - assert(connection); - assert(!user_data); +#define connection_udp_read_single listener_read_batch_socket listener_single_socket + +#ifdef __linux__ +#define connection_udp_read_batch listener_read_batch_socket +#else +#define connection_udp_read_batch NULL +#endif /* __linux__ */ - connection_udp_data_t * data = connection->data; - assert(data); - listener_batch_read_callback(connection->forwarder, NULL, fd, address_pair_get_local(&connection->pair), &data->bb); -} DECLARE_CONNECTION(udp); |