aboutsummaryrefslogtreecommitdiffstats
path: root/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.c
diff options
context:
space:
mode:
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.c')
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.c450
1 files changed, 450 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.c
new file mode 100644
index 00000000..a02efefa
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.c
@@ -0,0 +1,450 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <errno.h>
+#include <LongBow/runtime.h>
+
+#include <parc/algol/parc_Memory.h>
+
+#include <ccnx/transport/transport_rta/core/rta_Framework_private.h>
+
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/commands/rta_Command.h>
+
+#include <parc/algol/parc_Event.h>
+
+#ifdef DEBUG_OUTPUT
+#undef DEBUG_OUTPUT
+#endif
+
+#define DEBUG_OUTPUT 0
+
+extern FILE *GlobalStatisticsFile;
+
+static bool _rtaFramework_ExecuteCreateStack(RtaFramework *framework, const RtaCommandCreateProtocolStack *createStack);
+static bool _rtaFramework_ExecuteDestroyStack(RtaFramework *framework, const RtaCommandDestroyProtocolStack *destroyStack);
+static bool _rtaFramework_ExecuteOpenConnection(RtaFramework *framework, const RtaCommandOpenConnection *openConnection);
+static bool _rtaFramework_ExecuteCloseConnection(RtaFramework *framework, const RtaCommandCloseConnection *closeConnection);
+static bool _rtaFramework_ExecuteTransmitStatistics(RtaFramework *framework, const RtaCommandTransmitStatistics *transmitStats);
+static bool _rtaFramework_ExecuteShutdownFramework(RtaFramework *framework);
+
+static void rtaFramework_DrainApiDescriptor(int fd);
+
+void
+rtaFramework_CommandCallback(int fd, PARCEventType what, void *user_framework)
+{
+ RtaFramework *framework = (RtaFramework *) user_framework;
+
+ // flag the notifier that we are starting a batch of reads
+ parcNotifier_PauseEvents(framework->commandNotifier);
+
+ RtaCommand *command = NULL;
+ while ((command = rtaCommand_Read(framework->commandRingBuffer)) != NULL) {
+ // The shutdown command can broadcast a change of state before the function
+ // returns, so we need to free the RtaCommand before executing the shutdown.
+ // Therefore, we include the rtaCommand_Destroy() as part of the switch.
+
+ if (rtaCommand_IsOpenConnection(command)) {
+ _rtaFramework_ExecuteOpenConnection(framework, rtaCommand_GetOpenConnection(command));
+ rtaCommand_Release(&command);
+ } else if (rtaCommand_IsCloseConnection(command)) {
+ _rtaFramework_ExecuteCloseConnection(framework, rtaCommand_GetCloseConnection(command));
+ rtaCommand_Release(&command);
+ } else if (rtaCommand_IsCreateProtocolStack(command)) {
+ _rtaFramework_ExecuteCreateStack(framework, rtaCommand_GetCreateProtocolStack(command));
+ rtaCommand_Release(&command);
+ } else if (rtaCommand_IsDestroyProtocolStack(command)) {
+ _rtaFramework_ExecuteDestroyStack(framework, rtaCommand_GetDestroyProtocolStack(command));
+ rtaCommand_Release(&command);
+ } else if (rtaCommand_IsTransmitStatistics(command)) {
+ _rtaFramework_ExecuteTransmitStatistics(framework, rtaCommand_GetTransmitStatistics(command));
+ rtaCommand_Release(&command);
+ } else if (rtaCommand_IsShutdownFramework(command)) {
+ // release the command before executing shutdown
+ rtaCommand_Release(&command);
+ _rtaFramework_ExecuteShutdownFramework(framework);
+ } else {
+ rtaCommand_Display(command, 3);
+ rtaCommand_Release(&command);
+ trapUnexpectedState("Got unknown command type");
+ }
+ }
+
+ // resume notifications
+ parcNotifier_StartEvents(framework->commandNotifier);
+}
+
+// =========================================
+// Internal command processing
+
+/**
+ * Create a protocol holder and insert it in the framework's
+ * protocols_head list.
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static FrameworkProtocolHolder *
+rtaFramework_CreateProtocolHolder(RtaFramework *framework, PARCJSON *params, uint64_t kv_hash, int stack_id)
+{
+ // request for a new protocol stack, create it
+ FrameworkProtocolHolder *holder = parcMemory_AllocateAndClear(sizeof(FrameworkProtocolHolder));
+ assertNotNull(holder, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(FrameworkProtocolHolder));
+
+ TAILQ_INSERT_TAIL(&framework->protocols_head, holder, list);
+
+ holder->kv_hash = kv_hash;
+ holder->stack_id = stack_id;
+
+ if (DEBUG_OUTPUT) {
+ printf("%s created protocol holder %p hash %" PRIu64 "\n",
+ __func__,
+ (void *) holder,
+ kv_hash);
+ }
+
+ return holder;
+}
+
+/**
+ * Lookup the existing protocol holder for stackid.
+ * Returns NULL if not found.
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static FrameworkProtocolHolder *
+rtaFramework_GetProtocolStackByStackId(RtaFramework *framework, int stack_id)
+{
+ FrameworkProtocolHolder *holder;
+ TAILQ_FOREACH(holder, &framework->protocols_head, list)
+ {
+ if (holder->stack_id == stack_id) {
+ return holder;
+ }
+ }
+ return NULL;
+}
+
+static bool
+_rtaFramework_ExecuteCreateStack(RtaFramework *framework, const RtaCommandCreateProtocolStack *createStack)
+{
+ // if we're in INIT mode, we need to bump
+ // wait for notificaiton from event thread
+ if (framework->status == FRAMEWORK_INIT) {
+ rta_Framework_LockStatus(framework);
+ if (framework->status == FRAMEWORK_INIT) {
+ framework->status = FRAMEWORK_SETUP;
+ }
+ rta_Framework_BroadcastStatus(framework);
+ rta_Framework_UnlockStatus(framework);
+ }
+
+ FrameworkProtocolHolder *holder =
+ rtaFramework_GetProtocolStackByStackId(framework, rtaCommandCreateProtocolStack_GetStackId(createStack));
+ assertNull(holder, "Found a holder with stack_id %d, but we're asked to create it!",
+ rtaCommandCreateProtocolStack_GetStackId(createStack));
+
+ uint64_t kv_hash = ccnxStackConfig_HashCode(rtaCommandCreateProtocolStack_GetStackConfig(createStack));
+
+ // this creates it and inserts in framework->protocols_head
+ holder = rtaFramework_CreateProtocolHolder(framework, NULL, kv_hash, rtaCommandCreateProtocolStack_GetStackId(createStack));
+
+ holder->stack =
+ rtaProtocolStack_Create(framework, rtaCommandCreateProtocolStack_GetConfig(createStack), rtaCommandCreateProtocolStack_GetStackId(createStack));
+ rtaProtocolStack_Configure(holder->stack);
+
+ if (DEBUG_OUTPUT) {
+ printf("%s created protocol %p kv_hash %016" PRIX64 " stack_id %d\n",
+ __func__, (void *) holder->stack, kv_hash, rtaCommandCreateProtocolStack_GetStackId(createStack));
+ }
+ return 0;
+}
+
+static bool
+_rtaFramework_ExecuteOpenConnection(RtaFramework *framework, const RtaCommandOpenConnection *openConnection)
+{
+ int res;
+ FrameworkProtocolHolder *holder;
+ RtaConnection *rtaConnection;
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s framework %p\n",
+ rtaFramework_GetTicks(framework), __func__, (void *) framework);
+ }
+
+ holder = rtaFramework_GetProtocolStackByStackId(framework, rtaCommandOpenConnection_GetStackId(openConnection));
+ assertNotNull(holder, "Could not find stack_id %d", rtaCommandOpenConnection_GetStackId(openConnection));
+
+ rtaConnection = rtaConnectionTable_GetByApiFd(framework->connectionTable, rtaCommandOpenConnection_GetApiNotifierFd(openConnection));
+ assertNull(rtaConnection, "Found api_fd %d, but it should not exist!", rtaCommandOpenConnection_GetApiNotifierFd(openConnection));
+
+ rtaConnection = rtaConnection_Create(holder->stack, openConnection);
+ res = rtaConnectionTable_AddConnection(framework->connectionTable, rtaConnection);
+ assertTrue(res == 0, "Got error from rtaConnectionTable_AddConnection: %d", res);
+
+ res = rtaProtocolStack_Open(holder->stack, rtaConnection);
+ assertTrue(res == 0, "Got error from rtaProtocolStack_Open: %d", res);
+
+ rtaConnection_SetState(rtaConnection, CONN_OPEN);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s created connection %p stack_id %d api_fd %d transport_fd %d\n",
+ rtaFramework_GetTicks(framework),
+ __func__,
+ (void *) rtaConnection,
+ rtaCommandOpenConnection_GetStackId(openConnection),
+ rtaCommandOpenConnection_GetApiNotifierFd(openConnection),
+ rtaCommandOpenConnection_GetTransportNotifierFd(openConnection));
+ }
+
+ return true;
+}
+
+
+/**
+ * Mark a connection as closed. If there are no pending
+ * packets in queues, destroy it too.
+ * It's non-static because we call from rta_Framework.c
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+int
+rtaFramework_CloseConnection(RtaFramework *framework, RtaConnection *connection)
+{
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %p api_fd %d\n",
+ rtaFramework_GetTicks(framework),
+ __func__, (void *) connection, rtaConnection_GetApiFd(connection));
+ }
+
+ assertFalse(rtaConnection_GetState(connection) == CONN_CLOSED,
+ "connection api_fd %d is already closed", rtaConnection_GetApiFd(connection));
+
+ rtaConnection_SetState(connection, CONN_CLOSED);
+ rtaProtocolStack_Close(rtaConnection_GetStack(connection), connection);
+
+ rtaFramework_DrainApiDescriptor(rtaConnection_GetApiFd(connection));
+
+
+ // Remove it from the connection table, which will free our reference to it.
+
+ rtaConnectionTable_Remove(framework->connectionTable, connection);
+
+ // Done. The rtaConnection will be removed when the last queued
+ // messages for it are gone. We keep the connection holder, so if we do
+ // a Destroy we'll know about it. RtaConnection will call
+ // rtaFramework_RemoveConnection(...) when RtaConnection_Destroy() refcount
+ // is zero and it's going to fully remove the connection.
+
+ return 0;
+}
+
+
+static bool
+_rtaFramework_ExecuteCloseConnection(RtaFramework *framework, const RtaCommandCloseConnection *closeConnection)
+{
+ RtaConnection *connection = rtaConnectionTable_GetByApiFd(framework->connectionTable, rtaCommandCloseConnection_GetApiNotifierFd(closeConnection));
+ assertNotNull(connection, "Could not find api_fd %d", rtaCommandCloseConnection_GetApiNotifierFd(closeConnection));
+
+ return (rtaFramework_CloseConnection(framework, connection) == 0);
+}
+
+/**
+ * When the transport is closing the descriptor
+ * to the API, it should call this to drain any pending but unretrieved
+ * messages out of the API's side of the socket
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void
+rtaFramework_DrainApiDescriptor(int fd)
+{
+ unsigned count = 0;
+
+ if (DEBUG_OUTPUT) {
+ printf("%s fd %d\n", __func__, fd);
+ }
+
+ // Set non-blocking flag
+ int flags = fcntl(fd, F_GETFL, NULL);
+ assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno);
+ int failure = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ assertFalse(failure, "fcntl failed to set file descriptor flags (%d)\n", errno);
+
+ // Now drain the user side of stuff they have not read
+ CCNxMetaMessage *msg;
+ while (read(fd, &msg, sizeof(CCNxMetaMessage *)) == sizeof(CCNxMetaMessage *)) {
+ count++;
+ ccnxMetaMessage_Release(&msg);
+ }
+
+ if (DEBUG_OUTPUT) {
+ printf("%s destroyed %u messages\n", __func__, count);
+ }
+}
+
+/**
+ * This is a deferred callback from the RtaConnection when its last TransportMessage
+ * has been purged from the queues.
+ *
+ * Don't call anything inside here that ends up back in the RtaConnection.
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+void
+rtaFramework_RemoveConnection(RtaFramework *framework, RtaConnection *rtaConnection)
+{
+ rtaFramework_DrainApiDescriptor(rtaConnection_GetApiFd(rtaConnection));
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %p closing api_fd %d\n",
+ rtaFramework_GetTicks(framework),
+ __func__, (void *) rtaConnection, rtaConnection_GetApiFd(rtaConnection));
+ }
+
+ close(rtaConnection_GetApiFd(rtaConnection));
+ close(rtaConnection_GetTransportFd(rtaConnection));
+}
+
+void
+rtaFramework_DestroyProtocolHolder(RtaFramework *framework, FrameworkProtocolHolder *holder)
+{
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s proto_holder %p\n",
+ rtaFramework_GetTicks(framework),
+ __func__, (void *) holder);
+ }
+
+ // remove any and all connections associated with this protocol stack.
+ // If the connections still have packets floating around in queues, the connection
+ // will stay around until they all get flushed then will destroy on
+ // the last packet destruction
+ rtaConnectionTable_RemoveByStack(framework->connectionTable, holder->stack_id);
+
+ rtaProtocolStack_Destroy(&holder->stack);
+
+ TAILQ_REMOVE(&framework->protocols_head, holder, list);
+
+ parcMemory_Deallocate((void **) &holder);
+}
+
+
+static bool
+_rtaFramework_ExecuteDestroyStack(RtaFramework *framework, const RtaCommandDestroyProtocolStack *destroyStack)
+{
+ FrameworkProtocolHolder *holder = rtaFramework_GetProtocolStackByStackId(framework, rtaCommandDestroyProtocolStack_GetStackId(destroyStack));
+ assertNotNull(holder, "Could not find stack_id %d", rtaCommandDestroyProtocolStack_GetStackId(destroyStack));
+
+ rtaConnectionTable_RemoveByStack(framework->connectionTable, rtaCommandDestroyProtocolStack_GetStackId(destroyStack));
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s proto_holder %p\n",
+ rtaFramework_GetTicks(framework),
+ __func__, (void *) holder);
+ }
+
+ rtaFramework_DestroyProtocolHolder(framework, holder);
+
+ return true;
+}
+
+/**
+ * This will update the shared framework->status, so needs a lock around
+ * the work it does.
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static bool
+_rtaFramework_ExecuteShutdownFramework(RtaFramework *framework)
+{
+ FrameworkProtocolHolder *holder;
+
+ // %%% LOCK
+ rta_Framework_LockStatus(framework);
+ if (framework->status != FRAMEWORK_RUNNING) {
+ RtaFrameworkStatus status = framework->status;
+ rta_Framework_UnlockStatus(framework);
+ // %%% UNLOCK
+ assertTrue(0, "Invalid state, expected FRAMEWORK_RUNNING or later, got %d", status);
+ return -1;
+ }
+
+ holder = TAILQ_FIRST(&framework->protocols_head);
+ while (holder != NULL) {
+ FrameworkProtocolHolder *temp = TAILQ_NEXT(holder, list);
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s stack_id %d\n",
+ framework->clock_ticks, __func__, holder->stack_id);
+ }
+
+ rtaFramework_DestroyProtocolHolder(framework, holder);
+ holder = temp;
+ }
+
+ parcEventScheduler_Stop(framework->base, &(struct timeval) { .tv_sec = 0, .tv_usec = 1000 });
+ framework->status = FRAMEWORK_STOPPING;
+ rta_Framework_BroadcastStatus(framework);
+ rta_Framework_UnlockStatus(framework);
+ // %%% UNLOCK
+
+ return 0;
+}
+
+// Goes into rta_Framework_Commands.c
+static bool
+_rtaFramework_ExecuteTransmitStatistics(RtaFramework *framework, const RtaCommandTransmitStatistics *transmitStats)
+{
+ if (GlobalStatisticsFile != NULL) {
+ fclose(GlobalStatisticsFile);
+ }
+
+ GlobalStatisticsFile = fopen(rtaCommandTransmitStatistics_GetFilename(transmitStats), "a");
+ assertNotNull(GlobalStatisticsFile, "Failed to open %s", rtaCommandTransmitStatistics_GetFilename(transmitStats));
+
+ if (GlobalStatisticsFile != NULL) {
+ struct timeval period = rtaCommandTransmitStatistics_GetPeriod(transmitStats);
+ parcEventTimer_Start(framework->transmit_statistics_event, &period);
+ } else {
+ fprintf(stderr, "Will not report statistics: Failed to open %s for output.", rtaCommandTransmitStatistics_GetFilename(transmitStats));
+ }
+
+ return 0;
+}