From 32dccec98e4c7d7e4ce902e19ba8d1b29b823758 Mon Sep 17 00:00:00 2001 From: Jordan Augé Date: Wed, 23 Sep 2020 17:50:52 +0200 Subject: [HICN-570] Message buffer (incl. CS and PIT changes) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I4c508e4b04dee3acbfc3da1d26e1770cb826f22b Signed-off-by: Jordan Augé --- hicn-light/src/hicn/core/listener.c | 149 ++++++++++++++++++++++++------------ 1 file changed, 101 insertions(+), 48 deletions(-) (limited to 'hicn-light/src/hicn/core/listener.c') diff --git a/hicn-light/src/hicn/core/listener.c b/hicn-light/src/hicn/core/listener.c index 5857c0c88..d24fafba0 100644 --- a/hicn-light/src/hicn/core/listener.c +++ b/hicn-light/src/hicn/core/listener.c @@ -19,14 +19,14 @@ */ - #include // strdup -#include -#include -#include #include -#include "listener.h" + +#include "forwarder.h" +#include "listener_vft.h" +#include "../base/loop.h" +#include "../io/base.h" listener_t * listener_create(face_type_t type, const address_t * address, @@ -89,7 +89,7 @@ listener_initialize(listener_t * listener, face_type_t type, const char * name, // XXX data should be pre-allocated here loop_fd_event_create(&listener->event_data, MAIN_LOOP, listener->fd, listener, - (fd_callback_t)listener_vft[listener->type]->read_callback, NULL); + (fd_callback_t)listener_read_callback, NULL); if (!listener->event_data) { goto ERR_REGISTER_FD; @@ -151,7 +151,8 @@ int listener_get_socket(const listener_t * listener, const address_t * local, { assert(listener); assert(listener_has_valid_type(listener)); - // assert(pair); + assert(local); + assert(remote); return listener_vft[listener->type]->get_socket(listener, local, remote, interface_name); @@ -201,63 +202,115 @@ listener_punt(const listener_t * listener, const char * prefix_s) return listener_vft[listener_get_type(listener)]->punt(listener, prefix_s); } + ssize_t -listener_read_callback(forwarder_t * forwarder, listener_t * listener, int fd, - address_t * local_addr, uint8_t * packet, size_t size) +listener_read_single(listener_t * listener) { - // XXX TODO mutualize code across all listeners - // some do not support batches - // - // XXX negative in case of error - // 0 if we don't consume yet because we don't have enough - // needed for TCP !! - return size; + assert(listener); + + size_t processed_size; + size_t total_size = 0; + + msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(listener->forwarder); + + for (;;) { + + msgbuf_t * msgbuf = NULL; + off_t msgbuf_id = msgbuf_pool_get(msgbuf_pool, msgbuf); + if (!msgbuf_id_is_valid(msgbuf_id)) + return 0; + + address_pair_t pair; + pair.local = *listener_get_address(listener); + + ssize_t n = listener_vft[listener->type]->read_single(listener->fd, msgbuf, + address_pair_get_remote(&pair)); + if (n < 1) + return 0; + + /* Process received packet */ + processed_size = forwarder_receive(listener->forwarder, listener, + msgbuf_id, &pair, ticks_now()); + if (processed_size <= 0) + break; + + total_size += processed_size; + } + + /* + * Even through the current listener does not allow batching, the connection + * on which we went packets might do batching (even without sendmmsg), and + * we need to inform the system that we want to proceed to sending packets. + */ + forwarder_flush_connections(listener->forwarder); + return total_size; + } -void -listener_batch_read_callback(forwarder_t * forwarder, listener_t * listener, - int fd, address_t * local_addr, batch_buffer_t * bb) +ssize_t +listener_read_batch(listener_t * listener) { - assert(bb); + assert(listener); - // XXX potential improvement : receive in a loop while we have messages to - // read + size_t processed_size; + size_t total_size = 0; + + forwarder_t * forwarder = listener->forwarder; + msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + /* Receive messages in the loop as long as we manage to fill the buffers */ + int r = 0; + do { + /* Prepare the msgbuf and address pair arrays */ + msgbuf_t * msgbuf[MAX_MSG]; + if (!msgbuf_pool_getn(msgbuf_pool, msgbuf, MAX_MSG)) + break; + + address_pair_t pair[MAX_MSG]; + address_t * address_remote[MAX_MSG]; + for (unsigned i = 0; i < MAX_MSG; i++) + address_remote[i] = address_pair_get_remote(&pair[i]); + + ssize_t n = listener_vft[listener->type]->read_batch(listener->fd, + msgbuf, address_remote, MAX_MSG); + // XXX error check + + for (unsigned i = 0; i < n; i++) { + processed_size = forwarder_receive(forwarder, listener, + msgbuf_pool_get_id(msgbuf_pool, msgbuf[i]), + &pair[i], ticks_now()); + if (processed_size <= 0) + break; + + total_size += processed_size; + } - // XXX - int r = recvmmsg(fd, bb->msghdr, MAX_MSG, 0, NULL); - if (r == 0) - return; + } while(r == MAX_MSG); /* backpressure based on queue size ? */ - if (r < 0) { - if (errno == EINTR) - return; - perror("recv()"); - return; - } + /* + * Signal to the forwarder that we reached the end of a batch and we need to + * flush connections out + */ + forwarder_flush_connections(forwarder); - for (int i = 0; i < r; i++) { - struct mmsghdr *msg = &bb->msghdr[i]; - uint8_t * packet = msg->msg_hdr.msg_iov->iov_base; - size_t size = msg->msg_hdr.msg_iovlen; + return total_size; - /* BEGIN packet processing */ +} -#ifdef __APPLE__ - // XXX explain - msg->msg_hdr.msg_namelen = 0x00; -#endif +ssize_t +listener_read_callback(listener_t * listener, int fd, void * user_data) +{ + // XXX make a single callback and arbitrate between read and readbatch + assert(listener); + assert(fd == listener->fd); - /* Construct address pair used for connection lookup */ - address_pair_t pair; - pair.local = *local_addr; - pair.remote = *(address_t*)msg->msg_hdr.msg_name; - // in the case of a connection, we should assert the remote + if (listener_vft[listener->type]->read_batch) + return listener_read_batch(listener); - process_packet(forwarder, listener, packet, size, &pair); - } + return listener_read_single(listener); } + #if 0 void _listener_callback(evutil_socket_t fd, short what, void * arg) -- cgit 1.2.3-korg