aboutsummaryrefslogtreecommitdiffstats
path: root/hicn-light/src/hicn/core/listener.c
diff options
context:
space:
mode:
Diffstat (limited to 'hicn-light/src/hicn/core/listener.c')
-rw-r--r--hicn-light/src/hicn/core/listener.c425
1 files changed, 425 insertions, 0 deletions
diff --git a/hicn-light/src/hicn/core/listener.c b/hicn-light/src/hicn/core/listener.c
new file mode 100644
index 000000000..d2f547863
--- /dev/null
+++ b/hicn-light/src/hicn/core/listener.c
@@ -0,0 +1,425 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file listener.c
+ * @brief Implementation of hICN listeners
+ */
+
+#include <string.h> // strdup
+
+#include <hicn/util/log.h>
+
+#include "forwarder.h"
+#include "listener_vft.h"
+#include "../io/base.h"
+
+listener_key_t listener_key_factory(address_t address, face_type_t type) {
+ listener_key_t key;
+ memset(&key, 0, sizeof(listener_key_t));
+
+ key.address = address;
+ key.type = type;
+ return key;
+}
+
+listener_t *listener_create(face_type_t type, const address_t *address,
+ const char *interface_name, const char *name,
+ forwarder_t *forwarder) {
+ listener_table_t *table = forwarder_get_listener_table(forwarder);
+
+ listener_key_t key = listener_key_factory(*address, type);
+
+ listener_t *listener = listener_table_allocate(table, &key, name);
+ unsigned listener_id =
+ (unsigned int)listener_table_get_listener_id(table, listener);
+
+ int ret = listener_initialize(listener, type, name, listener_id, address,
+ interface_name, forwarder);
+ if (ret < 0) {
+ listener_table_remove_by_id(table, listener_id);
+ listener_finalize(listener);
+ return NULL;
+ }
+
+ WITH_INFO({
+ char addr_str[NI_MAXHOST];
+ int port;
+ address_to_string(address, addr_str, &port);
+ INFO("LISTENER CREATE (%s) %p created for address %s:%d",
+ face_type_str(listener->type), listener, addr_str, port);
+ listener_table_print_by_key(table);
+ })
+
+ return listener;
+}
+
+int listener_initialize(listener_t *listener, face_type_t type,
+ const char *name, unsigned listener_id,
+ const address_t *address, const char *interface_name,
+ forwarder_t *forwarder) {
+ int rc;
+
+ assert(listener);
+ assert(forwarder);
+
+ *listener = (listener_t){
+ .id = listener_id,
+ .name = strdup(name),
+ .key = listener_key_factory(*address, type),
+ .interface_name = strdup(interface_name),
+ .family = address->as_ss.ss_family,
+ .fd = 0,
+ .forwarder = forwarder,
+ };
+
+ face_protocol_t face_protocol = get_protocol(listener->type);
+ if (face_protocol == FACE_PROTOCOL_UNKNOWN) goto ERR_VFT;
+
+ listener->data = malloc(listener_vft[face_protocol]->data_size);
+ if (!listener->data) goto ERR_DATA;
+
+ assert(listener_has_valid_type(listener));
+
+ rc = listener_vft[face_protocol]->initialize(listener);
+ if (rc < 0) goto ERR_VFT;
+
+ listener->fd = listener_vft[face_protocol]->get_socket(listener, address,
+ NULL, interface_name);
+ if (listener->fd < 0) {
+ char addr_str[NI_MAXHOST];
+ int port;
+ address_to_string(address, addr_str, &port);
+ ERROR("Error creating listener %s fd: (%d) %s", addr_str, errno,
+ strerror(errno));
+ goto ERR_FD;
+ }
+ assert(listener->fd > 0);
+
+ // XXX data should be pre-allocated here
+
+ loop_fd_event_create(&listener->event_data, MAIN_LOOP, listener->fd, listener,
+ (fd_callback_t)listener_read_callback,
+ CONNECTION_ID_UNDEFINED, NULL);
+
+ if (!listener->event_data) {
+ goto ERR_REGISTER_FD;
+ }
+
+ if (loop_fd_event_register(listener->event_data) < 0) {
+ goto ERR_REGISTER_FD;
+ }
+
+ return 0;
+
+ERR_REGISTER_FD:
+#ifndef _WIN32
+ close(listener->fd);
+#else
+ closesocket(listener->fd);
+#endif
+ERR_FD:
+ERR_VFT:
+ERR_DATA:
+ return -1;
+}
+
+int listener_finalize(listener_t *listener) {
+ assert(listener);
+ assert(listener_has_valid_type(listener));
+
+ if (listener->event_data) {
+ loop_event_unregister(listener->event_data);
+ loop_event_free(listener->event_data);
+ }
+
+ if (listener->fd != -1) {
+#ifndef _WIN32
+ close(listener->fd);
+#else
+ closesocket(listener->fd);
+#endif
+ }
+
+ listener_vft[get_protocol(listener->type)]->finalize(listener);
+
+ if (listener->data) free(listener->data);
+ listener->data = NULL;
+ if (listener->interface_name) free(listener->interface_name);
+ listener->interface_name = NULL;
+ if (listener->name) free(listener->name);
+ listener->name = NULL;
+
+ return 0;
+}
+
+int listener_get_socket(const listener_t *listener, const address_t *local,
+ const address_t *remote, const char *interface_name) {
+ assert(listener);
+ assert(listener_has_valid_type(listener));
+ assert(local);
+ // assert(remote); TODO: can it be null?
+
+ // DEBUG("[listener_get_socket]");
+
+ return listener_vft[get_protocol(listener->type)]->get_socket(
+ listener, local, remote, interface_name);
+}
+
+/*
+ * This is called from the forwarder to dynamially create new connections on the
+ * listener, in that case, name is NULL. It is also called from
+ * connection_create, which is itself called from the configuration part.
+ */
+unsigned listener_create_connection(listener_t *listener,
+ const char *connection_name,
+ const address_pair_t *pair) {
+ assert(listener);
+ assert(listener_has_valid_type(listener));
+ assert(pair);
+
+ /* initialized so that gcc-9 does not complain */
+ face_type_t connection_type = FACE_TYPE_UNDEFINED;
+ switch (listener->type) {
+ case FACE_TYPE_UDP_LISTENER:
+ connection_type = FACE_TYPE_UDP;
+ break;
+ case FACE_TYPE_TCP_LISTENER:
+ connection_type = FACE_TYPE_TCP;
+ break;
+ case FACE_TYPE_HICN:
+ case FACE_TYPE_HICN_LISTENER:
+ case FACE_TYPE_UDP:
+ case FACE_TYPE_TCP:
+ case FACE_TYPE_UNDEFINED:
+ case FACE_TYPE_N:
+ return CONNECTION_ID_UNDEFINED;
+ }
+
+#ifdef USE_CONNECTED_SOCKETS
+ /*
+ * We create a connected connection with its own fd, instead of returning
+ * the fd of the listener. This will allow to avoid specifying the
+ * destination address when sending packets, and will increase performance
+ * by avoiding a FIB lookup for each packet.
+ */
+ int fd = listener_get_socket(listener, address_pair_get_local(pair),
+ address_pair_get_remote(pair),
+ listener->interface_name);
+ if (fd < 0) return CONNECTION_ID_UNDEFINED;
+#else
+ int fd = 0; // means listener->fd;
+#endif
+ bool local = address_is_local(address_pair_get_local(pair));
+
+ connection_table_t *table =
+ forwarder_get_connection_table(listener->forwarder);
+ connection_t *connection =
+ connection_table_allocate(table, pair, connection_name);
+ unsigned connection_id =
+ (unsigned int)connection_table_get_connection_id(table, connection);
+
+ int rc = connection_initialize(connection, connection_type, connection_name,
+ listener->interface_name, fd, pair, local,
+ connection_id, listener);
+ if (rc < 0) {
+ connection_table_remove_by_id(table, connection_id);
+ connection_finalize(connection);
+ return CONNECTION_ID_UNDEFINED;
+ }
+
+ WITH_INFO({
+ char local_addr_str[NI_MAXHOST];
+ char remote_addr_str[NI_MAXHOST];
+ int local_port;
+ int remote_port;
+ address_to_string(&(pair->local), local_addr_str, &local_port);
+ address_to_string(&(pair->remote), remote_addr_str, &remote_port);
+ INFO("%s connection %p created for address pair %s:%d (local=%s) - %s:%d",
+ face_type_str(connection->type), connection, local_addr_str,
+ local_port, connection_is_local(connection) ? "true" : "false",
+ remote_addr_str, remote_port);
+ connection_table_print_by_pair(table);
+ })
+
+ forwarder_on_connection_event(listener->forwarder, connection,
+ CONNECTION_EVENT_CREATE);
+
+ return connection_id;
+}
+
+int listener_punt(const listener_t *listener, const char *prefix_s) {
+ assert(listener);
+ assert(listener_get_type(listener) == FACE_TYPE_HICN);
+ assert(prefix_s);
+
+ return listener_vft[get_protocol(listener->type)]->punt(listener, prefix_s);
+}
+
+ssize_t listener_read_single(listener_t *listener, int fd,
+ unsigned connection_id) {
+ assert(listener);
+
+ msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(listener->forwarder);
+
+ // Preapare the msgbuf
+ msgbuf_t *msgbuf = NULL;
+ off_t msgbuf_id = msgbuf_pool_get(msgbuf_pool, &msgbuf);
+ if (!msgbuf_id_is_valid(msgbuf_id)) return -1;
+
+ // Prepare the address pair
+ address_pair_t pair;
+ memset(&pair, 0, sizeof(address_pair_t));
+ pair.local = listener->address;
+
+ // Read message and populate the remote address
+ ssize_t n = listener_vft[get_protocol(listener->type)]->read_single(
+ fd, msgbuf, address_pair_get_remote(&pair));
+ if (n <= 0) {
+ msgbuf_pool_put(msgbuf_pool, msgbuf);
+ return -1;
+ }
+
+ msgbuf_pool_acquire(msgbuf);
+ msgbuf_set_connection_id(msgbuf, connection_id);
+
+ // Process received packet
+ size_t processed_bytes = forwarder_receive(listener->forwarder, listener,
+ msgbuf_id, &pair, ticks_now());
+ forwarder_log(listener->forwarder);
+ if (processed_bytes <= 0) ERROR("Unable to handle message");
+
+ /*
+ * The connection on which we went packets might do batching (even without
+ * sendmmsg), and we need to inform the system that we want to proceed to
+ * sending packets.
+ */
+ forwarder_flush_connections(listener->forwarder);
+ msgbuf_pool_release(msgbuf_pool, &msgbuf);
+ return processed_bytes;
+}
+
+ssize_t listener_read_batch(listener_t *listener, int fd,
+ unsigned connection_id) {
+ assert(listener);
+
+ size_t total_processed_bytes = 0;
+ ssize_t num_msg_received = 0;
+
+ forwarder_t *forwarder = listener->forwarder;
+ msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ forwarder_acquired_msgbuf_ids_reset(forwarder);
+
+ /* Receive messages in the loop as long as we manage to fill the buffers */
+ do {
+ /* Prepare the msgbuf and address pair arrays */
+ msgbuf_t *msgbufs[MAX_MSG];
+ if (msgbuf_pool_getn(msgbuf_pool, msgbufs, MAX_MSG) < 0) {
+ ERROR("Unable to get message buffers");
+ break;
+ }
+
+ address_pair_t pair[MAX_MSG];
+ address_t *address_remote[MAX_MSG];
+ memset(&pair, 0, MAX_MSG * sizeof(address_pair_t));
+
+ off_t msgbuf_ids[MAX_MSG];
+ for (unsigned i = 0; i < MAX_MSG; i++) {
+ // Copy the pointers to the remote addresses
+ address_remote[i] = address_pair_get_remote(&pair[i]);
+
+ // Populate local addresses
+ pair[i].local = listener->address;
+
+ // Do NOT rely on msgbuf pointers since a msgbuf pool rezise event may
+ // make them invalid, use msgbuf ids instead
+ msgbuf_ids[i] = msgbuf_pool_get_id(msgbuf_pool, msgbufs[i]);
+ }
+
+ // Read batch and populate remote addresses
+ num_msg_received = listener_vft[get_protocol(listener->type)]->read_batch(
+ fd, msgbufs, address_remote, MAX_MSG);
+
+ for (int i = 0; i < MAX_MSG; i++) {
+ // Release unused msg buffers
+ if (i >= num_msg_received) {
+ msgbuf_pool_put(msgbuf_pool, msgbufs[i]);
+ continue;
+ }
+
+ msgbuf_pool_acquire(msgbufs[i]);
+ msgbuf_set_connection_id(msgbufs[i], connection_id);
+ forwarder_acquired_msgbuf_ids_push(forwarder, msgbuf_ids[i]);
+ }
+
+ if (num_msg_received < 0) break;
+ TRACE("[listener_read_batch] batch size = %d", num_msg_received);
+
+ for (unsigned i = 0; i < num_msg_received; i++) {
+ size_t processed_bytes = forwarder_receive(
+ forwarder, listener, msgbuf_ids[i], &pair[i], ticks_now());
+ forwarder_log(listener->forwarder);
+
+ total_processed_bytes += processed_bytes;
+ }
+ } while (num_msg_received ==
+ MAX_MSG); /* backpressure based on queue size ? */
+
+ /*
+ * Signal to the forwarder that we reached the end of a batch and we need to
+ * flush connections out
+ */
+ forwarder_flush_connections(forwarder);
+
+ const off_t *acquired_msgbuf_ids =
+ forwarder_get_acquired_msgbuf_ids(forwarder);
+ for (int i = 0; i < vector_len(acquired_msgbuf_ids); i++) {
+ msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, acquired_msgbuf_ids[i]);
+ msgbuf_pool_release(msgbuf_pool, &msgbuf);
+ }
+
+ return total_processed_bytes;
+}
+
+/*
+ * This might be called for a connection on the listener too. The listener is
+ * the entity that owns the buffers used for reading.
+ */
+ssize_t listener_read_callback(listener_t *listener, int fd,
+ unsigned connection_id, void *user_data) {
+ assert(listener);
+ assert(!user_data);
+
+ /*
+ * As the listener callback is shared between the listener and the different
+ * connections created on top of it, the fd might be either of them.
+ */
+ // assert(fd == listener->fd);
+
+ if (listener_vft[get_protocol(listener->type)]->read_batch)
+ return listener_read_batch(listener, fd, connection_id);
+
+ return listener_read_single(listener, fd, connection_id);
+}
+
+void listener_setup_local(forwarder_t *forwarder, uint16_t port) {
+ address_t localhost_ipv4_addr = ADDRESS4_LOCALHOST(port);
+ listener_create(FACE_TYPE_UDP_LISTENER, &localhost_ipv4_addr, "lo", "lo_udp4",
+ forwarder);
+
+ address_t localhost_ipv6_addr = ADDRESS6_LOCALHOST(port);
+ listener_create(FACE_TYPE_UDP_LISTENER, &localhost_ipv6_addr, "lo", "lo_udp6",
+ forwarder);
+}