diff options
author | Jordan Augé <jordan.auge+fdio@cisco.com> | 2020-09-23 17:50:52 +0200 |
---|---|---|
committer | Jordan Augé <jordan.auge+fdio@cisco.com> | 2020-09-28 18:25:30 +0200 |
commit | 32dccec98e4c7d7e4ce902e19ba8d1b29b823758 (patch) | |
tree | 95c5dec2083a3774c13bd5f896743cd6c5c42a7a /hicn-light/src/hicn/io | |
parent | 7356408ca1554468c9d7b9840aaaee28b4341c8d (diff) |
[HICN-570] Message buffer (incl. CS and PIT changes)
Change-Id: I4c508e4b04dee3acbfc3da1d26e1770cb826f22b
Signed-off-by: Jordan Augé <jordan.auge+fdio@cisco.com>
Diffstat (limited to 'hicn-light/src/hicn/io')
-rw-r--r-- | hicn-light/src/hicn/io/CMakeLists.txt | 1 | ||||
-rw-r--r-- | hicn-light/src/hicn/io/base.c | 128 | ||||
-rw-r--r-- | hicn-light/src/hicn/io/base.h | 38 | ||||
-rw-r--r-- | hicn-light/src/hicn/io/hicn.c | 95 | ||||
-rw-r--r-- | hicn-light/src/hicn/io/tcp.c | 42 | ||||
-rw-r--r-- | hicn-light/src/hicn/io/udp.c | 113 |
6 files changed, 272 insertions, 145 deletions
diff --git a/hicn-light/src/hicn/io/CMakeLists.txt b/hicn-light/src/hicn/io/CMakeLists.txt index d7e6977d6..80a1ba867 100644 --- a/hicn-light/src/hicn/io/CMakeLists.txt +++ b/hicn-light/src/hicn/io/CMakeLists.txt @@ -17,6 +17,7 @@ list(APPEND HEADER_FILES ) list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/base.c ${CMAKE_CURRENT_SOURCE_DIR}/hicn.c ${CMAKE_CURRENT_SOURCE_DIR}/tcp.c ${CMAKE_CURRENT_SOURCE_DIR}/udp.c diff --git a/hicn-light/src/hicn/io/base.c b/hicn-light/src/hicn/io/base.c new file mode 100644 index 000000000..35d8915a8 --- /dev/null +++ b/hicn-light/src/hicn/io/base.c @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file base.c + * #brief Implementation of base IO functions. + */ + +#include <hicn/util/log.h> + +#include "base.h" + +/** + * @brief Helper function for listener to read a single packet on a socket + */ +ssize_t io_read_single_fd(int fd, msgbuf_t * msgbuf, address_t * address) +{ + uint8_t * packet = msgbuf_get_packet(msgbuf); + size_t size = msgbuf_get_len(msgbuf); + + for(;;) { + ssize_t n = read(fd, packet, size); + if (n == 0) + return n; + if (n < 0) { + if (errno == EINTR) + continue; // XXX was break; + ERROR("read failed %d: (%d) %s", fd, errno, strerror(errno)); + return -1; + } + + msgbuf->length = n; + *address = ADDRESS_ANY(AF_UNSPEC, 0); // XXX placeholder, see hicn.c + } + + return 1; +} + +ssize_t io_read_single_socket(int fd, msgbuf_t * msgbuf, address_t * address) +{ + + struct sockaddr_storage * sa = (struct sockaddr_storage *)address; + socklen_t sa_len = sizeof(sa); + + uint8_t * packet = msgbuf_get_packet(msgbuf); + + for (;;) { + ssize_t n = recvfrom(fd, packet, MTU, 0, (struct sockaddr *)&sa, &sa_len); + if (n == 0) + return n; + if (n < 0) { + if (errno == EINTR) + continue; // XXX was break; + ERROR("recvfrom failed %d: (%d) %s", fd, errno, strerror(errno)); + return -1; + } + + msgbuf->length = n; + } + + return 1; +} + +#ifdef __linux__ +ssize_t io_read_batch_socket(int fd, msgbuf_t ** msgbuf, + address_t ** address, size_t batch_size) +{ + struct mmsghdr msghdr[batch_size]; + struct iovec iovecs[batch_size]; + struct sockaddr_storage addrs[batch_size]; + + /* Prepare the mmghdr struct for recvmmsg */ + for (unsigned i = 0; i < MAX_MSG; i++) { + struct mmsghdr *msg = &msghdr[i]; + *msg = (struct mmsghdr) { + .msg_hdr = { + .msg_iov = &iovecs[i], + .msg_iovlen = 1, + .msg_name = &addrs[i], + .msg_namelen = sizeof(struct sockaddr_storage), + .msg_control = NULL, + .msg_controllen = 0, + }, + }; + + iovecs[i] = (struct iovec) { + .iov_base = msgbuf_get_packet(msgbuf[i]), + .iov_len = MTU, + }; + } + + int n; + for (;;) { + n = recvmmsg(fd, msghdr, batch_size, /* flags */ 0, + /* timeout */ NULL); + if (n == 0) + return 0; + if (n < 0) { + if (errno == EINTR) + continue; // XXX was break; + ERROR("read failed %d: (%d) %s", fd, errno, strerror(errno)); + return (ssize_t) n; + } + + /* Assign size to msgbuf, and build address pair */ + for (int i = 0; i < n; i++) { + struct mmsghdr *msg = &msghdr[i]; + msgbuf[i]->length = msg->msg_hdr.msg_iovlen; + **address = *(address_t*)msg->msg_hdr.msg_name; + } + } + + return n; +} +#endif /* __linux__ */ + diff --git a/hicn-light/src/hicn/io/base.h b/hicn-light/src/hicn/io/base.h new file mode 100644 index 000000000..639334072 --- /dev/null +++ b/hicn-light/src/hicn/io/base.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file base.h + * #brief Base IO functions. + */ + +#ifndef HICNLIGHT_IO_BASE +#define HICNLIGHT_IO_BASE + +#include "../core/address_pair.h" +#include "../core/msgbuf.h" + +#define MAX_MSG 64 //16 //32 + +ssize_t io_read_single_fd(int fd, msgbuf_t * msgbuf, + address_t * address); + +ssize_t io_read_single_socket(int fd, msgbuf_t * msgbuf, + address_t * address); + +ssize_t io_read_batch_socket(int fd, msgbuf_t ** msgbuf, + address_t ** address, size_t n); + +#endif /* HICNLIGHT_IO_BASE */ diff --git a/hicn-light/src/hicn/io/hicn.c b/hicn-light/src/hicn/io/hicn.c index 26c641e51..02b6728a9 100644 --- a/hicn-light/src/hicn/io/hicn.c +++ b/hicn-light/src/hicn/io/hicn.c @@ -26,17 +26,18 @@ #include <string.h> #include <unistd.h> -#include <hicn/core/listener.h> -#include <hicn/core/listener_vft.h> -#include <hicn/core/connection.h> -#include <hicn/core/connection_vft.h> -#include <hicn/core/connection_table.h> -#include <hicn/core/forwarder.h> -#include <hicn/core/mapme.h> -#include <hicn/core/messagePacketType.h> -#include <hicn/socket/api.h> #include <hicn/util/log.h> +#include "base.h" +#include "../core/listener.h" +#include "../core/listener_vft.h" +#include "../core/connection.h" +#include "../core/connection_vft.h" +#include "../core/connection_table.h" +#include "../core/forwarder.h" +#include "../core/mapme.h" +#include "../socket/api.h" + #define IPv6 6 #define IPv4 4 #define MTU_SIZE 1500 // bytes @@ -242,65 +243,7 @@ typedef struct { } listener_hicn_data_t; -static -void -listener_hicn_read_callback(listener_t * listener, int fd, void * data) -{ - assert(listener); - assert(!data); /* No user data */ - uint8_t packet[MTU_SIZE]; - - int family = address_family(&listener->address); - if ((family != AF_INET) && (family != AF_INET6)) { - /* - * We need to discard the frame. Read 1 byte. This will clear it off - * the stack. - */ - int nread = read(fd, packet, 1); - - if (nread > 0) { - DEBUG("Discarded frame from fd %d", fd); - } else if (nread < 0) { - ERROR("Error trying to discard frame from fd %d: (%d) %s", fd, errno, - strerror(errno)); - } - return; - } - -#if 0 - if (!(what & PARCEventType_Read)) - return; -#endif - - ssize_t n = read(fd, packet, MTU_SIZE); - if (n < 0) { - ERROR("read failed %d: (%d) %s", fd, errno, strerror(errno)); - return; - } - -#if 0 - address_t packet_addr; - if (_createAddressFromPacket(packet, &packet_addr) < 0) - return; - - address_pair_t pair_find = { - .local = packet_addr, - .remote = /* dummy */ hicn->localAddress, - }; - const Connection *conn = _lookupConnection(listener, &pair_find); - if (!conn) { - address_pair_t pair = { - .local = hicn->localAddress, - .remote = packet_addr, - }; - connid = _createNewConnection(listener, fd, &pair); - } else { - connid = connection_GetConnectionId(conn); - } -#endif - - listener_read_callback(listener->forwarder, listener, fd, &listener->address, packet, n); -} +#define listener_hicn_read_callback listener_read_callback bool listener_hicn_bind(listener_t * listener, const address_t * address) @@ -426,7 +369,8 @@ listener_hicn_get_socket(const listener_t * listener, const address_t * local, { assert(listener); assert(listener_get_type(listener) == FACE_TYPE_HICN); - // assert(pair); + assert(local); + assert(remote); /* ... */ @@ -434,6 +378,9 @@ listener_hicn_get_socket(const listener_t * listener, const address_t * local, } +#define listener_hicn_read_single io_read_single_fd +#define listener_hicn_read_batch NULL + DECLARE_LISTENER(hicn); /****************************************************************************** @@ -503,7 +450,7 @@ static int connection_hicn_send_packet(const connection_t * connection, const uint8_t * packet, size_t size) { - // assert(ops); + assert(connection); assert(packet); /* ... */ @@ -511,12 +458,4 @@ connection_hicn_send_packet(const connection_t * connection, const uint8_t * pac return 0; } -static -void -connection_hicn_read_callback(connection_t * connection, int fd, void * data) -{ - ERROR("Unexpected read callback for hicn connection"); - return; -} - DECLARE_CONNECTION(hicn); diff --git a/hicn-light/src/hicn/io/tcp.c b/hicn-light/src/hicn/io/tcp.c index 0ed9b4650..e4e2b06af 100644 --- a/hicn-light/src/hicn/io/tcp.c +++ b/hicn-light/src/hicn/io/tcp.c @@ -27,19 +27,19 @@ #include <stdio.h> #include <string.h> -#include <hicn/core/connection.h> -#include <hicn/core/connection_vft.h> -#include <hicn/core/listener.h> -#include <hicn/core/listener_vft.h> -#include <hicn/core/msgbuf.h> -#include <hicn/core/forwarder.h> - -#include <hicn/core/messageHandler.h> - -#include <hicn/utils/commands.h> +#include <hicn/hicn.h> #include <hicn/util/log.h> -#include <hicn/hicn.h> +#include "base.h" +#include "../core/connection.h" +#include "../core/connection_vft.h" +#include "../core/listener.h" +#include "../core/listener_vft.h" +#include "../core/msgbuf.h" +#include "../core/forwarder.h" +#include "../core/messageHandler.h" +#include "../utils/commands.h" + // 128 KB output queue #define OUTPUT_QUEUE_BYTES (128 * 1024) @@ -93,13 +93,8 @@ listener_tcp_get_socket(const listener_t * listener, const address_t * local, } -static -void -listener_tcp_read_callback(listener_t * listener, int fd, void * data) -{ - ERROR("[listener_tcp_read_callback] Not implemented"); - -} +#define listener_tcp_read_single io_read_single_socket +#define listener_tcp_read_batch NULL DECLARE_LISTENER(tcp); @@ -324,11 +319,10 @@ connection_tcp_sendv(connnection_t * connection, struct iovec * iov, // XXX too much repeated code with sendv here static int -connection_tcp_send(const connection_t * connection, //const address_t * address, - msgbuf_t * msgbuf, bool queue) +connection_tcp_send(const connection_t * connection, msgbuf_t * msgbuf, + bool queue) { assert(connection); - // assert(address); /* msgbuf can be NULL */ /* No need to flush */ @@ -459,12 +453,13 @@ ERR: * @param <#param1#> * @return <#return#> */ +#if 0 static void connection_tcp_read_callback(connection_t * connection, int fd, void * user_data) { - // assert(!!(what & PARCEventType_Read)); assert(connection); + /* user_data can be NULL */ connection_tcp_data_t * data = connection->data; assert(RECV_BUFLEN - data->woff > MTU); @@ -487,7 +482,7 @@ connection_tcp_read_callback(connection_t * connection, int fd, void * user_data uint8_t * packet = data->buf + data->roff; size_t size = data->woff - data->roff; /* > 0 */ - ssize_t used = listener_read_callback(connection->forwarder, NULL, fd, + ssize_t used = listener_read_callback(NULL, fd, address_pair_get_local(&connection->pair), packet, size); if (used < 0) return; // XXX close connection ? @@ -517,6 +512,7 @@ connection_tcp_read_callback(connection_t * connection, int fd, void * user_data return; } +#endif #if 0 static diff --git a/hicn-light/src/hicn/io/udp.c b/hicn-light/src/hicn/io/udp.c index f0eaf8fd0..38d643838 100644 --- a/hicn-light/src/hicn/io/udp.c +++ b/hicn-light/src/hicn/io/udp.c @@ -31,6 +31,7 @@ #include <fcntl.h> #include <stdbool.h> #include <stdio.h> +#include <stdlib.h> #include <string.h> #ifndef _WIN32 #include <sys/uio.h> @@ -46,19 +47,20 @@ #define UDP_GRO 104 #endif /* WITH_GSO */ -#include <hicn/core/address_pair.h> -#include <hicn/core/connection.h> -#include <hicn/core/connection_vft.h> -#include <hicn/core/listener.h> -#include <hicn/core/listener_vft.h> -#include <hicn/base/loop.h> -#include <hicn/core/msgbuf.h> -#include <hicn/core/forwarder.h> -#include <hicn/core/messageHandler.h> -#include <hicn/core/messagePacketType.h> -#include <hicn/hicn-light/config.h> #include <hicn/util/log.h> +#include "base.h" +#include "../base/loop.h" +#include "../core/address_pair.h" +#include "../core/connection.h" +#include "../core/connection_vft.h" +#include "../core/forwarder.h" +#include "../core/listener.h" +#include "../core/listener_vft.h" +#include "../core/messageHandler.h" +#include "../core/msgbuf.h" +//#include "../hicn-light/config.h" + // Batching based on recvmmsg is also generic // the difference is the handling of packet as in tcp we need to go through the // ring buffer first to do the framing, while in UDP this is already done @@ -75,8 +77,6 @@ typedef struct { uint16_t port; // in address ? - - batch_buffer_t bb; } listener_udp_data_t; #ifdef __ANDROID__ @@ -211,10 +211,10 @@ listener_udp_initialize(listener_t * listener) { assert(listener); +#if 0 listener_udp_data_t * data = listener->data; assert(data); - - init_batch_buffers(&data->bb); +#endif // XXX Socket creation should be a function per-se and not be called in // initialize ! @@ -287,18 +287,13 @@ ERR_SOCKET: return -1; } -static -void -listener_udp_read_callback(listener_t * listener, int fd, void * user_data) -{ - assert(listener); - assert(!user_data); - - listener_udp_data_t * data = listener->data; - assert(data); +#define listener_udp_read_single io_read_single_socket - listener_batch_read_callback(listener->forwarder, listener, fd, &listener->address, &data->bb); -} +#ifdef __linux__ +#define listener_udp_read_batch io_read_batch_socket +#else +#define listener_udp_read_batch NULL +#endif /* __linux__ */ DECLARE_LISTENER(udp); @@ -307,7 +302,8 @@ DECLARE_LISTENER(udp); ******************************************************************************/ typedef struct { - batch_buffer_t bb; + // XXX queue storage : msfbuf id + off_t queue[MAX_MSG]; // sized according to the max batch int queue_len; } connection_udp_data_t; @@ -323,8 +319,6 @@ connection_udp_initialize(connection_t * connection) connection_udp_data_t * data = connection->data; assert(data); - init_batch_buffers(&data->bb); - data->queue_len = 0; return 0; @@ -358,6 +352,10 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que connection_udp_data_t * data = connection->data; assert(data); + forwarder_t * forwarder = connection->forwarder; + msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + + /* Flush if required or if queue is full */ if ((!msgbuf) || (queue && (data->queue_len > MAX_MSG))) { /* Flush operation */ @@ -366,7 +364,38 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que #else int flags = 0; #endif /* WITH_ZEROCOPY */ - int n = sendmmsg(connection->fd, data->bb.msghdr, data->queue_len, flags); + + + // BEGIN : send batch + + /* Preparing the struct mmsghdr for batch sending */ + struct mmsghdr msghdr[MAX_MSG]; + struct iovec iovecs[MAX_MSG]; + + /* Prepare the mmghdr struct for recvmmsg */ + for (unsigned i = 0; i < data->queue_len; i++) { + struct mmsghdr *msg = &msghdr[i]; + *msg = (struct mmsghdr) { + .msg_hdr = { + .msg_iov = &iovecs[i], + .msg_iovlen = 1, + .msg_name = NULL, + .msg_namelen = 0, + .msg_control = NULL, + .msg_controllen = 0, + }, + }; + + msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, data->queue[i]); + iovecs[i] = (struct iovec) { + .iov_base = msgbuf_get_packet(msgbuf), + .iov_len = msgbuf_get_len(msgbuf), + }; + } + + // XXX build mmsghdr from the msgbuf queue + + int n = sendmmsg(connection->fd, msghdr, data->queue_len, flags); if (n == -1) { perror("sendmmsg()"); data->queue_len = 0; @@ -384,11 +413,10 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que } if (queue) { - struct iovec *iovec = &data->bb.iovecs[data->queue_len++]; - iovec->iov_base = msgbuf_get_packet(msgbuf); - iovec->iov_len = msgbuf_get_len(msgbuf); - + /* Queue packet */ + data->queue[data->queue_len++] = msgbuf_pool_get_id(msgbuf_pool, msgbuf); } else { + /* Send one */ ssize_t writeLength = write(connection->fd, msgbuf_get_packet(msgbuf), msgbuf_get_len(msgbuf)); @@ -467,16 +495,13 @@ connection_udp_send_packet(const connection_t * connection, const uint8_t * pack return 0; } -static -void -connection_udp_read_callback(connection_t * connection, int fd, void * user_data) -{ - assert(connection); - assert(!user_data); +#define connection_udp_read_single listener_read_batch_socket listener_single_socket + +#ifdef __linux__ +#define connection_udp_read_batch listener_read_batch_socket +#else +#define connection_udp_read_batch NULL +#endif /* __linux__ */ - connection_udp_data_t * data = connection->data; - assert(data); - listener_batch_read_callback(connection->forwarder, NULL, fd, address_pair_get_local(&connection->pair), &data->bb); -} DECLARE_CONNECTION(udp); |