aboutsummaryrefslogtreecommitdiffstats
path: root/hicn-light/src/io/udpListener.c
diff options
context:
space:
mode:
Diffstat (limited to 'hicn-light/src/io/udpListener.c')
-rwxr-xr-xhicn-light/src/io/udpListener.c533
1 files changed, 533 insertions, 0 deletions
diff --git a/hicn-light/src/io/udpListener.c b/hicn-light/src/io/udpListener.c
new file mode 100755
index 000000000..31c0e673b
--- /dev/null
+++ b/hicn-light/src/io/udpListener.c
@@ -0,0 +1,533 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <src/config.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <src/core/messageHandler.h>
+
+#include <src/io/udpConnection.h>
+#include <src/io/udpListener.h>
+
+#include <parc/algol/parc_Memory.h>
+#include <parc/assert/parc_Assert.h>
+#include <src/core/connection.h>
+#include <src/core/forwarder.h>
+#include <src/core/messagePacketType.h>
+
+#ifdef WITH_MAPME
+#include <src/core/mapMe.h>
+#endif /* WITH_MAPME */
+
+#define IPv4 4
+#define IPv6 6
+
+struct udp_listener {
+ Forwarder *forwarder;
+ Logger *logger;
+
+ PARCEvent *udp_event;
+ SocketType udp_socket;
+ uint16_t port;
+
+ unsigned id;
+ Address *localAddress;
+};
+
+static void _destroy(ListenerOps **listenerOpsPtr);
+static unsigned _getInterfaceIndex(const ListenerOps *ops);
+static const Address *_getListenAddress(const ListenerOps *ops);
+static EncapType _getEncapType(const ListenerOps *ops);
+static int _getSocket(const ListenerOps *ops);
+
+static ListenerOps udpTemplate = {.context = NULL,
+ .destroy = &_destroy,
+ .getInterfaceIndex = &_getInterfaceIndex,
+ .getListenAddress = &_getListenAddress,
+ .getEncapType = &_getEncapType,
+ .getSocket = &_getSocket};
+
+static void _readcb(int fd, PARCEventType what, void *udpVoid);
+
+ListenerOps *udpListener_CreateInet6(Forwarder *forwarder,
+ struct sockaddr_in6 sin6) {
+ ListenerOps *ops = NULL;
+
+ UdpListener *udp = parcMemory_AllocateAndClear(sizeof(UdpListener));
+ parcAssertNotNull(udp, "parcMemory_AllocateAndClear(%zu) returned NULL",
+ sizeof(UdpListener));
+ udp->forwarder = forwarder;
+ udp->logger = logger_Acquire(forwarder_GetLogger(forwarder));
+ udp->localAddress = addressCreateFromInet6(&sin6);
+ udp->id = forwarder_GetNextConnectionId(forwarder);
+
+ udp->udp_socket = socket(AF_INET6, SOCK_DGRAM, 0);
+ parcAssertFalse(udp->udp_socket < 0, "Error opening UDP socket: (%d) %s",
+ errno, strerror(errno));
+
+ // Set non-blocking flag
+ int flags = fcntl(udp->udp_socket, F_GETFL, NULL);
+ parcAssertTrue(flags != -1,
+ "fcntl failed to obtain file descriptor flags (%d)", errno);
+ int failure = fcntl(udp->udp_socket, F_SETFL, flags | O_NONBLOCK);
+ parcAssertFalse(failure, "fcntl failed to set file descriptor flags (%d)",
+ errno);
+
+ int one = 1;
+ // don't hang onto address after listener has closed
+ failure = setsockopt(udp->udp_socket, SOL_SOCKET, SO_REUSEADDR, (void *)&one,
+ (socklen_t)sizeof(one));
+ parcAssertFalse(failure, "failed to set REUSEADDR on socket(%d)", errno);
+
+ failure = bind(udp->udp_socket, (struct sockaddr *)&sin6, sizeof(sin6));
+ if (failure == 0) {
+ udp->udp_event =
+ dispatcher_CreateNetworkEvent(forwarder_GetDispatcher(forwarder), true,
+ _readcb, (void *)udp, udp->udp_socket);
+ dispatcher_StartNetworkEvent(forwarder_GetDispatcher(forwarder),
+ udp->udp_event);
+
+ ops = parcMemory_AllocateAndClear(sizeof(ListenerOps));
+ parcAssertNotNull(ops, "parcMemory_AllocateAndClear(%zu) returned NULL",
+ sizeof(ListenerOps));
+ memcpy(ops, &udpTemplate, sizeof(ListenerOps));
+ ops->context = udp;
+
+ if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug)) {
+ char *str = addressToString(udp->localAddress);
+ logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__,
+ "UdpListener %p created for address %s", (void *)udp, str);
+ parcMemory_Deallocate((void **)&str);
+ }
+ } else {
+ if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Error)) {
+ int myerrno = errno;
+ char *str = addressToString(udp->localAddress);
+ logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Error, __func__,
+ "Error binding UDP socket to address %s: (%d) %s", str,
+ myerrno, strerror(myerrno));
+ parcMemory_Deallocate((void **)&str);
+ }
+
+ close(udp->udp_socket);
+ addressDestroy(&udp->localAddress);
+ logger_Release(&udp->logger);
+ parcMemory_Deallocate((void **)&udp);
+ }
+
+ return ops;
+}
+
+ListenerOps *udpListener_CreateInet(Forwarder *forwarder,
+ struct sockaddr_in sin) {
+ ListenerOps *ops = NULL;
+
+ UdpListener *udp = parcMemory_AllocateAndClear(sizeof(UdpListener));
+ parcAssertNotNull(udp, "parcMemory_AllocateAndClear(%zu) returned NULL",
+ sizeof(UdpListener));
+ udp->forwarder = forwarder;
+ udp->logger = logger_Acquire(forwarder_GetLogger(forwarder));
+ udp->localAddress = addressCreateFromInet(&sin);
+ udp->id = forwarder_GetNextConnectionId(forwarder);
+
+ udp->udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
+ parcAssertFalse(udp->udp_socket < 0, "Error opening UDP socket: (%d) %s",
+ errno, strerror(errno));
+
+ // Set non-blocking flag
+ int flags = fcntl(udp->udp_socket, F_GETFL, NULL);
+ parcAssertTrue(flags != -1,
+ "fcntl failed to obtain file descriptor flags (%d)", errno);
+ int failure = fcntl(udp->udp_socket, F_SETFL, flags | O_NONBLOCK);
+ parcAssertFalse(failure, "fcntl failed to set file descriptor flags (%d)",
+ errno);
+
+ int one = 1;
+ // don't hang onto address after listener has closed
+ failure = setsockopt(udp->udp_socket, SOL_SOCKET, SO_REUSEADDR, (void *)&one,
+ (socklen_t)sizeof(one));
+ parcAssertFalse(failure, "failed to set REUSEADDR on socket(%d)", errno);
+
+ failure = bind(udp->udp_socket, (struct sockaddr *)&sin, sizeof(sin));
+ if (failure == 0) {
+ udp->udp_event =
+ dispatcher_CreateNetworkEvent(forwarder_GetDispatcher(forwarder), true,
+ _readcb, (void *)udp, udp->udp_socket);
+ dispatcher_StartNetworkEvent(forwarder_GetDispatcher(forwarder),
+ udp->udp_event);
+
+ ops = parcMemory_AllocateAndClear(sizeof(ListenerOps));
+ parcAssertNotNull(ops, "parcMemory_AllocateAndClear(%zu) returned NULL",
+ sizeof(ListenerOps));
+ memcpy(ops, &udpTemplate, sizeof(ListenerOps));
+ ops->context = udp;
+
+ if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug)) {
+ char *str = addressToString(udp->localAddress);
+ logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__,
+ "UdpListener %p created for address %s", (void *)udp, str);
+ parcMemory_Deallocate((void **)&str);
+ }
+ } else {
+ if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Error)) {
+ int myerrno = errno;
+ char *str = addressToString(udp->localAddress);
+ logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Error, __func__,
+ "Error binding UDP socket to address %s: (%d) %s", str,
+ myerrno, strerror(myerrno));
+ parcMemory_Deallocate((void **)&str);
+ }
+
+ close(udp->udp_socket);
+ addressDestroy(&udp->localAddress);
+ logger_Release(&udp->logger);
+ parcMemory_Deallocate((void **)&udp);
+ }
+
+ return ops;
+}
+
+static void udpListener_Destroy(UdpListener **listenerPtr) {
+ parcAssertNotNull(listenerPtr, "Parameter must be non-null double pointer");
+ parcAssertNotNull(*listenerPtr,
+ "Parameter must derefernce to non-null pointer");
+
+ UdpListener *udp = *listenerPtr;
+
+ if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug)) {
+ logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__,
+ "UdpListener %p destroyed", (void *)udp);
+ }
+
+ close(udp->udp_socket);
+ addressDestroy(&udp->localAddress);
+ dispatcher_DestroyNetworkEvent(forwarder_GetDispatcher(udp->forwarder),
+ &udp->udp_event);
+ logger_Release(&udp->logger);
+ parcMemory_Deallocate((void **)&udp);
+ *listenerPtr = NULL;
+}
+
+static void _destroy(ListenerOps **listenerOpsPtr) {
+ ListenerOps *ops = *listenerOpsPtr;
+ UdpListener *udp = (UdpListener *)ops->context;
+ udpListener_Destroy(&udp);
+ parcMemory_Deallocate((void **)&ops);
+ *listenerOpsPtr = NULL;
+}
+
+static unsigned _getInterfaceIndex(const ListenerOps *ops) {
+ UdpListener *udp = (UdpListener *)ops->context;
+ return udp->id;
+}
+
+static const Address *_getListenAddress(const ListenerOps *ops) {
+ UdpListener *udp = (UdpListener *)ops->context;
+ return udp->localAddress;
+}
+
+static EncapType _getEncapType(const ListenerOps *ops) { return ENCAP_UDP; }
+
+static int _getSocket(const ListenerOps *ops) {
+ UdpListener *udp = (UdpListener *)ops->context;
+ return (int)udp->udp_socket;
+}
+
+// void
+// udpListener_SetPacketType(ListenerOps *ops, MessagePacketType type)
+//{
+// return;
+//}
+
+// =====================================================================
+
+/**
+ * @function peekMesageLength
+ * @abstract Peek at the next packet to learn its length by reading the fixed
+ * header
+ * @discussion
+ * <#Discussion#>
+ *
+ * @param <#param1#>
+ * @return <#return#>
+ */
+static size_t _peekMessageLength(UdpListener *udp, int fd,
+ struct sockaddr *peerIpAddress,
+ socklen_t *peerIpAddressLengthPtr) {
+ // to be fast I try to use just ipv6, this needs to be validated for ipv4
+
+ size_t packetLength = 0;
+
+ uint8_t fixedHeader[messageHandler_GetIPHeaderLength(IPv6)];
+
+ // peek at the UDP packet and read in the fixed header.
+ // Also returns the socket information for the remote peer
+
+ ssize_t res = recvfrom(
+ fd, fixedHeader, messageHandler_GetIPHeaderLength(IPv6), MSG_PEEK,
+ (struct sockaddr *)peerIpAddress, peerIpAddressLengthPtr);
+
+ if (res == messageHandler_GetIPHeaderLength(IPv6)) {
+ packetLength =
+ messageHandler_GetTotalPacketLength((const uint8_t *)&fixedHeader);
+ } else {
+ if (res < 0) {
+ printf("error while readin packet\n");
+ }
+ }
+
+ return packetLength;
+}
+
+/**
+ * @function _constructAddressPair
+ * @abstract Creates the address pair that uniquely identifies the connection
+ * @discussion
+ * The peerIpAddress must be of AF_INET or AF_INET6 family.
+ *
+ * @param <#param1#>
+ * @return Allocated MetisAddressPair, must be destroyed
+ */
+static AddressPair *_constructAddressPair(UdpListener *udp,
+ struct sockaddr *peerIpAddress,
+ socklen_t peerIpAddressLength) {
+ Address *remoteAddress;
+
+ switch (peerIpAddress->sa_family) {
+ case AF_INET:
+ remoteAddress =
+ addressCreateFromInet((struct sockaddr_in *)peerIpAddress);
+ break;
+
+ case AF_INET6:
+ remoteAddress =
+ addressCreateFromInet6((struct sockaddr_in6 *)peerIpAddress);
+ break;
+
+ default:
+ parcTrapIllegalValue(peerIpAddress,
+ "Peer address unrecognized family for IP: %d",
+ peerIpAddress->sa_family);
+ }
+
+ AddressPair *pair = addressPair_Create(udp->localAddress, remoteAddress);
+ addressDestroy(&remoteAddress);
+
+ return pair;
+}
+
+/**
+ * @function _lookupConnectionId
+ * @abstract Lookup a connection in the connection table
+ * @discussion
+ * Looks up the connection in the connection table and returns the connection
+ * id if it exists.
+ *
+ * @param outputConnectionIdPtr is the output parameter
+ * @return true if connection found and outputConnectionIdPtr set
+ */
+static bool _lookupConnectionId(UdpListener *udp, AddressPair *pair,
+ unsigned *outputConnectionIdPtr) {
+ ConnectionTable *connTable = forwarder_GetConnectionTable(udp->forwarder);
+
+ const Connection *conn = connectionTable_FindByAddressPair(connTable, pair);
+ if (conn) {
+ *outputConnectionIdPtr = connection_GetConnectionId(conn);
+ return true;
+ } else {
+ *outputConnectionIdPtr = 0;
+ return false;
+ }
+}
+
+/**
+ * @function _createNewConnection
+ * @abstract Creates a new Metis connection for the peer
+ * @discussion
+ * PRECONDITION: you know there's not an existing connection with the address
+ * pair
+ *
+ * Creates a new connection and adds it to the connection table.
+ *
+ * @param <#param1#>
+ * @return The connection id for the new connection
+ */
+
+static unsigned _createNewConnection(UdpListener *udp, int fd,
+ const AddressPair *pair) {
+ bool isLocal = false;
+
+ // metisUdpConnection_Create takes ownership of the pair
+ IoOperations *ops = udpConnection_Create(udp->forwarder, fd, pair, isLocal);
+ Connection *conn = connection_Create(ops);
+ // connection_AllowWldrAutoStart(conn);
+
+ connectionTable_Add(forwarder_GetConnectionTable(udp->forwarder), conn);
+ unsigned connid = ioOperations_GetConnectionId(ops);
+
+ return connid;
+}
+
+static void _handleProbeMessage(UdpListener *udp, uint8_t *msgBuffer) {
+ // TODO
+ parcMemory_Deallocate((void **)&msgBuffer);
+}
+
+static void _handleWldrNotification(UdpListener *udp, unsigned connId,
+ uint8_t *msgBuffer) {
+ const Connection *conn = connectionTable_FindById(
+ forwarder_GetConnectionTable(udp->forwarder), connId);
+ if (conn == NULL) {
+ return;
+ }
+
+ Message *message = message_CreateFromByteArray(
+ connId, msgBuffer, MessagePacketType_WldrNotification,
+ forwarder_GetTicks(udp->forwarder), forwarder_GetLogger(udp->forwarder));
+
+ connection_HandleWldrNotification((Connection *)conn, message);
+
+ message_Release(&message);
+}
+
+static Message *_readMessage(UdpListener *udp, int fd, size_t packetLength,
+ AddressPair *pair) {
+ uint8_t *msgBuffer = parcMemory_AllocateAndClear(packetLength);
+
+ ssize_t readLength = read(fd, msgBuffer, packetLength);
+
+ Message *message = NULL;
+
+ if (readLength < 0) {
+ printf("read failed %d: (%d) %s\n", fd, errno, strerror(errno));
+ return message;
+ }
+
+ unsigned connid = 0;
+ bool foundConnection = _lookupConnectionId(udp, pair, &connid);
+
+ if (readLength == packetLength) {
+ // we need to check if it is a valid packet
+ if (messageHandler_IsTCP(msgBuffer)) {
+ MessagePacketType pktType;
+
+ if (messageHandler_IsData(msgBuffer)) {
+ pktType = MessagePacketType_ContentObject;
+ if (!foundConnection) {
+ parcMemory_Deallocate((void **)&msgBuffer);
+ return message;
+ }
+ } else if (messageHandler_IsInterest(msgBuffer)) {
+ pktType = MessagePacketType_Interest;
+ if (!foundConnection) {
+ connid = _createNewConnection(udp, fd, pair);
+ }
+ } else {
+ printf("Got a packet that is not a data nor an interest, drop it!\n");
+ parcMemory_Deallocate((void **)&msgBuffer);
+ return message;
+ }
+
+ message = message_CreateFromByteArray(
+ connid, msgBuffer, pktType, forwarder_GetTicks(udp->forwarder),
+ forwarder_GetLogger(udp->forwarder));
+
+ if (message == NULL) {
+ parcMemory_Deallocate((void **)&msgBuffer);
+ }
+ } else if (messageHandler_IsWldrNotification(msgBuffer)) {
+ _handleWldrNotification(udp, connid, msgBuffer);
+ } else if (messageHandler_IsLoadBalancerProbe(msgBuffer)) {
+ _handleProbeMessage(udp, msgBuffer);
+ }
+#ifdef WITH_MAPME
+ else if (mapMe_isMapMe(msgBuffer)) {
+ forwarder_ProcessMapMe(udp->forwarder, msgBuffer, connid);
+ }
+#endif /* WITH_MAPME */
+ }
+
+ return message;
+}
+
+static void _receivePacket(UdpListener *udp, int fd, size_t packetLength,
+ struct sockaddr_storage *peerIpAddress,
+ socklen_t peerIpAddressLength) {
+ AddressPair *pair = _constructAddressPair(
+ udp, (struct sockaddr *)peerIpAddress, peerIpAddressLength);
+
+ Message *message = _readMessage(udp, fd, packetLength, pair);
+ addressPair_Release(&pair);
+
+ if (message) {
+ forwarder_Receive(udp->forwarder, message);
+ } else {
+ return;
+ }
+}
+
+static void _readFrameToDiscard(UdpListener *udp, int fd) {
+ // we need to discard the frame. Read 1 byte. This will clear it off the
+ // stack.
+ uint8_t buffer;
+ ssize_t nread = read(fd, &buffer, 1);
+
+ if (nread == 1) {
+ if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug)) {
+ logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__,
+ "Discarded frame from fd %d", fd);
+ }
+ } else if (nread < 0) {
+ if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Error)) {
+ logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Error, __func__,
+ "Error trying to discard frame from fd %d: (%d) %s", fd, errno,
+ strerror(errno));
+ }
+ }
+}
+
+static void _readcb(int fd, PARCEventType what, void *udpVoid) {
+ UdpListener *udp = (UdpListener *)udpVoid;
+
+ if (logger_IsLoggable(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug)) {
+ logger_Log(udp->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__,
+ "%s socket %d what %s%s%s%s data %p", __func__, fd,
+ (what & PARCEventType_Timeout) ? " timeout" : "",
+ (what & PARCEventType_Read) ? " read" : "",
+ (what & PARCEventType_Write) ? " write" : "",
+ (what & PARCEventType_Signal) ? " signal" : "", udpVoid);
+ }
+
+ if (what & PARCEventType_Read) {
+ struct sockaddr_storage peerIpAddress;
+ socklen_t peerIpAddressLength = sizeof(peerIpAddress);
+
+ size_t packetLength = _peekMessageLength(
+ udp, fd, (struct sockaddr *)&peerIpAddress, &peerIpAddressLength);
+
+ if (packetLength > 0) {
+ _receivePacket(udp, fd, packetLength, &peerIpAddress,
+ peerIpAddressLength);
+ } else {
+ _readFrameToDiscard(udp, fd);
+ }
+ }
+}