aboutsummaryrefslogtreecommitdiffstats
path: root/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.c
diff options
context:
space:
mode:
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.c')
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.c469
1 files changed, 469 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.c
new file mode 100644
index 00000000..ada629e0
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.c
@@ -0,0 +1,469 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This module implements the _Create(), _Start(), and _Destroy() methods.
+ * It also has various utilities for timers and events.
+ *
+ * The command channel is processed in rta_Framework_Commands.c.
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <errno.h>
+
+#include <string.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+
+#include <parc/algol/parc_EventSignal.h>
+
+#include <LongBow/runtime.h>
+
+#include <parc/algol/parc_Memory.h>
+#include <parc/algol/parc_ArrayList.h>
+#include <parc/logging/parc_LogReporterTextStdout.h>
+
+#include <ccnx/transport/transport_rta/core/rta_Framework.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h>
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+#include <ccnx/transport/transport_rta/core/rta_ConnectionTable.h>
+#include <ccnx/transport/common/transport_Message.h>
+#include <ccnx/transport/common/transport_private.h>
+
+#include <ccnx/transport/transport_rta/connectors/connector_Api.h>
+#include <ccnx/transport/transport_rta/connectors/connector_Forwarder.h>
+#include <ccnx/transport/transport_rta/components/component_Codec.h>
+#include <ccnx/transport/transport_rta/components/component_Flowcontrol.h>
+
+#include <ccnx/transport/transport_rta/commands/rta_CommandTransmitStatistics.h>
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#ifndef DEBUG_OUTPUT
+#define DEBUG_OUTPUT 0
+#endif
+
+#include "rta_Framework_Commands.h"
+
+// ===================================================
+
+// event callbacks
+static void _signal_cb(int signalNumber, PARCEventType event, void *arg);
+static void _tick_cb(int, PARCEventType, void *);
+static void transmitStatisticsCallback(int fd, PARCEventType what, void *user_data);
+
+
+// ===========================================
+// Public API (create, start, destroy)
+// stop are done via the command channel
+// start cannot be done via the command channel, as its not running until after start.
+
+void
+rta_Framework_LockStatus(RtaFramework *framework)
+{
+ int res = pthread_mutex_lock(&framework->status_mutex);
+ assertTrue(res == 0, "error from pthread_mutex_lock: %d", res);
+}
+
+void
+rta_Framework_UnlockStatus(RtaFramework *framework)
+{
+ int res = pthread_mutex_unlock(&framework->status_mutex);
+ assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res);
+}
+
+void
+rta_Framework_WaitStatus(RtaFramework *framework)
+{
+ int res = pthread_cond_wait(&framework->status_cv, &framework->status_mutex);
+ assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res);
+}
+
+void
+rta_Framework_BroadcastStatus(RtaFramework *framework)
+{
+ int res = pthread_cond_broadcast(&framework->status_cv);
+ assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res);
+}
+
+
+/**
+ * This is called whenever the connection table wants to free a connection.
+ * It should call the protocol stack's closers on the connection, then
+ * destroy the connection. It is called either (a) inside the worker thread,
+ * or (b) after the worker thread has stopped, so no locking needed.
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void
+rtaFramework_ConnectionTableFreeFunc(RtaConnection **connectionPtr)
+{
+ RtaConnection *connection;
+ assertNotNull(connectionPtr, "Called with null double pointer");
+ connection = *connectionPtr;
+ assertNotNull(connection, "Parameter must not dereference to null");
+
+ if (rtaConnection_GetState(connection) != CONN_CLOSED) {
+ rtaFramework_CloseConnection(rtaConnection_GetFramework(connection), connection);
+ }
+
+ rtaConnection_Destroy(connectionPtr);
+}
+
+static void
+_signal_cb(int signalNumber, PARCEventType event, void *arg)
+{
+}
+
+static void
+rtaFramework_InitializeEventScheduler(RtaFramework *framework)
+{
+ framework->base = parcEventScheduler_Create();
+ assertNotNull(framework->base, "Could not initialize event scheduler!");
+
+ framework->signal_pipe = parcEventSignal_Create(framework->base, SIGPIPE, PARCEventType_Signal | PARCEventType_Persist, _signal_cb, framework);
+ parcEventSignal_Start(framework->signal_pipe);
+
+ if (gettimeofday(&framework->starttime, NULL) != 0) {
+ perror("Error getting time of day");
+ trapUnexpectedState("Could not read gettimeofday");
+ }
+}
+
+static void
+rtaFramework_SetupMillisecondTimer(RtaFramework *framework)
+{
+ struct timeval wtnow_timeout;
+
+ // setup a milli-second timer
+ wtnow_timeout.tv_sec = 0;
+ wtnow_timeout.tv_usec = 1000000 / WTHZ;
+
+ framework->tick_event = parcEventTimer_Create(
+ framework->base,
+ PARCEventType_Persist,
+ _tick_cb,
+ (void *) framework);
+
+ parcEventTimer_Start(framework->tick_event, &wtnow_timeout);
+}
+
+static void
+rtaFramework_CreateCommandChannel(RtaFramework *framework)
+{
+ int fd = parcNotifier_Socket(framework->commandNotifier);
+
+ // setup a PARCEventQueue for command_fd
+
+ // Set non-blocking flag
+ int flags = fcntl(fd, F_GETFL, NULL);
+ assertFalse(flags == -1, "fcntl failed to obtain file descriptor flags (%d)", errno);
+ int res = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ assertTrue(res == 0, "rtaFramework_Create failed to set socket non-blocking: %s", strerror(errno));
+
+ framework->commandEvent = parcEvent_Create(framework->base, fd, PARCEventType_Read | PARCEventType_Persist, rtaFramework_CommandCallback, (void *) framework);
+
+ // The command port is the highest priority
+ parcEvent_SetPriority(framework->commandEvent, PARCEventPriority_Maximum);
+
+ parcEvent_Start(framework->commandEvent);
+
+ // the notifier socket is now ready to fire
+}
+
+/*
+ * Until things get plumbed from above via control messages, we will use
+ * environment variables in the form "RtaFacility_facility=level" with a special facility "RtaFacility_All".
+ * The "All" is processed first, then more specific facilities, so one could set all to a default
+ * level then set specific ones to over-ride.
+ *
+ * Default log level is Error
+ *
+ * Strings:
+ * RtaFacility_Framework
+ * RtaFacility_Api
+ * RtaFacility_Flowcontrol
+ * RtaFacility_Codec
+ * RtaFacility_Forwarder
+ */
+static void
+_setLogLevels(RtaFramework *framework)
+{
+ for (int i = 0; i < RtaLoggerFacility_END; i++) {
+ rtaLogger_SetLogLevel(framework->logger, i, PARCLogLevel_Error);
+ }
+
+ char *levelString = getenv("RtaFacility_All");
+ if (levelString) {
+ PARCLogLevel level = parcLogLevel_FromString(levelString);
+ if (level != PARCLogLevel_All) {
+ for (int i = 0; i < RtaLoggerFacility_END; i++) {
+ rtaLogger_SetLogLevel(framework->logger, i, level);
+ }
+ }
+ }
+
+ // no do specific facilities
+ char buffer[1024];
+ for (int i = 0; i < RtaLoggerFacility_END; i++) {
+ snprintf(buffer, 1024, "RtaFacility_%s", rtaLogger_FacilityString(i));
+ levelString = getenv(buffer);
+ if (levelString) {
+ PARCLogLevel level = parcLogLevel_FromString(levelString);
+ if (level != PARCLogLevel_All) {
+ rtaLogger_SetLogLevel(framework->logger, i, level);
+ }
+ }
+ }
+}
+
+/**
+ * Create a framework. This is a thread-safe function.
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+RtaFramework *
+rtaFramework_Create(PARCRingBuffer1x1 *commandRingBuffer, PARCNotifier *commandNotifier)
+{
+ RtaFramework *framework = parcMemory_AllocateAndClear(sizeof(RtaFramework));
+ assertNotNull(framework, "RtaFramework parcMemory_AllocateAndClear returned null");
+
+ PARCLogReporter *reporter = parcLogReporterTextStdout_Create();
+ framework->logger = rtaLogger_Create(reporter, parcClock_Monotonic());
+ parcLogReporter_Release(&reporter);
+
+ _setLogLevels(framework);
+
+ // setup the event scheduler
+
+ // mutes, condition variable, and protected state for starting
+ // and stopping the event thread from an outside thread.
+ pthread_mutex_init(&framework->status_mutex, NULL);
+ pthread_cond_init(&framework->status_cv, NULL);
+ framework->status = FRAMEWORK_INIT;
+
+ framework->commandRingBuffer = parcRingBuffer1x1_Acquire(commandRingBuffer);
+ framework->commandNotifier = parcNotifier_Acquire(commandNotifier);
+
+ framework->connid_next = 1;
+ TAILQ_INIT(&framework->protocols_head);
+
+ //TODO: make 16384 configurable.
+ framework->connectionTable = rtaConnectionTable_Create(16384, rtaFramework_ConnectionTableFreeFunc);
+ assertNotNull(framework->connectionTable, "Could not allocate conneciton table");
+
+ rtaFramework_InitializeEventScheduler(framework);
+
+ rtaFramework_SetupMillisecondTimer(framework);
+
+ framework->transmit_statistics_event = parcEventTimer_Create(framework->base,
+ PARCEventType_Persist,
+ transmitStatisticsCallback,
+ (void *) framework);
+
+
+ rtaFramework_CreateCommandChannel(framework);
+
+ if (rtaLogger_IsLoggable(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Info)) {
+ rtaLogger_Log(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Info, __func__,
+ "framework %p created", (void *) framework);
+ }
+
+ return framework;
+}
+
+static void
+rtaFramework_DestroyEventScheduler(RtaFramework *framework)
+{
+ parcEventTimer_Destroy(&(framework->tick_event));
+ parcEventTimer_Destroy(&(framework->transmit_statistics_event));
+
+ if (framework->signal_int != NULL) {
+ parcEventSignal_Destroy(&(framework->signal_int));
+ }
+ if (framework->signal_usr1 != NULL) {
+ parcEventSignal_Destroy(&(framework->signal_usr1));
+ }
+
+ parcEvent_Destroy(&(framework->commandEvent));
+ parcNotifier_Release(&framework->commandNotifier);
+ parcRingBuffer1x1_Release(&framework->commandRingBuffer);
+
+ parcEventSignal_Destroy(&(framework->signal_pipe));
+ parcEventScheduler_Destroy(&(framework->base));
+}
+
+void
+rtaFramework_Destroy(RtaFramework **frameworkPtr)
+{
+ RtaFramework *framework;
+
+ assertNotNull(frameworkPtr, "Parameter must be non-null RtaFramework double pointer");
+ framework = *frameworkPtr;
+ assertNotNull(framework, "Parameter must dereference to non-Null RtaFramework pointer");
+
+ rtaLogger_Log(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Info, __func__,
+ "framework %p destroy", (void *) framework);
+
+ // status can be STOPPED or INIT. It's ok to destroy one that's never been started.
+
+ // %%%% LOCK
+ rta_Framework_LockStatus(framework);
+ assertTrue(framework->status == FRAMEWORK_SHUTDOWN ||
+ framework->status == FRAMEWORK_INIT ||
+ framework->status == FRAMEWORK_TEARDOWN,
+ "Framework invalid state, got %d",
+ framework->status);
+ rta_Framework_UnlockStatus(framework);
+ // %%%% UNLOCK
+
+ rtaConnectionTable_Destroy(&framework->connectionTable);
+
+ rtaFramework_DestroyEventScheduler(framework);
+
+ rtaLogger_Release(&framework->logger);
+
+ parcMemory_Deallocate((void **) &framework);
+
+ *frameworkPtr = NULL;
+}
+
+RtaLogger *
+rtaFramework_GetLogger(RtaFramework *framework)
+{
+ return framework->logger;
+}
+
+/**
+ * May block briefly, returns the current status of the framework.
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+RtaFrameworkStatus
+rtaFramework_GetStatus(RtaFramework *framework)
+{
+ RtaFrameworkStatus status;
+ // %%%% LOCK
+ rta_Framework_LockStatus(framework);
+ status = framework->status;
+ rta_Framework_UnlockStatus(framework);
+ // %%%% UNLOCK
+ return status;
+}
+
+/**
+ * Blocks until the framework status equals or exeeds the desired status
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+RtaFrameworkStatus
+rtaFramework_WaitForStatus(RtaFramework *framework,
+ RtaFrameworkStatus status)
+{
+ // %%%% LOCK
+ rta_Framework_LockStatus(framework);
+ while (framework->status < status) {
+ rta_Framework_WaitStatus(framework);
+ }
+ rta_Framework_UnlockStatus(framework);
+ // %%%% UNLOCK
+
+ return status;
+}
+
+// =================================================================
+
+// Transport Operations
+
+PARCEventScheduler *
+rtaFramework_GetEventScheduler(RtaFramework *framework)
+{
+ assertNotNull(framework, "Parameter must be non-NULL RtaFramework");
+ return framework->base;
+}
+
+unsigned
+rtaFramework_GetNextConnectionId(RtaFramework *framework)
+{
+ assertNotNull(framework, "Parameter must be non-NULL RtaFramework");
+
+ return framework->connid_next++;
+}
+
+// ============================
+// Internal functions
+
+/*
+ * This is dispatched from the event loop, so its a loosely accurate time
+ */
+static void
+_tick_cb(int fd, PARCEventType what, void *user_data)
+{
+ RtaFramework *framework = (RtaFramework *) user_data;
+ assertTrue(what & PARCEventType_Timeout, "%s got unknown signal %d", __func__, what);
+ framework->clock_ticks++;
+
+ if (framework->killme) {
+ int res;
+
+ if (rtaLogger_IsLoggable(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Debug)) {
+ rtaLogger_Log(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Debug, __func__,
+ "framework %p exiting base loop", (void *) framework);
+ }
+
+ res = parcEventScheduler_Abort(framework->base);
+ assertTrue(res == 0, "error on parcEventScheduler_Abort: %d", res);
+ }
+}
+
+FILE *GlobalStatisticsFile = NULL;
+
+static void
+transmitStatisticsCallback(int fd, PARCEventType what, void *user_data)
+{
+ RtaFramework *framework = (RtaFramework *) user_data;
+ assertTrue(what & PARCEventType_Timeout, "unknown signal %d", what);
+
+ FrameworkProtocolHolder *holder;
+ TAILQ_FOREACH(holder, &framework->protocols_head, list)
+ {
+ RtaProtocolStack *stack = holder->stack;
+ PARCArrayList *list = rtaProtocolStack_GetStatistics(stack, GlobalStatisticsFile);
+ parcArrayList_Destroy(&list);
+ }
+}