summaryrefslogtreecommitdiffstats
path: root/hicn-light/src/core/dispatcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'hicn-light/src/core/dispatcher.c')
-rwxr-xr-xhicn-light/src/core/dispatcher.c435
1 files changed, 435 insertions, 0 deletions
diff --git a/hicn-light/src/core/dispatcher.c b/hicn-light/src/core/dispatcher.c
new file mode 100755
index 000000000..078087c59
--- /dev/null
+++ b/hicn-light/src/core/dispatcher.c
@@ -0,0 +1,435 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @header dispatcher.c
+ * @abstract Event dispatcher for hicn-light. Uses parcEvent
+ * @discussion
+ * Wraps the functions we use in parcEvent, along with StreamBuffer and
+ * Message. The dispatcher is the event loop, so it manages things like signals,
+ * timers, and network events.
+ */
+
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <signal.h>
+#include <src/config.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+
+#include <arpa/inet.h>
+#include <sys/socket.h>
+
+#include <parc/algol/parc_EventQueue.h>
+#include <parc/algol/parc_EventTimer.h>
+
+#include <parc/assert/parc_Assert.h>
+
+#include <src/core/dispatcher.h>
+
+#ifndef INPORT_ANY
+#define INPORT_ANY 0
+#endif
+
+struct dispatcher {
+ PARCEventScheduler *Base;
+ Logger *logger;
+};
+
+// ==========================================
+// Public API
+
+PARCEventScheduler *dispatcher_GetEventScheduler(Dispatcher *dispatcher) {
+ return dispatcher->Base;
+}
+
+Dispatcher *dispatcher_Create(Logger *logger) {
+ Dispatcher *dispatcher = parcMemory_AllocateAndClear(sizeof(Dispatcher));
+ parcAssertNotNull(dispatcher,
+ "parcMemory_AllocateAndClear(%zu) returned NULL",
+ sizeof(Dispatcher));
+
+ dispatcher->Base = parcEventScheduler_Create();
+ dispatcher->logger = logger_Acquire(logger);
+
+ parcAssertNotNull(dispatcher->Base,
+ "Got NULL from parcEventScheduler_Create()");
+
+ return dispatcher;
+}
+
+void dispatcher_Destroy(Dispatcher **dispatcherPtr) {
+ parcAssertNotNull(dispatcherPtr, "Parameter must be non-null double pointer");
+ parcAssertNotNull(*dispatcherPtr,
+ "Parameter must dereference to non-null pointer");
+ Dispatcher *dispatcher = *dispatcherPtr;
+
+ logger_Release(&dispatcher->logger);
+ parcEventScheduler_Destroy(&(dispatcher->Base));
+ parcMemory_Deallocate((void **)&dispatcher);
+ *dispatcherPtr = NULL;
+}
+
+void dispatcher_Stop(Dispatcher *dispatcher) {
+ struct timeval delay = {0, 1000};
+
+ parcEventScheduler_Stop(dispatcher->Base, &delay);
+}
+
+void dispatcher_Run(Dispatcher *dispatcher) {
+ parcAssertNotNull(dispatcher, "Parameter must be non-null");
+
+ parcEventScheduler_Start(dispatcher->Base, 0);
+}
+
+void dispatcher_RunDuration(Dispatcher *dispatcher, struct timeval *duration) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(duration, "Parameter duration must be non-null");
+
+ parcEventScheduler_Stop(dispatcher->Base, duration);
+ parcEventScheduler_Start(dispatcher->Base, 0);
+}
+
+void dispatcher_RunCount(Dispatcher *dispatcher, unsigned count) {
+ parcAssertNotNull(dispatcher, "Parameter must be non-null");
+
+ for (unsigned i = 0; i < count; i++) {
+ parcEventScheduler_Start(dispatcher->Base,
+ PARCEventSchedulerDispatchType_LoopOnce);
+ }
+}
+
+PARCEventSocket *dispatcher_CreateListener(Dispatcher *dispatcher,
+ PARCEventSocket_Callback *callback,
+ void *user_data, int backlog,
+ const struct sockaddr *sa,
+ int socklen) {
+ PARCEventSocket *listener = parcEventSocket_Create(
+ dispatcher->Base, callback, NULL, user_data, sa, socklen);
+ if (listener == NULL) {
+ perror("Problem creating listener");
+ }
+ return listener;
+}
+
+void dispatcher_DestroyListener(Dispatcher *dispatcher,
+ PARCEventSocket **listenerPtr) {
+ parcAssertNotNull(listenerPtr, "Parameter must be non-null double pointer");
+ parcAssertNotNull(*listenerPtr,
+ "Parameter must dereference to non-null pointer");
+ parcEventSocket_Destroy(listenerPtr);
+}
+
+PARCEventQueue *dispatcher_CreateStreamBufferFromSocket(Dispatcher *dispatcher,
+ SocketType fd) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ PARCEventQueue *buffer = parcEventQueue_Create(
+ dispatcher->Base, fd,
+ PARCEventQueueOption_CloseOnFree | PARCEventQueueOption_DeferCallbacks);
+ parcAssertNotNull(buffer,
+ "Got null from parcEventBufver_Create for socket %d", fd);
+ return buffer;
+}
+
+PARCEventTimer *dispatcher_CreateTimer(Dispatcher *dispatcher, bool isPeriodic,
+ PARCEvent_Callback *callback,
+ void *userData) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(callback, "Parameter callback must be non-null");
+
+ PARCEventType flags = 0;
+ if (isPeriodic) {
+ flags |= PARCEventType_Persist;
+ }
+ PARCEventTimer *event =
+ parcEventTimer_Create(dispatcher->Base, flags, callback, userData);
+ return event;
+}
+
+void dispatcher_StartTimer(Dispatcher *dispatcher, PARCEventTimer *timerEvent,
+ struct timeval *delay) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(timerEvent, "Parameter timerEvent must be non-null");
+ int failure = parcEventTimer_Start(timerEvent, delay);
+ parcAssertFalse(failure < 0, "Error starting timer event %p: (%d) %s",
+ (void *)timerEvent, errno, strerror(errno));
+}
+
+void dispatcher_StopTimer(Dispatcher *dispatcher, PARCEventTimer *event) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(event, "Parameter event must be non-null");
+
+ int failure = parcEventTimer_Stop(event);
+ parcAssertFalse(failure < 0, "Error stopping signal event %p: (%d) %s",
+ (void *)event, errno, strerror(errno));
+}
+
+void dispatcher_DestroyTimerEvent(Dispatcher *dispatcher,
+ PARCEventTimer **eventPtr) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(eventPtr,
+ "Parameter eventPtr must be non-null double pointer");
+ parcAssertNotNull(*eventPtr,
+ "Paramter eventPtr must dereference to non-null pointer");
+
+ parcEventTimer_Destroy(eventPtr);
+ eventPtr = NULL;
+}
+
+PARCEvent *dispatcher_CreateNetworkEvent(Dispatcher *dispatcher,
+ bool isPersistent,
+ PARCEvent_Callback *callback,
+ void *userData, int fd) {
+ short flags = PARCEventType_Timeout | PARCEventType_Read;
+ if (isPersistent) {
+ flags |= PARCEventType_Persist;
+ }
+
+ PARCEvent *event =
+ parcEvent_Create(dispatcher->Base, fd, flags, callback, userData);
+ parcAssertNotNull(event, "Got null from parcEvent_Create for socket %d", fd);
+ return event;
+}
+
+void dispatcher_DestroyNetworkEvent(Dispatcher *dispatcher,
+ PARCEvent **eventPtr) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(eventPtr,
+ "Parameter eventPtr must be non-null double pointer");
+ parcAssertNotNull(*eventPtr,
+ "Paramter eventPtr must dereference to non-null pointer");
+
+ parcEvent_Destroy(eventPtr);
+ eventPtr = NULL;
+}
+
+void dispatcher_StartNetworkEvent(Dispatcher *dispatcher, PARCEvent *event) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(event, "Parameter event must be non-null");
+
+ int failure = parcEvent_Start(event);
+ parcAssertFalse(failure < 0, "Error starting signal event %p: (%d) %s",
+ (void *)event, errno, strerror(errno));
+}
+
+void dispatcher_StopNetworkEvent(Dispatcher *dispatcher, PARCEvent *event) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(event, "Parameter event must be non-null");
+
+ int failure = parcEvent_Stop(event);
+ parcAssertFalse(failure < 0, "Error stopping signal event %p: (%d) %s",
+ (void *)event, errno, strerror(errno));
+}
+
+PARCEventSignal *dispatcher_CreateSignalEvent(
+ Dispatcher *dispatcher, PARCEventSignal_Callback *callback, void *userData,
+ int signal) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(callback, "Parameter callback must be non-null");
+
+ PARCEventSignal *event = parcEventSignal_Create(
+ dispatcher->Base, signal, PARCEventType_Signal | PARCEventType_Persist,
+ callback, userData);
+ parcAssertNotNull(event,
+ "Got null event when creating signal catcher for signal %d",
+ signal);
+
+ return event;
+}
+
+void dispatcher_DestroySignalEvent(Dispatcher *dispatcher,
+ PARCEventSignal **eventPtr) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(eventPtr,
+ "Parameter eventPtr must be non-null double pointer");
+ parcAssertNotNull(*eventPtr,
+ "Paramter eventPtr must dereference to non-null pointer");
+
+ parcEventSignal_Destroy(eventPtr);
+ eventPtr = NULL;
+}
+
+void dispatcher_StartSignalEvent(Dispatcher *dispatcher,
+ PARCEventSignal *event) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(event, "Parameter event must be non-null");
+
+ int failure = parcEventSignal_Start(event);
+ parcAssertFalse(failure < 0, "Error starting signal event %p: (%d) %s",
+ (void *)event, errno, strerror(errno));
+}
+
+void dispatcher_StopSignalEvent(Dispatcher *dispatcher,
+ PARCEventSignal *event) {
+ parcAssertNotNull(dispatcher, "Parameter dispatcher must be non-null");
+ parcAssertNotNull(event, "Parameter event must be non-null");
+
+ int failure = parcEventSignal_Stop(event);
+ parcAssertFalse(failure < 0, "Error stopping signal event %p: (%d) %s",
+ (void *)event, errno, strerror(errno));
+}
+
+/**
+ * Bind to a local address/port then connect to peer.
+ */
+static bool dispatcher_StreamBufferBindAndConnect(Dispatcher *dispatcher,
+ PARCEventQueue *buffer,
+ struct sockaddr *localSock,
+ socklen_t localSockLength,
+ struct sockaddr *remoteSock,
+ socklen_t remoteSockLength) {
+ // we need to bind, then connect. Special operation, so we make our
+ // own fd then pass it off to the buffer event
+
+ int fd = socket(localSock->sa_family, SOCK_STREAM, 0);
+ if (fd < 0) {
+ perror("socket");
+ return -1;
+ }
+
+ // Set non-blocking flag
+ int flags = fcntl(fd, F_GETFL, NULL);
+ if (flags < 0) {
+ perror("F_GETFL");
+ close(fd);
+ return -1;
+ }
+ int failure = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ if (failure) {
+ perror("F_SETFL");
+ close(fd);
+ return -1;
+ }
+
+ failure = bind(fd, localSock, localSockLength);
+ if (failure) {
+ perror("bind");
+ close(fd);
+ return false;
+ }
+
+ parcEventQueue_SetFileDescriptor(buffer, fd);
+
+ failure = parcEventQueue_ConnectSocket(buffer, remoteSock, remoteSockLength);
+ if (failure && (errno != EINPROGRESS)) {
+ perror("connect");
+ close(fd);
+ return false;
+ }
+ return true;
+}
+
+/**
+ * Connect to an INET peer
+ * @return NULL on error, otherwise a streambuffer
+ */
+static PARCEventQueue *dispatcher_StreamBufferConnect_INET(
+ Dispatcher *dispatcher, const Address *localAddress,
+ const Address *remoteAddress) {
+ struct sockaddr_in localSock, remoteSock;
+ addressGetInet(localAddress, &localSock);
+ addressGetInet(remoteAddress, &remoteSock);
+
+ PARCEventQueue *buffer = parcEventQueue_Create(
+ dispatcher->Base, -1, PARCEventQueueOption_CloseOnFree);
+ parcAssertNotNull(buffer, "got null buffer from parcEventQueue_Create()");
+
+ bool success = dispatcher_StreamBufferBindAndConnect(
+ dispatcher, buffer, (struct sockaddr *)&localSock, sizeof(localSock),
+ (struct sockaddr *)&remoteSock, sizeof(remoteSock));
+ if (!success) {
+ parcEventQueue_Destroy(&buffer);
+ buffer = NULL;
+ }
+
+ return buffer;
+}
+
+/**
+ * Connect to an INET peer
+ * @return NULL on error, otherwise a streambuffer
+ */
+static PARCEventQueue *
+// static StreamBuffer *
+dispatcher_StreamBufferConnect_INET6(Dispatcher *dispatcher,
+ const Address *localAddress,
+ const Address *remoteAddress) {
+ struct sockaddr_in6 localSock, remoteSock;
+ addressGetInet6(localAddress, &localSock);
+ addressGetInet6(remoteAddress, &remoteSock);
+
+ PARCEventQueue *buffer = parcEventQueue_Create(
+ dispatcher->Base, -1, PARCEventQueueOption_CloseOnFree);
+ parcAssertNotNull(buffer, "got null buffer from parcEventQueue_Create()");
+
+ bool success = dispatcher_StreamBufferBindAndConnect(
+ dispatcher, buffer, (struct sockaddr *)&localSock, sizeof(localSock),
+ (struct sockaddr *)&remoteSock, sizeof(remoteSock));
+ if (!success) {
+ parcEventQueue_Destroy(&buffer);
+ buffer = NULL;
+ }
+
+ return buffer;
+}
+
+PARCEventQueue *dispatcher_StreamBufferConnect(Dispatcher *dispatcher,
+ const AddressPair *pair) {
+ const Address *localAddress = addressPair_GetLocal(pair);
+ const Address *remoteAddress = addressPair_GetRemote(pair);
+
+ // they must be of the same address family
+ if (addressGetType(localAddress) != addressGetType(remoteAddress)) {
+ char message[2048];
+ char *localAddressString = addressToString(localAddress);
+ char *remoteAddressString = addressToString(remoteAddress);
+ snprintf(message, 2048,
+ "Remote address not same type as local address, expected %d got "
+ "%d\nlocal %s remote %s",
+ addressGetType(localAddress), addressGetType(remoteAddress),
+ localAddressString, remoteAddressString);
+
+ parcMemory_Deallocate((void **)&localAddressString);
+ parcMemory_Deallocate((void **)&remoteAddressString);
+
+ parcAssertTrue(
+ addressGetType(localAddress) == addressGetType(remoteAddress), "%s",
+ message);
+ }
+
+ address_type type = addressGetType(localAddress);
+
+ PARCEventQueue *result = NULL;
+
+ switch (type) {
+ case ADDR_INET:
+ return dispatcher_StreamBufferConnect_INET(dispatcher, localAddress,
+ remoteAddress);
+ break;
+ case ADDR_INET6:
+ return dispatcher_StreamBufferConnect_INET6(dispatcher, localAddress,
+ remoteAddress);
+ break;
+ default:
+ parcTrapIllegalValue(type, "local address unsupported address type: %d",
+ type);
+ }
+ return result;
+}