/* * Copyright (c) 2017 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** * This module implements the _Create(), _Start(), and _Destroy() methods. * It also has various utilities for timers and events. * * The command channel is processed in rta_Framework_Commands.c. * * Example: * @code * <#example#> * @endcode */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define __STDC_FORMAT_MACROS #include #ifndef DEBUG_OUTPUT #define DEBUG_OUTPUT 0 #endif #include "rta_Framework_Commands.h" // =================================================== // event callbacks static void _signal_cb(int signalNumber, PARCEventType event, void *arg); static void _tick_cb(int, PARCEventType, void *); static void transmitStatisticsCallback(int fd, PARCEventType what, void *user_data); // =========================================== // Public API (create, start, destroy) // stop are done via the command channel // start cannot be done via the command channel, as its not running until after start. void rta_Framework_LockStatus(RtaFramework *framework) { int res = pthread_mutex_lock(&framework->status_mutex); assertTrue(res == 0, "error from pthread_mutex_lock: %d", res); } void rta_Framework_UnlockStatus(RtaFramework *framework) { int res = pthread_mutex_unlock(&framework->status_mutex); assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res); } void rta_Framework_WaitStatus(RtaFramework *framework) { int res = pthread_cond_wait(&framework->status_cv, &framework->status_mutex); assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res); } void rta_Framework_BroadcastStatus(RtaFramework *framework) { int res = pthread_cond_broadcast(&framework->status_cv); assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res); } /** * This is called whenever the connection table wants to free a connection. * It should call the protocol stack's closers on the connection, then * destroy the connection. It is called either (a) inside the worker thread, * or (b) after the worker thread has stopped, so no locking needed. * * Example: * @code * <#example#> * @endcode */ static void rtaFramework_ConnectionTableFreeFunc(RtaConnection **connectionPtr) { RtaConnection *connection; assertNotNull(connectionPtr, "Called with null double pointer"); connection = *connectionPtr; assertNotNull(connection, "Parameter must not dereference to null"); if (rtaConnection_GetState(connection) != CONN_CLOSED) { rtaFramework_CloseConnection(rtaConnection_GetFramework(connection), connection); } rtaConnection_Destroy(connectionPtr); } static void _signal_cb(int signalNumber, PARCEventType event, void *arg) { } static void rtaFramework_InitializeEventScheduler(RtaFramework *framework) { framework->base = parcEventScheduler_Create(); assertNotNull(framework->base, "Could not initialize event scheduler!"); framework->signal_pipe = parcEventSignal_Create(framework->base, SIGPIPE, PARCEventType_Signal | PARCEventType_Persist, _signal_cb, framework); parcEventSignal_Start(framework->signal_pipe); if (gettimeofday(&framework->starttime, NULL) != 0) { perror("Error getting time of day"); trapUnexpectedState("Could not read gettimeofday"); } } static void rtaFramework_SetupMillisecondTimer(RtaFramework *framework) { struct timeval wtnow_timeout; // setup a milli-second timer wtnow_timeout.tv_sec = 0; wtnow_timeout.tv_usec = 1000000 / WTHZ; framework->tick_event = parcEventTimer_Create( framework->base, PARCEventType_Persist, _tick_cb, (void *) framework); parcEventTimer_Start(framework->tick_event, &wtnow_timeout); } static void rtaFramework_CreateCommandChannel(RtaFramework *framework) { int fd = parcNotifier_Socket(framework->commandNotifier); // setup a PARCEventQueue for command_fd // Set non-blocking flag int flags = fcntl(fd, F_GETFL, NULL); assertFalse(flags == -1, "fcntl failed to obtain file descriptor flags (%d)", errno); int res = fcntl(fd, F_SETFL, flags | O_NONBLOCK); assertTrue(res == 0, "rtaFramework_Create failed to set socket non-blocking: %s", strerror(errno)); framework->commandEvent = parcEvent_Create(framework->base, fd, PARCEventType_Read | PARCEventType_Persist, rtaFramework_CommandCallback, (void *) framework); // The command port is the highest priority parcEvent_SetPriority(framework->commandEvent, PARCEventPriority_Maximum); parcEvent_Start(framework->commandEvent); // the notifier socket is now ready to fire } /* * Until things get plumbed from above via control messages, we will use * environment variables in the form "RtaFacility_facility=level" with a special facility "RtaFacility_All". * The "All" is processed first, then more specific facilities, so one could set all to a default * level then set specific ones to over-ride. * * Default log level is Error * * Strings: * RtaFacility_Framework * RtaFacility_Api * RtaFacility_Flowcontrol * RtaFacility_Codec * RtaFacility_Forwarder */ static void _setLogLevels(RtaFramework *framework) { for (int i = 0; i < RtaLoggerFacility_END; i++) { rtaLogger_SetLogLevel(framework->logger, i, PARCLogLevel_Error); } char *levelString = getenv("RtaFacility_All"); if (levelString) { PARCLogLevel level = parcLogLevel_FromString(levelString); if (level != PARCLogLevel_All) { for (int i = 0; i < RtaLoggerFacility_END; i++) { rtaLogger_SetLogLevel(framework->logger, i, level); } } } // no do specific facilities char buffer[1024]; for (int i = 0; i < RtaLoggerFacility_END; i++) { snprintf(buffer, 1024, "RtaFacility_%s", rtaLogger_FacilityString(i)); levelString = getenv(buffer); if (levelString) { PARCLogLevel level = parcLogLevel_FromString(levelString); if (level != PARCLogLevel_All) { rtaLogger_SetLogLevel(framework->logger, i, level); } } } } /** * Create a framework. This is a thread-safe function. * * Example: * @code * <#example#> * @endcode */ RtaFramework * rtaFramework_Create(PARCRingBuffer1x1 *commandRingBuffer, PARCNotifier *commandNotifier) { RtaFramework *framework = parcMemory_AllocateAndClear(sizeof(RtaFramework)); assertNotNull(framework, "RtaFramework parcMemory_AllocateAndClear returned null"); PARCLogReporter *reporter = parcLogReporterTextStdout_Create(); framework->logger = rtaLogger_Create(reporter, parcClock_Monotonic()); parcLogReporter_Release(&reporter); _setLogLevels(framework); // setup the event scheduler // mutes, condition variable, and protected state for starting // and stopping the event thread from an outside thread. pthread_mutex_init(&framework->status_mutex, NULL); pthread_cond_init(&framework->status_cv, NULL); framework->status = FRAMEWORK_INIT; framework->commandRingBuffer = parcRingBuffer1x1_Acquire(commandRingBuffer); framework->commandNotifier = parcNotifier_Acquire(commandNotifier); framework->connid_next = 1; TAILQ_INIT(&framework->protocols_head); //TODO: make 16384 configurable. framework->connectionTable = rtaConnectionTable_Create(16384, rtaFramework_ConnectionTableFreeFunc); assertNotNull(framework->connectionTable, "Could not allocate conneciton table"); rtaFramework_InitializeEventScheduler(framework); rtaFramework_SetupMillisecondTimer(framework); framework->transmit_statistics_event = parcEventTimer_Create(framework->base, PARCEventType_Persist, transmitStatisticsCallback, (void *) framework); rtaFramework_CreateCommandChannel(framework); if (rtaLogger_IsLoggable(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Info)) { rtaLogger_Log(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Info, __func__, "framework %p created", (void *) framework); } return framework; } static void rtaFramework_DestroyEventScheduler(RtaFramework *framework) { parcEventTimer_Destroy(&(framework->tick_event)); parcEventTimer_Destroy(&(framework->transmit_statistics_event)); if (framework->signal_int != NULL) { parcEventSignal_Destroy(&(framework->signal_int)); } if (framework->signal_usr1 != NULL) { parcEventSignal_Destroy(&(framework->signal_usr1)); } parcEvent_Destroy(&(framework->commandEvent)); parcNotifier_Release(&framework->commandNotifier); parcRingBuffer1x1_Release(&framework->commandRingBuffer); parcEventSignal_Destroy(&(framework->signal_pipe)); parcEventScheduler_Destroy(&(framework->base)); } void rtaFramework_Destroy(RtaFramework **frameworkPtr) { RtaFramework *framework; assertNotNull(frameworkPtr, "Parameter must be non-null RtaFramework double pointer"); framework = *frameworkPtr; assertNotNull(framework, "Parameter must dereference to non-Null RtaFramework pointer"); rtaLogger_Log(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Info, __func__, "framework %p destroy", (void *) framework); // status can be STOPPED or INIT. It's ok to destroy one that's never been started. // %%%% LOCK rta_Framework_LockStatus(framework); assertTrue(framework->status == FRAMEWORK_SHUTDOWN || framework->status == FRAMEWORK_INIT || framework->status == FRAMEWORK_TEARDOWN, "Framework invalid state, got %d", framework->status); rta_Framework_UnlockStatus(framework); // %%%% UNLOCK rtaConnectionTable_Destroy(&framework->connectionTable); rtaFramework_DestroyEventScheduler(framework); rtaLogger_Release(&framework->logger); parcMemory_Deallocate((void **) &framework); *frameworkPtr = NULL; } RtaLogger * rtaFramework_GetLogger(RtaFramework *framework) { return framework->logger; } /** * May block briefly, returns the current status of the framework. * * Example: * @code * <#example#> * @endcode */ RtaFrameworkStatus rtaFramework_GetStatus(RtaFramework *framework) { RtaFrameworkStatus status; // %%%% LOCK rta_Framework_LockStatus(framework); status = framework->status; rta_Framework_UnlockStatus(framework); // %%%% UNLOCK return status; } /** * Blocks until the framework status equals or exeeds the desired status * * Example: * @code * <#example#> * @endcode */ RtaFrameworkStatus rtaFramework_WaitForStatus(RtaFramework *framework, RtaFrameworkStatus status) { // %%%% LOCK rta_Framework_LockStatus(framework); while (framework->status < status) { rta_Framework_WaitStatus(framework); } rta_Framework_UnlockStatus(framework); // %%%% UNLOCK return status; } // ================================================================= // Transport Operations PARCEventScheduler * rtaFramework_GetEventScheduler(RtaFramework *framework) { assertNotNull(framework, "Parameter must be non-NULL RtaFramework"); return framework->base; } unsigned rtaFramework_GetNextConnectionId(RtaFramework *framework) { assertNotNull(framework, "Parameter must be non-NULL RtaFramework"); return framework->connid_next++; } // ============================ // Internal functions /* * This is dispatched from the event loop, so its a loosely accurate time */ static void _tick_cb(int fd, PARCEventType what, void *user_data) { RtaFramework *framework = (RtaFramework *) user_data; assertTrue(what & PARCEventType_Timeout, "%s got unknown signal %d", __func__, what); framework->clock_ticks++; if (framework->killme) { int res; if (rtaLogger_IsLoggable(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Debug)) { rtaLogger_Log(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Debug, __func__, "framework %p exiting base loop", (void *) framework); } res = parcEventScheduler_Abort(framework->base); assertTrue(res == 0, "error on parcEventScheduler_Abort: %d", res); } } FILE *GlobalStatisticsFile = NULL; static void transmitStatisticsCallback(int fd, PARCEventType what, void *user_data) { RtaFramework *framework = (RtaFramework *) user_data; assertTrue(what & PARCEventType_Timeout, "unknown signal %d", what); FrameworkProtocolHolder *holder; TAILQ_FOREACH(holder, &framework->protocols_head, list) { RtaProtocolStack *stack = holder->stack; PARCArrayList *list = rtaProtocolStack_GetStatistics(stack, GlobalStatisticsFile); parcArrayList_Destroy(&list); } }