/* * Copyright (c) 2017 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define __STDC_FORMAT_MACROS #include #define MAX_STACK_DEPTH 10 #ifndef DEBUG_OUTPUT #define DEBUG_OUTPUT 0 #endif const char *RtaComponentNames[LAST_COMPONENT] = { "API", // 0 "FC_NONE", "FC_VEGAS", "FC_PIPELINE", "VERIFY_NONE", // 4 "VERIFY_ENUMERATED", "VERIFY_LOCATOR", "CODEC_NONE", NULL, // 8 "CODEC_TLV", "CODEC_CCNB", "CODE_FLAN", NULL, // 12 "FWD_LOCAL", "FWD_FLAN", "FWD_CCND", // 15 "TESTING_UPPER", "TESTING_LOWER", // 17 "CCND_REGISTRAR", "FWD_METIS" }; struct protocol_stack { int stack_id; // used during configuration to indicate if configured int config_codec; RtaFramework *framework; // They key value pairs passed to open. The api must // keep this memory valid for as long as the connection is open PARCJSON *params; // the inter-component queues unsigned component_count; PARCEventQueuePair *queue_pairs[MAX_STACK_DEPTH]; RtaComponents components[MAX_STACK_DEPTH]; // queues assigned to components struct component_queues { PARCEventQueue *up; PARCEventQueue *down; } *component_queues[LAST_COMPONENT]; RtaComponentOperations component_ops[LAST_COMPONENT]; void *component_state[LAST_COMPONENT]; // stack-wide stats RtaComponentStats *stack_stats[LAST_COMPONENT]; // state change events are disabled during initial setup and teardown bool stateChangeEventsEnabled; }; static void set_queue_pairs(RtaProtocolStack *stack, RtaComponents comp_type); static int configure_ApiConnector(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops); static int configure_Component(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops); static int configure_FwdConnector(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops); // ======================================== RtaFramework * rtaProtocolStack_GetFramework(RtaProtocolStack *stack) { assertNotNull(stack, "called with null stack"); return stack->framework; } RtaProtocolStack * rtaProtocolStack_Create(RtaFramework *framework, PARCJSON *params, int stack_id) { RtaProtocolStack *stack = parcMemory_AllocateAndClear(sizeof(RtaProtocolStack)); assertNotNull(stack, "%9" PRIu64 " parcMemory_AllocateAndClear returned NULL\n", rtaFramework_GetTicks(stack->framework)); stack->stateChangeEventsEnabled = false; stack->params = parcJSON_Copy(params); assertNotNull(stack->params, "SYSTEM key is NULL in params"); assertNotNull(framework, "Parameter framework may not be null"); stack->framework = framework; stack->stack_id = stack_id; // create all the buffer pairs for (int i = 0; i < MAX_STACK_DEPTH; i++) { stack->queue_pairs[i] = parcEventQueue_CreateConnectedPair(rtaFramework_GetEventScheduler(stack->framework)); assertNotNull(stack->queue_pairs[i], "parcEventQueue_CreateConnectedPair returned NULL index %d", i); if (stack->queue_pairs[i] == NULL) { for (int j = 0; j < i; j++) { parcEventQueue_DestroyConnectedPair(&(stack->queue_pairs[j])); } parcMemory_Deallocate((void **) &stack); return NULL; } // set them all to normal priority. The command port is high priority. External buffes are low priority. parcEventQueue_SetPriority(parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[i]), PARCEventPriority_Normal); parcEventQueue_SetPriority(parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[i]), PARCEventPriority_Normal); if (DEBUG_OUTPUT) { printf("%9" PRIu64 " %s create buffer pair %p <-> %p\n", rtaFramework_GetTicks(rtaProtocolStack_GetFramework(stack)), __func__, (void *) parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[i]), (void *) parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[i])); } } for (int i = 0; i < LAST_COMPONENT; i++) { stack->stack_stats[i] = rtaComponentStats_Create(NULL, i); } if (DEBUG_OUTPUT) { printf("%9" PRIu64 " %s created stack %d at %p\n", rtaFramework_GetTicks(rtaProtocolStack_GetFramework(stack)), __func__, stack_id, (void *) stack); } stack->stateChangeEventsEnabled = true; return stack; } /** * Opens a connection inside the protocol stack: it calls open() on each component. * * Returns 0 on success, -1 on error * * Example: * @code * <#example#> * @endcode */ int rtaProtocolStack_Open(RtaProtocolStack *stack, RtaConnection *connection) { assertNotNull(stack, "called with null stack\n"); if (DEBUG_OUTPUT) { printf("%9" PRIu64 " %s stack_id %d opening conn %p api_fd %d\n", rtaFramework_GetTicks(rtaProtocolStack_GetFramework(stack)), __func__, stack->stack_id, (void *) connection, rtaConnection_GetApiFd(connection)); } // call all the opens, except the api // need to disable events during creation to avoid calling the event notifier // of a component before the component sees the "open" call for this connection stack->stateChangeEventsEnabled = false; for (int i = 0; i < stack->component_count; i++) { RtaComponents comp = stack->components[i]; if (stack->component_ops[comp].open != NULL && stack->component_ops[comp].open(connection) != 0) { fprintf(stderr, "%s component %d failed open\n", __func__, i); abort(); return -1; } } stack->stateChangeEventsEnabled = true; return 0; } /* * Closes a connection but does not touch stack->connection_head */ static int internal_Stack_Close(RtaProtocolStack *stack, RtaConnection *conn) { int i; if (DEBUG_OUTPUT) { printf("%9" PRIu64 " %s stack_id %d closing stack %p conn %p\n", rtaFramework_GetTicks(rtaConnection_GetFramework(conn)), __func__, stack->stack_id, (void *) stack, (void *) conn); } rtaConnection_SetState(conn, CONN_CLOSED); // call all the opens for (i = 0; i < stack->component_count; i++) { RtaComponents comp = stack->components[i]; if (DEBUG_OUTPUT) { printf("%9" PRIu64 " %s calling close for %s\n", rtaFramework_GetTicks(rtaConnection_GetFramework(conn)), __func__, RtaComponentNames[comp]); } if (stack->component_ops[comp].close != NULL && stack->component_ops[comp].close(conn) != 0) { fprintf(stderr, "%s component %d failed open\n", __func__, i); abort(); return -1; } } return 0; } /** * Calls the close() function of each component in the protocol stack. * * This is typically called from inside the API connector when it processes * a CLOSE json message. * Returns 0 success, -1 error. * * Example: * @code * <#example#> * @endcode */ int rtaProtocolStack_Close(RtaProtocolStack *stack, RtaConnection *conn) { assertNotNull(stack, "called with null stack\n"); assertNotNull(conn, "called with null connection\n"); if (DEBUG_OUTPUT) { printf("%9" PRIu64 " %s stack_id %d stack %p conn %p\n", rtaFramework_GetTicks(rtaConnection_GetFramework(conn)), __func__, stack->stack_id, (void *) stack, (void *) conn); } internal_Stack_Close(stack, conn); return 0; } /** * Calls the release() function of all components. * Drains all the component queues. * * This is called from rtaFramework_DestroyStack, who is responsible for closing * all the connections in it before calling this. * * Example: * @code * <#example#> * @endcode */ void rtaProtocolStack_Destroy(RtaProtocolStack **stackPtr) { RtaProtocolStack *stack; assertNotNull(stackPtr, "%s called with null pointer to stack\n", __func__); stack = *stackPtr; assertNotNull(stack, "%s called with null stack dereference\n", __func__); if (DEBUG_OUTPUT) { printf("%s stack_id %d destroying stack %p\n", __func__, stack->stack_id, (void *) stack); } stack->stateChangeEventsEnabled = false; // call all the release functions for (int i = 0; i < stack->component_count; i++) { RtaComponents comp = stack->components[i]; if (stack->component_ops[comp].release != NULL && stack->component_ops[comp].release(stack) != 0) { fprintf(stderr, "%s component %d failed release\n", __func__, i); abort(); } } for (int i = 0; i < MAX_STACK_DEPTH; i++) { TransportMessage *tm; while ((tm = rtaComponent_GetMessage(parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[i]))) != NULL) { assertFalse(1, "%s we should never execute the body, it should just drain\n", __func__); } while ((tm = rtaComponent_GetMessage(parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[i]))) != NULL) { assertFalse(1, "%s we should never execute the body, it should just drain\n", __func__); } if (DEBUG_OUTPUT) { printf("%9" PRIu64 " %s destroy buffer pair %p <-> %p\n", rtaFramework_GetTicks(rtaProtocolStack_GetFramework(stack)), __func__, (void *) parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[i]), (void *) parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[i])); } parcEventQueue_DestroyConnectedPair(&(stack->queue_pairs[i])); } for (int i = 0; i < LAST_COMPONENT; i++) { if (stack->component_queues[i]) { parcMemory_Deallocate((void **) &(stack->component_queues[i])); } } for (int i = 0; i < LAST_COMPONENT; i++) { rtaComponentStats_Destroy(&stack->stack_stats[i]); } parcJSON_Release(&stack->params); memset(stack, 0, sizeof(RtaProtocolStack)); parcMemory_Deallocate((void **) &stack); *stackPtr = NULL; } PARCEventQueue * rtaProtocolStack_GetPutQueue(RtaProtocolStack *stack, RtaComponents component, RtaDirection direction) { assertNotNull(stack, "%s called with null stack\n", __func__); if (direction == RTA_UP) { return stack->component_queues[component]->up; } else { return stack->component_queues[component]->down; } } /** * Look up the symbolic name of the queue. Do not free the return. * * Example: * @code * <#example#> * @endcode */ const char * rtaProtocolStack_GetQueueName(RtaProtocolStack *stack, PARCEventQueue *queue) { int component; for (component = 0; component <= LAST_COMPONENT; component++) { if (stack->component_queues[component]) { if (stack->component_queues[component]->up == queue) { return RtaComponentNames[component]; } if (stack->component_queues[component]->down == queue) { return RtaComponentNames[component]; } } } trapUnexpectedState("Could not find queue %p in stack %p", (void *) queue, (void *) stack); } // ================================================= // ================================================= static RtaComponents getComponentTypeFromName(const char *name) { int i; if (name == NULL) { return UNKNOWN_COMPONENT; } for (i = 0; i < LAST_COMPONENT; i++) { if (RtaComponentNames[i] != NULL) { if (strncasecmp(RtaComponentNames[i], name, 16) == 0) { return (RtaComponents) i; } } } return UNKNOWN_COMPONENT; } /** * Calls the confguration routine for each component in the stack * * Builds an array list of everything in the JSON configuration, then * calls its configuation routine. * * The connecting event queues are disabled at this point. * * @param [in,out] stack The Protocol Stack to operate on * * Example: * @code * <#example#> * @endcode */ static void rtaProtocolStack_ConfigureComponents(RtaProtocolStack *stack) { PARCArrayList *componentNameList; componentNameList = protocolStack_GetComponentNameArray(stack->params); assertTrue(parcArrayList_Size(componentNameList) < MAX_STACK_DEPTH, "Too many components in a stack size %zu\n", parcArrayList_Size(componentNameList)); for (int i = 0; i < parcArrayList_Size(componentNameList); i++) { // match it to a component type const char *comp_name = parcArrayList_Get(componentNameList, i); RtaComponents comp_type = getComponentTypeFromName(comp_name); // this could be sped up slightly by putting the ops structures // in an array switch (comp_type) { case API_CONNECTOR: configure_ApiConnector(stack, comp_type, api_ops); break; case FC_NONE: trapIllegalValue(comp_type, "Null flowcontroller no longer supported"); break; case FC_VEGAS: configure_Component(stack, comp_type, flow_vegas_ops); break; case FC_PIPELINE: abort(); break; case CODEC_NONE: trapIllegalValue(comp_type, "Null codec no longer supported"); break; case CODEC_TLV: configure_Component(stack, comp_type, codec_tlv_ops); break; case FWD_NONE: abort(); break; case FWD_LOCAL: configure_FwdConnector(stack, comp_type, fwd_local_ops); break; case FWD_METIS: configure_FwdConnector(stack, comp_type, fwd_metis_ops); break; case TESTING_UPPER: // fallthrough case TESTING_LOWER: configure_Component(stack, comp_type, testing_null_ops); break; default: fprintf(stderr, "%s unsupported component type %s\n", __func__, comp_name); abort(); } } parcArrayList_Destroy(&componentNameList); } static bool rtaProtocolStack_InitializeComponents(RtaProtocolStack *stack) { // Call all the inits for (int i = 0; i < LAST_COMPONENT; i++) { int res = 0; if (stack->component_ops[i].init != NULL) { res = stack->component_ops[i].init(stack); } if (res != 0) { fprintf(stderr, "%s opener for layer %d failed\n", __func__, i); trapUnrecoverableState("Error Initializing the components") return false; } } return true; } /** * Enables events on all the queues between components * * Enables events on each queue. * * @param [in,out] stack The PRotocol stack * * Example: * @code * <#example#> * @endcode */ static void rtaProtocolStack_EnableComponentQueues(RtaProtocolStack *stack) { // enable all the events on intermediate queues for (int i = 0; i < stack->component_count; i++) { RtaComponents component = stack->components[i]; PARCEventQueue *upQueue = stack->component_queues[component]->up; if (upQueue != NULL) { parcEventQueue_Enable(upQueue, PARCEventType_Read); } PARCEventQueue *downQueue = stack->component_queues[component]->down; if (downQueue != NULL) { parcEventQueue_Enable(downQueue, PARCEventType_Read); } } } /* * Called from transportRta_Open() * * Returns 0 for success, -1 on error (connection not made) */ int rtaProtocolStack_Configure(RtaProtocolStack *stack) { assertNotNull(stack, "%s called with null stack\n", __func__); rtaProtocolStack_ConfigureComponents(stack); bool initSuccess = rtaProtocolStack_InitializeComponents(stack); if (!initSuccess) { return -1; } rtaProtocolStack_EnableComponentQueues(stack); return 0; } /* * Domain is the top-level key, e.g. SYSTEM or USER */ PARCJSON * rtaProtocolStack_GetParam(RtaProtocolStack *stack, const char *domain, const char *key) { assertNotNull(stack, "%s called with null stack\n", __func__); assertNotNull(domain, "%s called with null domain\n", __func__); assertNotNull(key, "%s called with null key\n", __func__); PARCJSONValue *value = parcJSON_GetValueByName(stack->params, domain); assertNotNull(value, "Did not find domain %s in protocol stack parameters", domain); if (value == NULL) { return NULL; } PARCJSON *domainJson = parcJSONValue_GetJSON(value); value = parcJSON_GetValueByName(domainJson, key); assertNotNull(value, "Did not find key %s in protocol stack parameters", key); return parcJSONValue_GetJSON(value); } unsigned rtaProtocolStack_GetNextConnectionId(RtaProtocolStack *stack) { assertNotNull(stack, "Parameter stack must be a non-null RtaProtocolStack pointer."); return rtaFramework_GetNextConnectionId(stack->framework); } RtaComponentStats * rtaProtocolStack_GetStats(const RtaProtocolStack *stack, RtaComponents type) { assertTrue(type < LAST_COMPONENT, "invalid type %d\n", type); return stack->stack_stats[type]; } static void printSingleTuple(FILE *file, const struct timeval *timeval, const RtaProtocolStack *stack, RtaComponents componentType, RtaComponentStatType stat) { RtaComponentStats *stats = rtaProtocolStack_GetStats(stack, componentType); fprintf(file, "{ \"stackId\" : %d, \"component\" : \"%s\", \"name\" : \"%s\", \"value\" : %" PRIu64 ", \"timeval\" : %ld.%06u }\n", stack->stack_id, RtaComponentNames[componentType], rtaComponentStatType_ToString(stat), rtaComponentStats_Get(stats, stat), timeval->tv_sec, (unsigned) timeval->tv_usec ); } PARCArrayList * rtaProtocolStack_GetStatistics(const RtaProtocolStack *stack, FILE *file) { PARCArrayList *list = parcArrayList_Create(NULL); struct timeval timeval; gettimeofday(&timeval, NULL); // This does not fill in the array list for (int componentIndex = 0; componentIndex < stack->component_count; componentIndex++) { RtaComponents componentType = stack->components[componentIndex]; printSingleTuple(file, &timeval, stack, componentType, STATS_OPENS); printSingleTuple(file, &timeval, stack, componentType, STATS_CLOSES); printSingleTuple(file, &timeval, stack, componentType, STATS_UPCALL_IN); printSingleTuple(file, &timeval, stack, componentType, STATS_UPCALL_OUT); printSingleTuple(file, &timeval, stack, componentType, STATS_DOWNCALL_IN); printSingleTuple(file, &timeval, stack, componentType, STATS_DOWNCALL_OUT); } return list; } // ============================================= static void set_queue_pairs(RtaProtocolStack *stack, RtaComponents comp_type) { //PARCEventQueuePair *component_queues[LAST_COMPONENT]; // Save references to the OUTPUT queues used by a specific component. if (stack->component_queues[comp_type] == NULL) { stack->component_queues[comp_type] = parcMemory_AllocateAndClear(sizeof(struct component_queues)); } stack->component_queues[comp_type]->up = parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[stack->component_count - 1]); stack->component_queues[comp_type]->down = parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[stack->component_count]); // Set callbacks on the INPUT queues read by a specific component parcEventQueue_SetCallbacks(stack->component_queues[comp_type]->up, stack->component_ops[comp_type].downcallRead, NULL, stack->component_ops[comp_type].downcallEvent, (void *) stack); parcEventQueue_SetCallbacks(stack->component_queues[comp_type]->down, stack->component_ops[comp_type].upcallRead, NULL, stack->component_ops[comp_type].upcallEvent, (void *) stack); } static int configure_ApiConnector(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops) { if (stack->component_queues[comp_type] == NULL) { stack->component_queues[comp_type] = parcMemory_AllocateAndClear(sizeof(struct component_queues)); } assertNotNull(stack->component_queues[comp_type], "called with null component_queue"); assertNotNull(stack->queue_pairs[stack->component_count], "called with null queue_pair"); // This wires the bottom half of the API Connector to the streams. // It does not do the top half, which is in the connector's INIT stack->components[stack->component_count] = comp_type; stack->component_ops[comp_type] = ops; stack->component_queues[comp_type]->down = parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[stack->component_count]); parcEventQueue_SetCallbacks(stack->component_queues[comp_type]->down, stack->component_ops[comp_type].upcallRead, NULL, stack->component_ops[comp_type].upcallEvent, (void *) stack); stack->component_count++; return 0; } static int configure_Component(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops) { stack->component_ops[comp_type] = ops; stack->components[stack->component_count] = comp_type; set_queue_pairs(stack, comp_type); stack->component_count++; return 0; } static int configure_FwdConnector(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops) { stack->component_ops[comp_type] = ops; stack->components[stack->component_count] = comp_type; // We only set the upcall buffers. The down buffers // are controlled by the forwarder connector if (stack->component_queues[comp_type] == NULL) { stack->component_queues[comp_type] = parcMemory_AllocateAndClear(sizeof(struct component_queues)); } stack->component_queues[comp_type]->up = parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[stack->component_count - 1]); parcEventQueue_SetCallbacks(stack->component_queues[comp_type]->up, stack->component_ops[comp_type].downcallRead, NULL, stack->component_ops[comp_type].downcallEvent, (void *) stack); stack->component_count++; return 0; } int rtaProtocolStack_GetStackId(RtaProtocolStack *stack) { return stack->stack_id; } void rtaProtocolStack_ConnectionStateChange(RtaProtocolStack *stack, void *connection) { if (stack->stateChangeEventsEnabled) { for (int componentIndex = 0; componentIndex < stack->component_count; componentIndex++) { RtaComponents componentType = stack->components[componentIndex]; if (stack->component_ops[componentType].stateChange != NULL) { stack->component_ops[componentType].stateChange(connection); } } } }