diff options
Diffstat (limited to 'hicn-light/src/hicn/core/listener.c')
-rw-r--r-- | hicn-light/src/hicn/core/listener.c | 220 |
1 files changed, 141 insertions, 79 deletions
diff --git a/hicn-light/src/hicn/core/listener.c b/hicn-light/src/hicn/core/listener.c index 0ab73b1f4..01f62bb39 100644 --- a/hicn-light/src/hicn/core/listener.c +++ b/hicn-light/src/hicn/core/listener.c @@ -19,14 +19,14 @@ */ - #include <string.h> // strdup -#include <hicn/core/listener_vft.h> -#include <hicn/base/loop.h> -#include <hicn/core/forwarder.h> #include <hicn/util/log.h> -#include "listener.h" + +#include "forwarder.h" +#include "listener_vft.h" +#include "../base/loop.h" +#include "../io/base.h" listener_t * listener_create(face_type_t type, const address_t * address, @@ -39,7 +39,10 @@ listener_create(face_type_t type, const address_t * address, .type = type, .address = *address, }; - listener_table_allocate(table, listener, &key, name); + listener_table_allocate(table, listener, &key, strdup(name)); + WITH_DEBUG( + listener_table_print(table); + ) unsigned listener_id = listener_table_get_listener_id(table, listener); @@ -63,23 +66,27 @@ listener_initialize(listener_t * listener, face_type_t type, const char * name, .type = type, .interface_name = strdup(interface_name), //.interface_index = , - //.family = , + .family = address->ss_family, .fd = 0, .address = *address, .forwarder = forwarder, }; - listener->data = malloc(listener_vft[listener->type]->data_size); + face_protocol_t face_protocol = get_protocol(listener->type); + if (face_protocol == FACE_PROTOCOL_UNKNOWN) + goto ERR_VFT; + + listener->data = malloc(listener_vft[face_protocol]->data_size); if (!listener->data) goto ERR_DATA; assert(listener_has_valid_type(listener)); - rc = listener_vft[listener->type]->initialize(listener); + rc = listener_vft[face_protocol]->initialize(listener); if (rc < 0) goto ERR_VFT; - listener->fd = listener_vft[listener->type]->get_socket(listener, address, NULL, interface_name); + listener->fd = listener_vft[face_protocol]->get_socket(listener, address, NULL, interface_name); if (listener->fd < 0) { ERROR("Error creating listener fd: (%d) %s", errno, strerror(errno)); goto ERR_FD; @@ -88,16 +95,21 @@ listener_initialize(listener_t * listener, face_type_t type, const char * name, // XXX data should be pre-allocated here - if (loop_register_fd(MAIN_LOOP, listener->fd, listener, - (fd_callback_t)listener_vft[listener->type]->read_callback, NULL) < 0) + loop_fd_event_create(&listener->event_data, MAIN_LOOP, listener->fd, listener, + (fd_callback_t)listener_read_callback, NULL); + + if (!listener->event_data) { goto ERR_REGISTER_FD; + } - // XXX TODO - //char *str = addressToString(listener->local_addr); - DEBUG("%s UdpListener %p created for address %s", - face_type_str(listener->type), listener, "N/A"); - //free(str); + if (loop_fd_event_register(listener->event_data) < 0) { + goto ERR_REGISTER_FD; + } + char addr_str[INET6_ADDRSTRLEN]; + address_to_string(address, addr_str); + DEBUG("%s UdpListener %p created for address %s", + face_type_str(listener->type), listener, addr_str); return 0; ERR_REGISTER_FD: @@ -121,7 +133,7 @@ listener_finalize(listener_t * listener) assert(listener); assert(listener_has_valid_type(listener)); - loop_unregister_fd(MAIN_LOOP, listener->fd); + loop_event_unregister(listener->event_data); #ifndef _WIN32 close(listener->fd); @@ -129,11 +141,12 @@ listener_finalize(listener_t * listener) closesocket(listener->fd); #endif - listener_vft[listener->type]->finalize(listener); + listener_vft[get_protocol(listener->type)]->finalize(listener); free(listener->data); free(listener->interface_name); free(listener->name); + loop_event_free(listener->event_data); return 0; } @@ -143,13 +156,13 @@ int listener_get_socket(const listener_t * listener, const address_t * local, { assert(listener); assert(listener_has_valid_type(listener)); - assert(pair); + assert(local); + // assert(remote); TODO: can it be null? - return listener_vft[listener->type]->get_socket(listener, local, remote, + return listener_vft[get_protocol(listener->type)]->get_socket(listener, local, remote, interface_name); } -// XXX CHANGE : we now get the fd directly from the listener unsigned listener_create_connection(const listener_t * listener, const address_pair_t * pair) { @@ -158,22 +171,17 @@ unsigned listener_create_connection(const listener_t * listener, assert(pair); // XXX TODO This code is likely common with connection creation code - const char * name = NULL; connection_table_t * table = forwarder_get_connection_table(listener->forwarder); connection_t * connection; - connection_table_allocate(table, connection, pair, name); + connection_table_allocate(table, connection, pair, listener->name); unsigned connid = connection_table_get_connection_id(table, connection); bool local = address_is_local(address_pair_get_local(pair)); - int fd = listener_get_socket(listener, address_pair_get_local(pair), - address_pair_get_remote(pair), NULL); // XXX interfacename was not specified - - // XXX here we use the same interface name as the listener - int rc = connection_initialize(connection, listener->type, name, - listener->interface_name, fd, pair, local, connid, listener->forwarder); + int rc = connection_initialize(connection, listener->type, listener->name, + listener->interface_name, listener->fd, pair, local, connid, listener->forwarder); if (rc < 0) return ~0; // XXX how to return an error @@ -190,66 +198,121 @@ listener_punt(const listener_t * listener, const char * prefix_s) assert(listener_get_type(listener) == FACE_TYPE_HICN); assert(prefix_s); - return listener_vft[listener_get_type(listener)]->punt(listener, prefix_s); + return listener_vft[get_protocol(listener->type)]->punt(listener, prefix_s); } + ssize_t -listener_read_callback(forwarder_t * forwarder, listener_t * listener, int fd, - address_t * local_addr, uint8_t * packet, size_t size) +listener_read_single(listener_t * listener) { - // XXX TODO mutualize code across all listeners - // some do not support batches - // - // XXX negative in case of error - // 0 if we don't consume yet because we don't have enough - // needed for TCP !! - return size; + assert(listener); + + size_t processed_size; + size_t total_size = 0; + + msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(listener->forwarder); + + for (;;) { + + msgbuf_t * msgbuf = NULL; + off_t msgbuf_id = msgbuf_pool_get(msgbuf_pool, &msgbuf); + if (!msgbuf_id_is_valid(msgbuf_id)) + return 0; + + address_pair_t pair; + pair.local = *listener_get_address(listener); + + ssize_t n = listener_vft[get_protocol(listener->type)]->read_single(listener->fd, msgbuf, + address_pair_get_remote(&pair)); + if (n < 1) + return 0; + + /* Process received packet */ + processed_size = forwarder_receive(listener->forwarder, listener, + msgbuf_id, &pair, ticks_now()); + if (processed_size <= 0) + break; + + total_size += processed_size; + } + + /* + * Even through the current listener does not allow batching, the connection + * on which we went packets might do batching (even without sendmmsg), and + * we need to inform the system that we want to proceed to sending packets. + */ + forwarder_flush_connections(listener->forwarder); + return total_size; + } -void -listener_batch_read_callback(forwarder_t * forwarder, listener_t * listener, - int fd, address_t * local_addr, batch_buffer_t * bb) +ssize_t +listener_read_batch(listener_t * listener) { - assert(bb); + assert(listener); - // XXX potential improvement : receive in a loop while we have messages to - // read + size_t processed_size; + size_t total_size = 0; + + forwarder_t * forwarder = listener->forwarder; + msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + /* Receive messages in the loop as long as we manage to fill the buffers */ + int r = 0; + do { + /* Prepare the msgbuf and address pair arrays */ + msgbuf_t * msgbuf[MAX_MSG]; + if (msgbuf_pool_getn(msgbuf_pool, msgbuf, MAX_MSG) < 0) + break; + + address_pair_t pair[MAX_MSG]; + address_t * address_remote[MAX_MSG]; + for (unsigned i = 0; i < MAX_MSG; i++) + address_remote[i] = address_pair_get_remote(&pair[i]); + + ssize_t n = listener_vft[get_protocol(listener->type)]->read_batch(listener->fd, + msgbuf, address_remote, MAX_MSG); + // XXX error check + + for (unsigned i = 0; i < n; i++) { + processed_size = forwarder_receive(forwarder, listener, + msgbuf_pool_get_id(msgbuf_pool, msgbuf[i]), + &pair[i], ticks_now()); + if (processed_size <= 0) + break; + + total_size += processed_size; + } - // XXX - int r = recvmmsg(fd, bb->msghdr, MAX_MSG, 0, NULL); - if (r == 0) - return; + // TODO: free only if not used by cs or pit + for (unsigned i = 0; i < MAX_MSG; i++) + msgbuf_pool_put(msgbuf_pool, msgbuf[i]); + } while(r == MAX_MSG); /* backpressure based on queue size ? */ - if (r < 0) { - if (errno == EINTR) - return; - perror("recv()"); - return; - } + /* + * Signal to the forwarder that we reached the end of a batch and we need to + * flush connections out + */ + forwarder_flush_connections(forwarder); - for (int i = 0; i < r; i++) { - struct mmsghdr *msg = &bb->msghdr[i]; - uint8_t * packet = msg->msg_hdr.msg_iov->iov_base; - size_t size = msg->msg_hdr.msg_iovlen; + return total_size; - /* BEGIN packet processing */ +} -#ifdef __APPLE__ - // XXX explain - msg->msg_hdr.msg_namelen = 0x00; -#endif +ssize_t +listener_read_callback(listener_t * listener, int fd, void * user_data) +{ + // XXX make a single callback and arbitrate between read and readbatch + assert(listener); + assert(fd == listener->fd); - /* Construct address pair used for connection lookup */ - address_pair_t pair; - pair.local = *local_addr; - pair.remote = *(address_t*)msg->msg_hdr.msg_name; - // in the case of a connection, we should assert the remote + if (listener_vft[get_protocol(listener->type)]->read_batch) + return listener_read_batch(listener); - process_packet(forwarder, listener, packet, size, &pair); - } + return listener_read_single(listener); } + #if 0 void _listener_callback(evutil_socket_t fd, short what, void * arg) @@ -314,13 +377,12 @@ listener_setup_all(const forwarder_t * forwarder, uint16_t port, const char *loc // XXX TODO void -listener_setup_local_ipv4(const forwarder_t * forwarder, uint16_t port) +listener_setup_local_ipv4(forwarder_t * forwarder, uint16_t port) { -#if 0 - // XXX memset - address_t address = ADDRESS4_LOCALHOST(port); + address_t address; + memset(&address, 0, sizeof(address_t)); + address = ADDRESS4_LOCALHOST(port); - _setupUdpListener(forwarder, "lo_udp", &address, "lo"); - _setupTcpListener(forwarder, "lo_tcp", &address, "lo"); -#endif + listener_create(FACE_TYPE_UDP_LISTENER, &address, "lo", "lo_udp", forwarder); + // listener_create(FACE_TYPE_TCP_LISTENER, &address, "lo", "lo_tcp", forwarder); } |