From 32dccec98e4c7d7e4ce902e19ba8d1b29b823758 Mon Sep 17 00:00:00 2001 From: Jordan Augé Date: Wed, 23 Sep 2020 17:50:52 +0200 Subject: [HICN-570] Message buffer (incl. CS and PIT changes) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I4c508e4b04dee3acbfc3da1d26e1770cb826f22b Signed-off-by: Jordan Augé --- hicn-light/src/hicn/io/udp.c | 113 ++++++++++++++++++++++++++----------------- 1 file changed, 69 insertions(+), 44 deletions(-) (limited to 'hicn-light/src/hicn/io/udp.c') diff --git a/hicn-light/src/hicn/io/udp.c b/hicn-light/src/hicn/io/udp.c index f0eaf8fd0..38d643838 100644 --- a/hicn-light/src/hicn/io/udp.c +++ b/hicn-light/src/hicn/io/udp.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #ifndef _WIN32 #include @@ -46,19 +47,20 @@ #define UDP_GRO 104 #endif /* WITH_GSO */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include +#include "base.h" +#include "../base/loop.h" +#include "../core/address_pair.h" +#include "../core/connection.h" +#include "../core/connection_vft.h" +#include "../core/forwarder.h" +#include "../core/listener.h" +#include "../core/listener_vft.h" +#include "../core/messageHandler.h" +#include "../core/msgbuf.h" +//#include "../hicn-light/config.h" + // Batching based on recvmmsg is also generic // the difference is the handling of packet as in tcp we need to go through the // ring buffer first to do the framing, while in UDP this is already done @@ -75,8 +77,6 @@ typedef struct { uint16_t port; // in address ? - - batch_buffer_t bb; } listener_udp_data_t; #ifdef __ANDROID__ @@ -211,10 +211,10 @@ listener_udp_initialize(listener_t * listener) { assert(listener); +#if 0 listener_udp_data_t * data = listener->data; assert(data); - - init_batch_buffers(&data->bb); +#endif // XXX Socket creation should be a function per-se and not be called in // initialize ! @@ -287,18 +287,13 @@ ERR_SOCKET: return -1; } -static -void -listener_udp_read_callback(listener_t * listener, int fd, void * user_data) -{ - assert(listener); - assert(!user_data); - - listener_udp_data_t * data = listener->data; - assert(data); +#define listener_udp_read_single io_read_single_socket - listener_batch_read_callback(listener->forwarder, listener, fd, &listener->address, &data->bb); -} +#ifdef __linux__ +#define listener_udp_read_batch io_read_batch_socket +#else +#define listener_udp_read_batch NULL +#endif /* __linux__ */ DECLARE_LISTENER(udp); @@ -307,7 +302,8 @@ DECLARE_LISTENER(udp); ******************************************************************************/ typedef struct { - batch_buffer_t bb; + // XXX queue storage : msfbuf id + off_t queue[MAX_MSG]; // sized according to the max batch int queue_len; } connection_udp_data_t; @@ -323,8 +319,6 @@ connection_udp_initialize(connection_t * connection) connection_udp_data_t * data = connection->data; assert(data); - init_batch_buffers(&data->bb); - data->queue_len = 0; return 0; @@ -358,6 +352,10 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que connection_udp_data_t * data = connection->data; assert(data); + forwarder_t * forwarder = connection->forwarder; + msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + + /* Flush if required or if queue is full */ if ((!msgbuf) || (queue && (data->queue_len > MAX_MSG))) { /* Flush operation */ @@ -366,7 +364,38 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que #else int flags = 0; #endif /* WITH_ZEROCOPY */ - int n = sendmmsg(connection->fd, data->bb.msghdr, data->queue_len, flags); + + + // BEGIN : send batch + + /* Preparing the struct mmsghdr for batch sending */ + struct mmsghdr msghdr[MAX_MSG]; + struct iovec iovecs[MAX_MSG]; + + /* Prepare the mmghdr struct for recvmmsg */ + for (unsigned i = 0; i < data->queue_len; i++) { + struct mmsghdr *msg = &msghdr[i]; + *msg = (struct mmsghdr) { + .msg_hdr = { + .msg_iov = &iovecs[i], + .msg_iovlen = 1, + .msg_name = NULL, + .msg_namelen = 0, + .msg_control = NULL, + .msg_controllen = 0, + }, + }; + + msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, data->queue[i]); + iovecs[i] = (struct iovec) { + .iov_base = msgbuf_get_packet(msgbuf), + .iov_len = msgbuf_get_len(msgbuf), + }; + } + + // XXX build mmsghdr from the msgbuf queue + + int n = sendmmsg(connection->fd, msghdr, data->queue_len, flags); if (n == -1) { perror("sendmmsg()"); data->queue_len = 0; @@ -384,11 +413,10 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que } if (queue) { - struct iovec *iovec = &data->bb.iovecs[data->queue_len++]; - iovec->iov_base = msgbuf_get_packet(msgbuf); - iovec->iov_len = msgbuf_get_len(msgbuf); - + /* Queue packet */ + data->queue[data->queue_len++] = msgbuf_pool_get_id(msgbuf_pool, msgbuf); } else { + /* Send one */ ssize_t writeLength = write(connection->fd, msgbuf_get_packet(msgbuf), msgbuf_get_len(msgbuf)); @@ -467,16 +495,13 @@ connection_udp_send_packet(const connection_t * connection, const uint8_t * pack return 0; } -static -void -connection_udp_read_callback(connection_t * connection, int fd, void * user_data) -{ - assert(connection); - assert(!user_data); +#define connection_udp_read_single listener_read_batch_socket listener_single_socket + +#ifdef __linux__ +#define connection_udp_read_batch listener_read_batch_socket +#else +#define connection_udp_read_batch NULL +#endif /* __linux__ */ - connection_udp_data_t * data = connection->data; - assert(data); - listener_batch_read_callback(connection->forwarder, NULL, fd, address_pair_get_local(&connection->pair), &data->bb); -} DECLARE_CONNECTION(udp); -- cgit 1.2.3-korg