aboutsummaryrefslogtreecommitdiffstats
path: root/hicn-light/src/hicn/core/listener.c
diff options
context:
space:
mode:
Diffstat (limited to 'hicn-light/src/hicn/core/listener.c')
-rw-r--r--hicn-light/src/hicn/core/listener.c149
1 files changed, 101 insertions, 48 deletions
diff --git a/hicn-light/src/hicn/core/listener.c b/hicn-light/src/hicn/core/listener.c
index 5857c0c88..d24fafba0 100644
--- a/hicn-light/src/hicn/core/listener.c
+++ b/hicn-light/src/hicn/core/listener.c
@@ -19,14 +19,14 @@
*/
-
#include <string.h> // strdup
-#include <hicn/core/listener_vft.h>
-#include <hicn/base/loop.h>
-#include <hicn/core/forwarder.h>
#include <hicn/util/log.h>
-#include "listener.h"
+
+#include "forwarder.h"
+#include "listener_vft.h"
+#include "../base/loop.h"
+#include "../io/base.h"
listener_t *
listener_create(face_type_t type, const address_t * address,
@@ -89,7 +89,7 @@ listener_initialize(listener_t * listener, face_type_t type, const char * name,
// XXX data should be pre-allocated here
loop_fd_event_create(&listener->event_data, MAIN_LOOP, listener->fd, listener,
- (fd_callback_t)listener_vft[listener->type]->read_callback, NULL);
+ (fd_callback_t)listener_read_callback, NULL);
if (!listener->event_data) {
goto ERR_REGISTER_FD;
@@ -151,7 +151,8 @@ int listener_get_socket(const listener_t * listener, const address_t * local,
{
assert(listener);
assert(listener_has_valid_type(listener));
- // assert(pair);
+ assert(local);
+ assert(remote);
return listener_vft[listener->type]->get_socket(listener, local, remote,
interface_name);
@@ -201,63 +202,115 @@ listener_punt(const listener_t * listener, const char * prefix_s)
return listener_vft[listener_get_type(listener)]->punt(listener, prefix_s);
}
+
ssize_t
-listener_read_callback(forwarder_t * forwarder, listener_t * listener, int fd,
- address_t * local_addr, uint8_t * packet, size_t size)
+listener_read_single(listener_t * listener)
{
- // XXX TODO mutualize code across all listeners
- // some do not support batches
- //
- // XXX negative in case of error
- // 0 if we don't consume yet because we don't have enough
- // needed for TCP !!
- return size;
+ assert(listener);
+
+ size_t processed_size;
+ size_t total_size = 0;
+
+ msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(listener->forwarder);
+
+ for (;;) {
+
+ msgbuf_t * msgbuf = NULL;
+ off_t msgbuf_id = msgbuf_pool_get(msgbuf_pool, msgbuf);
+ if (!msgbuf_id_is_valid(msgbuf_id))
+ return 0;
+
+ address_pair_t pair;
+ pair.local = *listener_get_address(listener);
+
+ ssize_t n = listener_vft[listener->type]->read_single(listener->fd, msgbuf,
+ address_pair_get_remote(&pair));
+ if (n < 1)
+ return 0;
+
+ /* Process received packet */
+ processed_size = forwarder_receive(listener->forwarder, listener,
+ msgbuf_id, &pair, ticks_now());
+ if (processed_size <= 0)
+ break;
+
+ total_size += processed_size;
+ }
+
+ /*
+ * Even through the current listener does not allow batching, the connection
+ * on which we went packets might do batching (even without sendmmsg), and
+ * we need to inform the system that we want to proceed to sending packets.
+ */
+ forwarder_flush_connections(listener->forwarder);
+ return total_size;
+
}
-void
-listener_batch_read_callback(forwarder_t * forwarder, listener_t * listener,
- int fd, address_t * local_addr, batch_buffer_t * bb)
+ssize_t
+listener_read_batch(listener_t * listener)
{
- assert(bb);
+ assert(listener);
- // XXX potential improvement : receive in a loop while we have messages to
- // read
+ size_t processed_size;
+ size_t total_size = 0;
+
+ forwarder_t * forwarder = listener->forwarder;
+ msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ /* Receive messages in the loop as long as we manage to fill the buffers */
+ int r = 0;
+ do {
+ /* Prepare the msgbuf and address pair arrays */
+ msgbuf_t * msgbuf[MAX_MSG];
+ if (!msgbuf_pool_getn(msgbuf_pool, msgbuf, MAX_MSG))
+ break;
+
+ address_pair_t pair[MAX_MSG];
+ address_t * address_remote[MAX_MSG];
+ for (unsigned i = 0; i < MAX_MSG; i++)
+ address_remote[i] = address_pair_get_remote(&pair[i]);
+
+ ssize_t n = listener_vft[listener->type]->read_batch(listener->fd,
+ msgbuf, address_remote, MAX_MSG);
+ // XXX error check
+
+ for (unsigned i = 0; i < n; i++) {
+ processed_size = forwarder_receive(forwarder, listener,
+ msgbuf_pool_get_id(msgbuf_pool, msgbuf[i]),
+ &pair[i], ticks_now());
+ if (processed_size <= 0)
+ break;
+
+ total_size += processed_size;
+ }
- // XXX
- int r = recvmmsg(fd, bb->msghdr, MAX_MSG, 0, NULL);
- if (r == 0)
- return;
+ } while(r == MAX_MSG); /* backpressure based on queue size ? */
- if (r < 0) {
- if (errno == EINTR)
- return;
- perror("recv()");
- return;
- }
+ /*
+ * Signal to the forwarder that we reached the end of a batch and we need to
+ * flush connections out
+ */
+ forwarder_flush_connections(forwarder);
- for (int i = 0; i < r; i++) {
- struct mmsghdr *msg = &bb->msghdr[i];
- uint8_t * packet = msg->msg_hdr.msg_iov->iov_base;
- size_t size = msg->msg_hdr.msg_iovlen;
+ return total_size;
- /* BEGIN packet processing */
+}
-#ifdef __APPLE__
- // XXX explain
- msg->msg_hdr.msg_namelen = 0x00;
-#endif
+ssize_t
+listener_read_callback(listener_t * listener, int fd, void * user_data)
+{
+ // XXX make a single callback and arbitrate between read and readbatch
+ assert(listener);
+ assert(fd == listener->fd);
- /* Construct address pair used for connection lookup */
- address_pair_t pair;
- pair.local = *local_addr;
- pair.remote = *(address_t*)msg->msg_hdr.msg_name;
- // in the case of a connection, we should assert the remote
+ if (listener_vft[listener->type]->read_batch)
+ return listener_read_batch(listener);
- process_packet(forwarder, listener, packet, size, &pair);
- }
+ return listener_read_single(listener);
}
+
#if 0
void
_listener_callback(evutil_socket_t fd, short what, void * arg)