aboutsummaryrefslogtreecommitdiffstats
path: root/hicn-light/src/hicn/core/listener.c
diff options
context:
space:
mode:
Diffstat (limited to 'hicn-light/src/hicn/core/listener.c')
-rw-r--r--hicn-light/src/hicn/core/listener.c220
1 files changed, 141 insertions, 79 deletions
diff --git a/hicn-light/src/hicn/core/listener.c b/hicn-light/src/hicn/core/listener.c
index 0ab73b1f4..01f62bb39 100644
--- a/hicn-light/src/hicn/core/listener.c
+++ b/hicn-light/src/hicn/core/listener.c
@@ -19,14 +19,14 @@
*/
-
#include <string.h> // strdup
-#include <hicn/core/listener_vft.h>
-#include <hicn/base/loop.h>
-#include <hicn/core/forwarder.h>
#include <hicn/util/log.h>
-#include "listener.h"
+
+#include "forwarder.h"
+#include "listener_vft.h"
+#include "../base/loop.h"
+#include "../io/base.h"
listener_t *
listener_create(face_type_t type, const address_t * address,
@@ -39,7 +39,10 @@ listener_create(face_type_t type, const address_t * address,
.type = type,
.address = *address,
};
- listener_table_allocate(table, listener, &key, name);
+ listener_table_allocate(table, listener, &key, strdup(name));
+ WITH_DEBUG(
+ listener_table_print(table);
+ )
unsigned listener_id = listener_table_get_listener_id(table, listener);
@@ -63,23 +66,27 @@ listener_initialize(listener_t * listener, face_type_t type, const char * name,
.type = type,
.interface_name = strdup(interface_name),
//.interface_index = ,
- //.family = ,
+ .family = address->ss_family,
.fd = 0,
.address = *address,
.forwarder = forwarder,
};
- listener->data = malloc(listener_vft[listener->type]->data_size);
+ face_protocol_t face_protocol = get_protocol(listener->type);
+ if (face_protocol == FACE_PROTOCOL_UNKNOWN)
+ goto ERR_VFT;
+
+ listener->data = malloc(listener_vft[face_protocol]->data_size);
if (!listener->data)
goto ERR_DATA;
assert(listener_has_valid_type(listener));
- rc = listener_vft[listener->type]->initialize(listener);
+ rc = listener_vft[face_protocol]->initialize(listener);
if (rc < 0)
goto ERR_VFT;
- listener->fd = listener_vft[listener->type]->get_socket(listener, address, NULL, interface_name);
+ listener->fd = listener_vft[face_protocol]->get_socket(listener, address, NULL, interface_name);
if (listener->fd < 0) {
ERROR("Error creating listener fd: (%d) %s", errno, strerror(errno));
goto ERR_FD;
@@ -88,16 +95,21 @@ listener_initialize(listener_t * listener, face_type_t type, const char * name,
// XXX data should be pre-allocated here
- if (loop_register_fd(MAIN_LOOP, listener->fd, listener,
- (fd_callback_t)listener_vft[listener->type]->read_callback, NULL) < 0)
+ loop_fd_event_create(&listener->event_data, MAIN_LOOP, listener->fd, listener,
+ (fd_callback_t)listener_read_callback, NULL);
+
+ if (!listener->event_data) {
goto ERR_REGISTER_FD;
+ }
- // XXX TODO
- //char *str = addressToString(listener->local_addr);
- DEBUG("%s UdpListener %p created for address %s",
- face_type_str(listener->type), listener, "N/A");
- //free(str);
+ if (loop_fd_event_register(listener->event_data) < 0) {
+ goto ERR_REGISTER_FD;
+ }
+ char addr_str[INET6_ADDRSTRLEN];
+ address_to_string(address, addr_str);
+ DEBUG("%s UdpListener %p created for address %s",
+ face_type_str(listener->type), listener, addr_str);
return 0;
ERR_REGISTER_FD:
@@ -121,7 +133,7 @@ listener_finalize(listener_t * listener)
assert(listener);
assert(listener_has_valid_type(listener));
- loop_unregister_fd(MAIN_LOOP, listener->fd);
+ loop_event_unregister(listener->event_data);
#ifndef _WIN32
close(listener->fd);
@@ -129,11 +141,12 @@ listener_finalize(listener_t * listener)
closesocket(listener->fd);
#endif
- listener_vft[listener->type]->finalize(listener);
+ listener_vft[get_protocol(listener->type)]->finalize(listener);
free(listener->data);
free(listener->interface_name);
free(listener->name);
+ loop_event_free(listener->event_data);
return 0;
}
@@ -143,13 +156,13 @@ int listener_get_socket(const listener_t * listener, const address_t * local,
{
assert(listener);
assert(listener_has_valid_type(listener));
- assert(pair);
+ assert(local);
+ // assert(remote); TODO: can it be null?
- return listener_vft[listener->type]->get_socket(listener, local, remote,
+ return listener_vft[get_protocol(listener->type)]->get_socket(listener, local, remote,
interface_name);
}
-// XXX CHANGE : we now get the fd directly from the listener
unsigned listener_create_connection(const listener_t * listener,
const address_pair_t * pair)
{
@@ -158,22 +171,17 @@ unsigned listener_create_connection(const listener_t * listener,
assert(pair);
// XXX TODO This code is likely common with connection creation code
- const char * name = NULL;
connection_table_t * table = forwarder_get_connection_table(listener->forwarder);
connection_t * connection;
- connection_table_allocate(table, connection, pair, name);
+ connection_table_allocate(table, connection, pair, listener->name);
unsigned connid = connection_table_get_connection_id(table, connection);
bool local = address_is_local(address_pair_get_local(pair));
- int fd = listener_get_socket(listener, address_pair_get_local(pair),
- address_pair_get_remote(pair), NULL); // XXX interfacename was not specified
-
- // XXX here we use the same interface name as the listener
- int rc = connection_initialize(connection, listener->type, name,
- listener->interface_name, fd, pair, local, connid, listener->forwarder);
+ int rc = connection_initialize(connection, listener->type, listener->name,
+ listener->interface_name, listener->fd, pair, local, connid, listener->forwarder);
if (rc < 0)
return ~0; // XXX how to return an error
@@ -190,66 +198,121 @@ listener_punt(const listener_t * listener, const char * prefix_s)
assert(listener_get_type(listener) == FACE_TYPE_HICN);
assert(prefix_s);
- return listener_vft[listener_get_type(listener)]->punt(listener, prefix_s);
+ return listener_vft[get_protocol(listener->type)]->punt(listener, prefix_s);
}
+
ssize_t
-listener_read_callback(forwarder_t * forwarder, listener_t * listener, int fd,
- address_t * local_addr, uint8_t * packet, size_t size)
+listener_read_single(listener_t * listener)
{
- // XXX TODO mutualize code across all listeners
- // some do not support batches
- //
- // XXX negative in case of error
- // 0 if we don't consume yet because we don't have enough
- // needed for TCP !!
- return size;
+ assert(listener);
+
+ size_t processed_size;
+ size_t total_size = 0;
+
+ msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(listener->forwarder);
+
+ for (;;) {
+
+ msgbuf_t * msgbuf = NULL;
+ off_t msgbuf_id = msgbuf_pool_get(msgbuf_pool, &msgbuf);
+ if (!msgbuf_id_is_valid(msgbuf_id))
+ return 0;
+
+ address_pair_t pair;
+ pair.local = *listener_get_address(listener);
+
+ ssize_t n = listener_vft[get_protocol(listener->type)]->read_single(listener->fd, msgbuf,
+ address_pair_get_remote(&pair));
+ if (n < 1)
+ return 0;
+
+ /* Process received packet */
+ processed_size = forwarder_receive(listener->forwarder, listener,
+ msgbuf_id, &pair, ticks_now());
+ if (processed_size <= 0)
+ break;
+
+ total_size += processed_size;
+ }
+
+ /*
+ * Even through the current listener does not allow batching, the connection
+ * on which we went packets might do batching (even without sendmmsg), and
+ * we need to inform the system that we want to proceed to sending packets.
+ */
+ forwarder_flush_connections(listener->forwarder);
+ return total_size;
+
}
-void
-listener_batch_read_callback(forwarder_t * forwarder, listener_t * listener,
- int fd, address_t * local_addr, batch_buffer_t * bb)
+ssize_t
+listener_read_batch(listener_t * listener)
{
- assert(bb);
+ assert(listener);
- // XXX potential improvement : receive in a loop while we have messages to
- // read
+ size_t processed_size;
+ size_t total_size = 0;
+
+ forwarder_t * forwarder = listener->forwarder;
+ msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ /* Receive messages in the loop as long as we manage to fill the buffers */
+ int r = 0;
+ do {
+ /* Prepare the msgbuf and address pair arrays */
+ msgbuf_t * msgbuf[MAX_MSG];
+ if (msgbuf_pool_getn(msgbuf_pool, msgbuf, MAX_MSG) < 0)
+ break;
+
+ address_pair_t pair[MAX_MSG];
+ address_t * address_remote[MAX_MSG];
+ for (unsigned i = 0; i < MAX_MSG; i++)
+ address_remote[i] = address_pair_get_remote(&pair[i]);
+
+ ssize_t n = listener_vft[get_protocol(listener->type)]->read_batch(listener->fd,
+ msgbuf, address_remote, MAX_MSG);
+ // XXX error check
+
+ for (unsigned i = 0; i < n; i++) {
+ processed_size = forwarder_receive(forwarder, listener,
+ msgbuf_pool_get_id(msgbuf_pool, msgbuf[i]),
+ &pair[i], ticks_now());
+ if (processed_size <= 0)
+ break;
+
+ total_size += processed_size;
+ }
- // XXX
- int r = recvmmsg(fd, bb->msghdr, MAX_MSG, 0, NULL);
- if (r == 0)
- return;
+ // TODO: free only if not used by cs or pit
+ for (unsigned i = 0; i < MAX_MSG; i++)
+ msgbuf_pool_put(msgbuf_pool, msgbuf[i]);
+ } while(r == MAX_MSG); /* backpressure based on queue size ? */
- if (r < 0) {
- if (errno == EINTR)
- return;
- perror("recv()");
- return;
- }
+ /*
+ * Signal to the forwarder that we reached the end of a batch and we need to
+ * flush connections out
+ */
+ forwarder_flush_connections(forwarder);
- for (int i = 0; i < r; i++) {
- struct mmsghdr *msg = &bb->msghdr[i];
- uint8_t * packet = msg->msg_hdr.msg_iov->iov_base;
- size_t size = msg->msg_hdr.msg_iovlen;
+ return total_size;
- /* BEGIN packet processing */
+}
-#ifdef __APPLE__
- // XXX explain
- msg->msg_hdr.msg_namelen = 0x00;
-#endif
+ssize_t
+listener_read_callback(listener_t * listener, int fd, void * user_data)
+{
+ // XXX make a single callback and arbitrate between read and readbatch
+ assert(listener);
+ assert(fd == listener->fd);
- /* Construct address pair used for connection lookup */
- address_pair_t pair;
- pair.local = *local_addr;
- pair.remote = *(address_t*)msg->msg_hdr.msg_name;
- // in the case of a connection, we should assert the remote
+ if (listener_vft[get_protocol(listener->type)]->read_batch)
+ return listener_read_batch(listener);
- process_packet(forwarder, listener, packet, size, &pair);
- }
+ return listener_read_single(listener);
}
+
#if 0
void
_listener_callback(evutil_socket_t fd, short what, void * arg)
@@ -314,13 +377,12 @@ listener_setup_all(const forwarder_t * forwarder, uint16_t port, const char *loc
// XXX TODO
void
-listener_setup_local_ipv4(const forwarder_t * forwarder, uint16_t port)
+listener_setup_local_ipv4(forwarder_t * forwarder, uint16_t port)
{
-#if 0
- // XXX memset
- address_t address = ADDRESS4_LOCALHOST(port);
+ address_t address;
+ memset(&address, 0, sizeof(address_t));
+ address = ADDRESS4_LOCALHOST(port);
- _setupUdpListener(forwarder, "lo_udp", &address, "lo");
- _setupTcpListener(forwarder, "lo_tcp", &address, "lo");
-#endif
+ listener_create(FACE_TYPE_UDP_LISTENER, &address, "lo", "lo_udp", forwarder);
+ // listener_create(FACE_TYPE_TCP_LISTENER, &address, "lo", "lo_tcp", forwarder);
}