aboutsummaryrefslogtreecommitdiffstats
path: root/hicn-light/src/hicn/io/udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'hicn-light/src/hicn/io/udp.c')
-rw-r--r--hicn-light/src/hicn/io/udp.c565
1 files changed, 565 insertions, 0 deletions
diff --git a/hicn-light/src/hicn/io/udp.c b/hicn-light/src/hicn/io/udp.c
new file mode 100644
index 000000000..d881b4b01
--- /dev/null
+++ b/hicn-light/src/hicn/io/udp.c
@@ -0,0 +1,565 @@
+/*
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file udp.c
+ * #brief Implementation of the native UDP face leveraging the new packet format
+ * to avoid encapsulation.
+ *
+ */
+
+/* This has to be included early as other modules are including socket.h */
+
+#ifndef _WIN32
+#include <arpa/inet.h>
+#endif
+#include <errno.h>
+#include <fcntl.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#ifndef _WIN32
+#include <sys/uio.h>
+#include <unistd.h>
+#endif
+#include <sys/socket.h>
+
+#ifdef WITH_GSO
+#include <netinet/if_ether.h> // ETH_DATA_LEN
+#include <linux/ipv6.h> // struct ipv6hdr
+#include <netinet/udp.h> // struct udphdr, SOL_UDP
+//#include <linux/udp.h> // UDP_GRO
+#define UDP_GRO 104
+#endif /* WITH_GSO */
+
+#include <hicn/util/log.h>
+#include <hicn/util/sstrncpy.h>
+#include <hicn/util/ring.h>
+
+#include "base.h"
+#include "../core/address_pair.h"
+#include "../core/connection.h"
+#include "../core/connection_vft.h"
+#include "../core/forwarder.h"
+#include "../core/listener.h"
+#include "../core/listener_vft.h"
+#include "../core/msgbuf.h"
+
+#define BATCH_SOCKET_BUFFER 512 * 1024 /* 256k */
+
+/******************************************************************************
+ * Listener
+ ******************************************************************************/
+
+typedef struct {
+ uint16_t port; // in address ?
+} listener_udp_data_t;
+
+#ifdef __ANDROID__
+extern int bindSocket(int sock, const char *interface_name);
+#endif
+
+#if 0
+#include <arpa/inet.h>
+char *get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen) {
+ switch (sa->sa_family) {
+ case AF_INET:
+ inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), s, maxlen);
+ break;
+
+ case AF_INET6:
+ inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), s, maxlen);
+ break;
+
+ default:
+ strncpy(s, "Unknown AF", maxlen);
+ return NULL;
+ }
+
+ return s;
+}
+#endif
+
+/* socket options */
+int socket_set_options(int fd) {
+#ifndef _WIN32
+ // Set non-blocking flag
+ int flags = fcntl(fd, F_GETFL, NULL);
+ if (flags < 0) {
+ perror("fcntl");
+ return -1;
+ }
+
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
+ perror("fcntl");
+ return -1;
+ }
+#else
+ // Set non-blocking flag
+ int result = ioctlsocket(fd, FIONBIO, &(int){1});
+ if (result != NO_ERROR) {
+ perror("ioctlsocket");
+ return -1;
+ }
+#endif
+
+ // don't hang onto address after listener has closed
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &(int){1}, sizeof(int)) < 0) {
+ perror("setsockopt");
+ return -1;
+ }
+
+#ifdef WITH_GSO
+ int gso_size = ETH_DATA_LEN - sizeof(struct ipv6hdr) - sizeof(struct udphdr);
+ if (setsockopt(fd, SOL_UDP, UDP_SEGMENT, &gso_size, sizeof(gso_size)) < 0) {
+ perror("setsockopt");
+ return -1;
+ }
+#endif /* WITH_GSO */
+
+#ifdef WITH_GRO
+ if (setsockopt(fd, IPPROTO_UDP, UDP_GRO, &(int){1}, sizeof(int)) < 0) {
+ perror("setsockopt");
+ return -1;
+ }
+#endif /* WITH_GRO */
+
+#ifdef WITH_ZEROCOPY
+ if (setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &(int){1}, sizeof(int))) {
+ perror("setsockopt");
+ return -1;
+ }
+#endif /* WITH_ZEROCOPY */
+
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &(int){BATCH_SOCKET_BUFFER},
+ sizeof(int)) < 0) {
+ perror("setsockopt");
+ return -1;
+ }
+ if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &(int){BATCH_SOCKET_BUFFER},
+ sizeof(int)) < 0) {
+ perror("setsockopt");
+ return -1;
+ }
+ return 0;
+}
+
+#ifdef __linux__
+/* bind to device */
+int socket_bind_to_device(int fd, const char *interface_name) {
+ if (strncmp("lo", interface_name, 2) == 0) return 0;
+
+ if (setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, interface_name,
+ strnlen_s(interface_name, IFNAMSIZ)) < 0) {
+ perror("setsockopt");
+ goto ERR_BIND_TO_DEVICE;
+ }
+
+ return 0;
+
+ERR_BIND_TO_DEVICE:
+#ifdef __ANDROID__
+ if (bindSocket(fd, interface_name) < 0) return -1;
+ return 0;
+#else
+ return -1;
+#endif /* __ANDROID__ */
+}
+#endif /* __linux__ */
+
+static int listener_udp_initialize(listener_t *listener) {
+ assert(listener);
+
+#if 0
+ listener_udp_data_t * data = listener->data;
+ assert(data);
+#endif
+ return 0;
+}
+
+static void listener_udp_finalize(listener_t *listener) {
+ assert(listener);
+ assert(listener->type == FACE_TYPE_UDP_LISTENER);
+
+ return;
+}
+
+static int listener_udp_punt(const listener_t *listener, const char *prefix_s) {
+ return -1;
+}
+
+static int listener_udp_get_socket(const listener_t *listener,
+ const address_t *local,
+ const address_t *remote,
+ const char *interface_name) {
+ int fd = socket(address_family(local), SOCK_DGRAM, 0);
+ if (fd < 0) goto ERR_SOCKET;
+
+ if (socket_set_options(fd) < 0) {
+ goto ERR;
+ }
+
+ if (bind(fd, address_sa(local), address_socklen(local)) < 0) {
+ perror("bind");
+ goto ERR;
+ }
+
+#ifdef __linux__
+ if (socket_bind_to_device(fd, interface_name) < 0) {
+ goto ERR;
+ }
+#endif /* __linux__ */
+
+ // DEBUG("UDP remote ?");
+ if (remote) {
+ // DEBUG("UDP connected socket ");
+ if (connect(fd, address_sa(remote), address_socklen(remote)) < 0) {
+ perror("connect");
+ goto ERR;
+ }
+ }
+
+ return fd;
+
+ERR:
+#ifndef _WIN32
+ close(fd);
+#else
+ closesocket(fd);
+#endif
+ERR_SOCKET:
+ return -1;
+}
+
+#define listener_udp_read_single io_read_single_socket
+
+#ifdef __linux__
+#define listener_udp_read_batch io_read_batch_socket
+#else
+#define listener_udp_read_batch NULL
+#endif /* __linux__ */
+
+DECLARE_LISTENER(udp);
+
+/******************************************************************************
+ * Connection
+ ******************************************************************************/
+
+#define RING_LEN 5 * MAX_MSG
+
+typedef struct {
+#ifdef __linux__
+ /* Ring buffer */
+ off_t *ring;
+
+ struct mmsghdr msghdr[MAX_MSG];
+ struct iovec iovecs[MAX_MSG];
+#endif /* __linux__ */
+} connection_udp_data_t;
+
+static int connection_udp_initialize(connection_t *connection) {
+ assert(connection);
+ assert(connection->type == FACE_TYPE_UDP);
+ assert(connection->interface_name);
+
+#ifdef __linux__
+ connection_udp_data_t *data = connection->data;
+ assert(data);
+
+ ring_init(data->ring, RING_LEN);
+
+ void *name = NULL;
+ int namelen = 0;
+
+ /*
+ * If the connection does not use a connected socket, we need to set the
+ * different destination addresses
+ */
+ if (!connection->connected) {
+ const address_t *remote = connection_get_remote(connection);
+ name = address_sa(remote);
+ namelen = address_socklen(remote);
+ }
+
+ memset(data->msghdr, 0, MAX_MSG * sizeof(struct mmsghdr));
+ for (unsigned i = 0; i < MAX_MSG; i++) {
+ struct mmsghdr *msg = &data->msghdr[i];
+ *msg = (struct mmsghdr){
+ .msg_hdr =
+ {
+ .msg_iov = &data->iovecs[i],
+ .msg_iovlen = 1,
+ .msg_name = name,
+ .msg_namelen = namelen,
+#if 0
+ .msg_control = NULL,
+ .msg_controllen = 0,
+ .msg_flags = 0,
+#endif
+ },
+ };
+ }
+#endif /* __linux__ */
+
+ return 0;
+}
+
+static void connection_udp_finalize(connection_t *connection) {
+ assert(connection);
+ assert(connection->type == FACE_TYPE_UDP);
+#ifdef __linux__
+ connection_udp_data_t *data = connection->data;
+ assert(data);
+
+ ring_free(data->ring);
+#endif /* __linux__ */
+}
+
+static bool connection_udp_flush(connection_t *connection) {
+#ifdef __linux__
+ int retry = 0;
+ off_t msgbuf_id = 0;
+ unsigned cpt;
+ size_t i;
+ int n;
+
+ assert(connection);
+ forwarder_t *forwarder = listener_get_forwarder(connection->listener);
+ msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ connection_udp_data_t *data = connection->data;
+ assert(data);
+
+ TRACE("[connection_udp_send] Flushing connection queue");
+
+ /* Flush operation */
+#ifdef WITH_ZEROCOPY
+ int flags = MSG_ZEROCOPY;
+#else
+ int flags = 0;
+#endif /* WITH_ZEROCOPY */
+
+SEND:
+ /* Consume up to MSG_MSG packets in ring buffer */
+ cpt = 0;
+
+ ring_enumerate_n(data->ring, i, &msgbuf_id, MAX_MSG, {
+ msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+
+ // update path label
+ if (msgbuf_get_type(msgbuf) == HICN_PACKET_TYPE_DATA) {
+ msgbuf_update_pathlabel(msgbuf, connection_get_id(connection));
+
+ connection->stats.data.tx_pkts++;
+ connection->stats.data.tx_bytes += msgbuf_get_len(msgbuf);
+ } else {
+ connection->stats.interests.tx_pkts++;
+ connection->stats.interests.tx_bytes += msgbuf_get_len(msgbuf);
+ }
+
+ data->iovecs[i].iov_base = msgbuf_get_packet(msgbuf);
+ data->iovecs[i].iov_len = msgbuf_get_len(msgbuf);
+ cpt++;
+ });
+
+SENDMMSG:
+ n = sendmmsg(connection->fd, data->msghdr, cpt, flags);
+ if (n == -1) {
+ /* man(2)sendmmsg / BUGS
+ *
+ * If an error occurs after at least one message has been sent, the call
+ * succeeds, and returns the number of messages sent. The error code is
+ * lost. The caller can retry the transmission, starting at the first
+ * failed message, but there is no guarantee that, if an error is
+ * returned, it will be the same as the one that was lost on the previous
+ * call.
+ */
+ WARN("sendmmsg failed %s", strerror(errno));
+ if (retry < 1) {
+ retry++;
+ goto SENDMMSG;
+ }
+ return false;
+ }
+
+ ring_advance(data->ring, n);
+
+ if (n < cpt) {
+ WARN("Unknown error after sending n=%d packets...", n);
+ if (retry < 1) {
+ retry++;
+ goto SEND;
+ }
+ }
+
+ if (ring_get_size(data->ring) > 0) {
+ retry = 0;
+ goto SEND;
+ }
+#endif /* __linux__ */
+ return true;
+}
+
+/**
+ * @function metisUdpConnection_Send
+ * @abstract Non-destructive send of the message.
+ * @discussion
+ * sends a message to the peer.
+ *
+ * @param dummy is ignored. A udp connection has only one peer.
+ * @return <#return#>
+ */
+static bool connection_udp_send(connection_t *connection, msgbuf_t *msgbuf,
+ bool queue) {
+ assert(connection);
+ assert(msgbuf);
+
+#ifdef __linux__
+ connection_udp_data_t *data = connection->data;
+ assert(data);
+
+ forwarder_t *forwarder = listener_get_forwarder(connection->listener);
+ msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+
+ /* Queue packet ? */
+ if (queue) {
+ off_t msgbuf_id;
+ if (ring_is_full(data->ring)) connection_udp_flush(connection);
+
+ msgbuf_id = msgbuf_pool_get_id(msgbuf_pool, msgbuf);
+ ring_add(data->ring, &msgbuf_id);
+
+ } else {
+#endif /* __linux__ */
+ /* Send one */
+ // update the path label befor send the packet
+ if (msgbuf_get_type(msgbuf) == HICN_PACKET_TYPE_DATA) {
+ msgbuf_update_pathlabel(msgbuf, connection_get_id(connection));
+
+ connection->stats.data.tx_pkts++;
+ connection->stats.data.tx_bytes += msgbuf_get_len(msgbuf);
+ } else {
+ connection->stats.interests.tx_pkts++;
+ connection->stats.interests.tx_bytes += msgbuf_get_len(msgbuf);
+ }
+
+ ssize_t writeLength = write(connection->fd, msgbuf_get_packet(msgbuf),
+ msgbuf_get_len(msgbuf));
+
+ if (writeLength < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return false;
+ } else {
+ // this print is for debugging
+ printf("Incorrect write length %zd, expected %lu: (%d) %s\n",
+ writeLength, (long unsigned int)msgbuf_get_len(msgbuf), errno,
+ strerror(errno));
+ return false;
+ }
+ }
+#ifdef __linux__
+ }
+#endif /* __linux__ */
+
+ return true;
+}
+
+#if 0
+static
+bool
+connection_udp_sendv(const connection_t * connection, struct iovec * iov,
+ size_t size)
+{
+
+ assert(connetion);
+ assert(iov);
+
+ connection_udp_data_t * data = connection->data;
+ assert(data);
+
+#ifndef _WIN32
+ // Perform connect before to establish association between this peer and
+ // the remote peer. This is required to use writev.
+ // Connection association can be changed at any time.
+
+ if (writev(connection->fd, iov, (int)size) < 0)
+ return false;
+#else
+ WSABUF *buf = (WSABUF *) malloc(size * sizeof(WSABUF));
+
+ DWORD bytes_sent = 0;
+
+ for (int i = 0; i < size; i++) {
+ buf[i].buf = iov[i].iov_base;
+ buf[i].len = (ULONG)iov[i].iov_len;
+ }
+
+ int rc = WSASendTo(udp->fd, buf, size, &bytes_sent, 0,
+ (SOCKADDR *)address_sa(address_pair_remote(&udp->pair)),
+ address_socklen(address_pair_remote(&udp->pair)), NULL, NULL);
+ free(dataBuf);
+ if (rc == SOCKET_ERROR)
+ return false;
+#endif
+
+ return true;
+}
+#endif
+
+static bool connection_udp_send_packet(const connection_t *connection,
+ const uint8_t *packet, size_t size) {
+ assert(connection);
+ assert(packet);
+
+ // XXX: in case of two connections this functions may have wrong behaviour. We
+ // noticed that the packet is sent by the forwarder on the right socket (fd)
+ // but from tcpdump we see that the packet goes on the other connection. In
+ // other cases the packet is sent twice, first on the wrong socket and later
+ // on the right one, even if we log a single send from the forwarder. The
+ // same behaviour was observed with the write executed in the function
+ // connection_udp_send with the queue flag set to false. A workaround that
+ // seems to solve the problem is to use connection_udp_send with queue = true
+ // and force the flush of the connection if needed.
+
+ // TODO: commented otherwise unable to do tests locally
+ // if(connection_is_local(connection))
+ // return -1;
+
+#ifdef USE_CONNECTED_SOCKETS
+ ssize_t n = send(connection->fd, packet, size, 0);
+ if (n < 0) {
+ perror("sendto");
+ return false;
+ }
+#else
+ const address_t *remote = connection_get_remote(connection);
+ ssize_t n = sendto(connection->fd, packet, size, 0, address_sa(remote),
+ address_socklen(remote));
+ if (n < 0) return false;
+#endif
+
+ return true;
+}
+
+#define connection_udp_read_single \
+ listener_read_batch_socket listener_single_socket
+
+#ifdef __linux__
+#define connection_udp_read_batch listener_read_batch_socket
+#else
+#define connection_udp_read_batch NULL
+#endif /* __linux__ */
+
+DECLARE_CONNECTION(udp);