diff options
author | Luca Muscariello <lumuscar+fdio@cisco.com> | 2017-02-23 20:44:26 +0100 |
---|---|---|
committer | Luca Muscariello <lumuscar+fdio@cisco.com> | 2017-02-23 19:51:14 +0000 |
commit | d18ae43123fcd7604d1c36a1ec8450dbe6071824 (patch) | |
tree | 2d49fc3aabd0f2607251c854565648d47b56b2e9 /libccnx-transport-rta/ccnx/transport/transport_rta/core | |
parent | 9b30fc10fb1cbebe651e5a107e8ca5b24de54675 (diff) |
Initial commit: ccnxlibs.
Change-Id: I1b376527a7dd01a6b9e083a6cb646955902f45c0
Signed-off-by: Luca Muscariello <lumuscar+fdio@cisco.com>
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/core')
40 files changed, 8030 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/components.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/components.h new file mode 100644 index 00000000..1a07bcf3 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/components.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// +// components.h +// Libccnx +// + + +#ifndef Libccnx_components_h +#define Libccnx_components_h + +// Every component in the system must be defined here +// These must correspond to array indicies. +typedef enum { + API_CONNECTOR = 0, + FC_NONE = 1, + FC_VEGAS = 2, + FC_PIPELINE = 3, + // vacant = 4, + // vacant = 5, + // vacant = 6, + CODEC_NONE = 7, + CODEC_UNSPEC = 8, + CODEC_TLV = 9, + // vacant = 10, + // vacant = 11, + FWD_NONE = 12, + FWD_LOCAL = 13, + // vacant = 14, + // vacant = 15, + TESTING_UPPER = 16, + TESTING_LOWER = 17, + FWD_METIS = 19, + LAST_COMPONENT = 20, // MUST ALWAYS BE LAST + UNKNOWN_COMPONENT // MUST BE VERY LAST +} RtaComponents; + + +// This is defied in rta_ProtocolStack.c and should be kept +// in sync with RtaComponents +extern const char *RtaComponentNames[LAST_COMPONENT]; + +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta.h new file mode 100644 index 00000000..75b53950 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// +// rta.h +// Libccnx +// +// + +#ifndef Libccnx_rta_h +#define Libccnx_rta_h + +#include "rta_Transport.h" +#include "rta_ProtocolStack.h" +#include "rta_Connection.h" +#include "rta_Component.h" +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.c new file mode 100644 index 00000000..4be0c085 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.c @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> + +#include <LongBow/runtime.h> + +#include <parc/algol/parc_EventBuffer.h> + +#include <ccnx/transport/common/transport_Message.h> +#include <ccnx/transport/transport_rta/rta_Transport.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/core/rta_Component.h> + + +#ifndef DEBUG_OUTPUT +#define DEBUG_OUTPUT 0 +#endif + +PARCEventQueue * +rtaComponent_GetOutputQueue(RtaConnection *conn, + RtaComponents component, + RtaDirection direction) +{ + RtaProtocolStack *stack; + + assertNotNull(conn, "called with null connection\n"); + + stack = rtaConnection_GetStack(conn); + assertNotNull(stack, "resolved null stack\n"); + + return rtaProtocolStack_GetPutQueue(stack, component, direction); +} + +int +rtaComponent_PutMessage(PARCEventQueue *queue, TransportMessage *tm) +{ + RtaConnection *conn = rtaConnection_GetFromTransport(tm); + assertNotNull(conn, "Got null connection from transport message\n"); + + if (rtaConnection_GetState(conn) != CONN_CLOSED) { + PARCEventBuffer *out = parcEventBuffer_GetQueueBufferOutput(queue); + int res; + + rtaConnection_IncrementMessagesInQueue(conn); + + if (DEBUG_OUTPUT) { + printf("%s queue %-12s tm %p\n", + __func__, + rtaProtocolStack_GetQueueName(rtaConnection_GetStack(conn), queue), + (void *) tm); + } + + res = parcEventBuffer_Append(out, (void *) &tm, sizeof(&tm)); + assertTrue(res == 0, "%s parcEventBuffer_Append returned error\n", __func__); + parcEventBuffer_Destroy(&out); + return 1; + } else { + // drop + transportMessage_Destroy(&tm); + + return 0; + } +} + +TransportMessage * +rtaComponent_GetMessage(PARCEventQueue *queue) +{ + PARCEventBuffer *in = parcEventBuffer_GetQueueBufferInput(queue); + + while (parcEventBuffer_GetLength(in) >= sizeof(TransportMessage *)) { + ssize_t len; + TransportMessage *tm; + RtaConnection *conn; + + len = parcEventBuffer_Read(in, (void *) &tm, sizeof(&tm)); + + assertTrue(len == sizeof(TransportMessage *), + "parcEventBuffer_Read returned error"); + + // Is the transport message for an open connection? + conn = rtaConnection_GetFromTransport(tm); + assertNotNull(conn, "%s GetInfo returnd null connection\n", __func__); + + if (DEBUG_OUTPUT) { + printf("%s queue %-12s tm %p\n", + __func__, + rtaProtocolStack_GetQueueName(rtaConnection_GetStack(conn), queue), + (void *) tm); + } + + (void) rtaConnection_DecrementMessagesInQueue(conn); + + if (rtaConnection_GetState(conn) != CONN_CLOSED) { + parcEventBuffer_Destroy(&in); + return tm; + } + + // it's a closed connection + + if (DEBUG_OUTPUT) { + printf("%s clearing connection %p reference in transport\n", + __func__, (void *) conn); + } + //drop + transportMessage_Destroy(&tm); + } + + parcEventBuffer_Destroy(&in); + return NULL; +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.h new file mode 100644 index 00000000..24efa6aa --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.h @@ -0,0 +1,258 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_Component.h + * @brief <#Brief Description#> + * + * A Component is a functional block within a protocol stack. It exists + * between the API Connector (at the top) and the Forwarder Connector + * (at the bottom). All components have a similar interface. The only + * slight variation is that components betwen the Forwarder Connector + * and the Codec deal in "wire" message formats, while components above + * the connector deal with "parsed" (CCNxMessage) formats. + * + * To write a component, follow these procedures: + * 1) add your component's name to components.h enum. This is the + * symbolic name you will use for it in the code. We'll call + * it PROTO_WIZ. + * 2) Copy a skeleton, such as component_Verifier.h for your header. + * Let's call it component_Wizard.h. Inside the header, you'll + * define the "operations" structure that's exported to the system. + * @code{.c} + **#ifndef Libccnx_component_wizard_h + **#define Libccnx_component_wizard_h + * + * // Function structs for component variations + * extern ComponentOperations proto_wizard_ops; + * + **#endif + * @endcode + * + * 3) Copy a skeleton, like component_Verifier_Null.c, for your + * implementation. Let's call it component_Wizard.c. Inside + * you must: + * a) instantiate proto_wizard_ops: + * @code{.c} + * static int component_Wizard_Init(ProtocolStack *stack); + * static int component_Wizard_Opener(RtaConnection *conn); + * static void component_Wizard_Upcall_Read(PARCEventQueue *, void *conn); + * static void component_Wizard_Downcall_Read(PARCEventQueue *, void *conn); + * static int component_Wizard_Closer(RtaConnection *conn); + * static int component_Wizard_Release(ProtocolStack *stack); + * + * ComponentOperations verify_null_ops = { + * component_Wizard_Init, + * component_Wizard_Opener, + * component_Wizard_Upcall_Read, + * NULL, + * component_Wizard_Downcall_Read, + * NULL, + * component_Wizard_Closer, + * component_Wizard_Release + * }; + * @endcode + * + * These define the interface your component exposes to the stack + * Init: called once on stack creation + * Open: called once per connection Open + * UpcallRead: Called when the "upward" buffer has something to read + * DowncallRead: Called when the "downward" buffer has something to read + * Closer: called once per connection Close + * Release: called on protocol stack destruction. + * + * Optionally, you may include UpcallEvents and DowncallEvents, but + * in general those are not useful. + * + * Any of the function pointers in the "ops" may be NULL. + * + * b) Implement your Init. If you need to create a stack-wide data structure + * to track state, you would do something like this, which allocates + * memory and sticks it away in component-specific storage in the stack. + * Notice that protocolStack_SetPrivateData takes our protocol's name + * PROTO_WIZ as a parameter. + * + * @code{.c} + * static int + * component_Wizard_Init(ProtocolStack *stack) + * { + * struct mydata *data = mydata_Create(); + * protocolStack_SetPrivateData(stack, PROTO_WIZ, data); + * return 0; + * } + * @endcode + * + * c) Implement your Opener. You will very likely want to keep per-connection + * state. This follows a similar method to the Init, but in a connection. + * We squirl away the connection-specific data similarly to the stack-wide + * data. In addition, it's good practice to fetch your component's Stats + * for the connection and increment the OPENS counter for a successful open. + * + * @code{.c} + * static int + * component_Wizard_Opener(RtaConnection *connection) + * { + * ComponentStats *stats; + * struct myState *mystate; + * + * parcMemory_AlocateAndClear(&mystate, sizeof(void *), sizeof(struct api_conn_state)); + * rtaConnection_SetPrivateData(connection, PROTO_WIZ, mystate); + * + * stats = rtaConnection_GetStats(connection, PROTO_WIZ); + * stats_Increment(stats, STATS_OPENS); + * return 0; + * } + * @endcode + * + * d) Implement your Close and Release. These perform the inverse + * of the Open and Init. They should fetch your private data, if + * any, and free it: + * @code{.c} + * static int + * component_Wizard_Closer(RtaConnection *conn) + * { + * ComponentStats *stats = rtaConnection_GetStats(conn, PROTO_WIZ); + * struct myState *mystate = rtaConnection_GetPrivateData(conn, PROTO_WIZ); + * + * stats_Increment(stats, STATS_CLOSES); + * myState_Destroy(&mystate); + * return 0; + * } + * + * static int + * component_Wizard_Release(ProtocolStack *stack) + * { + * ComponentStats *stats = protocoLStack_GetStats(stack, PROTO_WIZ); + * struct myData *mydata = protocolStack_GetPrivateData(stack, PROTO_WIZ); + * + * stats_Increment(stats, STATS_CLOSES); + * myData_Destroy(&mydata); + * return 0; + * } + * @endcode + * + * d) Implement your Read handlers. They are similar for the upcall + * and downcall handlers. The main issue to be aware of is that + * you must *drain* the queue on each call. The callback is edge + * triggered. + * + * Below we show an example of the Upcall read callback, which means + * there is data from below travelling up the stack. Therefore, we + * retrieve the RTA_UP output queue to pass messages up the stack. + * The while() loop is what drains the queue. + * + * Note also that "ptr" is a pointer to the ProtocolStack that owns + * connecition (what your Init was called with). The Connection information + * rides inside the transport message, and is retrieved with a call + * to transportMessage_GetInfo(). + * + * @code{.c} + * static void + * component_Wizard_Upcall_Read(PARCEventQueue *in, PARCEvent_EventType event, void *ptr) + * { + * ProtocolStack *stack = (ProtocolStack *) ptr; + * PARCEventQueue *out = protocoStack_GetPutQueue(stack, PROTO_WIZ, RTA_UP); + * TransportMessage *tm; + * + * while( (tm = rtaComponent_GetMessage(in)) != NULL ) + * { + * RtaConnection *conn = transportMessage_GetInfo(tm); + * ComponentStats *stats = rtaConnection_GetStats(conn, PROTO_WIZ); + * CCNxMessage *msg = TransportMessage_GetCcnxMessage(tm); + * + * stats_Increment(stats, STATS_UPCALL_IN); + * + * // do something with the CCNxMessage + * + * if( rtaComponent_PutMessage(out, tm) ) + * stats_Increment(stats, STATS_UPCALL_OUT); + * } + * } + * @endcode + * + * Example: + * @code + * <#example#> + * @endcode + * + */ +/** + */ + +#ifndef Libccnx_rta_component_h +#define Libccnx_rta_component_h + +#include "components.h" +#include "rta_ComponentQueue.h" +#include "rta_ComponentStats.h" + +/** + * Init: one time initialization on first instantiation (0 success, -1 failure) + * Open: Per connection open, returns valid descriptor or -1 on failure + * upcallRead: Callback when one or more messages are available + * downcallRead: Callback when one or more messages are available. + * xEvent: Called for events on the queue + * Close: Per connection close + * Release: One time release of state when whole stack taken down + * stateChagne: Called when there is a state change related to the connection + * + * Example: + * @code + * <#example#> + * @endcode + */ +typedef struct { + int (*init)(RtaProtocolStack *stack); + int (*open)(RtaConnection *conn); + void (*upcallRead)(PARCEventQueue *queue, PARCEventType events, void *stack); + void (*upcallEvent)(PARCEventQueue *queue, PARCEventQueueEventType events, void *stack); + void (*downcallRead)(PARCEventQueue *queue, PARCEventType events, void *stack); + void (*downcallEvent)(PARCEventQueue *queue, PARCEventQueueEventType events, void *stack); + int (*close)(RtaConnection *conn); + int (*release)(RtaProtocolStack *stack); + void (*stateChange)(RtaConnection *conn); +} RtaComponentOperations; + +extern PARCEventQueue *rtaComponent_GetOutputQueue(RtaConnection *conn, + RtaComponents component, + RtaDirection direction); + +/** + * Send a message between components. The API connector and Forwarder connector + * must set the connection information in the transport message with + * rtaConnection_SetInTransport(). + * + * returns 1 on success, 0 on failure + * + * Example: + * @code + * <#example#> + * @endcode + */ +extern int rtaComponent_PutMessage(PARCEventQueue *queue, TransportMessage *tm); + +/** + * Fetch a message from the queue. Will return NULL if no message + * is available. + * + * As a side effect, it will drain message on a closed connection. + * + * Example: + * @code + * <#example#> + * @endcode + */ +extern TransportMessage *rtaComponent_GetMessage(PARCEventQueue *queue); +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ComponentQueue.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ComponentQueue.h new file mode 100644 index 00000000..33294cbf --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ComponentQueue.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_ComponentQueue.h + * @brief <#Brief Description#> + * + * <#Detailed Description#> + * + */ +#ifndef Libccnx_rta_ComponentQueue_h +#define Libccnx_rta_ComponentQueue_h + +typedef enum { + RTA_UP = 0, + RTA_DOWN = 1 +} RtaDirection; +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ComponentStats.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ComponentStats.c new file mode 100644 index 00000000..3ae60a9c --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ComponentStats.c @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <LongBow/runtime.h> +#include <parc/algol/parc_Memory.h> +#include <ccnx/transport/transport_rta/core/rta_ComponentStats.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> + +struct rta_component_stats { + RtaProtocolStack *stack; + RtaComponents type; + uint64_t stats[STATS_LAST]; +}; + +char * +rtaComponentStatType_ToString(RtaComponentStatType statsType) +{ + switch (statsType) { + case STATS_OPENS: + return "opens"; + + case STATS_CLOSES: + return "closes"; + + case STATS_UPCALL_IN: + return "upcall_in"; + + case STATS_UPCALL_OUT: + return "upcall_out"; + + case STATS_DOWNCALL_IN: + return "downcall_in"; + + case STATS_DOWNCALL_OUT: + return "downcall_out"; + + default: + trapIllegalValue(statsType, "Unknown RtaComponentStatType %d", statsType); + } +} + +/** + * Its ok to call with null stack. that just means when we increment, we won't + * also increment stack-wide stats + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaComponentStats * +rtaComponentStats_Create(RtaProtocolStack *stack, RtaComponents componentType) +{ + RtaComponentStats *stats = parcMemory_AllocateAndClear(sizeof(RtaComponentStats)); + assertNotNull(stats, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(RtaComponentStats)); + assertTrue(componentType < LAST_COMPONENT, "invalid type %d\n", componentType); + + stats->stack = stack; + stats->type = componentType; + return stats; +} + +/* Increment and return incremented value */ +uint64_t +rtaComponentStats_Increment(RtaComponentStats *stats, RtaComponentStatType statsType) +{ + assertNotNull(stats, "%s dereferenced a null stats pointer\n", __func__); + assertFalse(statsType >= STATS_LAST, "%s incorrect stat type %d\n", __func__, statsType); + stats->stats[statsType]++; + + if (stats->stack != NULL) { + RtaComponentStats *stack_stats = rtaProtocolStack_GetStats(stats->stack, stats->type); + // if stack is not null, then we must get stats from it + assertNotNull(stack_stats, "%s got null stack stats\n", __func__); + stack_stats->stats[statsType]++; + } + + return stats->stats[statsType]; +} + +/* Return value */ +uint64_t +rtaComponentStats_Get(RtaComponentStats *stats, RtaComponentStatType statsType) +{ + assertNotNull(stats, "dereferenced a null stats pointer\n"); + assertFalse(statsType >= STATS_LAST, "incorrect stat statsType %d\n", statsType); + return stats->stats[statsType]; +} + +/* dump the stats to the given output */ +void +rtaComponentStats_Dump(RtaComponentStats *stats, FILE *output) +{ +} + +void +rtaComponentStats_Destroy(RtaComponentStats **statsPtr) +{ + RtaComponentStats *stats; + assertNotNull(statsPtr, "%s got null stats pointer\n", __func__); + + stats = *statsPtr; + assertNotNull(stats, "%s dereferenced a null stats pointer\n", __func__); + + memset(stats, 0, sizeof(RtaComponentStats)); + parcMemory_Deallocate((void **) &stats); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ComponentStats.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ComponentStats.h new file mode 100644 index 00000000..6db22a70 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ComponentStats.h @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_ComponentStats.h + * @brief <#Brief Description#> + * + * Statistics are PER CONNECTION PER COMPONENT. Therefore, a component would call + * rtaConnection_GetStats(conn, component) to access its stats. Each component must + * create its stats counter in _Open and free it in _Close. + * + * Each ProtocolStack has a PER STACK PER COMPONENT set of statistics too. When a + * component creates its stats in _Open, it passes a pointer to its stack, so when + * _Increment is called, it will increment both the component's stats and the stack's + * stats. + * + * For example: + * + * protocolStack_Init() creates stack-wide stats for each component type. + * componentX_Open(stack) creates per-connection stats for that component with + * a reference to stack using stats_Create(stack, component_type) + * componentX_Y(conn) performs some per-connection activity. It would call + * stats_Increment(rtaConnection_GetStats(conn), component_type, stat_type). + * That would increment the per-connection per-component stat and if the stack + * was not null, would increment the identical component_type, stat_type + * stat in the per-stack per-component counters. + * + * + * + */ +#ifndef Libccnx_rta_ComponentStats +#define Libccnx_rta_ComponentStats + +#include <ccnx/transport/transport_rta/core/components.h> + +struct protocol_stack; + +struct rta_component_stats; +/** + * + * @see stats_Create + */ +typedef struct rta_component_stats RtaComponentStats; + +typedef enum { + STATS_OPENS, + STATS_CLOSES, + STATS_UPCALL_IN, + STATS_UPCALL_OUT, + STATS_DOWNCALL_IN, + STATS_DOWNCALL_OUT, + STATS_LAST // must be last +} RtaComponentStatType; + +/** + * Create a stats component + * + * If the optional stack is specified, its statistics will be incremented whenever this + * stats object is incremented. Otherwise, it may be NULL. + * + * @param [in] stack Optional protocol stack + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +RtaComponentStats *rtaComponentStats_Create(struct protocol_stack *stack, RtaComponents componentType); + +/** + * <#OneLineDescription#> + * + * <#Discussion#> + * + * @param <#param1#> + * @return <#return#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +char *rtaComponentStatType_ToString(RtaComponentStatType statType); + +/** + * Increment and return incremented value + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +uint64_t rtaComponentStats_Increment(RtaComponentStats *stats, RtaComponentStatType statType); + +/** + * Return value + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +uint64_t rtaComponentStats_Get(RtaComponentStats *stats, RtaComponentStatType statType); + +/** + * dump the stats to the given output + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +void rtaComponentStats_Dump(RtaComponentStats *stats, FILE *output); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +void rtaComponentStats_Destroy(RtaComponentStats **statsPtr); +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Connection.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Connection.c new file mode 100644 index 00000000..455fed8d --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Connection.c @@ -0,0 +1,383 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> + +#include <LongBow/runtime.h> + +#include <parc/algol/parc_Memory.h> +#include <ccnx/transport/common/transport_Message.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework_Commands.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/core/rta_Component.h> + +#include <ccnx/api/notify/notify_Status.h> +#include <ccnx/api/control/cpi_ControlFacade.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#ifdef DEBUG_OUTPUT +#undef DEBUG_OUTPUT +#endif + +#define DEBUG_OUTPUT 0 + +// SPEW will dump stack traces on reference count events +#define SPEW 0 + +struct rta_connection { + RtaProtocolStack *stack; + RtaFramework *framework; + + // unique id for this connection + unsigned connid; + + // opaque component-specific data and their closers + void *component_data[LAST_COMPONENT]; + RtaComponentStats *component_stats[LAST_COMPONENT]; + + RtaConnectionStateType connState; + + unsigned messages_in_queue; + unsigned refcount; + + PARCJSON *params; + + // api_fd is used in status messages up to the user + // transport_fd is used by the API connector to talk w/ API. + int api_fd; + int transport_fd; + + // is the connection blocked in the given direction? + bool blocked_down; + bool blocked_up; +}; + +RtaComponentStats * +rtaConnection_GetStats(RtaConnection *conn, RtaComponents component) +{ + assertNotNull(conn, "called with null connection\n"); + return conn->component_stats[component]; +} + +RtaConnection * +rtaConnection_Create(RtaProtocolStack *stack, const RtaCommandOpenConnection *cmdOpen) +{ + int i; + RtaConnection *conn = parcMemory_AllocateAndClear(sizeof(RtaConnection)); + assertNotNull(conn, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(RtaConnection)); + + conn->stack = stack; + conn->framework = rtaProtocolStack_GetFramework(stack); + conn->connid = rtaProtocolStack_GetNextConnectionId(stack); + conn->connState = CONN_OPEN; + conn->api_fd = rtaCommandOpenConnection_GetApiNotifierFd(cmdOpen); + conn->transport_fd = rtaCommandOpenConnection_GetTransportNotifierFd(cmdOpen); + + conn->params = parcJSON_Copy(rtaCommandOpenConnection_GetConfig(cmdOpen)); + conn->refcount = 1; + + conn->blocked_down = false; + conn->blocked_up = false; + + for (i = 0; i < LAST_COMPONENT; i++) { + conn->component_stats[i] = rtaComponentStats_Create(stack, i); + } + + if (DEBUG_OUTPUT) { + fprintf(stderr, "%9" PRIu64 " %s connection %p refcount %d\n", + rtaFramework_GetTicks(conn->framework), __func__, (void *) conn, conn->refcount); + if (SPEW) { + longBowRuntime_StackTrace(STDERR_FILENO); + } + + char *p = parcJSON_ToString(conn->params); + printf("Connection configuration: %s\n", p); + parcMemory_Deallocate((void **) &p); + } + + return conn; +} + +RtaConnection * +rtaConnection_Copy(RtaConnection *original) +{ + assertNotNull(original, "Called with null parameter"); + original->refcount++; + if (DEBUG_OUTPUT) { + fprintf(stderr, "%9" PRIu64 " %s connection %p refcount %d\n", + rtaFramework_GetTicks(original->framework), __func__, (void *) original, original->refcount); + if (SPEW) { + longBowRuntime_StackTrace(STDERR_FILENO); + } + } + + return original; +} + +void +rtaConnection_FreeFunc(void **voidPtr) +{ + rtaConnection_Destroy((RtaConnection **) voidPtr); +} + +void +rtaConnection_Destroy(RtaConnection **connPtr) +{ + int i; + RtaConnection *conn; + assertNotNull(connPtr, "called with null connection pointer\n"); + conn = *connPtr; + assertNotNull(conn, "called with null connection\n"); + assertTrue(conn->refcount > 0, "Called with 0 refcount, invalid state"); + + conn->refcount--; + if (conn->refcount > 0) { + if (DEBUG_OUTPUT) { + fprintf(stderr, "%9" PRIu64 " %s connection %p skipped, refcount %u\n", + rtaFramework_GetTicks(conn->framework), __func__, (void *) conn, conn->refcount); + if (SPEW) { + longBowRuntime_StackTrace(STDERR_FILENO); + } + } + return; + } + + assertTrue(conn->messages_in_queue == 0, "called when messages are still queued\n"); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %p\n", rtaFramework_GetTicks(conn->framework), __func__, (void *) conn); + if (SPEW) { + longBowRuntime_StackTrace(STDERR_FILENO); + } + } + + // Ok, at this point there's nothing left in queue, so we can + // get rid of the container now + + for (i = 0; i < LAST_COMPONENT; i++) { + rtaComponentStats_Destroy(&conn->component_stats[i]); + } + + rtaFramework_RemoveConnection(conn->framework, conn); + parcJSON_Release(&conn->params); + parcMemory_Deallocate((void **) &conn); + *connPtr = NULL; +} + +RtaProtocolStack * +rtaConnection_GetStack(RtaConnection *conn) +{ + assertNotNull(conn, "called with null connection\n"); + return conn->stack; +} + +/* + * Used to store per-connection state from Open. + * Should be freed in Close, but you don't need to set it NULL. + */ +void +rtaConnection_SetPrivateData(RtaConnection *conn, + RtaComponents component, + void *private) +{ + assertNotNull(conn, "called with null connection\n"); + conn->component_data[component] = private; +} + +/* + * Used to store per-connection state from Open + */ +void * +rtaConnection_GetPrivateData(RtaConnection *conn, + RtaComponents component) +{ + assertNotNull(conn, "called with null connection\n"); + return conn->component_data[component]; +} + +RtaConnectionStateType +rtaConnection_GetState(RtaConnection *conn) +{ + assertNotNull(conn, "called with null connection\n"); + return conn->connState; +} + +void +rtaConnection_SetState(RtaConnection *conn, RtaConnectionStateType connState) +{ + assertNotNull(conn, "called with null connection\n"); + conn->connState = connState; + rtaProtocolStack_ConnectionStateChange(conn->stack, conn); +} + +/* + * returns number in queue, including this one + */ +unsigned +rtaConnection_IncrementMessagesInQueue(RtaConnection *conn) +{ + assertNotNull(conn, "called with null connection\n"); + assertTrue(conn->connState != CONN_CLOSED, "%s called when connection closed\n", __func__); + conn->messages_in_queue++; + return conn->messages_in_queue; +} + +unsigned +rtaConnection_DecrementMessagesInQueue(RtaConnection *conn) +{ + assertNotNull(conn, "called with null connection\n"); + assertTrue(conn->messages_in_queue > 0, "Trying to decrement a queue with 0 messages already"); + + conn->messages_in_queue--; + return conn->messages_in_queue; +} + +int +rtaConnection_GetApiFd(RtaConnection *conn) +{ + assertNotNull(conn, "called with null connection\n"); + return conn->api_fd; +} + + +int +rtaConnection_GetTransportFd(RtaConnection *conn) +{ + assertNotNull(conn, "called with null connection\n"); + return conn->transport_fd; +} + +int +rtaConnection_GetStackId(RtaConnection *conn) +{ + return rtaProtocolStack_GetStackId(conn->stack); +} + +unsigned +rtaConnection_MessagesInQueue(RtaConnection *conn) +{ + assertNotNull(conn, "called with null connection\n"); + return conn->messages_in_queue; +} + +unsigned +rtaConnection_GetConnectionId(const RtaConnection *conn) +{ + assertNotNull(conn, "called with null connection\n"); + return conn->connid; +} + +void +rtaConnection_SendNotifyStatus(RtaConnection *conn, RtaComponents component, RtaDirection direction, const NotifyStatus *status) +{ + PARCJSON *json = notifyStatus_ToJSON(status); + + CCNxTlvDictionary *notification = ccnxControlFacade_CreateNotification(json); + parcJSON_Release(&json); + + TransportMessage *tm = transportMessage_CreateFromDictionary(notification); + ccnxTlvDictionary_Release(¬ification); + + PARCEventQueue *out = rtaComponent_GetOutputQueue(conn, component, direction); + + transportMessage_SetInfo(tm, rtaConnection_Copy(conn), rtaConnection_FreeFunc); + rtaComponent_PutMessage(out, tm); +} + +void +rtaConnection_SendStatus(RtaConnection *conn, + RtaComponents component, + RtaDirection direction, + NotifyStatusCode code, + CCNxName *optionalName, + const char *optionalMessage) +{ + NotifyStatus *status = notifyStatus_Create(conn->api_fd, code, optionalName, optionalMessage); + rtaConnection_SendNotifyStatus(conn, component, direction, status); + notifyStatus_Release(&status); +} + +RtaConnection * +rtaConnection_GetFromTransport(TransportMessage *tm) +{ + return (RtaConnection *) transportMessage_GetInfo(tm); +} + +RtaFramework * +rtaConnection_GetFramework(const RtaConnection *connection) +{ + assertNotNull(connection, "called with null connection"); + return connection->framework; +} + +PARCJSON * +rtaConnection_GetParameters(RtaConnection *conn) +{ + assertNotNull(conn, "called with null connection"); + return conn->params; +} + +bool +rtaConnection_BlockedDown(const RtaConnection *connection) +{ + assertNotNull(connection, "Parameter connection must be non-null"); + return (connection->connState != CONN_OPEN) || connection->blocked_down; +} + +bool +rtaConnection_BlockedUp(const RtaConnection *connection) +{ + assertNotNull(connection, "Parameter connection must be non-null"); + return (connection->connState != CONN_OPEN) || connection->blocked_up; +} + +void +rtaConnection_SetBlockedDown(RtaConnection *connection) +{ + assertNotNull(connection, "Parameter connection must be non-null"); + connection->blocked_down = true; + rtaProtocolStack_ConnectionStateChange(connection->stack, connection); +} + +void +rtaConnection_ClearBlockedDown(RtaConnection *connection) +{ + assertNotNull(connection, "Parameter connection must be non-null"); + connection->blocked_down = false; + rtaProtocolStack_ConnectionStateChange(connection->stack, connection); +} + +void +rtaConnection_SetBlockedUp(RtaConnection *connection) +{ + assertNotNull(connection, "Parameter connection must be non-null"); + connection->blocked_up = true; + rtaProtocolStack_ConnectionStateChange(connection->stack, connection); +} + +void +rtaConnection_ClearBlockedUp(RtaConnection *connection) +{ + assertNotNull(connection, "Parameter connection must be non-null"); + connection->blocked_up = false; + rtaProtocolStack_ConnectionStateChange(connection->stack, connection); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Connection.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Connection.h new file mode 100644 index 00000000..8619ef96 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Connection.h @@ -0,0 +1,457 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_Connection.h + * @brief <#Brief Description#> + * + * A connection embodies an API connection to the forwarder. Multiple + * connections are multiplexed over one stack. A connection, however, + * is largely independent of a particular stack. All the RTA connections + * are stored in RtaConnectionTable, which is managed by the Framework. + * + * A problem arises using queues between components, because there may + * be messages in queue that cannot be free'd without slogging through + * all the queues. + * + * Therefore, a connection tracks the number of messages in queue and + * will not be freed until all messages in queue are flushed. + * + * A connection carries an "isopen" flag. If it is false, no new + * messages can go in to the connection. Any message dequeued that + * references a closed connection discarded. + * + * Once the connection reaches 0 messages in queue, if it is closed, + * it is elegible for garbage collection. componentServq will call + * the _Destroy() method. Destroy() only works if the refcount for + * the connection is 0. If the ProtocolStack still has a reference + * to the connection, the connection will not be destroyed until + * the protocol stack calls Destroy. + * + * A Connection may live longer than its protocol stack. In the _Destroy, + * it should not make reference to the protocol stack. + * + */ +#ifndef Libccnx_Rta_Connection_h +#define Libccnx_Rta_Connection_h + +#include <sys/queue.h> +#include <ccnx/transport/common/transport.h> +#include <ccnx/transport/transport_rta/core/components.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_ComponentStats.h> +#include <ccnx/transport/transport_rta/commands/rta_CommandOpenConnection.h> + +#include <ccnx/api/notify/notify_Status.h> + +struct rta_connection; +/** + * + * @see rtaConnection_Create + */ +typedef struct rta_connection RtaConnection; + +typedef enum { + CONN_OPEN, + CONN_CLOSED, + CONN_PAUSED +} RtaConnectionStateType; + +/** + * Create a connection and set the refcount to 1. If the connection + * pointer is stored by multiple entities, they should call + * IncrementRefcount. Calling _Destroy() decrements the refcount. + * + * transport_fd is our side of the data socketpair provided by rtaTransport. + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaConnection *rtaConnection_Create(RtaProtocolStack *stack, const RtaCommandOpenConnection *cmdOpen); + +/** + * Get a reference counted copy + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaConnection *rtaConnection_Copy(RtaConnection *original); + +/** + * Destroys the object if this call decrements the refcount to 0. + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaConnection_Destroy(RtaConnection **connPtr); + +/** + * Same as _Destroy, but for using in a TransportMessage Info. + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaConnection_FreeFunc(void **voidPtr); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +RtaProtocolStack *rtaConnection_GetStack(RtaConnection *connection); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +RtaFramework *rtaConnection_GetFramework(const RtaConnection *connection); + +/** + * + * Used to store per-connection state from Open. + * Should be freed in Close, but you don't need to set it NULL. + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +void rtaConnection_SetPrivateData(RtaConnection *connection, RtaComponents component, void *private); + +/** + * Used to store per-connection state from Open + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +void *rtaConnection_GetPrivateData(RtaConnection *connection, RtaComponents component); + +/** + * Returns the connection state (open, paused, closed) + * + * <#Discussion#> + * + * @param <#param1#> + * @return <#return#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaConnectionStateType rtaConnection_GetState(RtaConnection *connection); + +/** + * Sets the connection state + * + * The API connector manages the connection state. open means all messages + * may flow. Paused means no new messages flow. closed means all existing + * messages will be destroyed. + * + * @param <#param1#> + * @return <#return#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaConnection_SetState(RtaConnection *connection, RtaConnectionStateType state); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +RtaComponentStats *rtaConnection_GetStats(RtaConnection *connection, RtaComponents component); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +unsigned rtaConnection_IncrementMessagesInQueue(RtaConnection *connection); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +unsigned rtaConnection_DecrementMessagesInQueue(RtaConnection *connection); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +unsigned rtaConnection_MessagesInQueue(RtaConnection *connection); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +unsigned rtaConnection_GetConnectionId(const RtaConnection *connection); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +int rtaConnection_GetStackId(RtaConnection *connection); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +int rtaConnection_GetApiFd(RtaConnection *connection); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +int rtaConnection_GetTransportFd(RtaConnection *connection); + +/** + * Creates a status message (see ccnx/api/notify) and sends it up or down the stack. + * + * <#Discussion#> + * + * @param <#param1#> + * @return <#return#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +extern void rtaConnection_SendStatus(RtaConnection *connection, + RtaComponents component, + RtaDirection direction, + NotifyStatusCode code, + CCNxName *optionalName, + const char *optionalMessage); + +/** + * Creates a status message (see ccnx/api/notify) and sends it up or down the stack. + * + * <#Discussion#> + * + * @param <#param1#> + * @return <#return#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaConnection *rtaConnection_GetFromTransport(TransportMessage *tm); + +/** + * Creates a status message (see ccnx/api/notify) and sends it up or down the stack. + * + * <#Discussion#> + * + * @param connection + * @return <#return#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +PARCJSON *rtaConnection_GetParameters(RtaConnection *connection); + +/** + * Is the connection blocked in the down direction? + * + * Will return true if the connection is not open (DOWN or PAUSED state) or if the + * given direction is blocked. + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return true Connection blocked, will not accept any more packets in down direction + * @return false Connection not blocked in down direction + * + * Example: + * @code + * <#example#> + * @endcode + */ +bool rtaConnection_BlockedDown(const RtaConnection *connection); + +/** + * Is the connection blocked in the up direction? + * + * Will return true if the connection is not open (DOWN or PAUSED state) or if the + * given direction is blocked. + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return true Connection blocked, will not accept any more packets in up direction + * @return false Connection not blocked in up direction + * + * Example: + * @code + * <#example#> + * @endcode + */ +bool rtaConnection_BlockedUp(const RtaConnection *connection); + +void rtaConnection_SetBlockedDown(RtaConnection *connection); +void rtaConnection_ClearBlockedDown(RtaConnection *connection); + +void rtaConnection_SetBlockedUp(RtaConnection *connection); +void rtaConnection_ClearBlockedUp(RtaConnection *connection); +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ConnectionTable.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ConnectionTable.c new file mode 100644 index 00000000..2b1e8d7e --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ConnectionTable.c @@ -0,0 +1,250 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Uses a linked list right now, but should be hash tables on the keys we use. + */ +#include <config.h> +#include <stdio.h> +#include <sys/queue.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <LongBow/runtime.h> +#include <parc/algol/parc_Memory.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h> +#include <ccnx/transport/transport_rta/core/rta_ConnectionTable.h> + +#define DEBUG_OUTPUT 0 + +typedef struct rta_connection_entry { + RtaConnection *connection; + + TAILQ_ENTRY(rta_connection_entry) list; +} RtaConnectionEntry; + +struct rta_connection_table { + size_t max_elements; + size_t count_elements; + TableFreeFunc *freefunc; + TAILQ_HEAD(, rta_connection_entry) head; +}; + + +/** + * Create a connection table of the given size + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaConnectionTable * +rtaConnectionTable_Create(size_t elements, TableFreeFunc *freefunc) +{ + RtaConnectionTable *table = parcMemory_AllocateAndClear(sizeof(RtaConnectionTable)); + assertNotNull(table, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(RtaConnectionTable)); + TAILQ_INIT(&table->head); + table->max_elements = elements; + table->count_elements = 0; + table->freefunc = freefunc; + return table; +} + +/** + * Destroy the connection table, and it will call rtaConnection_Destroy() + * on each connection in the table. + * + * Example: + * @code + * <#example#> + * @endcode + */ +void +rtaConnectionTable_Destroy(RtaConnectionTable **tablePtr) +{ + RtaConnectionTable *table; + + assertNotNull(tablePtr, "Called with null parameter"); + table = *tablePtr; + assertNotNull(table, "Called with parameter that dereferences to null"); + + while (!TAILQ_EMPTY(&table->head)) { + RtaConnectionEntry *entry = TAILQ_FIRST(&table->head); + TAILQ_REMOVE(&table->head, entry, list); + if (table->freefunc) { + table->freefunc(&entry->connection); + } + parcMemory_Deallocate((void **) &entry); + } + + parcMemory_Deallocate((void **) &table); + *tablePtr = NULL; +} + +/** + * Add a connetion to the table. Stores the reference provided (does not copy). + * Returns 0 on success, -1 on error + * + * Example: + * @code + * <#example#> + * @endcode + */ +int +rtaConnectionTable_AddConnection(RtaConnectionTable *table, RtaConnection *connection) +{ + assertNotNull(table, "Called with null parameter RtaConnectionTable"); + assertNotNull(connection, "Called with null parameter RtaConnection"); + + if (table->count_elements < table->max_elements) { + table->count_elements++; + RtaConnectionEntry *entry = parcMemory_AllocateAndClear(sizeof(RtaConnectionEntry)); + assertNotNull(entry, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(RtaConnectionEntry)); + entry->connection = connection; + TAILQ_INSERT_TAIL(&table->head, entry, list); + return 0; + } + return -1; +} + +/** + * Lookup a connection. + * Returns NULL if not found + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaConnection * +rtaConnectionTable_GetByApiFd(RtaConnectionTable *table, int api_fd) +{ + assertNotNull(table, "Called with null parameter RtaConnectionTable"); + + RtaConnectionEntry *entry; + TAILQ_FOREACH(entry, &table->head, list) + { + if (rtaConnection_GetApiFd(entry->connection) == api_fd) { + return entry->connection; + } + } + return NULL; +} + +/** + * Lookup a connection. + * Returns NULL if not found + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaConnection * +rtaConnectionTable_GetByTransportFd(RtaConnectionTable *table, int transport_fd) +{ + assertNotNull(table, "Called with null parameter RtaConnectionTable"); + + RtaConnectionEntry *entry; + TAILQ_FOREACH(entry, &table->head, list) + { + if (rtaConnection_GetTransportFd(entry->connection) == transport_fd) { + return entry->connection; + } + } + return NULL; +} + + +/** + * Remove a connection from the table, calling rtaConnection_Destroy() on it. + * Returns 0 on success, -1 if not found (or error) + * + * Example: + * @code + * <#example#> + * @endcode + */ +int +rtaConnectionTable_Remove(RtaConnectionTable *table, RtaConnection *connection) +{ + assertNotNull(table, "Called with null parameter RtaConnectionTable"); + assertNotNull(connection, "Called with null parameter RtaConnection"); + + RtaConnectionEntry *entry; + TAILQ_FOREACH(entry, &table->head, list) + { + if (entry->connection == connection) { + assertTrue(table->count_elements > 0, "Invalid state, found an entry, but count_elements is zero"); + table->count_elements--; + TAILQ_REMOVE(&table->head, entry, list); + if (table->freefunc) { + table->freefunc(&entry->connection); + } + parcMemory_Deallocate((void **) &entry); + return 0; + } + } + return -1; +} + +/** + * Remove all connections in a given stack_id, calling rtaConnection_Destroy() on it. + * Returns 0 on success, -1 if not found (or error) + * + * Example: + * @code + * <#example#> + * @endcode + */ +int +rtaConnectionTable_RemoveByStack(RtaConnectionTable *table, int stack_id) +{ + assertNotNull(table, "Called with null parameter RtaConnectionTable"); + + RtaConnectionEntry *entry = TAILQ_FIRST(&table->head); + while (entry != NULL) { + RtaConnectionEntry *temp = TAILQ_NEXT(entry, list); + if (rtaConnection_GetStackId(entry->connection) == stack_id) { + assertTrue(table->count_elements > 0, "Invalid state, found an entry, but count_elements is zero"); + table->count_elements--; + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 "%s stack_id %d conn %p\n", + rtaFramework_GetTicks(rtaConnection_GetFramework(entry->connection)), + __func__, + stack_id, + (void *) entry->connection); + } + + TAILQ_REMOVE(&table->head, entry, list); + if (table->freefunc) { + table->freefunc(&entry->connection); + } + + if (DEBUG_OUTPUT) { + printf("%9s %s FREEFUNC RETURNS\n", + " ", __func__); + } + + parcMemory_Deallocate((void **) &entry); + } + entry = temp; + } + return 0; +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ConnectionTable.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ConnectionTable.h new file mode 100644 index 00000000..5cb1cb3e --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ConnectionTable.h @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_ConnectionTable.h + * @brief Data structure of connections. It is managed by rtaFramework. + * + */ + +#ifndef Libccnx_rta_ConnectionTable_h +#define Libccnx_rta_ConnectionTable_h + +#include "rta_Connection.h" + +struct rta_connection_table; +typedef struct rta_connection_table RtaConnectionTable; + +typedef void (TableFreeFunc)(RtaConnection **connection); + +/** + * Create a connection table of the given size. Whenever a + * connection is removed, the freefunc is called. Be sure that + * does not in turn call back in to the connection table. + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaConnectionTable *rtaConnectionTable_Create(size_t elements, TableFreeFunc *freefunc); + +/** + * Destroy the connection table, and it will call freefunc() + * on each connection in the table. + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaConnectionTable_Destroy(RtaConnectionTable **tablePtr); + +/** + * Add a connetion to the table. Stores the reference provided (does not copy). + * Returns 0 on success, -1 on error + * + * Example: + * @code + * <#example#> + * @endcode + */ +int rtaConnectionTable_AddConnection(RtaConnectionTable *table, RtaConnection *connection); + +/** + * Lookup a connection. + * Returns NULL if not found + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaConnection *rtaConnectionTable_GetByApiFd(RtaConnectionTable *table, int api_fd); + +/** + * Lookup a connection. + * Returns NULL if not found + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaConnection *rtaConnectionTable_GetByTransportFd(RtaConnectionTable *table, int transport_fd); + +/** + * Remove a connection from the table, calling freefunc() on it. + * Returns 0 on success, -1 if not found (or error) + * + * Example: + * @code + * <#example#> + * @endcode + */ +int rtaConnectionTable_Remove(RtaConnectionTable *table, RtaConnection *connection); + +/** + * Remove all connections in a given stack_id, calling freefunc() on it. + * Returns 0 on success, -1 if not found (or error) + * + * Example: + * @code + * <#example#> + * @endcode + */ +int rtaConnectionTable_RemoveByStack(RtaConnectionTable *table, int stack_id); +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.c new file mode 100644 index 00000000..ada629e0 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.c @@ -0,0 +1,469 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This module implements the _Create(), _Start(), and _Destroy() methods. + * It also has various utilities for timers and events. + * + * The command channel is processed in rta_Framework_Commands.c. + * + * Example: + * @code + * <#example#> + * @endcode + */ + +#include <config.h> +#include <stdio.h> +#include <unistd.h> + +#include <errno.h> + +#include <string.h> +#include <fcntl.h> +#include <sys/socket.h> +#include <sys/time.h> + +#include <parc/algol/parc_EventSignal.h> + +#include <LongBow/runtime.h> + +#include <parc/algol/parc_Memory.h> +#include <parc/algol/parc_ArrayList.h> +#include <parc/logging/parc_LogReporterTextStdout.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/core/rta_Component.h> +#include <ccnx/transport/transport_rta/core/rta_ConnectionTable.h> +#include <ccnx/transport/common/transport_Message.h> +#include <ccnx/transport/common/transport_private.h> + +#include <ccnx/transport/transport_rta/connectors/connector_Api.h> +#include <ccnx/transport/transport_rta/connectors/connector_Forwarder.h> +#include <ccnx/transport/transport_rta/components/component_Codec.h> +#include <ccnx/transport/transport_rta/components/component_Flowcontrol.h> + +#include <ccnx/transport/transport_rta/commands/rta_CommandTransmitStatistics.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#ifndef DEBUG_OUTPUT +#define DEBUG_OUTPUT 0 +#endif + +#include "rta_Framework_Commands.h" + +// =================================================== + +// event callbacks +static void _signal_cb(int signalNumber, PARCEventType event, void *arg); +static void _tick_cb(int, PARCEventType, void *); +static void transmitStatisticsCallback(int fd, PARCEventType what, void *user_data); + + +// =========================================== +// Public API (create, start, destroy) +// stop are done via the command channel +// start cannot be done via the command channel, as its not running until after start. + +void +rta_Framework_LockStatus(RtaFramework *framework) +{ + int res = pthread_mutex_lock(&framework->status_mutex); + assertTrue(res == 0, "error from pthread_mutex_lock: %d", res); +} + +void +rta_Framework_UnlockStatus(RtaFramework *framework) +{ + int res = pthread_mutex_unlock(&framework->status_mutex); + assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res); +} + +void +rta_Framework_WaitStatus(RtaFramework *framework) +{ + int res = pthread_cond_wait(&framework->status_cv, &framework->status_mutex); + assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res); +} + +void +rta_Framework_BroadcastStatus(RtaFramework *framework) +{ + int res = pthread_cond_broadcast(&framework->status_cv); + assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res); +} + + +/** + * This is called whenever the connection table wants to free a connection. + * It should call the protocol stack's closers on the connection, then + * destroy the connection. It is called either (a) inside the worker thread, + * or (b) after the worker thread has stopped, so no locking needed. + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void +rtaFramework_ConnectionTableFreeFunc(RtaConnection **connectionPtr) +{ + RtaConnection *connection; + assertNotNull(connectionPtr, "Called with null double pointer"); + connection = *connectionPtr; + assertNotNull(connection, "Parameter must not dereference to null"); + + if (rtaConnection_GetState(connection) != CONN_CLOSED) { + rtaFramework_CloseConnection(rtaConnection_GetFramework(connection), connection); + } + + rtaConnection_Destroy(connectionPtr); +} + +static void +_signal_cb(int signalNumber, PARCEventType event, void *arg) +{ +} + +static void +rtaFramework_InitializeEventScheduler(RtaFramework *framework) +{ + framework->base = parcEventScheduler_Create(); + assertNotNull(framework->base, "Could not initialize event scheduler!"); + + framework->signal_pipe = parcEventSignal_Create(framework->base, SIGPIPE, PARCEventType_Signal | PARCEventType_Persist, _signal_cb, framework); + parcEventSignal_Start(framework->signal_pipe); + + if (gettimeofday(&framework->starttime, NULL) != 0) { + perror("Error getting time of day"); + trapUnexpectedState("Could not read gettimeofday"); + } +} + +static void +rtaFramework_SetupMillisecondTimer(RtaFramework *framework) +{ + struct timeval wtnow_timeout; + + // setup a milli-second timer + wtnow_timeout.tv_sec = 0; + wtnow_timeout.tv_usec = 1000000 / WTHZ; + + framework->tick_event = parcEventTimer_Create( + framework->base, + PARCEventType_Persist, + _tick_cb, + (void *) framework); + + parcEventTimer_Start(framework->tick_event, &wtnow_timeout); +} + +static void +rtaFramework_CreateCommandChannel(RtaFramework *framework) +{ + int fd = parcNotifier_Socket(framework->commandNotifier); + + // setup a PARCEventQueue for command_fd + + // Set non-blocking flag + int flags = fcntl(fd, F_GETFL, NULL); + assertFalse(flags == -1, "fcntl failed to obtain file descriptor flags (%d)", errno); + int res = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + assertTrue(res == 0, "rtaFramework_Create failed to set socket non-blocking: %s", strerror(errno)); + + framework->commandEvent = parcEvent_Create(framework->base, fd, PARCEventType_Read | PARCEventType_Persist, rtaFramework_CommandCallback, (void *) framework); + + // The command port is the highest priority + parcEvent_SetPriority(framework->commandEvent, PARCEventPriority_Maximum); + + parcEvent_Start(framework->commandEvent); + + // the notifier socket is now ready to fire +} + +/* + * Until things get plumbed from above via control messages, we will use + * environment variables in the form "RtaFacility_facility=level" with a special facility "RtaFacility_All". + * The "All" is processed first, then more specific facilities, so one could set all to a default + * level then set specific ones to over-ride. + * + * Default log level is Error + * + * Strings: + * RtaFacility_Framework + * RtaFacility_Api + * RtaFacility_Flowcontrol + * RtaFacility_Codec + * RtaFacility_Forwarder + */ +static void +_setLogLevels(RtaFramework *framework) +{ + for (int i = 0; i < RtaLoggerFacility_END; i++) { + rtaLogger_SetLogLevel(framework->logger, i, PARCLogLevel_Error); + } + + char *levelString = getenv("RtaFacility_All"); + if (levelString) { + PARCLogLevel level = parcLogLevel_FromString(levelString); + if (level != PARCLogLevel_All) { + for (int i = 0; i < RtaLoggerFacility_END; i++) { + rtaLogger_SetLogLevel(framework->logger, i, level); + } + } + } + + // no do specific facilities + char buffer[1024]; + for (int i = 0; i < RtaLoggerFacility_END; i++) { + snprintf(buffer, 1024, "RtaFacility_%s", rtaLogger_FacilityString(i)); + levelString = getenv(buffer); + if (levelString) { + PARCLogLevel level = parcLogLevel_FromString(levelString); + if (level != PARCLogLevel_All) { + rtaLogger_SetLogLevel(framework->logger, i, level); + } + } + } +} + +/** + * Create a framework. This is a thread-safe function. + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaFramework * +rtaFramework_Create(PARCRingBuffer1x1 *commandRingBuffer, PARCNotifier *commandNotifier) +{ + RtaFramework *framework = parcMemory_AllocateAndClear(sizeof(RtaFramework)); + assertNotNull(framework, "RtaFramework parcMemory_AllocateAndClear returned null"); + + PARCLogReporter *reporter = parcLogReporterTextStdout_Create(); + framework->logger = rtaLogger_Create(reporter, parcClock_Monotonic()); + parcLogReporter_Release(&reporter); + + _setLogLevels(framework); + + // setup the event scheduler + + // mutes, condition variable, and protected state for starting + // and stopping the event thread from an outside thread. + pthread_mutex_init(&framework->status_mutex, NULL); + pthread_cond_init(&framework->status_cv, NULL); + framework->status = FRAMEWORK_INIT; + + framework->commandRingBuffer = parcRingBuffer1x1_Acquire(commandRingBuffer); + framework->commandNotifier = parcNotifier_Acquire(commandNotifier); + + framework->connid_next = 1; + TAILQ_INIT(&framework->protocols_head); + + //TODO: make 16384 configurable. + framework->connectionTable = rtaConnectionTable_Create(16384, rtaFramework_ConnectionTableFreeFunc); + assertNotNull(framework->connectionTable, "Could not allocate conneciton table"); + + rtaFramework_InitializeEventScheduler(framework); + + rtaFramework_SetupMillisecondTimer(framework); + + framework->transmit_statistics_event = parcEventTimer_Create(framework->base, + PARCEventType_Persist, + transmitStatisticsCallback, + (void *) framework); + + + rtaFramework_CreateCommandChannel(framework); + + if (rtaLogger_IsLoggable(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Info)) { + rtaLogger_Log(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Info, __func__, + "framework %p created", (void *) framework); + } + + return framework; +} + +static void +rtaFramework_DestroyEventScheduler(RtaFramework *framework) +{ + parcEventTimer_Destroy(&(framework->tick_event)); + parcEventTimer_Destroy(&(framework->transmit_statistics_event)); + + if (framework->signal_int != NULL) { + parcEventSignal_Destroy(&(framework->signal_int)); + } + if (framework->signal_usr1 != NULL) { + parcEventSignal_Destroy(&(framework->signal_usr1)); + } + + parcEvent_Destroy(&(framework->commandEvent)); + parcNotifier_Release(&framework->commandNotifier); + parcRingBuffer1x1_Release(&framework->commandRingBuffer); + + parcEventSignal_Destroy(&(framework->signal_pipe)); + parcEventScheduler_Destroy(&(framework->base)); +} + +void +rtaFramework_Destroy(RtaFramework **frameworkPtr) +{ + RtaFramework *framework; + + assertNotNull(frameworkPtr, "Parameter must be non-null RtaFramework double pointer"); + framework = *frameworkPtr; + assertNotNull(framework, "Parameter must dereference to non-Null RtaFramework pointer"); + + rtaLogger_Log(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Info, __func__, + "framework %p destroy", (void *) framework); + + // status can be STOPPED or INIT. It's ok to destroy one that's never been started. + + // %%%% LOCK + rta_Framework_LockStatus(framework); + assertTrue(framework->status == FRAMEWORK_SHUTDOWN || + framework->status == FRAMEWORK_INIT || + framework->status == FRAMEWORK_TEARDOWN, + "Framework invalid state, got %d", + framework->status); + rta_Framework_UnlockStatus(framework); + // %%%% UNLOCK + + rtaConnectionTable_Destroy(&framework->connectionTable); + + rtaFramework_DestroyEventScheduler(framework); + + rtaLogger_Release(&framework->logger); + + parcMemory_Deallocate((void **) &framework); + + *frameworkPtr = NULL; +} + +RtaLogger * +rtaFramework_GetLogger(RtaFramework *framework) +{ + return framework->logger; +} + +/** + * May block briefly, returns the current status of the framework. + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaFrameworkStatus +rtaFramework_GetStatus(RtaFramework *framework) +{ + RtaFrameworkStatus status; + // %%%% LOCK + rta_Framework_LockStatus(framework); + status = framework->status; + rta_Framework_UnlockStatus(framework); + // %%%% UNLOCK + return status; +} + +/** + * Blocks until the framework status equals or exeeds the desired status + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaFrameworkStatus +rtaFramework_WaitForStatus(RtaFramework *framework, + RtaFrameworkStatus status) +{ + // %%%% LOCK + rta_Framework_LockStatus(framework); + while (framework->status < status) { + rta_Framework_WaitStatus(framework); + } + rta_Framework_UnlockStatus(framework); + // %%%% UNLOCK + + return status; +} + +// ================================================================= + +// Transport Operations + +PARCEventScheduler * +rtaFramework_GetEventScheduler(RtaFramework *framework) +{ + assertNotNull(framework, "Parameter must be non-NULL RtaFramework"); + return framework->base; +} + +unsigned +rtaFramework_GetNextConnectionId(RtaFramework *framework) +{ + assertNotNull(framework, "Parameter must be non-NULL RtaFramework"); + + return framework->connid_next++; +} + +// ============================ +// Internal functions + +/* + * This is dispatched from the event loop, so its a loosely accurate time + */ +static void +_tick_cb(int fd, PARCEventType what, void *user_data) +{ + RtaFramework *framework = (RtaFramework *) user_data; + assertTrue(what & PARCEventType_Timeout, "%s got unknown signal %d", __func__, what); + framework->clock_ticks++; + + if (framework->killme) { + int res; + + if (rtaLogger_IsLoggable(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Debug)) { + rtaLogger_Log(framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Debug, __func__, + "framework %p exiting base loop", (void *) framework); + } + + res = parcEventScheduler_Abort(framework->base); + assertTrue(res == 0, "error on parcEventScheduler_Abort: %d", res); + } +} + +FILE *GlobalStatisticsFile = NULL; + +static void +transmitStatisticsCallback(int fd, PARCEventType what, void *user_data) +{ + RtaFramework *framework = (RtaFramework *) user_data; + assertTrue(what & PARCEventType_Timeout, "unknown signal %d", what); + + FrameworkProtocolHolder *holder; + TAILQ_FOREACH(holder, &framework->protocols_head, list) + { + RtaProtocolStack *stack = holder->stack; + PARCArrayList *list = rtaProtocolStack_GetStatistics(stack, GlobalStatisticsFile); + parcArrayList_Destroy(&list); + } +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.h new file mode 100644 index 00000000..fbd698d1 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework.h @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_Framework.h + * @brief <#Brief Description#> + * + * rtaFramework executes inside the worker thread in callback from the event scheduler. + * + * It provides service functions to components and connectors so they do not need + * to be event aware. + * + * It also manages the command channel to communicate with rtaTransport in the API's thread. + * + * _Create(), _Start(), and _Destroy() are called from the API's thread. You should not + * call _Destroy until rtaFramework_GetStatus() is FRAMEWORK_SHUTDOWN. + * + * The framework can run in threaded mode or non-threaded mode. Including this one + * header gives you both sets of operations, but they are not compatible. + * + * THREADED MODE: + * call _Create + * call _Start + * ... do work ... + * call _Shutdown + * call _Destroy + * + * NON-THREADED MODE + * call _Create + * ... do work ... + * call _Step or _StepCount or _StepTimed + * ... do work ... + * call _Step or _StepCount or _StepTimed + * ... do work ... + * call _Teardown + * call _Destroy + * + */ +#ifndef Libccnx_rta_Framework_h +#define Libccnx_rta_Framework_h + +#include <parc/concurrent/parc_RingBuffer_1x1.h> +#include <parc/concurrent/parc_Notifier.h> +#include <ccnx/transport/transport_rta/core/rta_Logger.h> + +// =================================== +// External API, used by rtaTransport + +struct rta_framework; +typedef struct rta_framework RtaFramework; + +#define RTA_MAX_PRIORITY 0 +#define RTA_NORMAL_PRIORITY 1 +#define RTA_MIN_PRIORITY 2 + +/** + * Transient states: STARTING, STOPPING. You don't want to block waiting for those + * as you could easily miss them + * + * Example: + * @code + * <#example#> + * @endcode + */ +typedef enum { + FRAMEWORK_INIT = 0, /** Initial status after Create() * + * Example: + * @code + * <#example#> + * @endcode + */ + FRAMEWORK_SETUP = 1, /** Configured in non-threaded mode * + * Example: + * @code + * <#example#> + * @endcode + */ + + FRAMEWORK_STARTING = 2, /** Between calling _Start() and the thread running * + * Example: + * @code + * <#example#> + * @endcode + */ + FRAMEWORK_RUNNING = 3, /** After event scheduler thread starts * + * Example: + * @code + * <#example#> + * @endcode + */ + FRAMEWORK_STOPPING = 4, /** When shutdown is finished, but before event scheduler exists * + * Example: + * @code + * <#example#> + * @endcode + */ + + FRAMEWORK_TEARDOWN = 5, /** After cleanup from SETUP * + * Example: + * @code + * <#example#> + * @endcode + */ + FRAMEWORK_SHUTDOWN = 6, /** After event scheduler exits * + * Example: + * @code + * <#example#> + * @endcode + */ +} RtaFrameworkStatus; + +/** + * Creates the framework context, but does not start the worker thread. + * <code>command_fd</code> is the socketpair or pipe (one-way is ok) over which + * RTATransport will send commands. + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaFramework *rtaFramework_Create(PARCRingBuffer1x1 *commandRingBuffer, PARCNotifier *commandNotifier); + + +void rtaFramework_Destroy(RtaFramework **frameworkPtr); + +/** + * Returns the Logging system used by the framework + * + * <#Paragraphs Of Explanation#> + * + * @param [in] framework An allocated RtaFramework + * + * @retval non-null The Logging system + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaLogger *rtaFramework_GetLogger(RtaFramework *framework); + +/** + * May block briefly, returns the current status of the framework. + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaFrameworkStatus rtaFramework_GetStatus(RtaFramework *framework); + +/** + * Blocks until the framework status equals or exeeds the desired status + * Transient states: STARTING, STOPPING. You don't want to block waiting for those + * as you could easily miss them + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaFrameworkStatus rtaFramework_WaitForStatus(RtaFramework *framework, + RtaFrameworkStatus status); + + +#include "rta_Framework_Threaded.h" +#include "rta_Framework_NonThreaded.h" +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.c new file mode 100644 index 00000000..a02efefa --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.c @@ -0,0 +1,450 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include <stdio.h> +#include <unistd.h> +#include <fcntl.h> +#include <string.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <errno.h> +#include <LongBow/runtime.h> + +#include <parc/algol/parc_Memory.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework_private.h> + +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/commands/rta_Command.h> + +#include <parc/algol/parc_Event.h> + +#ifdef DEBUG_OUTPUT +#undef DEBUG_OUTPUT +#endif + +#define DEBUG_OUTPUT 0 + +extern FILE *GlobalStatisticsFile; + +static bool _rtaFramework_ExecuteCreateStack(RtaFramework *framework, const RtaCommandCreateProtocolStack *createStack); +static bool _rtaFramework_ExecuteDestroyStack(RtaFramework *framework, const RtaCommandDestroyProtocolStack *destroyStack); +static bool _rtaFramework_ExecuteOpenConnection(RtaFramework *framework, const RtaCommandOpenConnection *openConnection); +static bool _rtaFramework_ExecuteCloseConnection(RtaFramework *framework, const RtaCommandCloseConnection *closeConnection); +static bool _rtaFramework_ExecuteTransmitStatistics(RtaFramework *framework, const RtaCommandTransmitStatistics *transmitStats); +static bool _rtaFramework_ExecuteShutdownFramework(RtaFramework *framework); + +static void rtaFramework_DrainApiDescriptor(int fd); + +void +rtaFramework_CommandCallback(int fd, PARCEventType what, void *user_framework) +{ + RtaFramework *framework = (RtaFramework *) user_framework; + + // flag the notifier that we are starting a batch of reads + parcNotifier_PauseEvents(framework->commandNotifier); + + RtaCommand *command = NULL; + while ((command = rtaCommand_Read(framework->commandRingBuffer)) != NULL) { + // The shutdown command can broadcast a change of state before the function + // returns, so we need to free the RtaCommand before executing the shutdown. + // Therefore, we include the rtaCommand_Destroy() as part of the switch. + + if (rtaCommand_IsOpenConnection(command)) { + _rtaFramework_ExecuteOpenConnection(framework, rtaCommand_GetOpenConnection(command)); + rtaCommand_Release(&command); + } else if (rtaCommand_IsCloseConnection(command)) { + _rtaFramework_ExecuteCloseConnection(framework, rtaCommand_GetCloseConnection(command)); + rtaCommand_Release(&command); + } else if (rtaCommand_IsCreateProtocolStack(command)) { + _rtaFramework_ExecuteCreateStack(framework, rtaCommand_GetCreateProtocolStack(command)); + rtaCommand_Release(&command); + } else if (rtaCommand_IsDestroyProtocolStack(command)) { + _rtaFramework_ExecuteDestroyStack(framework, rtaCommand_GetDestroyProtocolStack(command)); + rtaCommand_Release(&command); + } else if (rtaCommand_IsTransmitStatistics(command)) { + _rtaFramework_ExecuteTransmitStatistics(framework, rtaCommand_GetTransmitStatistics(command)); + rtaCommand_Release(&command); + } else if (rtaCommand_IsShutdownFramework(command)) { + // release the command before executing shutdown + rtaCommand_Release(&command); + _rtaFramework_ExecuteShutdownFramework(framework); + } else { + rtaCommand_Display(command, 3); + rtaCommand_Release(&command); + trapUnexpectedState("Got unknown command type"); + } + } + + // resume notifications + parcNotifier_StartEvents(framework->commandNotifier); +} + +// ========================================= +// Internal command processing + +/** + * Create a protocol holder and insert it in the framework's + * protocols_head list. + * + * Example: + * @code + * <#example#> + * @endcode + */ +static FrameworkProtocolHolder * +rtaFramework_CreateProtocolHolder(RtaFramework *framework, PARCJSON *params, uint64_t kv_hash, int stack_id) +{ + // request for a new protocol stack, create it + FrameworkProtocolHolder *holder = parcMemory_AllocateAndClear(sizeof(FrameworkProtocolHolder)); + assertNotNull(holder, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(FrameworkProtocolHolder)); + + TAILQ_INSERT_TAIL(&framework->protocols_head, holder, list); + + holder->kv_hash = kv_hash; + holder->stack_id = stack_id; + + if (DEBUG_OUTPUT) { + printf("%s created protocol holder %p hash %" PRIu64 "\n", + __func__, + (void *) holder, + kv_hash); + } + + return holder; +} + +/** + * Lookup the existing protocol holder for stackid. + * Returns NULL if not found. + * + * Example: + * @code + * <#example#> + * @endcode + */ +static FrameworkProtocolHolder * +rtaFramework_GetProtocolStackByStackId(RtaFramework *framework, int stack_id) +{ + FrameworkProtocolHolder *holder; + TAILQ_FOREACH(holder, &framework->protocols_head, list) + { + if (holder->stack_id == stack_id) { + return holder; + } + } + return NULL; +} + +static bool +_rtaFramework_ExecuteCreateStack(RtaFramework *framework, const RtaCommandCreateProtocolStack *createStack) +{ + // if we're in INIT mode, we need to bump + // wait for notificaiton from event thread + if (framework->status == FRAMEWORK_INIT) { + rta_Framework_LockStatus(framework); + if (framework->status == FRAMEWORK_INIT) { + framework->status = FRAMEWORK_SETUP; + } + rta_Framework_BroadcastStatus(framework); + rta_Framework_UnlockStatus(framework); + } + + FrameworkProtocolHolder *holder = + rtaFramework_GetProtocolStackByStackId(framework, rtaCommandCreateProtocolStack_GetStackId(createStack)); + assertNull(holder, "Found a holder with stack_id %d, but we're asked to create it!", + rtaCommandCreateProtocolStack_GetStackId(createStack)); + + uint64_t kv_hash = ccnxStackConfig_HashCode(rtaCommandCreateProtocolStack_GetStackConfig(createStack)); + + // this creates it and inserts in framework->protocols_head + holder = rtaFramework_CreateProtocolHolder(framework, NULL, kv_hash, rtaCommandCreateProtocolStack_GetStackId(createStack)); + + holder->stack = + rtaProtocolStack_Create(framework, rtaCommandCreateProtocolStack_GetConfig(createStack), rtaCommandCreateProtocolStack_GetStackId(createStack)); + rtaProtocolStack_Configure(holder->stack); + + if (DEBUG_OUTPUT) { + printf("%s created protocol %p kv_hash %016" PRIX64 " stack_id %d\n", + __func__, (void *) holder->stack, kv_hash, rtaCommandCreateProtocolStack_GetStackId(createStack)); + } + return 0; +} + +static bool +_rtaFramework_ExecuteOpenConnection(RtaFramework *framework, const RtaCommandOpenConnection *openConnection) +{ + int res; + FrameworkProtocolHolder *holder; + RtaConnection *rtaConnection; + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s framework %p\n", + rtaFramework_GetTicks(framework), __func__, (void *) framework); + } + + holder = rtaFramework_GetProtocolStackByStackId(framework, rtaCommandOpenConnection_GetStackId(openConnection)); + assertNotNull(holder, "Could not find stack_id %d", rtaCommandOpenConnection_GetStackId(openConnection)); + + rtaConnection = rtaConnectionTable_GetByApiFd(framework->connectionTable, rtaCommandOpenConnection_GetApiNotifierFd(openConnection)); + assertNull(rtaConnection, "Found api_fd %d, but it should not exist!", rtaCommandOpenConnection_GetApiNotifierFd(openConnection)); + + rtaConnection = rtaConnection_Create(holder->stack, openConnection); + res = rtaConnectionTable_AddConnection(framework->connectionTable, rtaConnection); + assertTrue(res == 0, "Got error from rtaConnectionTable_AddConnection: %d", res); + + res = rtaProtocolStack_Open(holder->stack, rtaConnection); + assertTrue(res == 0, "Got error from rtaProtocolStack_Open: %d", res); + + rtaConnection_SetState(rtaConnection, CONN_OPEN); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s created connection %p stack_id %d api_fd %d transport_fd %d\n", + rtaFramework_GetTicks(framework), + __func__, + (void *) rtaConnection, + rtaCommandOpenConnection_GetStackId(openConnection), + rtaCommandOpenConnection_GetApiNotifierFd(openConnection), + rtaCommandOpenConnection_GetTransportNotifierFd(openConnection)); + } + + return true; +} + + +/** + * Mark a connection as closed. If there are no pending + * packets in queues, destroy it too. + * It's non-static because we call from rta_Framework.c + * + * Example: + * @code + * <#example#> + * @endcode + */ +int +rtaFramework_CloseConnection(RtaFramework *framework, RtaConnection *connection) +{ + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %p api_fd %d\n", + rtaFramework_GetTicks(framework), + __func__, (void *) connection, rtaConnection_GetApiFd(connection)); + } + + assertFalse(rtaConnection_GetState(connection) == CONN_CLOSED, + "connection api_fd %d is already closed", rtaConnection_GetApiFd(connection)); + + rtaConnection_SetState(connection, CONN_CLOSED); + rtaProtocolStack_Close(rtaConnection_GetStack(connection), connection); + + rtaFramework_DrainApiDescriptor(rtaConnection_GetApiFd(connection)); + + + // Remove it from the connection table, which will free our reference to it. + + rtaConnectionTable_Remove(framework->connectionTable, connection); + + // Done. The rtaConnection will be removed when the last queued + // messages for it are gone. We keep the connection holder, so if we do + // a Destroy we'll know about it. RtaConnection will call + // rtaFramework_RemoveConnection(...) when RtaConnection_Destroy() refcount + // is zero and it's going to fully remove the connection. + + return 0; +} + + +static bool +_rtaFramework_ExecuteCloseConnection(RtaFramework *framework, const RtaCommandCloseConnection *closeConnection) +{ + RtaConnection *connection = rtaConnectionTable_GetByApiFd(framework->connectionTable, rtaCommandCloseConnection_GetApiNotifierFd(closeConnection)); + assertNotNull(connection, "Could not find api_fd %d", rtaCommandCloseConnection_GetApiNotifierFd(closeConnection)); + + return (rtaFramework_CloseConnection(framework, connection) == 0); +} + +/** + * When the transport is closing the descriptor + * to the API, it should call this to drain any pending but unretrieved + * messages out of the API's side of the socket + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void +rtaFramework_DrainApiDescriptor(int fd) +{ + unsigned count = 0; + + if (DEBUG_OUTPUT) { + printf("%s fd %d\n", __func__, fd); + } + + // Set non-blocking flag + int flags = fcntl(fd, F_GETFL, NULL); + assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno); + int failure = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + assertFalse(failure, "fcntl failed to set file descriptor flags (%d)\n", errno); + + // Now drain the user side of stuff they have not read + CCNxMetaMessage *msg; + while (read(fd, &msg, sizeof(CCNxMetaMessage *)) == sizeof(CCNxMetaMessage *)) { + count++; + ccnxMetaMessage_Release(&msg); + } + + if (DEBUG_OUTPUT) { + printf("%s destroyed %u messages\n", __func__, count); + } +} + +/** + * This is a deferred callback from the RtaConnection when its last TransportMessage + * has been purged from the queues. + * + * Don't call anything inside here that ends up back in the RtaConnection. + * + * Example: + * @code + * <#example#> + * @endcode + */ +void +rtaFramework_RemoveConnection(RtaFramework *framework, RtaConnection *rtaConnection) +{ + rtaFramework_DrainApiDescriptor(rtaConnection_GetApiFd(rtaConnection)); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %p closing api_fd %d\n", + rtaFramework_GetTicks(framework), + __func__, (void *) rtaConnection, rtaConnection_GetApiFd(rtaConnection)); + } + + close(rtaConnection_GetApiFd(rtaConnection)); + close(rtaConnection_GetTransportFd(rtaConnection)); +} + +void +rtaFramework_DestroyProtocolHolder(RtaFramework *framework, FrameworkProtocolHolder *holder) +{ + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s proto_holder %p\n", + rtaFramework_GetTicks(framework), + __func__, (void *) holder); + } + + // remove any and all connections associated with this protocol stack. + // If the connections still have packets floating around in queues, the connection + // will stay around until they all get flushed then will destroy on + // the last packet destruction + rtaConnectionTable_RemoveByStack(framework->connectionTable, holder->stack_id); + + rtaProtocolStack_Destroy(&holder->stack); + + TAILQ_REMOVE(&framework->protocols_head, holder, list); + + parcMemory_Deallocate((void **) &holder); +} + + +static bool +_rtaFramework_ExecuteDestroyStack(RtaFramework *framework, const RtaCommandDestroyProtocolStack *destroyStack) +{ + FrameworkProtocolHolder *holder = rtaFramework_GetProtocolStackByStackId(framework, rtaCommandDestroyProtocolStack_GetStackId(destroyStack)); + assertNotNull(holder, "Could not find stack_id %d", rtaCommandDestroyProtocolStack_GetStackId(destroyStack)); + + rtaConnectionTable_RemoveByStack(framework->connectionTable, rtaCommandDestroyProtocolStack_GetStackId(destroyStack)); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s proto_holder %p\n", + rtaFramework_GetTicks(framework), + __func__, (void *) holder); + } + + rtaFramework_DestroyProtocolHolder(framework, holder); + + return true; +} + +/** + * This will update the shared framework->status, so needs a lock around + * the work it does. + * + * Example: + * @code + * <#example#> + * @endcode + */ +static bool +_rtaFramework_ExecuteShutdownFramework(RtaFramework *framework) +{ + FrameworkProtocolHolder *holder; + + // %%% LOCK + rta_Framework_LockStatus(framework); + if (framework->status != FRAMEWORK_RUNNING) { + RtaFrameworkStatus status = framework->status; + rta_Framework_UnlockStatus(framework); + // %%% UNLOCK + assertTrue(0, "Invalid state, expected FRAMEWORK_RUNNING or later, got %d", status); + return -1; + } + + holder = TAILQ_FIRST(&framework->protocols_head); + while (holder != NULL) { + FrameworkProtocolHolder *temp = TAILQ_NEXT(holder, list); + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s stack_id %d\n", + framework->clock_ticks, __func__, holder->stack_id); + } + + rtaFramework_DestroyProtocolHolder(framework, holder); + holder = temp; + } + + parcEventScheduler_Stop(framework->base, &(struct timeval) { .tv_sec = 0, .tv_usec = 1000 }); + framework->status = FRAMEWORK_STOPPING; + rta_Framework_BroadcastStatus(framework); + rta_Framework_UnlockStatus(framework); + // %%% UNLOCK + + return 0; +} + +// Goes into rta_Framework_Commands.c +static bool +_rtaFramework_ExecuteTransmitStatistics(RtaFramework *framework, const RtaCommandTransmitStatistics *transmitStats) +{ + if (GlobalStatisticsFile != NULL) { + fclose(GlobalStatisticsFile); + } + + GlobalStatisticsFile = fopen(rtaCommandTransmitStatistics_GetFilename(transmitStats), "a"); + assertNotNull(GlobalStatisticsFile, "Failed to open %s", rtaCommandTransmitStatistics_GetFilename(transmitStats)); + + if (GlobalStatisticsFile != NULL) { + struct timeval period = rtaCommandTransmitStatistics_GetPeriod(transmitStats); + parcEventTimer_Start(framework->transmit_statistics_event, &period); + } else { + fprintf(stderr, "Will not report statistics: Failed to open %s for output.", rtaCommandTransmitStatistics_GetFilename(transmitStats)); + } + + return 0; +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.h new file mode 100644 index 00000000..52f6c2d4 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Commands.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file <#filename#> + * @brief Process the commands from RTATransport + * + * <#Detailed Description#> + * + */ +#ifndef Libccnx_rta_Framework_Commands_h +#define Libccnx_rta_Framework_Commands_h + +#include <ccnx/transport/transport_rta/core/rta_Framework.h> +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/core/rta_Framework_private.h> + +#include <parc/algol/parc_Event.h> + +/** + * RtaConnection will call this when RtaConnection_Destroy() refcount reaches + * zero and it's actually going to destroy a connection. + * + * Example: + * @code + * <#example#> + * @endcode + */ +void +rtaFramework_RemoveConnection(RtaFramework *framework, RtaConnection *rtaConneciton); + +/** + * called by event scheduler for activity on the Command channel + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaFramework_CommandCallback(int fd, PARCEventType what, void *user_framework); +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_NonThreaded.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_NonThreaded.c new file mode 100644 index 00000000..158f3ec0 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_NonThreaded.c @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * * + * Example: + * @code + * <#example#> + * @endcode + */ + +#include <config.h> +#include <stdio.h> +#include <unistd.h> + +#include <errno.h> + +#include <string.h> +#include <fcntl.h> +#include <sys/socket.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <LongBow/runtime.h> + +#include <parc/algol/parc_Memory.h> + +#include "rta_Framework.h" +#include "rta_ConnectionTable.h" +#include "rta_Framework_Commands.h" + +#ifndef DEBUG_OUTPUT +#define DEBUG_OUTPUT 0 +#endif + +// This is implemented in rta_Framework_Commands +void +rtaFramework_DestroyProtocolHolder(RtaFramework *framework, FrameworkProtocolHolder *holder); + +/** + * If running in non-threaded mode (you don't call _Start), you must manually + * turn the crank. This turns it for a single cycle. + * Return 0 on success, -1 on error (likely you're running in threaded mode) + * + * Example: + * @code + * <#example#> + * @endcode + */ +int +rtaFramework_NonThreadedStep(RtaFramework *framework) +{ + if (framework->status == FRAMEWORK_INIT) { + framework->status = FRAMEWORK_SETUP; + } + + assertTrue(framework->status == FRAMEWORK_SETUP, + "Framework invalid state for non-threaded, expected %d got %d", + FRAMEWORK_SETUP, + framework->status + ); + + if (framework->status != FRAMEWORK_SETUP) { + return -1; + } + + if (parcEventScheduler_Start(framework->base, PARCEventSchedulerDispatchType_LoopOnce) < 0) { + return -1; + } + + return 0; +} + +/** + * If running in non-threaded mode (you don't call _Start), you must manually + * turn the crank. This turns it for a number of cycles. + * Return 0 on success, -1 on error (likely you're running in threaded mode) + * + * Example: + * @code + * <#example#> + * @endcode + */ +int +rtaFramework_NonThreadedStepCount(RtaFramework *framework, unsigned count) +{ + if (framework->status == FRAMEWORK_INIT) { + framework->status = FRAMEWORK_SETUP; + } + + assertTrue(framework->status == FRAMEWORK_SETUP, + "Framework invalid state for non-threaded, expected %d got %d", + FRAMEWORK_SETUP, + framework->status + ); + + if (framework->status != FRAMEWORK_SETUP) { + return -1; + } + + while (count-- > 0) { + if (parcEventScheduler_Start(framework->base, PARCEventSchedulerDispatchType_LoopOnce) < 0) { + return -1; + } + } + return 0; +} + +/** + * If running in non-threaded mode (you don't call _Start), you must manually + * turn the crank. This turns it for a given amount of time. + * Return 0 on success, -1 on error (likely you're running in threaded mode) + * + * Example: + * @code + * <#example#> + * @endcode + */ +int +rtaFramework_NonThreadedStepTimed(RtaFramework *framework, struct timeval *duration) +{ + if (framework->status == FRAMEWORK_INIT) { + framework->status = FRAMEWORK_SETUP; + } + + assertTrue(framework->status == FRAMEWORK_SETUP, + "Framework invalid state for non-threaded, expected %d got %d", + FRAMEWORK_SETUP, + framework->status + ); + + if (framework->status != FRAMEWORK_SETUP) { + return -1; + } + + parcEventScheduler_Stop(framework->base, duration); + + if (parcEventScheduler_Start(framework->base, 0) < 0) { + return -1; + } + return 0; +} + + +/** + * After a protocol stack is created, you need to Teardown. If you + * are running in threaded mode (did a _Start), you should send an asynchronous + * SHUTDOWN command instead. This function only works if in the SETUP state + * + * Example: + * @code + * <#example#> + * @endcode + */ +int +rtaFramework_Teardown(RtaFramework *framework) +{ + FrameworkProtocolHolder *holder; + + assertNotNull(framework, "called with null framework"); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s framework %p\n", + rtaFramework_GetTicks(framework), + __func__, (void *) framework); + } + + // %%% LOCK + rta_Framework_LockStatus(framework); + if (framework->status != FRAMEWORK_SETUP) { + RtaFrameworkStatus status = framework->status; + rta_Framework_UnlockStatus(framework); + // %%% UNLOCK + assertTrue(0, "Invalid state, expected FRAMEWORK_SETUP, got %d", status); + return -1; + } + + holder = TAILQ_FIRST(&framework->protocols_head); + while (holder != NULL) { + FrameworkProtocolHolder *temp = TAILQ_NEXT(holder, list); + rtaFramework_DestroyProtocolHolder(framework, holder); + holder = temp; + } + + framework->status = FRAMEWORK_TEARDOWN; + rta_Framework_BroadcastStatus(framework); + rta_Framework_UnlockStatus(framework); + // %%% UNLOCK + + return 0; +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_NonThreaded.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_NonThreaded.h new file mode 100644 index 00000000..ca193c83 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_NonThreaded.h @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_Framework_NonThreaded.h + * @brief Implementation of the non-threaded api. + * + * Unless you call one of the _Step methods frequently, the tick clock will be off. + * + */ +#ifndef Libccnx_rta_Framework_NonThreaded_h +#define Libccnx_rta_Framework_NonThreaded_h + +#include <sys/time.h> + +// ============================== +// NON-THREADED API + +/** + * If running in non-threaded mode (you don't call _Start), you must manually + * turn the crank. This turns it for a single cycle. + * Return 0 on success, -1 on error (likely you're running in threaded mode) + * + * Example: + * @code + * <#example#> + * @endcode + */ +int rtaFramework_NonThreadedStep(RtaFramework *framework); + +/** + * If running in non-threaded mode (you don't call _Start), you must manually + * turn the crank. This turns it for a number of cycles. + * Return 0 on success, -1 on error (likely you're running in threaded mode) + * + * Example: + * @code + * <#example#> + * @endcode + */ +int rtaFramework_NonThreadedStepCount(RtaFramework *framework, unsigned count); + +/** + * If running in non-threaded mode (you don't call _Start), you must manually + * turn the crank. This turns it for a given amount of time. + * Return 0 on success, -1 on error (likely you're running in threaded mode) + * + * Example: + * @code + * <#example#> + * @endcode + */ +int rtaFramework_NonThreadedStepTimed(RtaFramework *framework, struct timeval *duration); + + +/** + * After a protocol stack is created, you need to Teardown. If you + * are running in threaded mode (did a _Start), you should send an asynchronous + * SHUTDOWN command instead. This function only works if in the SETUP state + * + * Example: + * @code + * <#example#> + * @endcode + */ +int rtaFramework_Teardown(RtaFramework *framework); +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Services.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Services.c new file mode 100644 index 00000000..cf2c8cd3 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Services.c @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include <stdio.h> + +#include <LongBow/runtime.h> +#include <parc/algol/parc_Memory.h> +#include <sys/queue.h> + +#include "rta_Framework.h" +#include "rta_Framework_private.h" +#include "rta_Framework_Services.h" + +ticks +rtaFramework_GetTicks(RtaFramework *framework) +{ + assertNotNull(framework, "Parameter framework cannot be null"); + return framework->clock_ticks; +} + +uint64_t +rtaFramework_TicksToUsec(ticks tick) +{ + return FC_USEC_PER_TICK * tick; +} + +ticks +rtaFramework_UsecToTicks(unsigned usec) +{ + return MSEC_TO_TICKS(usec / 1000); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Services.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Services.h new file mode 100644 index 00000000..f9adf194 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Services.h @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_Framework_Services.h + * @brief Miscellaneous services offered by the Framework for components and connectors + * + * <#Detailed Description#> + * + */ +#ifndef Libccnx_rta_Framework_Services_h +#define Libccnx_rta_Framework_Services_h + +#include "rta_Framework.h" + +#include <parc/algol/parc_EventScheduler.h> + +// =================================== + +typedef uint64_t ticks; +#define TICK_CMP(a, b) ((int64_t) a - (int64_t) b) + +/** + * <#One Line Description#> + * + * If a component wants to use the event scheduler to manage sockets, it + * can get a reference to the event base to manage those things + * + * @param [in] framework <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +PARCEventScheduler *rtaFramework_GetEventScheduler(RtaFramework *framework); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [in] framework <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +unsigned rtaFramework_GetNextConnectionId(RtaFramework *framework); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [in] framework <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +ticks rtaFramework_GetTicks(RtaFramework *framework); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [in] tick <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +extern uint64_t rtaFramework_TicksToUsec(ticks tick); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [in] usec <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +extern ticks rtaFramework_UsecToTicks(unsigned usec); +#endif // Libccnx_rta_Framework_Services_h diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Threaded.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Threaded.c new file mode 100644 index 00000000..ade2c0a1 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Threaded.c @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include <stdio.h> +#include <unistd.h> +#include <pthread.h> + +#include <errno.h> + +#include <string.h> +#include <fcntl.h> +#include <sys/socket.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <LongBow/runtime.h> + +#include <parc/algol/parc_Memory.h> + +#include "rta_Framework.h" +#include "rta_ConnectionTable.h" +#include "rta_Framework_Commands.h" + +#ifndef DEBUG_OUTPUT +#define DEBUG_OUTPUT 0 +#endif + +// the thread function +static void *_rtaFramework_Run(void *ctx); + +/** + * Starts the worker thread. Blocks until started + * + * Example: + * @code + * <#example#> + * @endcode + */ +void +rtaFramework_Start(RtaFramework *framework) +{ + pthread_attr_t attr; + + // ensure we're in the INIT state, then bump to STARTING + // %%% LOCK + rta_Framework_LockStatus(framework); + if (framework->status == FRAMEWORK_INIT) { + framework->status = FRAMEWORK_STARTING; + rta_Framework_BroadcastStatus(framework); + rta_Framework_UnlockStatus(framework); + // %%% UNLOCK + } else { + RtaFrameworkStatus status = framework->status; + rta_Framework_UnlockStatus(framework); + // %%% UNLOCK + assertTrue(0, "Invalid state, not FRAMEWORK_INIT, got %d", status); + return; + } + + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&framework->thread, &attr, _rtaFramework_Run, framework) != 0) { + perror("pthread_create"); + exit(EXIT_FAILURE); + } + + if (DEBUG_OUTPUT) { + printf("%s framework started %p\n", __func__, (void *) framework); + } + + // wait for notificaiton from event thread + rta_Framework_LockStatus(framework); + while (framework->status == FRAMEWORK_INIT) { + rta_Framework_WaitStatus(framework); + } + rta_Framework_UnlockStatus(framework); + + if (DEBUG_OUTPUT) { + printf("%s framework running %p\n", __func__, (void *) framework); + } +} + +static void * +_rtaFramework_Run(void *ctx) +{ + RtaFramework *framework = (RtaFramework *) ctx; + + // %%% LOCK + rta_Framework_LockStatus(framework); + if (framework->status != FRAMEWORK_STARTING) { + assertTrue(0, "Invalid state, expected before %d, got %d", FRAMEWORK_STARTING, framework->status); + rta_Framework_UnlockStatus(framework); + // %%% UNLOCK + pthread_exit(NULL); + } + framework->status = FRAMEWORK_RUNNING; + + // Set our thread name, only used to diagnose a crash or in debugging +#if __APPLE__ + pthread_setname_np("RTA Framework"); +#else + pthread_setname_np(framework->thread, "RTA Framework"); +#endif + + rta_Framework_BroadcastStatus(framework); + rta_Framework_UnlockStatus(framework); + // %%% UNLOCK + + if (DEBUG_OUTPUT) { + const int bufferLength = 1024; + char frameworkName[bufferLength]; + pthread_getname_np(framework->thread, frameworkName, bufferLength); + printf("Framework thread running: '%s'\n", frameworkName); + } + + // blocks + parcEventScheduler_Start(framework->base, PARCEventSchedulerDispatchType_Blocking); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s existed parcEventScheduler_Start\n", framework->clock_ticks, __func__); + } + + // %%% LOCK + rta_Framework_LockStatus(framework); + framework->status = FRAMEWORK_SHUTDOWN; + rta_Framework_BroadcastStatus(framework); + rta_Framework_UnlockStatus(framework); + // %%% UNLOCK + + pthread_exit(NULL); +} + +/** + * Stops the worker thread by sending a CommandShutdown. + * Blocks until shutdown complete. + * + * CALLED FROM API's THREAD + * + * Example: + * @code + * <#example#> + * @endcode + */ +void +rtaFramework_Shutdown(RtaFramework *framework) +{ + RtaCommand *shutdown = rtaCommand_CreateShutdownFramework(); + rtaCommand_Write(shutdown, framework->commandRingBuffer); + parcNotifier_Notify(framework->commandNotifier); + rtaCommand_Release(&shutdown); + + // now block on reading status + rtaFramework_WaitForStatus(framework, FRAMEWORK_SHUTDOWN); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Threaded.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Threaded.h new file mode 100644 index 00000000..ba6e9cb3 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_Threaded.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_Framework_Threaded.h + * @brief <#Brief Description#> + * + * <#Detailed Description#> + * + */ +#ifndef Libccnx_rta_Framework_Threaded_h +#define Libccnx_rta_Framework_Threaded_h + +// ============================= +// THREADED + +/** + * Starts the worker thread. Blocks until started. + * + * CALLED FROM API's THREAD + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaFramework_Start(RtaFramework *framework); + +/** + * Stops the worker thread by sending a CommandShutdown. + * Blocks until shutdown complete. + * + * The caller must provider their side of the command channel + * + * CALLED FROM API's THREAD + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaFramework_Shutdown(RtaFramework *framework); + +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_private.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_private.h new file mode 100644 index 00000000..ffc386c5 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Framework_private.h @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_Framework_private.h + * @brief <#Brief Description#> + * + * <#Detailed Description#> + * + */ +#ifndef Libccnx_rta_Framework_private_h +#define Libccnx_rta_Framework_private_h + +#include <stdlib.h> +#include <sys/queue.h> +#include <pthread.h> + +#include "rta_ProtocolStack.h" +#include "rta_Connection.h" +#include "rta_Framework_Services.h" + +#include "rta_ConnectionTable.h" + +#include <parc/algol/parc_EventScheduler.h> +#include <parc/algol/parc_Event.h> +#include <parc/algol/parc_EventTimer.h> +#include <parc/algol/parc_EventSignal.h> + +// the router's wrapped time frquency is 1 msec +#define WTHZ 1000 +#define FC_MSEC_PER_TICK (1000 / WTHZ) +#define FC_USEC_PER_TICK (1000000 / WTHZ) +#define MSEC_TO_TICKS(msec) ((msec < FC_MSEC_PER_TICK) ? 1 : msec / FC_MSEC_PER_TICK) + +// =================================================== + +typedef struct framework_protocol_holder { + RtaProtocolStack *stack; + uint64_t kv_hash; + int stack_id; + + TAILQ_ENTRY(framework_protocol_holder) list; +} FrameworkProtocolHolder; + + +struct rta_framework { + PARCRingBuffer1x1 *commandRingBuffer; + PARCNotifier *commandNotifier; + PARCEvent *commandEvent; + + //struct event_config *cfg; + int udp_socket; + + PARCEventScheduler *base; + + PARCEventSignal *signal_int; + PARCEventSignal *signal_usr1; + PARCEventTimer *tick_event; + PARCEvent *udp_event; + PARCEventTimer *transmit_statistics_event; + PARCEventSignal *signal_pipe; + + struct timeval starttime; + ticks clock_ticks; // at WTHZ + + // used by seed48 and nrand48 + unsigned short seed[3]; + + pthread_t thread; + + unsigned connid_next; + + // operations that modify global state need + // to be locked. + pthread_mutex_t status_mutex; + pthread_cond_t status_cv; + RtaFrameworkStatus status; + + // signals from outside control thread to event scheduler + // that it should exit its event loop. This does + // not need to be protected in mutex (its not + // a condition variable). We check for this + // inside the HZ timer callback. + bool killme; + + // A list of all our in-use protocol stacks + TAILQ_HEAD(, framework_protocol_holder) protocols_head; + + RtaConnectionTable *connectionTable; + + RtaLogger *logger; +}; + +int rtaFramework_CloseConnection(RtaFramework *framework, RtaConnection *connection); + +/** + * Lock the frameworks state machine status + * + * Will block until the state machine status is locked + * + * @param [in] framework An allocated framework + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rta_Framework_LockStatus(RtaFramework *framework); + +/** + * Unlock the state mahcines status + * + * Will assert if we do not currently hold the lock + * + * @param [in] framework An allocated framework + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rta_Framework_UnlockStatus(RtaFramework *framework); + +/** + * Wait on the state machine's condition variable to be signaled + * + * <#Paragraphs Of Explanation#> + * + * @param [in] framework An allocated framework + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rta_Framework_WaitStatus(RtaFramework *framework); + +/** + * Broadcast on the state machine's condition variable (signal it) + * + * <#Paragraphs Of Explanation#> + * + * @param [in] framework An allocated framework + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rta_Framework_BroadcastStatus(RtaFramework *framework); +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Logger.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Logger.c new file mode 100644 index 00000000..b8cc8d12 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Logger.c @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <config.h> +#include <stdarg.h> +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <errno.h> + +#include <LongBow/runtime.h> + +#include <parc/algol/parc_Memory.h> +#include <parc/algol/parc_Object.h> + +#include <parc/logging/parc_Log.h> +#include <ccnx/transport/transport_rta/core/rta_Logger.h> + +struct rta_logger { + PARCClock *clock; + + PARCLogReporter *reporter; + PARCLog *loggerArray[RtaLoggerFacility_END]; +}; + +static const struct facility_to_string { + RtaLoggerFacility facility; + const char *string; +} _facilityToString[] = { + { .facility = RtaLoggerFacility_Framework, .string = "Framework" }, + { .facility = RtaLoggerFacility_ApiConnector, .string = "Api" }, + { .facility = RtaLoggerFacility_Flowcontrol, .string = "Flowcontrol" }, + { .facility = RtaLoggerFacility_Codec, .string = "Codec" }, + { .facility = RtaLoggerFacility_ForwarderConnector, .string = "Forwarder" }, + { .facility = 0, .string = NULL } +}; + +const char * +rtaLogger_FacilityString(RtaLoggerFacility facility) +{ + for (int i = 0; _facilityToString[i].string != NULL; i++) { + if (_facilityToString[i].facility == facility) { + return _facilityToString[i].string; + } + } + return "Unknown"; +} + +static void +_allocateLoggers(RtaLogger *logger, PARCLogReporter *reporter) +{ + trapUnexpectedStateIf(logger->reporter != NULL, "Trying to allocate a reporter when the previous one is not null"); + logger->reporter = parcLogReporter_Acquire(reporter); + + char hostname[255]; + int gotHostName = gethostname(hostname, 255); + if (gotHostName < 0) { + snprintf(hostname, 255, "unknown"); + } + + for (int i = 0; i < RtaLoggerFacility_END; i++) { + logger->loggerArray[i] = parcLog_Create(hostname, rtaLogger_FacilityString(i), "rta", logger->reporter); + parcLog_SetLevel(logger->loggerArray[i], PARCLogLevel_Error); + } +} + +static void +_releaseLoggers(RtaLogger *logger) +{ + for (int i = 0; i < RtaLoggerFacility_END; i++) { + parcLog_Release(&logger->loggerArray[i]); + } + parcLogReporter_Release(&logger->reporter); +} + +static void +_destroyer(RtaLogger **loggerPtr) +{ + RtaLogger *logger = *loggerPtr; + _releaseLoggers(logger); + parcClock_Release(&(*loggerPtr)->clock); +} + +parcObject_ExtendPARCObject(RtaLogger, _destroyer, NULL, NULL, NULL, NULL, NULL, NULL); + +parcObject_ImplementAcquire(rtaLogger, RtaLogger); + +parcObject_ImplementRelease(rtaLogger, RtaLogger); + +RtaLogger * +rtaLogger_Create(PARCLogReporter *reporter, const PARCClock *clock) +{ + assertNotNull(reporter, "Parameter reporter must be non-null"); + assertNotNull(clock, "Parameter clock must be non-null"); + + RtaLogger *logger = parcObject_CreateAndClearInstance(RtaLogger); + if (logger) { + logger->clock = parcClock_Acquire(clock); + _allocateLoggers(logger, reporter); + } + + return logger; +} + +void +rtaLogger_SetReporter(RtaLogger *logger, PARCLogReporter *reporter) +{ + assertNotNull(logger, "Parameter logger must be non-null"); + + // save the log level state + PARCLogLevel savedLevels[RtaLoggerFacility_END]; + for (int i = 0; i < RtaLoggerFacility_END; i++) { + savedLevels[i] = parcLog_GetLevel(logger->loggerArray[i]); + } + + _releaseLoggers(logger); + + _allocateLoggers(logger, reporter); + + // restore log level state + for (int i = 0; i < RtaLoggerFacility_END; i++) { + parcLog_SetLevel(logger->loggerArray[i], savedLevels[i]); + } +} + +void +rtaLogger_SetClock(RtaLogger *logger, PARCClock *clock) +{ + assertNotNull(logger, "Parameter logger must be non-null"); + parcClock_Release(&logger->clock); + logger->clock = parcClock_Acquire(clock); +} + +static void +_assertInvariants(const RtaLogger *logger, RtaLoggerFacility facility) +{ + assertNotNull(logger, "Parameter logger must be non-null"); + trapOutOfBoundsIf(facility >= RtaLoggerFacility_END, "Invalid facility %d", facility); +} + +void +rtaLogger_SetLogLevel(RtaLogger *logger, RtaLoggerFacility facility, PARCLogLevel minimumLevel) +{ + _assertInvariants(logger, facility); + PARCLog *log = logger->loggerArray[facility]; + parcLog_SetLevel(log, minimumLevel); +} + +bool +rtaLogger_IsLoggable(const RtaLogger *logger, RtaLoggerFacility facility, PARCLogLevel level) +{ + _assertInvariants(logger, facility); + PARCLog *log = logger->loggerArray[facility]; + return parcLog_IsLoggable(log, level); +} + +void +rtaLogger_Log(RtaLogger *logger, RtaLoggerFacility facility, PARCLogLevel level, const char *module, const char *format, ...) +{ + if (rtaLogger_IsLoggable(logger, facility, level)) { + // this is logged as the messageid + uint64_t logtime = parcClock_GetTime(logger->clock); + + // rtaLogger_IsLoggable asserted invariants so we know facility is in bounds + PARCLog *log = logger->loggerArray[facility]; + + va_list va; + va_start(va, format); + + parcLog_MessageVaList(log, level, logtime, format, va); + + va_end(va); + } +} + diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Logger.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Logger.h new file mode 100644 index 00000000..067ad4f1 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Logger.h @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_Logger.h + * @brief Logger for the Rta transport + * + * A facility based logger to allow selective logging from different parts of Rta + * + */ + +#ifndef Rta_rta_Logger_h +#define Rta_rta_Logger_h + +#include <sys/time.h> +#include <stdarg.h> +#include <parc/algol/parc_Buffer.h> +#include <parc/logging/parc_LogLevel.h> +#include <parc/logging/parc_LogReporter.h> +#include <parc/algol/parc_Clock.h> + +struct rta_logger; +typedef struct rta_logger RtaLogger; + +/** + * Framework - Overall framework + * ApiConnector - API Connector + * Flowcontrol - Flow controller + * Codec - Codec and verification/signing + * ForwarderConnector - Forwarder connector + */ +typedef enum { + RtaLoggerFacility_Framework, + RtaLoggerFacility_ApiConnector, + RtaLoggerFacility_Flowcontrol, + RtaLoggerFacility_Codec, + RtaLoggerFacility_ForwarderConnector, + RtaLoggerFacility_END // sentinel value +} RtaLoggerFacility; + +/** + * Returns a string representation of a facility + * + * Do not free the returned value. + * + * @param [in] facility The facility to change to a string + * + * @retval string A string representation of the facility + * + * Example: + * @code + * <#example#> + * @endcode + */ +const char *rtaLogger_FacilityString(RtaLoggerFacility facility); + +/** + * Returns a string representation of a log level + * + * Do not free the returned value. + * + * @param [in] level The level to change to a string + * + * @retval string A string representation of the level + * + * Example: + * @code + * <#example#> + * @endcode + */ +const char *rtaLogger_LevelString(PARCLogLevel level); + +/** + * Create a logger that uses a given writer and clock + * + * <#Paragraphs Of Explanation#> + * + * @param [in] writer The output writer + * @param [in] clock The clock to use for log messages + * + * @retval non-null An allocated logger + * @retval null An error + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaLogger *rtaLogger_Create(PARCLogReporter *reporter, const PARCClock *clock); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @retval <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaLogger_Release(RtaLogger **loggerPtr); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @retval <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +RtaLogger *rtaLogger_Acquire(const RtaLogger *logger); + +/** + * Sets the minimum log level for a facility + * + * The default log level is ERROR. For a message to be logged, it must be of equal + * or higher log level. + * + * @param [in] logger An allocated logger + * @param [in] facility The facility to set the log level for + * @param [in] The minimum level to log + * + * @retval <#value#> <#explanation#> + * + * Example: + * @code + * { + * PARCLogReporter *reporter = parcLogReporterTextStdout_Create(); + * RtaLogger *logger = rtaLogger_Create(reporter, parcClock_Wallclock()); + * parcLogReporter_Release(&reporter); + * rtaLogger_SetLogLevel(logger, RtaLoggerFacility_IO, PARCLogLevel_Warning); + * } + * @endcode + */ +void rtaLogger_SetLogLevel(RtaLogger *logger, RtaLoggerFacility facility, PARCLogLevel minimumLevel); + +/** + * Tests if the log level would be logged + * + * If the facility would log the given level, returns true. May be used as a + * guard around expensive logging functions. + * + * @param [in] logger An allocated logger + * @param [in] facility The facility to test + * @param [in] The level to test + * + * @retval true The given facility would log the given level + * @retval false A message of the given level would not be logged + * + * Example: + * @code + * <#example#> + * @endcode + */ +bool rtaLogger_IsLoggable(const RtaLogger *logger, RtaLoggerFacility facility, PARCLogLevel level); + +/** + * Log a message + * + * The message will only be logged if it is loggable (rtaLogger_IsLoggable returns true). + * + * @param [in] logger An allocated RtaLogger + * @param [in] facility The facility to log under + * @param [in] level The log level of the message + * @param [in] module The specific module logging the message + * @param [in] format The message with varargs + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaLogger_Log(RtaLogger *logger, RtaLoggerFacility facility, PARCLogLevel level, const char *module, const char *format, ...); + +/** + * Switch the logger to a new reporter + * + * Will close the old reporter and re-setup the internal loggers to use the new reporter. + * All current log level settings are preserved. + * + * @param [in] logger An allocated RtaLogger + * @param [in] reporter An allocated PARCLogReporter + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaLogger_SetReporter(RtaLogger *logger, PARCLogReporter *reporter); + +/** + * Set a new clock to use with the logger + * + * The logger will start getting the time (logged as the messageid) from the specified clock + * + * @param [in] logger An allocated RtaLogger + * @param [in] clock An allocated PARCClock + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaLogger_SetClock(RtaLogger *logger, PARCClock *clock); +#endif // Rta_rta_Logger_h diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ProtocolStack.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ProtocolStack.c new file mode 100644 index 00000000..7bdafbf7 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ProtocolStack.c @@ -0,0 +1,786 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> + +#include <LongBow/runtime.h> + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/queue.h> +#include <string.h> +#include <strings.h> +#include <sys/time.h> + +#include <parc/algol/parc_Memory.h> +#include <parc/algol/parc_EventQueue.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework.h> +#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h> + +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/core/rta_Component.h> +#include <ccnx/transport/transport_rta/core/rta_ConnectionTable.h> +#include <ccnx/transport/transport_rta/core/rta_ComponentStats.h> +#include <ccnx/transport/common/transport_Message.h> +#include <ccnx/transport/common/transport_private.h> + +#include <ccnx/transport/transport_rta/connectors/connector_Api.h> +#include <ccnx/transport/transport_rta/connectors/connector_Forwarder.h> +#include <ccnx/transport/transport_rta/components/component_Codec.h> +#include <ccnx/transport/transport_rta/components/component_Flowcontrol.h> +#include <ccnx/transport/transport_rta/components/component_Testing.h> + +#include <ccnx/transport/transport_rta/config/config_ProtocolStack.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#define MAX_STACK_DEPTH 10 + +#ifndef DEBUG_OUTPUT +#define DEBUG_OUTPUT 0 +#endif + +const char *RtaComponentNames[LAST_COMPONENT] = +{ + "API", // 0 + "FC_NONE", + "FC_VEGAS", + "FC_PIPELINE", + "VERIFY_NONE", // 4 + "VERIFY_ENUMERATED", + "VERIFY_LOCATOR", + "CODEC_NONE", + NULL, // 8 + "CODEC_TLV", + "CODEC_CCNB", + "CODE_FLAN", + NULL, // 12 + "FWD_LOCAL", + "FWD_FLAN", + "FWD_CCND", // 15 + "TESTING_UPPER", + "TESTING_LOWER", // 17 + "CCND_REGISTRAR", + "FWD_METIS" +}; + +struct protocol_stack { + int stack_id; + + // used during configuration to indicate if configured + int config_codec; + + RtaFramework *framework; + + // They key value pairs passed to open. The api must + // keep this memory valid for as long as the connection is open + PARCJSON *params; + + // the inter-component queues + unsigned component_count; + PARCEventQueuePair *queue_pairs[MAX_STACK_DEPTH]; + RtaComponents components[MAX_STACK_DEPTH]; + + // queues assigned to components + struct component_queues { + PARCEventQueue *up; + PARCEventQueue *down; + } *component_queues[LAST_COMPONENT]; + RtaComponentOperations component_ops[LAST_COMPONENT]; + void *component_state[LAST_COMPONENT]; + + // stack-wide stats + RtaComponentStats *stack_stats[LAST_COMPONENT]; + + + // state change events are disabled during initial setup and teardown + bool stateChangeEventsEnabled; +}; + +static void set_queue_pairs(RtaProtocolStack *stack, RtaComponents comp_type); +static int configure_ApiConnector(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops); +static int configure_Component(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops); +static int configure_FwdConnector(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops); + +// ======================================== + +RtaFramework * +rtaProtocolStack_GetFramework(RtaProtocolStack *stack) +{ + assertNotNull(stack, "called with null stack"); + return stack->framework; +} + +RtaProtocolStack * +rtaProtocolStack_Create(RtaFramework *framework, PARCJSON *params, int stack_id) +{ + RtaProtocolStack *stack = parcMemory_AllocateAndClear(sizeof(RtaProtocolStack)); + assertNotNull(stack, "%9" PRIu64 " parcMemory_AllocateAndClear returned NULL\n", + rtaFramework_GetTicks(stack->framework)); + + stack->stateChangeEventsEnabled = false; + + stack->params = parcJSON_Copy(params); + + assertNotNull(stack->params, "SYSTEM key is NULL in params"); + + assertNotNull(framework, "Parameter framework may not be null"); + + stack->framework = framework; + stack->stack_id = stack_id; + + // create all the buffer pairs + for (int i = 0; i < MAX_STACK_DEPTH; i++) { + stack->queue_pairs[i] = parcEventQueue_CreateConnectedPair(rtaFramework_GetEventScheduler(stack->framework)); + + assertNotNull(stack->queue_pairs[i], "parcEventQueue_CreateConnectedPair returned NULL index %d", i); + if (stack->queue_pairs[i] == NULL) { + for (int j = 0; j < i; j++) { + parcEventQueue_DestroyConnectedPair(&(stack->queue_pairs[j])); + } + + parcMemory_Deallocate((void **) &stack); + return NULL; + } + + // set them all to normal priority. The command port is high priority. External buffes are low priority. + parcEventQueue_SetPriority(parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[i]), PARCEventPriority_Normal); + parcEventQueue_SetPriority(parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[i]), PARCEventPriority_Normal); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s create buffer pair %p <-> %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(stack)), + __func__, + (void *) parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[i]), + (void *) parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[i])); + } + } + + for (int i = 0; i < LAST_COMPONENT; i++) { + stack->stack_stats[i] = rtaComponentStats_Create(NULL, i); + } + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s created stack %d at %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(stack)), + __func__, + stack_id, + (void *) stack); + } + + stack->stateChangeEventsEnabled = true; + + return stack; +} + +/** + * Opens a connection inside the protocol stack: it calls open() on each component. + * + * Returns 0 on success, -1 on error + * + * Example: + * @code + * <#example#> + * @endcode + */ +int +rtaProtocolStack_Open(RtaProtocolStack *stack, RtaConnection *connection) +{ + assertNotNull(stack, "called with null stack\n"); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s stack_id %d opening conn %p api_fd %d\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(stack)), + __func__, + stack->stack_id, + (void *) connection, rtaConnection_GetApiFd(connection)); + } + + // call all the opens, except the api + + // need to disable events during creation to avoid calling the event notifier + // of a component before the component sees the "open" call for this connection + stack->stateChangeEventsEnabled = false; + for (int i = 0; i < stack->component_count; i++) { + RtaComponents comp = stack->components[i]; + if (stack->component_ops[comp].open != NULL && + stack->component_ops[comp].open(connection) != 0) { + fprintf(stderr, "%s component %d failed open\n", __func__, i); + abort(); + return -1; + } + } + stack->stateChangeEventsEnabled = true; + + return 0; +} + +/* + * Closes a connection but does not touch stack->connection_head + */ +static int +internal_Stack_Close(RtaProtocolStack *stack, RtaConnection *conn) +{ + int i; + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s stack_id %d closing stack %p conn %p\n", + rtaFramework_GetTicks(rtaConnection_GetFramework(conn)), + __func__, + stack->stack_id, + (void *) stack, + (void *) conn); + } + + rtaConnection_SetState(conn, CONN_CLOSED); + + // call all the opens + for (i = 0; i < stack->component_count; i++) { + RtaComponents comp = stack->components[i]; + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s calling close for %s\n", + rtaFramework_GetTicks(rtaConnection_GetFramework(conn)), __func__, RtaComponentNames[comp]); + } + + if (stack->component_ops[comp].close != NULL && + stack->component_ops[comp].close(conn) != 0) { + fprintf(stderr, "%s component %d failed open\n", __func__, i); + abort(); + return -1; + } + } + + return 0; +} + +/** + * Calls the close() function of each component in the protocol stack. + * + * This is typically called from inside the API connector when it processes + * a CLOSE json message. + * Returns 0 success, -1 error. + * + * Example: + * @code + * <#example#> + * @endcode + */ +int +rtaProtocolStack_Close(RtaProtocolStack *stack, RtaConnection *conn) +{ + assertNotNull(stack, "called with null stack\n"); + assertNotNull(conn, "called with null connection\n"); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s stack_id %d stack %p conn %p\n", + rtaFramework_GetTicks(rtaConnection_GetFramework(conn)), + __func__, + stack->stack_id, + (void *) stack, + (void *) conn); + } + + + internal_Stack_Close(stack, conn); + + return 0; +} + +/** + * Calls the release() function of all components. + * Drains all the component queues. + * + * This is called from rtaFramework_DestroyStack, who is responsible for closing + * all the connections in it before calling this. + * + * Example: + * @code + * <#example#> + * @endcode + */ +void +rtaProtocolStack_Destroy(RtaProtocolStack **stackPtr) +{ + RtaProtocolStack *stack; + + assertNotNull(stackPtr, "%s called with null pointer to stack\n", __func__); + + stack = *stackPtr; + assertNotNull(stack, "%s called with null stack dereference\n", __func__); + + if (DEBUG_OUTPUT) { + printf("%s stack_id %d destroying stack %p\n", + __func__, + stack->stack_id, + (void *) stack); + } + + stack->stateChangeEventsEnabled = false; + + // call all the release functions + for (int i = 0; i < stack->component_count; i++) { + RtaComponents comp = stack->components[i]; + if (stack->component_ops[comp].release != NULL && + stack->component_ops[comp].release(stack) != 0) { + fprintf(stderr, "%s component %d failed release\n", __func__, i); + abort(); + } + } + + for (int i = 0; i < MAX_STACK_DEPTH; i++) { + TransportMessage *tm; + while ((tm = rtaComponent_GetMessage(parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[i]))) != NULL) { + assertFalse(1, "%s we should never execute the body, it should just drain\n", __func__); + } + + while ((tm = rtaComponent_GetMessage(parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[i]))) != NULL) { + assertFalse(1, "%s we should never execute the body, it should just drain\n", __func__); + } + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s destroy buffer pair %p <-> %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(stack)), + __func__, + (void *) parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[i]), + (void *) parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[i])); + } + + parcEventQueue_DestroyConnectedPair(&(stack->queue_pairs[i])); + } + + for (int i = 0; i < LAST_COMPONENT; i++) { + if (stack->component_queues[i]) { + parcMemory_Deallocate((void **) &(stack->component_queues[i])); + } + } + + for (int i = 0; i < LAST_COMPONENT; i++) { + rtaComponentStats_Destroy(&stack->stack_stats[i]); + } + + + parcJSON_Release(&stack->params); + memset(stack, 0, sizeof(RtaProtocolStack)); + + parcMemory_Deallocate((void **) &stack); + *stackPtr = NULL; +} + + +PARCEventQueue * +rtaProtocolStack_GetPutQueue(RtaProtocolStack *stack, RtaComponents component, RtaDirection direction) +{ + assertNotNull(stack, "%s called with null stack\n", __func__); + + if (direction == RTA_UP) { + return stack->component_queues[component]->up; + } else { + return stack->component_queues[component]->down; + } +} + + +/** + * Look up the symbolic name of the queue. Do not free the return. + * + * Example: + * @code + * <#example#> + * @endcode + */ +const char * +rtaProtocolStack_GetQueueName(RtaProtocolStack *stack, PARCEventQueue *queue) +{ + int component; + for (component = 0; component <= LAST_COMPONENT; component++) { + if (stack->component_queues[component]) { + if (stack->component_queues[component]->up == queue) { + return RtaComponentNames[component]; + } + if (stack->component_queues[component]->down == queue) { + return RtaComponentNames[component]; + } + } + } + trapUnexpectedState("Could not find queue %p in stack %p", (void *) queue, (void *) stack); +} + +// ================================================= +// ================================================= + +static RtaComponents +getComponentTypeFromName(const char *name) +{ + int i; + + if (name == NULL) { + return UNKNOWN_COMPONENT; + } + + for (i = 0; i < LAST_COMPONENT; i++) { + if (RtaComponentNames[i] != NULL) { + if (strncasecmp(RtaComponentNames[i], name, 16) == 0) { + return (RtaComponents) i; + } + } + } + return UNKNOWN_COMPONENT; +} + + +/** + * Calls the confguration routine for each component in the stack + * + * Builds an array list of everything in the JSON configuration, then + * calls its configuation routine. + * + * The connecting event queues are disabled at this point. + * + * @param [in,out] stack The Protocol Stack to operate on + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void +rtaProtocolStack_ConfigureComponents(RtaProtocolStack *stack) +{ + PARCArrayList *componentNameList; + componentNameList = protocolStack_GetComponentNameArray(stack->params); + assertTrue(parcArrayList_Size(componentNameList) < MAX_STACK_DEPTH, + "Too many components in a stack size %zu\n", + parcArrayList_Size(componentNameList)); + + + for (int i = 0; i < parcArrayList_Size(componentNameList); i++) { + // match it to a component type + const char *comp_name = parcArrayList_Get(componentNameList, i); + RtaComponents comp_type = getComponentTypeFromName(comp_name); + + // this could be sped up slightly by putting the ops structures + // in an array + switch (comp_type) { + case API_CONNECTOR: + configure_ApiConnector(stack, comp_type, api_ops); + break; + + case FC_NONE: + trapIllegalValue(comp_type, "Null flowcontroller no longer supported"); + break; + case FC_VEGAS: + configure_Component(stack, comp_type, flow_vegas_ops); + break; + case FC_PIPELINE: + abort(); + break; + + case CODEC_NONE: + trapIllegalValue(comp_type, "Null codec no longer supported"); + break; + case CODEC_TLV: + configure_Component(stack, comp_type, codec_tlv_ops); + break; + + case FWD_NONE: + abort(); + break; + case FWD_LOCAL: + configure_FwdConnector(stack, comp_type, fwd_local_ops); + break; + + case FWD_METIS: + configure_FwdConnector(stack, comp_type, fwd_metis_ops); + break; + + case TESTING_UPPER: + // fallthrough + case TESTING_LOWER: + configure_Component(stack, comp_type, testing_null_ops); + break; + + + default: + fprintf(stderr, "%s unsupported component type %s\n", __func__, comp_name); + abort(); + } + } + parcArrayList_Destroy(&componentNameList); +} + +static bool +rtaProtocolStack_InitializeComponents(RtaProtocolStack *stack) +{ + // Call all the inits + for (int i = 0; i < LAST_COMPONENT; i++) { + int res = 0; + if (stack->component_ops[i].init != NULL) { + res = stack->component_ops[i].init(stack); + } + + if (res != 0) { + fprintf(stderr, "%s opener for layer %d failed\n", __func__, i); + trapUnrecoverableState("Error Initializing the components") + return false; + } + } + return true; +} + +/** + * Enables events on all the queues between components + * + * Enables events on each queue. + * + * @param [in,out] stack The PRotocol stack + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void +rtaProtocolStack_EnableComponentQueues(RtaProtocolStack *stack) +{ + // enable all the events on intermediate queues + for (int i = 0; i < stack->component_count; i++) { + RtaComponents component = stack->components[i]; + PARCEventQueue *upQueue = stack->component_queues[component]->up; + if (upQueue != NULL) { + parcEventQueue_Enable(upQueue, PARCEventType_Read); + } + + PARCEventQueue *downQueue = stack->component_queues[component]->down; + if (downQueue != NULL) { + parcEventQueue_Enable(downQueue, PARCEventType_Read); + } + } +} + +/* + * Called from transportRta_Open() + * + * Returns 0 for success, -1 on error (connection not made) + */ +int +rtaProtocolStack_Configure(RtaProtocolStack *stack) +{ + assertNotNull(stack, "%s called with null stack\n", __func__); + + rtaProtocolStack_ConfigureComponents(stack); + + bool initSuccess = rtaProtocolStack_InitializeComponents(stack); + if (!initSuccess) { + return -1; + } + + rtaProtocolStack_EnableComponentQueues(stack); + + return 0; +} + +/* + * Domain is the top-level key, e.g. SYSTEM or USER + */ +PARCJSON * +rtaProtocolStack_GetParam(RtaProtocolStack *stack, const char *domain, const char *key) +{ + assertNotNull(stack, "%s called with null stack\n", __func__); + assertNotNull(domain, "%s called with null domain\n", __func__); + assertNotNull(key, "%s called with null key\n", __func__); + + PARCJSONValue *value = parcJSON_GetValueByName(stack->params, domain); + assertNotNull(value, "Did not find domain %s in protocol stack parameters", domain); + if (value == NULL) { + return NULL; + } + PARCJSON *domainJson = parcJSONValue_GetJSON(value); + + value = parcJSON_GetValueByName(domainJson, key); + assertNotNull(value, "Did not find key %s in protocol stack parameters", key); + return parcJSONValue_GetJSON(value); +} + +unsigned +rtaProtocolStack_GetNextConnectionId(RtaProtocolStack *stack) +{ + assertNotNull(stack, "Parameter stack must be a non-null RtaProtocolStack pointer."); + return rtaFramework_GetNextConnectionId(stack->framework); +} + +RtaComponentStats * +rtaProtocolStack_GetStats(const RtaProtocolStack *stack, RtaComponents type) +{ + assertTrue(type < LAST_COMPONENT, "invalid type %d\n", type); + return stack->stack_stats[type]; +} + +static void +printSingleTuple(FILE *file, const struct timeval *timeval, const RtaProtocolStack *stack, RtaComponents componentType, RtaComponentStatType stat) +{ + RtaComponentStats *stats = rtaProtocolStack_GetStats(stack, componentType); + + fprintf(file, "{ \"stackId\" : %d, \"component\" : \"%s\", \"name\" : \"%s\", \"value\" : %" PRIu64 ", \"timeval\" : %ld.%06u }\n", + stack->stack_id, + RtaComponentNames[componentType], + rtaComponentStatType_ToString(stat), + rtaComponentStats_Get(stats, stat), + timeval->tv_sec, + (unsigned) timeval->tv_usec + ); +} + +PARCArrayList * +rtaProtocolStack_GetStatistics(const RtaProtocolStack *stack, FILE *file) +{ + PARCArrayList *list = parcArrayList_Create(NULL); + + struct timeval timeval; + gettimeofday(&timeval, NULL); + + // This does not fill in the array list + for (int componentIndex = 0; componentIndex < stack->component_count; componentIndex++) { + RtaComponents componentType = stack->components[componentIndex]; + printSingleTuple(file, &timeval, stack, componentType, STATS_OPENS); + printSingleTuple(file, &timeval, stack, componentType, STATS_CLOSES); + printSingleTuple(file, &timeval, stack, componentType, STATS_UPCALL_IN); + printSingleTuple(file, &timeval, stack, componentType, STATS_UPCALL_OUT); + printSingleTuple(file, &timeval, stack, componentType, STATS_DOWNCALL_IN); + printSingleTuple(file, &timeval, stack, componentType, STATS_DOWNCALL_OUT); + } + + return list; +} + + +// ============================================= + +static void +set_queue_pairs(RtaProtocolStack *stack, RtaComponents comp_type) +{ + //PARCEventQueuePair *component_queues[LAST_COMPONENT]; + // Save references to the OUTPUT queues used by a specific component. + if (stack->component_queues[comp_type] == NULL) { + stack->component_queues[comp_type] = parcMemory_AllocateAndClear(sizeof(struct component_queues)); + } + + stack->component_queues[comp_type]->up = + parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[stack->component_count - 1]); + + stack->component_queues[comp_type]->down = + parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[stack->component_count]); + + // Set callbacks on the INPUT queues read by a specific component + parcEventQueue_SetCallbacks(stack->component_queues[comp_type]->up, + stack->component_ops[comp_type].downcallRead, + NULL, + stack->component_ops[comp_type].downcallEvent, + (void *) stack); + + parcEventQueue_SetCallbacks(stack->component_queues[comp_type]->down, + stack->component_ops[comp_type].upcallRead, + NULL, + stack->component_ops[comp_type].upcallEvent, + (void *) stack); +} + + +static int +configure_ApiConnector(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops) +{ + if (stack->component_queues[comp_type] == NULL) { + stack->component_queues[comp_type] = parcMemory_AllocateAndClear(sizeof(struct component_queues)); + } + + assertNotNull(stack->component_queues[comp_type], "called with null component_queue"); + assertNotNull(stack->queue_pairs[stack->component_count], "called with null queue_pair"); + + // This wires the bottom half of the API Connector to the streams. + // It does not do the top half, which is in the connector's INIT + + stack->components[stack->component_count] = comp_type; + stack->component_ops[comp_type] = ops; + + stack->component_queues[comp_type]->down = + parcEventQueue_GetConnectedDownQueue(stack->queue_pairs[stack->component_count]); + + parcEventQueue_SetCallbacks(stack->component_queues[comp_type]->down, + stack->component_ops[comp_type].upcallRead, + NULL, + stack->component_ops[comp_type].upcallEvent, + (void *) stack); + + stack->component_count++; + return 0; +} + +static int +configure_Component(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops) +{ + stack->component_ops[comp_type] = ops; + stack->components[stack->component_count] = comp_type; + set_queue_pairs(stack, comp_type); + stack->component_count++; + return 0; +} + + +static int +configure_FwdConnector(RtaProtocolStack *stack, RtaComponents comp_type, RtaComponentOperations ops) +{ + stack->component_ops[comp_type] = ops; + stack->components[stack->component_count] = comp_type; + + // We only set the upcall buffers. The down buffers + // are controlled by the forwarder connector + if (stack->component_queues[comp_type] == NULL) { + stack->component_queues[comp_type] = parcMemory_AllocateAndClear(sizeof(struct component_queues)); + } + + stack->component_queues[comp_type]->up = + parcEventQueue_GetConnectedUpQueue(stack->queue_pairs[stack->component_count - 1]); + + parcEventQueue_SetCallbacks(stack->component_queues[comp_type]->up, + stack->component_ops[comp_type].downcallRead, + NULL, + stack->component_ops[comp_type].downcallEvent, + (void *) stack); + + stack->component_count++; + return 0; +} + +int +rtaProtocolStack_GetStackId(RtaProtocolStack *stack) +{ + return stack->stack_id; +} + +void +rtaProtocolStack_ConnectionStateChange(RtaProtocolStack *stack, void *connection) +{ + if (stack->stateChangeEventsEnabled) { + for (int componentIndex = 0; componentIndex < stack->component_count; componentIndex++) { + RtaComponents componentType = stack->components[componentIndex]; + if (stack->component_ops[componentType].stateChange != NULL) { + stack->component_ops[componentType].stateChange(connection); + } + } + } +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ProtocolStack.h b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ProtocolStack.h new file mode 100644 index 00000000..df09c23f --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_ProtocolStack.h @@ -0,0 +1,379 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file rta_ProtocolStack.h + * @brief A set of connectors and components + * + * In a Ready To Assemble transport, individual pieces are called connectors + * and components. A connector attaches to the API library at the top and + * to the forwarder at the bottom. In between the connectors are components. + * + * One set of connectors and components is called a protocol stack. + * + * A ProtocolStack defines a set of Components linked by bidirectional + * queues. A ProtocolStack is defined by the KeyValue set passed to + * the Transport. The hash of the KeyValue set selects the protocol stack. + * If the Transport sees a new hash, it creates a new protocol stack + * via ProtocolStack_Create(). + * + * Each API connection calls _Open, which will return a new RtaConnection + * pointer. The Transport gives the API an "api_fd", which the Transport + * translates to the RtaConnection. + * + * A protocol stack is implemented as a set of queue pairs between components. + * There is a fixed sized array called queue_pairs[MAX_STACK_DEPTH]. The + * queue_pairs[i].pair[RTA_DOWN] end attaches to the upper component. RTA_DOWN + * indicates the direction of travel for a write. queue_pairs[i].pair[RTA_UP] + * attaches to the lower component. + * + * A component only knows its identity (see components.h). For example, the + * TLV codec is called CODEC_TLV, and that is the only identity it know. It does + * not know the identity of the pieces above or below it. + * + * Therefore, when a component calls protocolStack_GetPutQ(stack, CODEC_TLV, RTA_DOWN), + * it is asking for the queue to write to in the DOWN direction. This means that + * we should keep an index by the component name, not by the queue_pairs[] array. + * Thus, we keep a component_queues[] array that is indexed by the component name. + * + * Let's say our stack is API_CONNECTOR, FC_NULL, VERIFY_NULL, CODEC_TLV, FWD_LOCAL. + * The picture is like this: + * + * @code + * | + * * <- api_connector managed queue + * API_CONNECTOR + * * <- queue_pair[0].pair[DOWN] <- component_queue[API_CONNECTOR].pair[DOWN] + * | + * * <- queue_pair[0].pair[UP] <- component_queue[FC_NULL].pair[UP] + * FC_NULL + * * <- queue_pair[1].pair[DOWN] <- component_queue[FC_NULL].pair[DOWN] + * | + * * <- queue_pair[1].pair[UP] <- component_queue[VERIFY_NULL].pair[UP] + * VERIFY_NULL + * * <- queue_pair[2].pair[DOWN] <- component_queue[VERIFY_NULL].pair[DOWN] + * | + * * <- queue_pair[2].pair[UP] <- component_queue[CODEC_TLV].pair[UP] + * CODEC_TLV + * * <- queue_pair[3].pair[DOWN] <- component_queue[CODEC_TLV].pair[DOWN] + * | + * * <- queue_pair[3].pair[UP] <- component_queue[FWD_LOCAL].pair[UP] + * FWD_LOCAL + * * <- fwd_local managed connection + * | + * @endcode + * + * Each component also has a pair of callbacks, one for reading messages flowing down + * the stack and one for reading messages flowing up the stack. These are called + * "downcall_read" for reading messages flowing down and "upcall_read" for messages + * flowing up. + * + * Recall that the direction attributes UP and DOWN in the queues are in terms + * of WRITES, therefore the directions are opposite for reads. A component's + * downcall_read will read from component_queue[X].pair[UP]. + * + * Example: + * @code + * <#example#> + * @endcode + */ +#ifndef Libccnx_rta_ProtocolStack_h +#define Libccnx_rta_ProtocolStack_h + +#include <parc/algol/parc_ArrayList.h> + +#include <parc/algol/parc_EventQueue.h> + +#include <ccnx/transport/transport_rta/core/rta_ComponentStats.h> +#include <ccnx/transport/transport_rta/core/rta_Framework.h> +#include <ccnx/transport/transport_rta/core/components.h> +#include <ccnx/transport/transport_rta/core/rta_ComponentQueue.h> +#include <ccnx/transport/transport_rta/commands/rta_Command.h> + +struct rta_connection; +struct component_queue; + +struct protocol_stack; +typedef struct protocol_stack RtaProtocolStack; + +/** + * Used to assign unique connection id to sockets. This is just + * for internal tracking, its not a descriptor. + * + * <#Paragraphs Of Explanation#> + * + * @param [in] stack <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +unsigned rtaProtocolStack_GetNextConnectionId(RtaProtocolStack *stack); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [in] framework <#description#> + * @param [in] params <#description#> + * @param [in] stack_id <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +RtaProtocolStack *rtaProtocolStack_Create(RtaFramework *framework, PARCJSON *params, int stack_id); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [in] stack <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +int rtaProtocolStack_Configure(RtaProtocolStack *stack); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [in] stack <#description#> + * @param [in] component <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +void *rtaProtocolStack_GetPrivateData(RtaProtocolStack *stack, RtaComponents component); +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [in] stack <#description#> + * @param [in] component <#description#> + * @param [in] private <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +void rtaProtocolStack_SetPrivateData(RtaProtocolStack *stack, RtaComponents component, void *private); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [in] stack <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +RtaFramework *rtaProtocolStack_GetFramework(RtaProtocolStack *stack); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +int rtaProtocolStack_GetStackId(RtaProtocolStack *stack); + +/** + * Opens a connection inside the protocol stack: it calls open() on each component. + * + * Returns 0 on success, -1 on error + * + * Example: + * @code + * <#example#> + * @endcode + */ +int rtaProtocolStack_Open(RtaProtocolStack *, struct rta_connection *connection); + +/** + * + * 0 success, -1 error + * + * Example: + * @code + * <#example#> + * @endcode + */ +int rtaProtocolStack_Close(RtaProtocolStack *, struct rta_connection *conn); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +void rtaProtocolStack_Destroy(RtaProtocolStack **stack); + +/** + * Return the queue used for output for a component in a given direction + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +PARCEventQueue *rtaProtocolStack_GetPutQueue(RtaProtocolStack *stack, + RtaComponents component, + RtaDirection direction); + +/** + * <#One Line Description#> + * + * Domain is the top-level key, e.g. SYSTEM or USER + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +PARCJSON *rtaProtocolStack_GetParam(RtaProtocolStack *stack, const char *domain, const char *key); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +RtaComponentStats *rtaProtocolStack_GetStats(const RtaProtocolStack *stack, RtaComponents type); + +/** + * <#OneLineDescription#> + * + * <#Discussion#> + * + * @param stack + * @param file + * @return <#return#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +PARCArrayList *rtaProtocolStack_GetStatistics(const RtaProtocolStack *stack, FILE *file); + +/** + * Look up the symbolic name of the queue. Do not free the return. + * + * Example: + * @code + * <#example#> + * @endcode + */ +const char *rtaProtocolStack_GetQueueName(RtaProtocolStack *stack, PARCEventQueue *queue); + +/** + * A state event occured on the given connection, let all the components know. + * + * A state changed occured (UP, DOWN, PAUSE, or flow control), notify all the components + * + * @param [in] connection The RtaConnection. + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaProtocolStack_ConnectionStateChange(RtaProtocolStack *stack, void *connection); +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/.gitignore b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/.gitignore new file mode 100644 index 00000000..8763938d --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/.gitignore @@ -0,0 +1,10 @@ +test_rta_Component +test_rta_Connection +test_rta_Framework_NonThreaded +test_rta_Framework_Services +test_rta_Framework_Threaded +test_rta_ProtocolStack +test_rta_Logger +test_rta_Stats +output.txt +core diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/CMakeLists.txt b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/CMakeLists.txt new file mode 100644 index 00000000..b57c2afd --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/CMakeLists.txt @@ -0,0 +1,23 @@ +# Enable gcov output for the tests +add_definitions(--coverage) +set(CMAKE_EXE_LINKER_FLAGS ${CMAKE_EXE_LINKER_FLAGS} " --coverage") + +set(TestsExpectedToPass + test_rta_ConnectionTable + test_rta_Framework + test_rta_Framework_Commands + test_rta_Component + test_rta_Connection + test_rta_Framework_NonThreaded + test_rta_Framework_Services + test_rta_Framework_Threaded + test_rta_Logger + test_rta_ProtocolStack + test_rta_ComponentStats +) + + +foreach(test ${TestsExpectedToPass}) + AddTest(${test}) +endforeach() + diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Component.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Component.c new file mode 100644 index 00000000..a1de01bf --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Component.c @@ -0,0 +1,247 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Create a non-threaded framework to test comopnent functions. + * + */ +#define DEBUG_OUTPUT 1 +#include "../rta_Component.c" + +#include <parc/algol/parc_SafeMemory.h> +#include <LongBow/unit-test.h> + +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/config/config_All.h> +#include <ccnx/transport/transport_rta/core/rta_Framework_Commands.c> +#include <ccnx/transport/test_tools/traffic_tools.h> + +#include <sys/socket.h> +#include <errno.h> + +#define PAIR_OTHER 0 +#define PAIR_TRANSPORT 1 + +typedef struct test_data { + PARCRingBuffer1x1 *commandRingBuffer; + PARCNotifier *commandNotifier; + + int api_fds[2]; + RtaFramework *framework; + RtaProtocolStack *stack; + RtaConnection *connection; +} TestData; + +static TestData * +_commonSetup(void) +{ + TestData *data = parcMemory_AllocateAndClear(sizeof(TestData)); + assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData)); + + int error = socketpair(AF_UNIX, SOCK_STREAM, 0, data->api_fds); + assertFalse(error, "Error creating socket pair: (%d) %s", errno, strerror(errno)); + + data->commandRingBuffer = parcRingBuffer1x1_Create(128, NULL); + data->commandNotifier = parcNotifier_Create(); + data->framework = rtaFramework_Create(data->commandRingBuffer, data->commandNotifier); + + assertNotNull(data->framework, "rtaFramework_Create returned null"); + + CCNxStackConfig *stackConfig = ccnxStackConfig_Create(); + apiConnector_ProtocolStackConfig(stackConfig); + testingLower_ProtocolStackConfig(stackConfig); + protocolStack_ComponentsConfigArgs(stackConfig, apiConnector_GetName(), testingLower_GetName(), NULL); + + rtaFramework_NonThreadedStepCount(data->framework, 10); + + int stackId = 1; + RtaCommandCreateProtocolStack *createStack = rtaCommandCreateProtocolStack_Create(stackId, stackConfig); + _rtaFramework_ExecuteCreateStack(data->framework, createStack); + rtaCommandCreateProtocolStack_Release(&createStack); + + rtaFramework_NonThreadedStepCount(data->framework, 10); + data->stack = (rtaFramework_GetProtocolStackByStackId(data->framework, stackId))->stack; + + CCNxConnectionConfig *connConfig = ccnxConnectionConfig_Create(); + apiConnector_ConnectionConfig(connConfig); + + tlvCodec_ConnectionConfig(connConfig); + + testingLower_ConnectionConfig(connConfig); + + RtaCommandOpenConnection *openConnection = rtaCommandOpenConnection_Create(stackId, + data->api_fds[PAIR_OTHER], + data->api_fds[PAIR_TRANSPORT], + ccnxConnectionConfig_GetJson(connConfig)); + + rtaFramework_NonThreadedStepCount(data->framework, 10); + _rtaFramework_ExecuteOpenConnection(data->framework, openConnection); + rtaCommandOpenConnection_Release(&openConnection); + + rtaFramework_NonThreadedStepCount(data->framework, 10); + data->connection = rtaConnectionTable_GetByApiFd(data->framework->connectionTable, data->api_fds[PAIR_OTHER]); + + ccnxConnectionConfig_Destroy(&connConfig); + ccnxStackConfig_Release(&stackConfig); + + return data; +} + +static void +_commonTeardown(TestData *data) +{ + rtaFramework_Teardown(data->framework); + + parcRingBuffer1x1_Release(&data->commandRingBuffer); + parcNotifier_Release(&data->commandNotifier); + rtaFramework_Destroy(&data->framework); + + close(data->api_fds[0]); + close(data->api_fds[1]); + parcMemory_Deallocate((void **) &data); +} + +LONGBOW_TEST_RUNNER(rta_Component) +{ + // The following Test Fixtures will run their corresponding Test Cases. + // Test Fixtures are run in the order specified, but all tests should be idempotent. + // Never rely on the execution order of tests or share state between them. + LONGBOW_RUN_TEST_FIXTURE(Global); + LONGBOW_RUN_TEST_FIXTURE(Local); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_Component) +{ + parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory); + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_Component) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Global) +{ + LONGBOW_RUN_TEST_CASE(Global, rtaComponent_GetOutputQueue); + + LONGBOW_RUN_TEST_CASE(Global, rtaComponent_PutMessage_ClosedConnection); + LONGBOW_RUN_TEST_CASE(Global, rtaComponent_PutMessage_OpenConnection); +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + longBowTestCase_SetClipBoardData(testCase, _commonSetup()); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + _commonTeardown(longBowTestCase_GetClipBoardData(testCase)); + + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_CASE(Global, rtaComponent_GetOutputQueue) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + PARCEventQueue *queue = rtaComponent_GetOutputQueue(data->connection, API_CONNECTOR, RTA_DOWN); + assertNotNull(queue, "Got null queue for API_CONNECTOR DOWN queue"); +} + +LONGBOW_TEST_CASE(Global, rtaComponent_PutMessage_ClosedConnection) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + rtaConnection_SetState(data->connection, CONN_CLOSED); + + // Create the TransportMessage to put on the queue + TransportMessage *tm = trafficTools_CreateTransportMessageWithDictionaryControl(data->connection, CCNxTlvDictionary_SchemaVersion_V1); + + // Send it down from the API connector to the Testing Lower component + PARCEventQueue *outputQueue = rtaComponent_GetOutputQueue(data->connection, API_CONNECTOR, RTA_DOWN); + + int success = rtaComponent_PutMessage(outputQueue, tm); + assertFalse(success, "Error putting message on API Connector's down queue"); + + // check that we got it + PARCEventQueue *inputQueue = rtaComponent_GetOutputQueue(data->connection, TESTING_LOWER, RTA_UP); + + TransportMessage *test_tm = rtaComponent_GetMessage(inputQueue); + assertNull(test_tm, "Should have returned NULL on a closed connection"); + + // The transport message was destroyed by PutMessage because the connection + // was closed. Don't need to destroy the transport message. + + // set state back to OPEN so the connection is properly disposed of + rtaConnection_SetState(data->connection, CONN_OPEN); +} + +LONGBOW_TEST_CASE(Global, rtaComponent_PutMessage_OpenConnection) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + // Create the TransportMessage to put on the queue + TransportMessage *tm = trafficTools_CreateTransportMessageWithDictionaryControl(data->connection, CCNxTlvDictionary_SchemaVersion_V1); + + // Send it down from the API connector to the Testing Lower component + PARCEventQueue *outputQueue = rtaComponent_GetOutputQueue(data->connection, API_CONNECTOR, RTA_DOWN); + + int success = rtaComponent_PutMessage(outputQueue, tm); + assertTrue(success, "Error putting message on API Connector's down queue"); + + // check that we got it + PARCEventQueue *inputQueue = rtaComponent_GetOutputQueue(data->connection, TESTING_LOWER, RTA_UP); + + TransportMessage *test_tm = rtaComponent_GetMessage(inputQueue); + assertTrue(test_tm == tm, "Got wrong message, got %p expected %p", (void *) test_tm, (void *) tm); + + transportMessage_Destroy(&tm); +} + +LONGBOW_TEST_FIXTURE(Local) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_Component); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_ComponentStats.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_ComponentStats.c new file mode 100644 index 00000000..1da0b4fc --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_ComponentStats.c @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "../rta_ComponentStats.c" +#include <parc/algol/parc_SafeMemory.h> + +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/config/config_All.h> + +#include <inttypes.h> +#include <sys/socket.h> +#include <errno.h> + +#define PAIR_OTHER 0 +#define PAIR_TRANSPORT 1 + +#include <LongBow/unit-test.h> + +typedef struct test_data { + PARCRingBuffer1x1 *commandRingBuffer; + PARCNotifier *commandNotifier; + + int api_fds[2]; + RtaFramework *framework; + RtaProtocolStack *stack; +} TestData; + +static TestData * +_commonSetup(void) +{ + TestData *data = parcMemory_AllocateAndClear(sizeof(TestData)); + assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData)); + + int error = socketpair(AF_UNIX, SOCK_STREAM, 0, data->api_fds); + assertTrue(error == 0, "Error creating socket pair: (%d) %s", errno, strerror(errno)); + + data->commandRingBuffer = parcRingBuffer1x1_Create(128, NULL); + data->commandNotifier = parcNotifier_Create(); + data->framework = rtaFramework_Create(data->commandRingBuffer, data->commandNotifier); + assertNotNull(data->framework, "rtaFramework_Create returned null"); + + rtaFramework_Start(data->framework); + + CCNxStackConfig *stackConfig = ccnxStackConfig_Create(); + apiConnector_ProtocolStackConfig(stackConfig); + testingLower_ProtocolStackConfig(stackConfig); + protocolStack_ComponentsConfigArgs(stackConfig, apiConnector_GetName(), testingLower_GetName(), NULL); + data->stack = rtaProtocolStack_Create(data->framework, ccnxStackConfig_GetJson(stackConfig), 1); + + ccnxStackConfig_Release(&stackConfig); + return data; +} + +static void +_commonTeardown(TestData *data) +{ + rtaProtocolStack_Destroy(&data->stack); + + // blocks until done + rtaFramework_Shutdown(data->framework); + + parcRingBuffer1x1_Release(&data->commandRingBuffer); + parcNotifier_Release(&data->commandNotifier); + rtaFramework_Destroy(&data->framework); + + close(data->api_fds[0]); + close(data->api_fds[1]); + parcMemory_Deallocate((void **) &data); +} + +LONGBOW_TEST_RUNNER(rta_Stats) +{ + LONGBOW_RUN_TEST_FIXTURE(Global); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_Stats) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_Stats) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Global) +{ + LONGBOW_RUN_TEST_CASE(Global, stats_Create_Destroy); + LONGBOW_RUN_TEST_CASE(Global, stats_Dump); + LONGBOW_RUN_TEST_CASE(Global, stats_Get); + LONGBOW_RUN_TEST_CASE(Global, stats_Increment); +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + longBowTestCase_SetClipBoardData(testCase, _commonSetup()); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + _commonTeardown(longBowTestCase_GetClipBoardData(testCase)); + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_CASE(Global, stats_Create_Destroy) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + RtaComponentStats *stats = rtaComponentStats_Create(data->stack, API_CONNECTOR); + + assertNotNull(stats, "Got null stats from rtaComponentStats_Create"); + assertTrue(stats->stack == data->stack, + "Bad stack pointer, got %p expected %p", + (void *) stats->stack, (void *) data->stack); + + rtaComponentStats_Destroy(&stats); +} + +LONGBOW_TEST_CASE(Global, stats_Dump) +{ + for (int i = 0; i < STATS_LAST; i++) { + char *test = rtaComponentStatType_ToString(i); + assertNotNull(test, "Got null string for stat type %d", i); + } +} + +LONGBOW_TEST_CASE(Global, stats_Get) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + RtaComponentStats *stats = rtaComponentStats_Create(data->stack, API_CONNECTOR); + + for (int i = 0; i < STATS_LAST; i++) { + // set each stat to a value + uint64_t value = i + 5; + stats->stats[i] = value; + + uint64_t counter = stats->stats[i]; + assertTrue(counter == value, "Counter %d wrong value, got %" PRIu64 " expected %" PRIu64, i, counter, value); + } + + rtaComponentStats_Destroy(&stats); +} + +LONGBOW_TEST_CASE(Global, stats_Increment) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + RtaComponentStats *stats = rtaComponentStats_Create(data->stack, API_CONNECTOR); + + for (int i = 0; i < STATS_LAST; i++) { + rtaComponentStats_Increment(stats, (RtaComponentStatType) i); + } + + // now make sure they are all "1" + for (int i = 0; i < STATS_LAST; i++) { + uint64_t counter = stats->stats[i]; + assertTrue(counter == 1, "Counter %d wrong value, got %" PRIu64 "expected 1", i, counter); + } + + rtaComponentStats_Destroy(&stats); +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_Stats); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Connection.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Connection.c new file mode 100644 index 00000000..4b80d7e1 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Connection.c @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "../rta_Connection.c" +#include <parc/algol/parc_SafeMemory.h> +#include <LongBow/unit-test.h> + +LONGBOW_TEST_RUNNER(rta_Connection) +{ + // The following Test Fixtures will run their corresponding Test Cases. + // Test Fixtures are run in the order specified, but all tests should be idempotent. + // Never rely on the execution order of tests or share state between them. + LONGBOW_RUN_TEST_FIXTURE(Global); + LONGBOW_RUN_TEST_FIXTURE(Local); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_Connection) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_Connection) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Global) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Local) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_Connection); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_ConnectionTable.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_ConnectionTable.c new file mode 100644 index 00000000..d77f47c0 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_ConnectionTable.c @@ -0,0 +1,309 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// Include the file(s) containing the functions to be tested. +// This permits internal static functions to be visible to this Test Framework. +#include "../rta_ConnectionTable.c" +#include "../rta_ProtocolStack.c" +#include <LongBow/unit-test.h> + +#include <parc/algol/parc_SafeMemory.h> + +LONGBOW_TEST_RUNNER(rta_ConnectionTable) +{ + // The following Test Fixtures will run their corresponding Test Cases. + // Test Fixtures are run in the order specified, but all tests should be idempotent. + // Never rely on the execution order of tests or share state between them. + LONGBOW_RUN_TEST_FIXTURE(Global); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_ConnectionTable) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_ConnectionTable) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Global) +{ + LONGBOW_RUN_TEST_CASE(Global, rtaConnectionTable_AddConnection); + LONGBOW_RUN_TEST_CASE(Global, rtaConnectionTable_AddConnection_TooMany); + LONGBOW_RUN_TEST_CASE(Global, rtaConnectionTable_Create_Destroy); + LONGBOW_RUN_TEST_CASE(Global, rtaConnectionTable_GetByApiFd); + LONGBOW_RUN_TEST_CASE(Global, rtaConnectionTable_GetByTransportFd); + LONGBOW_RUN_TEST_CASE(Global, rtaConnectionTable_Remove); + LONGBOW_RUN_TEST_CASE(Global, rtaConnectionTable_RemoveByStack); +} + +typedef struct test_data { + PARCRingBuffer1x1 *commandRingBuffer; + PARCNotifier *commandNotifier; + + RtaFramework *framework; + + // in some tests we use two protocol stacks + RtaProtocolStack *stack_a; + RtaProtocolStack *stack_b; +} TestData; + +static RtaConnection * +createConnection(RtaProtocolStack *stack, int api_fd, int transport_fd) +{ + // ------- + // Create a connection to use in the table + PARCJSON *params = parcJSON_ParseString("{}"); + RtaCommandOpenConnection *openConnection = rtaCommandOpenConnection_Create(stack->stack_id, api_fd, transport_fd, params); + + // Create a connection that goes in the connection table + RtaConnection *conn = rtaConnection_Create(stack, openConnection); + assertNotNull(conn, "Got null connection from rtaConnection_Create"); + + rtaCommandOpenConnection_Release(&openConnection); + parcJSON_Release(¶ms); + return conn; +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + TestData *data = parcMemory_AllocateAndClear(sizeof(TestData)); + assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData)); + + // --------------------------- + // To test a connection table, we need to create a Framework and a Protocol stack + + data->commandRingBuffer = parcRingBuffer1x1_Create(128, NULL); + data->commandNotifier = parcNotifier_Create(); + + data->framework = rtaFramework_Create(data->commandRingBuffer, data->commandNotifier); + + // fake out a protocol stack + data->stack_a = parcMemory_AllocateAndClear(sizeof(RtaProtocolStack)); + assertNotNull(data->stack_a, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(RtaProtocolStack)); + data->stack_a->stack_id = 1; + data->stack_a->framework = data->framework; + + // fake out a protocol stack + data->stack_b = parcMemory_AllocateAndClear(sizeof(RtaProtocolStack)); + assertNotNull(data->stack_b, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(RtaProtocolStack)); + data->stack_b->stack_id = 2; + data->stack_b->framework = data->framework; + + longBowTestCase_SetClipBoardData(testCase, data); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + // now cleanup everything + rtaFramework_Destroy(&data->framework); + parcNotifier_Release(&data->commandNotifier); + parcRingBuffer1x1_Release(&data->commandRingBuffer); + + parcMemory_Deallocate((void **) &(data->stack_a)); + parcMemory_Deallocate((void **) &(data->stack_b)); + parcMemory_Deallocate((void **) &data); + + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +/** + * Destroy the table before destroying the connection + */ +LONGBOW_TEST_CASE(Global, rtaConnectionTable_AddConnection) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + RtaConnection *conn = createConnection(data->stack_a, 2, 3); + + // This is the part we want to test. + RtaConnectionTable *table = rtaConnectionTable_Create(1000, rtaConnection_Destroy); + rtaConnectionTable_AddConnection(table, conn); + + assertTrue(table->count_elements == 1, "Incorrect table size, expected %d got %zu", 1, table->count_elements); + rtaConnectionTable_Destroy(&table); +} + + +/** + * Create a connection table with just 1 connection and make sure table + * does the right thing on overflow + */ +LONGBOW_TEST_CASE(Global, rtaConnectionTable_AddConnection_TooMany) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + RtaConnection *conn = createConnection(data->stack_a, 2, 3); + + int res; + + // create the table with size 1 + RtaConnectionTable *table = rtaConnectionTable_Create(1, rtaConnection_Destroy); + res = rtaConnectionTable_AddConnection(table, conn); + assertTrue(res == 0, "Got non-zero return %d", res); + assertTrue(table->count_elements == 1, "Incorrect table size, expected %d got %zu", 1, table->count_elements); + + // add the second connection, should return failure + res = rtaConnectionTable_AddConnection(table, conn); + assertTrue(res == -1, "Should have failed, expecting -1, got %d", res); + + rtaConnectionTable_Destroy(&table); +} + + +LONGBOW_TEST_CASE(Global, rtaConnectionTable_Create_Destroy) +{ + size_t beforeBalance = parcMemory_Outstanding(); + RtaConnectionTable *table = rtaConnectionTable_Create(1000, rtaConnection_Destroy); + assertTrue(table->max_elements == 1000, "Initialized with wrong number of elements"); + rtaConnectionTable_Destroy(&table); + size_t afterBalance = parcMemory_Outstanding(); + assertTrue(beforeBalance == afterBalance, "Memory imbalance after create/destroy"); +} + +LONGBOW_TEST_CASE(Global, rtaConnectionTable_GetByApiFd) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + RtaConnection *conn = createConnection(data->stack_a, 2, 3); + + RtaConnectionTable *table = rtaConnectionTable_Create(1000, rtaConnection_Destroy); + rtaConnectionTable_AddConnection(table, conn); + + RtaConnection *test; + test = rtaConnectionTable_GetByApiFd(table, 2); + assertTrue(test == conn, "Got wrong connection, expecting %p got %p", (void *) conn, (void *) test); + + test = rtaConnectionTable_GetByApiFd(table, 3); + assertTrue(test == NULL, "Got wrong connection, expecting %p got %p", NULL, (void *) test); + + test = rtaConnectionTable_GetByApiFd(table, 4); + assertTrue(test == NULL, "Got wrong connection, expecting %p got %p", NULL, (void *) test); + + rtaConnectionTable_Destroy(&table); +} + +LONGBOW_TEST_CASE(Global, rtaConnectionTable_GetByTransportFd) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + RtaConnection *conn = createConnection(data->stack_a, 2, 3); + + RtaConnectionTable *table = rtaConnectionTable_Create(1000, rtaConnection_Destroy); + rtaConnectionTable_AddConnection(table, conn); + + RtaConnection *test; + test = rtaConnectionTable_GetByTransportFd(table, 2); + assertTrue(test == NULL, "Got wrong connection, expecting %p got %p", NULL, (void *) test); + + test = rtaConnectionTable_GetByTransportFd(table, 3); + assertTrue(test == conn, "Got wrong connection, expecting %p got %p", (void *) conn, (void *) test); + + test = rtaConnectionTable_GetByTransportFd(table, 4); + assertTrue(test == NULL, "Got wrong connection, expecting %p got %p", NULL, (void *) test); + + + rtaConnectionTable_Destroy(&table); +} + +/** + * We create two connections and make sure that when we remove one the other + * is still in the table + */ +LONGBOW_TEST_CASE(Global, rtaConnectionTable_Remove) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + int res; + int a_pair[2]; + int b_pair[2]; + + // we have to use actual socket pairs in this test because Remove will destroy + // the last copy of the connection and call close() on the sockets. + socketpair(PF_LOCAL, SOCK_STREAM, 0, a_pair); + socketpair(PF_LOCAL, SOCK_STREAM, 0, b_pair); + + RtaConnectionTable *table = rtaConnectionTable_Create(1000, rtaConnection_Destroy); + + RtaConnection *conn_a = createConnection(data->stack_a, a_pair[0], a_pair[1]); + rtaConnectionTable_AddConnection(table, conn_a); + + RtaConnection *conn_b = createConnection(data->stack_b, b_pair[0], b_pair[1]); + rtaConnectionTable_AddConnection(table, conn_b); + + assertTrue(table->count_elements == 2, "Wrong element count"); + + res = rtaConnectionTable_Remove(table, conn_b); + assertTrue(res == 0, "Got error from rtaConnectionTable_Remove: %d", res); + assertTrue(table->count_elements == 1, "Wrong element count"); + + RtaConnection *test = rtaConnectionTable_GetByApiFd(table, a_pair[0]); + assertNotNull(test, "Could not retrieve connection that was supposed to still be there"); + + rtaConnectionTable_Destroy(&table); +} + +/** + * Create two connections, they are in different protocol stacks. Remove one by + * stack id and make sure the other is still in the table + */ +LONGBOW_TEST_CASE(Global, rtaConnectionTable_RemoveByStack) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + int res; + int a_pair[2]; + int b_pair[2]; + + // we have to use actual socket pairs in this test because Remove will destroy + // the last copy of the connection and call close() on the sockets. + socketpair(PF_LOCAL, SOCK_STREAM, 0, a_pair); + socketpair(PF_LOCAL, SOCK_STREAM, 0, b_pair); + + RtaConnectionTable *table = rtaConnectionTable_Create(1000, rtaConnection_Destroy); + + RtaConnection *conn_a = createConnection(data->stack_a, a_pair[0], a_pair[1]); + rtaConnectionTable_AddConnection(table, conn_a); + + RtaConnection *conn_b = createConnection(data->stack_b, b_pair[0], b_pair[1]); + rtaConnectionTable_AddConnection(table, conn_b); + + // now remove a connection by stack id + + res = rtaConnectionTable_RemoveByStack(table, data->stack_a->stack_id); + assertTrue(res == 0, "Got error from rtaConnectionTable_RemoveByStack: %d", res); + assertTrue(table->count_elements == 1, "Wrong element count"); + + RtaConnection *test = rtaConnectionTable_GetByApiFd(table, b_pair[0]); + assertNotNull(test, "Could not retrieve connection that was supposed to still be there"); + + rtaConnectionTable_Destroy(&table); +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_ConnectionTable); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework.c new file mode 100644 index 00000000..715908f3 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework.c @@ -0,0 +1,298 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// Include the file(s) containing the functions to be tested. +// This permits internal static functions to be visible to this Test Framework. +#include "../rta_Framework.c" +#include <ccnx/transport/transport_rta/commands/rta_Command.h> + +#include <parc/algol/parc_SafeMemory.h> +#include <LongBow/unit-test.h> +#include <math.h> + +typedef struct test_data { + PARCRingBuffer1x1 *commandRingBuffer; + PARCNotifier *commandNotifier; + RtaFramework *framework; +} TestData; + + +static TestData * +_createTestData(void) +{ + TestData *data = parcMemory_AllocateAndClear(sizeof(TestData)); + assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData)); + data->commandRingBuffer = parcRingBuffer1x1_Create(128, NULL); + data->commandNotifier = parcNotifier_Create(); + data->framework = rtaFramework_Create(data->commandRingBuffer, data->commandNotifier); + rtaLogger_SetLogLevel(data->framework->logger, RtaLoggerFacility_Framework, PARCLogLevel_Debug); + return data; +} + +static void +_destroyTestData(TestData *data) +{ + parcRingBuffer1x1_Release(&data->commandRingBuffer); + parcNotifier_Release(&data->commandNotifier); + rtaFramework_Destroy(&data->framework); + parcMemory_Deallocate((void **) &data); +} + +LONGBOW_TEST_RUNNER(rta_Framework) +{ + LONGBOW_RUN_TEST_FIXTURE(Global); + LONGBOW_RUN_TEST_FIXTURE(Local); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_Framework) +{ + parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory); + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_Framework) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// =================================================================== + +LONGBOW_TEST_FIXTURE(Global) +{ + LONGBOW_RUN_TEST_CASE(Global, rtaFramework_Create_Destroy); + LONGBOW_RUN_TEST_CASE(Global, rtaFramework_GetEventScheduler); + LONGBOW_RUN_TEST_CASE(Global, rtaFramework_GetNextConnectionId); + LONGBOW_RUN_TEST_CASE(Global, rtaFramework_GetStatus); + LONGBOW_RUN_TEST_CASE(Global, rtaFramework_Start_Shutdown); + LONGBOW_RUN_TEST_CASE(Global, tick_cb); +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + longBowTestCase_SetClipBoardData(testCase, _createTestData()); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + _destroyTestData(longBowTestCase_GetClipBoardData(testCase)); + + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_CASE(Global, rtaFramework_Create_Destroy) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + assertNotNull(data->framework, "rtaFramework_Create returned null"); + assertTrue(data->framework->commandRingBuffer == data->commandRingBuffer, "framework commandRingBuffer incorrect"); + assertTrue(data->framework->commandNotifier == data->commandNotifier, "framework commandNotifier incorrect"); + assertNotNull(data->framework->commandEvent, "framework commandEvent is null"); +} + +LONGBOW_TEST_CASE(Global, rtaFramework_GetEventScheduler) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + assertTrue(rtaFramework_GetEventScheduler(data->framework) == data->framework->base, "getEventScheduler broken"); +} + +LONGBOW_TEST_CASE(Global, rtaFramework_GetNextConnectionId) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + assertTrue(rtaFramework_GetNextConnectionId(data->framework) == 1, "GetNextConnetionId not starting at 1"); + assertTrue(rtaFramework_GetNextConnectionId(data->framework) == 2, "GetNextConnetionId first increment not 2"); +} + +LONGBOW_TEST_CASE(Global, rtaFramework_GetStatus) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + assertTrue(rtaFramework_GetStatus(data->framework) == FRAMEWORK_INIT, "Wrong initial status"); +} + +LONGBOW_TEST_CASE(Global, rtaFramework_Start_Shutdown) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + rtaFramework_Start(data->framework); + assertTrue(rtaFramework_WaitForStatus(data->framework, FRAMEWORK_RUNNING) == FRAMEWORK_RUNNING, "Status not RUNNING"); + + // blocks until done + rtaFramework_Shutdown(data->framework); +} + +LONGBOW_TEST_CASE(Global, tick_cb) +{ + ticks tic0, tic1; + struct timeval t0, t1; + double delta_tic, delta_t, delta_abs; + + TestData *data = longBowTestCase_GetClipBoardData(testCase); + rtaFramework_Start(data->framework); + assertTrue(rtaFramework_WaitForStatus(data->framework, FRAMEWORK_RUNNING) == FRAMEWORK_RUNNING, "Status not RUNNING"); + + gettimeofday(&t0, NULL); + tic0 = data->framework->clock_ticks; + sleep(2); + gettimeofday(&t1, NULL); + tic1 = data->framework->clock_ticks; + + delta_t = (t1.tv_sec + t1.tv_usec * 1E-6) - (t0.tv_sec + t0.tv_usec * 1E-6); + delta_tic = ((tic1 - tic0) * FC_USEC_PER_TICK) * 1E-6; + delta_abs = fabs(delta_tic - delta_t); + + printf("over 2 seconds, absolute clock error is %.6f seconds\n", delta_abs); + + + // blocks until done + rtaFramework_Shutdown(data->framework); +} + +// =================================================== + +LONGBOW_TEST_FIXTURE(Local) +{ + LONGBOW_RUN_TEST_CASE(Local, _setLogLevels_All); + LONGBOW_RUN_TEST_CASE(Local, _setLogLevels_All_Framework); + LONGBOW_RUN_TEST_CASE(Local, _setLogLevels_Framework); + LONGBOW_RUN_TEST_CASE(Local, _setLogLevels_ApiConnector); + LONGBOW_RUN_TEST_CASE(Local, _setLogLevels_FlowController); + LONGBOW_RUN_TEST_CASE(Local, _setLogLevels_Codec); + LONGBOW_RUN_TEST_CASE(Local, _setLogLevels_ForwarderConnector); +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + longBowTestCase_SetClipBoardData(testCase, _createTestData()); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + _destroyTestData(longBowTestCase_GetClipBoardData(testCase)); + + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_CASE(Local, _setLogLevels_All) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + setenv("RtaFacility_All", "Warning", 1); + _setLogLevels(data->framework); + + for (int i = 0; i < RtaLoggerFacility_END; i++) { + bool isLoggable = rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), i, PARCLogLevel_Warning); + assertTrue(isLoggable, "Facility %s not set to Warning", rtaLogger_FacilityString(i)); + } + + unsetenv("RtaFacility_All"); +} + +LONGBOW_TEST_CASE(Local, _setLogLevels_All_Framework) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + setenv("RtaFacility_All", "Info", 1); + setenv("RtaFacility_Framework", "Warning", 1); + _setLogLevels(data->framework); + + assertTrue(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_ApiConnector, PARCLogLevel_Info), "Api facility not Info"); + assertTrue(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_Framework, PARCLogLevel_Warning), "Framework not Warning"); + + unsetenv("RtaFacility_All"); + unsetenv("RtaFacility_Framework"); +} + +LONGBOW_TEST_CASE(Local, _setLogLevels_Framework) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + setenv("RtaFacility_Framework", "Warning", 1); + _setLogLevels(data->framework); + + assertFalse(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_Framework, PARCLogLevel_Info), "Info should not be loggable"); + assertTrue(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_Framework, PARCLogLevel_Warning), "Warning should be loggable"); + unsetenv("RtaFacility_Framework"); +} + +LONGBOW_TEST_CASE(Local, _setLogLevels_ApiConnector) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + setenv("RtaFacility_Api", "Warning", 1); + _setLogLevels(data->framework); + + assertFalse(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_ApiConnector, PARCLogLevel_Info), "Info should not be loggable"); + assertTrue(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_ApiConnector, PARCLogLevel_Warning), "Warning should be loggable"); + unsetenv("RtaFacility_Api"); +} + +LONGBOW_TEST_CASE(Local, _setLogLevels_FlowController) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + setenv("RtaFacility_Flowcontrol", "Warning", 1); + _setLogLevels(data->framework); + + assertFalse(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info), "Info should not be loggable"); + assertTrue(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Warning), "Warning should be loggable"); + unsetenv("RtaFacility_Flowcontrol"); +} + +LONGBOW_TEST_CASE(Local, _setLogLevels_Codec) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + setenv("RtaFacility_Codec", "Warning", 1); + _setLogLevels(data->framework); + + assertFalse(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_Codec, PARCLogLevel_Info), "Info should not be loggable"); + assertTrue(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_Codec, PARCLogLevel_Warning), "Warning should be loggable"); + unsetenv("RtaFacility_Codec"); +} + +LONGBOW_TEST_CASE(Local, _setLogLevels_ForwarderConnector) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + setenv("RtaFacility_Forwarder", "Warning", 1); + _setLogLevels(data->framework); + + assertFalse(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_ForwarderConnector, PARCLogLevel_Info), "Info should not be loggable"); + assertTrue(rtaLogger_IsLoggable(rtaFramework_GetLogger(data->framework), RtaLoggerFacility_ForwarderConnector, PARCLogLevel_Warning), "Warning should be loggable"); + unsetenv("RtaFacility_Forwarder"); +} + +// =================================================== + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_Framework); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_Commands.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_Commands.c new file mode 100644 index 00000000..d19d680b --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_Commands.c @@ -0,0 +1,449 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Creates a bentpipe forwarder and then creates and runs in non-threaded Transport in + * the commonSetup() function. The function commonTeardown() undoes all that. + * + */ + +#include "../rta_Framework_Commands.c" +#include <sys/param.h> + +#include <LongBow/unit-test.h> + +#include <parc/security/parc_Pkcs12KeyStore.h> +#include <parc/security/parc_Security.h> + +#include <ccnx/api/control/cpi_ControlMessage.h> + +#include <parc/algol/parc_SafeMemory.h> + +#include <ccnx/transport/transport_rta/config/config_All.h> +#include <ccnx/transport/transport_rta/rta_Transport.h> +#include <ccnx/transport/common/transport_private.h> +#include <ccnx/transport/test_tools/traffic_tools.h> + +#include <ccnx/transport/test_tools/bent_pipe.h> + +// ============================================== +typedef struct test_data { + PARCRingBuffer1x1 *commandRingBuffer; + PARCNotifier *commandNotifier; + RtaFramework *framework; + + char bentpipe_Directory[MAXPATHLEN]; + char bentpipe_LocalName[MAXPATHLEN]; + BentPipeState *bentpipe; + char keystoreName[MAXPATHLEN]; + char keystorePassword[MAXPATHLEN]; +} TestData; + +static CCNxTransportConfig * +_createParams(const char *local_name, const char *keystore_name, const char *keystore_passwd) +{ + assertNotNull(local_name, "Got null local name\n"); + assertNotNull(keystore_name, "Got null keystore name\n"); + assertNotNull(keystore_passwd, "Got null keystore passwd\n"); + + CCNxStackConfig *stackConfig = apiConnector_ProtocolStackConfig( + tlvCodec_ProtocolStackConfig( + localForwarder_ProtocolStackConfig( + protocolStack_ComponentsConfigArgs(ccnxStackConfig_Create(), + apiConnector_GetName(), + tlvCodec_GetName(), + localForwarder_GetName(), + NULL)))); + + CCNxConnectionConfig *connConfig = apiConnector_ConnectionConfig( + localForwarder_ConnectionConfig(ccnxConnectionConfig_Create(), local_name)); + + connConfig = tlvCodec_ConnectionConfig(connConfig); + + publicKeySigner_ConnectionConfig(connConfig, keystore_name, keystore_passwd); + + CCNxTransportConfig *result = ccnxTransportConfig_Create(stackConfig, connConfig); + ccnxStackConfig_Release(&stackConfig); + return result; +} + +static void +_runNonThreaded(TestData *data) +{ + rtaFramework_NonThreadedStepTimed(data->framework, &((struct timeval) { 0, 100000 })); +} + +static void +_stopThreaded(TestData *data) +{ + printf("Beginning shutdown pid %d\n", getpid()); + // blocks until done + rtaFramework_Shutdown(data->framework); + printf("Finished shutdown pid %d\n", getpid()); +} + +static void +_stopNonThreaded(TestData *data) +{ + printf("Beginning shutdown pid %d\n", getpid()); + rtaFramework_Teardown(data->framework); + printf("Finished shutdown pid %d\n", getpid()); +} + +static TestData * +_commonSetup(void) +{ + TestData *data = parcMemory_AllocateAndClear(sizeof(TestData)); + assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData)); + + snprintf(data->bentpipe_Directory, MAXPATHLEN, "/tmp/bentpipe_XXXXXX"); + char *p = mkdtemp(data->bentpipe_Directory); + assertNotNull(p, "Got null from mkdtemp(%s)", data->bentpipe_Directory); + snprintf(data->bentpipe_LocalName, MAXPATHLEN, "%s/bentpipe.sock", data->bentpipe_Directory); + + data->bentpipe = bentpipe_Create(data->bentpipe_LocalName); + bentpipe_SetChattyOutput(data->bentpipe, false); + + printf("Staring bent pipe pid %d\n", getpid()); + bentpipe_Start(data->bentpipe); + printf("Started bent pipe\n"); + + snprintf(data->keystoreName, MAXPATHLEN, "/tmp/keystore_p12_XXXXXX"); + int fd = mkstemp(data->keystoreName); + assertTrue(fd != -1, "Error from mkstemp(%s)", data->keystoreName); + + sprintf(data->keystorePassword, "23439429"); + + bool success = parcPkcs12KeyStore_CreateFile(data->keystoreName, data->keystorePassword, "user", 1024, 30); + assertTrue(success, "parcPublicKeySignerPkcs12Store_CreateFile() failed."); + close(fd); + + data->commandRingBuffer = parcRingBuffer1x1_Create(128, NULL); + data->commandNotifier = parcNotifier_Create(); + data->framework = rtaFramework_Create(data->commandRingBuffer, data->commandNotifier); + return data; +} + +static void +_commonTeardown(TestData *data) +{ + if (rtaFramework_GetStatus(data->framework) == FRAMEWORK_RUNNING) { + _stopThreaded(data); + } else { + _stopNonThreaded(data); + } + + parcRingBuffer1x1_Release(&data->commandRingBuffer); + parcNotifier_Release(&data->commandNotifier); + + printf("Destroying framework pid %d\n", getpid()); + rtaFramework_Destroy(&data->framework); + + bentpipe_Stop(data->bentpipe); + bentpipe_Destroy(&data->bentpipe); + unlink(data->keystoreName); + unlink(data->bentpipe_LocalName); + rmdir(data->bentpipe_Directory); + + parcMemory_Deallocate((void **) &data); +} + + +/** + * @function assertConnectionOpen + * @abstract Block on reading the 1st message out of the socket. It's the connection ready message. + * @discussion + * <#Discussion#> + * + * @param <#param1#> + * @return <#return#> + */ +static void +_assertConnectionOpen(int fd) +{ + CCNxMetaMessage *firstMessage; + + rtaTransport_Recv(NULL, fd, &firstMessage, CCNxStackTimeout_Never); + + assertTrue(ccnxMetaMessage_IsControl(firstMessage), "not a control message"); + + CCNxControl *control = ccnxMetaMessage_GetControl(firstMessage); + + NotifyStatus *status = notifyStatus_ParseJSON(ccnxControl_GetJson(control)); + ccnxMetaMessage_Release(&firstMessage); + + assertTrue(notifyStatus_IsConnectionOpen(status), "Expected notifyStatus_IsConnectionOpen to be true"); + notifyStatus_Release(&status); +} + + +/** + * @function openConnection + * @abstract Opens a connection and fills in the socket pair + * @discussion + * uses rtaFramework_ExecuteOpen to directly create, does not go over the command pair + * + * @param <#param1#> + * @return <#return#> + */ +static void +_openConnection(RtaFramework *framework, CCNxTransportConfig *transportConfig, int stack_id, int socketPairOutput[]) +{ + socketpair(PF_LOCAL, SOCK_STREAM, 0, socketPairOutput); + + struct timeval timeout = { .tv_sec = 10, .tv_usec = 0 }; + + setsockopt(socketPairOutput[0], SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); + setsockopt(socketPairOutput[0], SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); + setsockopt(socketPairOutput[1], SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); + setsockopt(socketPairOutput[1], SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); + + RtaCommandOpenConnection *openConnection = rtaCommandOpenConnection_Create(stack_id, socketPairOutput[1], socketPairOutput[0], + ccnxConnectionConfig_GetJson(ccnxTransportConfig_GetConnectionConfig(transportConfig))); + + _rtaFramework_ExecuteOpenConnection(framework, openConnection); + rtaCommandOpenConnection_Release(&openConnection); + + rtaFramework_NonThreadedStepCount(framework, 10); + _assertConnectionOpen(socketPairOutput[1]); +} + +static bool +_readAndCompareName(int fd, CCNxName *truthName) +{ + CCNxMetaMessage *test_msg; + + int res = rtaTransport_Recv(NULL, fd, &test_msg, CCNxStackTimeout_Never); + assertTrue(res == 0, "Got error receiving on bob's socket: %s (%d)", strerror(errno), errno); + + assertNotNull(test_msg, "Got null message from Bob"); + + assertTrue(ccnxMetaMessage_IsInterest(test_msg), "Got wrong type, expected Interest but got other"); + + CCNxInterest *interest = ccnxMetaMessage_GetInterest(test_msg); + + assertTrue(ccnxName_Compare(truthName, ccnxInterest_GetName(interest)) == 0, "Names did not compare") + { + ccnxName_Display(ccnxInterest_GetName(interest), 3); + ccnxName_Display(truthName, 3); + } + + ccnxMetaMessage_Release(&test_msg); + + return true; +} + +// ========================== + +LONGBOW_TEST_RUNNER(rta_Framework_Commands) +{ + LONGBOW_RUN_TEST_FIXTURE(Local); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_Framework_Commands) +{ + printf("\n********\n%s starting\n\n", __func__); + + srandom((int) time(NULL)); + parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory); + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_Framework_Commands) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Local) +{ + LONGBOW_RUN_TEST_CASE(Local, _rtaFramework_ExecuteCloseConnection); + LONGBOW_RUN_TEST_CASE(Local, _rtaFramework_ExecuteCreateStack); + LONGBOW_RUN_TEST_CASE(Local, _rtaFramework_ExecuteOpenConnection); +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + parcSecurity_Init(); + +#if __APPLE__ + pthread_setname_np(longBowTestCase_GetName(testCase)); +#else + pthread_setname_np(pthread_self(), longBowTestCase_GetName(testCase)); +#endif + + TestData *data = _commonSetup(); + _runNonThreaded(data); + + longBowTestCase_SetClipBoardData(testCase, data); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + _commonTeardown(data); + parcSecurity_Fini(); + + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_CASE(Local, _rtaFramework_ExecuteCloseConnection) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + int stack_id = 5; + + CCNxTransportConfig *params = _createParams(data->bentpipe_LocalName, data->keystoreName, data->keystorePassword); + + RtaCommandCreateProtocolStack *createStack = + rtaCommandCreateProtocolStack_Create(stack_id, ccnxTransportConfig_GetStackConfig(params)); + + _rtaFramework_ExecuteCreateStack(data->framework, createStack); + rtaCommandCreateProtocolStack_Release(&createStack); + + // now use three connections, then close 1 and make sure other 2 still ok + { + int alice_pair[2], bob_pair[2], charlie_pair[2]; + + _openConnection(data->framework, params, stack_id, alice_pair); + _openConnection(data->framework, params, stack_id, bob_pair); + _openConnection(data->framework, params, stack_id, charlie_pair); + + CCNxInterest *firstInterest = trafficTools_CreateInterest(); + + // send will consume the message, so copy out the name + CCNxName *truth_name = ccnxName_Copy(ccnxInterest_GetName(firstInterest)); + + CCNxMetaMessage *message = ccnxMetaMessage_CreateFromInterest(firstInterest); + bool success = rtaTransport_Send(NULL, alice_pair[1], message, CCNxStackTimeout_Never); + assertTrue(success, "Got error sending on alice's socket: %s (%d)", strerror(errno), errno); + ccnxMetaMessage_Release(&message); + + // *** Read bob + rtaFramework_NonThreadedStepCount(data->framework, 10); + _readAndCompareName(bob_pair[1], truth_name); + + // *** Read Charlie + rtaFramework_NonThreadedStepCount(data->framework, 10); + _readAndCompareName(charlie_pair[1], truth_name); + + // Close charlie and make sure alice + bob still happy + RtaCommandCloseConnection *closeConnection = rtaCommandCloseConnection_Create(charlie_pair[1]); + _rtaFramework_ExecuteCloseConnection(data->framework, closeConnection); + rtaCommandCloseConnection_Release(&closeConnection); + rtaFramework_NonThreadedStepCount(data->framework, 10); + + // send another interest + CCNxInterest *secondInterest = trafficTools_CreateInterest(); + message = ccnxMetaMessage_CreateFromInterest(secondInterest); + + success = rtaTransport_Send(NULL, alice_pair[1], message, CCNxStackTimeout_Never); + assertTrue(success, "Got error sending on alice's socket: %s (%d)", strerror(errno), errno); + ccnxMetaMessage_Release(&message); + + // make sure bob gets it + rtaFramework_NonThreadedStepCount(data->framework, 10); + _readAndCompareName(bob_pair[1], truth_name); + + ccnxName_Release(&truth_name); + ccnxInterest_Release(&firstInterest); + ccnxInterest_Release(&secondInterest); + } + + ccnxTransportConfig_Destroy(¶ms); +} + +LONGBOW_TEST_CASE(Local, _rtaFramework_ExecuteCreateStack) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + int stack_id = 4; + CCNxTransportConfig *params = _createParams(data->bentpipe_LocalName, data->keystoreName, data->keystorePassword); + RtaCommandCreateProtocolStack *createStack = + rtaCommandCreateProtocolStack_Create(stack_id, ccnxTransportConfig_GetStackConfig(params)); + + + // this call skirts around threading + _rtaFramework_ExecuteCreateStack(data->framework, createStack); + + FrameworkProtocolHolder *holder; + holder = rtaFramework_GetProtocolStackByStackId(data->framework, stack_id); + assertNotNull(holder, "There is no protocol holder for this stack, not created?"); + + ccnxTransportConfig_Destroy(¶ms); + rtaCommandCreateProtocolStack_Release(&createStack); +} + +LONGBOW_TEST_CASE(Local, _rtaFramework_ExecuteOpenConnection) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + int stack_id = 4; + CCNxTransportConfig *params = _createParams(data->bentpipe_LocalName, data->keystoreName, data->keystorePassword); + + RtaCommandCreateProtocolStack *createStack = + rtaCommandCreateProtocolStack_Create(stack_id, ccnxTransportConfig_GetStackConfig(params)); + _rtaFramework_ExecuteCreateStack(data->framework, createStack); + rtaCommandCreateProtocolStack_Release(&createStack); + + // now create two connections and make sure they work + { + // now create + int alice_pair[2], bob_pair[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, alice_pair); + socketpair(PF_LOCAL, SOCK_STREAM, 0, bob_pair); + + _openConnection(data->framework, params, stack_id, alice_pair); + _openConnection(data->framework, params, stack_id, bob_pair); + + CCNxInterest *interest = trafficTools_CreateInterest(); + + //ccnxInterest_Display(interest, 0); + + // send will consume the message, so copy out the name + CCNxName *truth_name = ccnxName_Copy(ccnxInterest_GetName(interest)); + + //ccnxName_Display(truth_name, 0); + + // now send it down the stack + CCNxMetaMessage *message = ccnxMetaMessage_CreateFromInterest(interest); + bool success = rtaTransport_Send(NULL, alice_pair[1], message, CCNxStackTimeout_Never); + assertTrue(success, "Got error sending on alice's socket: %s (%d)", strerror(errno), errno); + ccnxMetaMessage_Release(&message); + + rtaFramework_NonThreadedStepCount(data->framework, 10); + _readAndCompareName(bob_pair[1], truth_name); + + ccnxName_Release(&truth_name); + ccnxInterest_Release(&interest); + } + + ccnxTransportConfig_Destroy(¶ms); +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_Framework_Commands); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_NonThreaded.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_NonThreaded.c new file mode 100644 index 00000000..fb73e15c --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_NonThreaded.c @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "../rta_Framework_NonThreaded.c" +#include <parc/algol/parc_SafeMemory.h> + +#include <LongBow/unit-test.h> + +LONGBOW_TEST_RUNNER(rta_Framework_NonThreaded) +{ + // The following Test Fixtures will run their corresponding Test Cases. + // Test Fixtures are run in the order specified, but all tests should be idempotent. + // Never rely on the execution order of tests or share state between them. + LONGBOW_RUN_TEST_FIXTURE(Global); + LONGBOW_RUN_TEST_FIXTURE(Local); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_Framework_NonThreaded) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_Framework_NonThreaded) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Global) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Local) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_Framework_NonThreaded); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_Services.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_Services.c new file mode 100644 index 00000000..6bb51f0f --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_Services.c @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "../rta_Framework_Services.c" +#include <parc/algol/parc_SafeMemory.h> + +#include <LongBow/unit-test.h> + +LONGBOW_TEST_RUNNER(rta_Framework_Services) +{ + // The following Test Fixtures will run their corresponding Test Cases. + // Test Fixtures are run in the order specified, but all tests should be idempotent. + // Never rely on the execution order of tests or share state between them. + LONGBOW_RUN_TEST_FIXTURE(Global); + LONGBOW_RUN_TEST_FIXTURE(Local); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_Framework_Services) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_Framework_Services) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Global) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + + +LONGBOW_TEST_FIXTURE(Local) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_Framework_Services); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_Threaded.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_Threaded.c new file mode 100644 index 00000000..74ea2f64 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Framework_Threaded.c @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "../rta_Framework_Threaded.c" +#include <parc/algol/parc_SafeMemory.h> + +#include <LongBow/unit-test.h> + +LONGBOW_TEST_RUNNER(rta_Framework_Threaded) +{ + // The following Test Fixtures will run their corresponding Test Cases. + // Test Fixtures are run in the order specified, but all tests should be idempotent. + // Never rely on the execution order of tests or share state between them. + LONGBOW_RUN_TEST_FIXTURE(Global); + LONGBOW_RUN_TEST_FIXTURE(Local); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_Framework_Threaded) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_Framework_Threaded) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Global) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Local) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_Framework_Threaded); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Logger.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Logger.c new file mode 100644 index 00000000..240293fc --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_Logger.c @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Include the file(s) containing the functions to be tested. +// This permits internal static functions to be visible to this Test Framework. +#include "../rta_Logger.c" +#include <stdio.h> +#include <LongBow/unit-test.h> +#include <parc/algol/parc_SafeMemory.h> + +LONGBOW_TEST_RUNNER(rta_Logger) +{ + LONGBOW_RUN_TEST_FIXTURE(Global); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_Logger) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_Logger) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// ========================================================== + +/* + * _testWritter will vsprintf to this buffer + */ +#define _logLength 1024 +static char _lastLogMessage[_logLength]; + +static int +_testWriter(const char *message) +{ + int written = 0; + written = snprintf(_lastLogMessage, _logLength, "%s", message); + return written; +} + +static PARCLogReporter * +_testWriter_Acquire(const PARCLogReporter *reporter) +{ + return parcObject_Acquire(reporter); +} + +static void +_testWriter_Release(PARCLogReporter **reporterPtr) +{ + parcObject_Release((void **) reporterPtr); +} + +static void +_testWriter_Report(PARCLogReporter *reporter, const PARCLogEntry *entry) +{ + char *string = parcLogEntry_ToString(entry); + _testWriter(string); + parcMemory_Deallocate((void **) &string); +} + +static PARCLogReporter * +_testWriter_Create(void) +{ + return parcLogReporter_Create(_testWriter_Acquire, _testWriter_Release, _testWriter_Report, NULL); +} + +// ========================================================== + +LONGBOW_TEST_FIXTURE(Global) +{ + LONGBOW_RUN_TEST_CASE(Global, rtaLogger_FacilityString_Found); + LONGBOW_RUN_TEST_CASE(Global, rtaLogger_FacilityString_NotFound); + LONGBOW_RUN_TEST_CASE(Global, rtaLogger_Create); + LONGBOW_RUN_TEST_CASE(Global, rtaLogger_Acquire); + LONGBOW_RUN_TEST_CASE(Global, rtaLogger_SetLogLevel); + LONGBOW_RUN_TEST_CASE(Global, rtaLogger_IsLoggable_True); + LONGBOW_RUN_TEST_CASE(Global, rtaLogger_IsLoggable_False); + LONGBOW_RUN_TEST_CASE(Global, rtaLogger_Log_IsLoggable); + LONGBOW_RUN_TEST_CASE(Global, rtaLogger_Log_IsNotLoggable); +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_CASE(Global, rtaLogger_FacilityString_Found) +{ + for (RtaLoggerFacility i = 0; i < RtaLoggerFacility_END; i++) { + const char *test = rtaLogger_FacilityString(i); + assertNotNull(test, "Got null string for facility %d", i); + } +} + +LONGBOW_TEST_CASE(Global, rtaLogger_FacilityString_NotFound) +{ + const char *test = rtaLogger_FacilityString(1000); + assertTrue(strcmp(test, "Unknown") == 0, "Got wrong string for unknown facility"); +} + +LONGBOW_TEST_CASE(Global, rtaLogger_Create) +{ + PARCLogReporter *reporter = _testWriter_Create(); + RtaLogger *logger = rtaLogger_Create(reporter, parcClock_Wallclock()); + parcLogReporter_Release(&reporter); + + rtaLogger_Release(&logger); +} + +LONGBOW_TEST_CASE(Global, rtaLogger_Acquire) +{ + PARCLogReporter *reporter = _testWriter_Create(); + RtaLogger *logger = rtaLogger_Create(reporter, parcClock_Wallclock()); + parcLogReporter_Release(&reporter); + + RtaLogger *copy = rtaLogger_Acquire(logger); + rtaLogger_Release(&logger); + rtaLogger_Release(©); +} + +LONGBOW_TEST_CASE(Global, rtaLogger_SetLogLevel) +{ + PARCLogReporter *reporter = _testWriter_Create(); + RtaLogger *logger = rtaLogger_Create(reporter, parcClock_Wallclock()); + parcLogReporter_Release(&reporter); + + rtaLogger_SetLogLevel(logger, RtaLoggerFacility_Framework, PARCLogLevel_Off); + + PARCLogLevel test = parcLog_GetLevel(logger->loggerArray[RtaLoggerFacility_Framework]); + assertTrue(test == PARCLogLevel_Off, "wrong log level, expected %d got %d", PARCLogLevel_Off, test); + rtaLogger_Release(&logger); +} + +LONGBOW_TEST_CASE(Global, rtaLogger_IsLoggable_True) +{ + PARCLogReporter *reporter = _testWriter_Create(); + RtaLogger *logger = rtaLogger_Create(reporter, parcClock_Wallclock()); + parcLogReporter_Release(&reporter); + + rtaLogger_SetLogLevel(logger, RtaLoggerFacility_Framework, PARCLogLevel_Warning); + bool isLoggable = rtaLogger_IsLoggable(logger, RtaLoggerFacility_Framework, PARCLogLevel_Warning); + assertTrue(isLoggable, "Did not get true for isLoggable when expecting true"); + rtaLogger_Release(&logger); +} + +LONGBOW_TEST_CASE(Global, rtaLogger_IsLoggable_False) +{ + PARCLogReporter *reporter = _testWriter_Create(); + RtaLogger *logger = rtaLogger_Create(reporter, parcClock_Wallclock()); + parcLogReporter_Release(&reporter); + + rtaLogger_SetLogLevel(logger, RtaLoggerFacility_Framework, PARCLogLevel_Warning); + bool isLoggable = rtaLogger_IsLoggable(logger, RtaLoggerFacility_Framework, PARCLogLevel_Debug); + assertFalse(isLoggable, "Logging debug to warning facility should have been false"); + rtaLogger_Release(&logger); +} + +LONGBOW_TEST_CASE(Global, rtaLogger_Log_IsLoggable) +{ + PARCLogReporter *reporter = _testWriter_Create(); + RtaLogger *logger = rtaLogger_Create(reporter, parcClock_Wallclock()); + parcLogReporter_Release(&reporter); + + rtaLogger_SetLogLevel(logger, RtaLoggerFacility_Framework, PARCLogLevel_Warning); + memset(_lastLogMessage, 0, _logLength); + + rtaLogger_Log(logger, RtaLoggerFacility_Framework, PARCLogLevel_Warning, __func__, "hello"); + assertTrue(strlen(_lastLogMessage) > 0, "Did not write to log message"); + rtaLogger_Release(&logger); +} + +LONGBOW_TEST_CASE(Global, rtaLogger_Log_IsNotLoggable) +{ + PARCLogReporter *reporter = _testWriter_Create(); + RtaLogger *logger = rtaLogger_Create(reporter, parcClock_Wallclock()); + parcLogReporter_Release(&reporter); + + rtaLogger_SetLogLevel(logger, RtaLoggerFacility_Framework, PARCLogLevel_Warning); + memset(_lastLogMessage, 0, _logLength); + + rtaLogger_Log(logger, RtaLoggerFacility_Framework, PARCLogLevel_Debug, __func__, "hello"); + assertTrue(strlen(_lastLogMessage) == 0, "Should not have written to log message"); + rtaLogger_Release(&logger); +} + + +// ========================================================== + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_Logger); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} + diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_ProtocolStack.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_ProtocolStack.c new file mode 100644 index 00000000..c14d799a --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_ProtocolStack.c @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <stdio.h> + +#include <parc/algol/parc_SafeMemory.h> + +#include <LongBow/unit-test.h> + +LONGBOW_TEST_RUNNER(rta_ProtocolStack) +{ + LONGBOW_RUN_TEST_FIXTURE(Global); + LONGBOW_RUN_TEST_FIXTURE(Local); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_ProtocolStack) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_ProtocolStack) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Global) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Local) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_ProtocolStack); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_WebService.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_WebService.c new file mode 100644 index 00000000..4b2fb015 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/test/test_rta_WebService.c @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Include the file(s) containing the functions to be tested. +// This permits internal static functions to be visible to this Test Framework. +#include "../rta_WebService.c" +#include <LongBow/unit-test.h> +#include <parc/algol/parc_SafeMemory.h> + +#include <signal.h> +#include <pthread.h> +#include <errno.h> +#include <arpa/inet.h> + +LONGBOW_TEST_RUNNER(rta_WebService) +{ + // The following Test Fixtures will run their corresponding Test Cases. + // Test Fixtures are run in the order specified, but all tests should be idempotent. + // Never rely on the execution order of tests or share state between them. + LONGBOW_RUN_TEST_FIXTURE(Global); + LONGBOW_RUN_TEST_FIXTURE(Local); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_WebService) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_WebService) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Global) +{ + LONGBOW_RUN_TEST_CASE(Global, rtaWebService_Create_Destroy); +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + if (parcSafeMemory_ReportAllocation(STDOUT_FILENO) != 0) { + printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding()); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +struct sigaction save_sigchld; +struct sigaction save_sigpipe; + +static void +blockSigChild() +{ + struct sigaction ignore_action; + ignore_action.sa_handler = SIG_IGN; + sigemptyset(&ignore_action.sa_mask); + ignore_action.sa_flags = 0; + + sigaction(SIGCHLD, NULL, &save_sigchld); + sigaction(SIGPIPE, NULL, &save_sigpipe); + + sigaction(SIGCHLD, &ignore_action, NULL); + sigaction(SIGPIPE, &ignore_action, NULL); +} + +static void +unblockSigChild() +{ + sigaction(SIGCHLD, &save_sigchld, NULL); + sigaction(SIGPIPE, &save_sigpipe, NULL); +} + +LONGBOW_TEST_CASE(Global, rtaWebService_Create_Destroy) +{ + int fds[2]; + int failure = socketpair(AF_LOCAL, SOCK_STREAM, 0, fds); + assertFalse(failure, "error on socketpair: (%d) %s", errno, strerror(errno)); + + RtaFramework *framework = rtaFramework_Create(fds[1]); + + // we should be runing on port 9090, so the string popen() gets + // will look like this: + // tcp4 0 0 127.0.0.1.9090 *.* LISTEN + + blockSigChild(); + FILE *fp = popen("netstat -an -p tcp", "r"); + assertNotNull(fp, "Got null opening netstat for reading"); + + char str[1035]; + bool found = false; + while (fgets(str, sizeof(str) - 1, fp) != NULL) { + if (strstr(str, "127.0.0.1.9090") != NULL) { + found = true; + break; + } + + if (strstr(str, "127.0.0.1:9090") != NULL) { + found = true; + break; + } + } + + pclose(fp); + + rtaFramework_Destroy(&framework); + + close(fds[0]); + close(fds[1]); + unblockSigChild(); + + assertTrue(found, "Did not find 127.0.0.1.9090 in netstat output"); +} + + +LONGBOW_TEST_FIXTURE(Local) +{ + LONGBOW_RUN_TEST_CASE(Local, rtaWebService_ProcessHelloRequest); + LONGBOW_RUN_TEST_CASE(Local, rtaWebService_ProcessRequest); +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + if (parcSafeMemory_ReportAllocation(STDOUT_FILENO) != 0) { + printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding()); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_CASE(Local, rtaWebService_ProcessHelloRequest) +{ +#ifndef __APPLE__ + testSkip("Test broken on non-darwin"); +#endif + + blockSigChild(); + int fds[2]; + int failure = socketpair(AF_LOCAL, SOCK_STREAM, 0, fds); + assertFalse(failure, "error on socketpair: (%d) %s", errno, strerror(errno)); + + RtaFramework *framework = rtaFramework_Create(fds[0]); + rtaFramework_Start(framework); + rtaFramework_WaitForStatus(framework, FRAMEWORK_RUNNING); + + int fd = socket(AF_INET, SOCK_STREAM, 0); + + struct sockaddr_in sin; + sin.sin_addr.s_addr = inet_addr("127.0.0.1"); + sin.sin_port = htons(9090); + + failure = connect(fd, (struct sockaddr *) &sin, sizeof(sin)); + assertFalse(failure, "error on connect: (%d) %s", errno, strerror(errno)); + + char request[] = "GET /hello HTTP/1.1\r\n\r\n"; + ssize_t write_length = write(fd, request, sizeof(request)); + assertFalse(write_length < 0, "Error writing: (%d) %s", errno, strerror(errno)); + + + struct truth_s { + char *line; + } truth[] = { + { .line = "HTTP/1.1 200 OK\r\n" }, + { .line = "" }, // do not care line for Date + { .line = "Content-Length: 18\r\n" }, + { .line = "Content-Type: text/html; charset=ISO-8859-1\r\n" }, + { .line = "\r\n" }, + { .line = "Requested: /hello\n" }, + { .line = NULL } + }; + + // read response line by line + FILE *fh = fdopen(fd, "r"); + int count = 0; + while (!feof(fh) && truth[count].line != NULL) { + assertNotNull(truth[count].line, "read too many lines: %d", count); + + char response[16384]; + fgets(response, sizeof(response), fh); + if (truth[count].line[0] != '\0') { + bool result = strcmp(truth[count].line, response) == 0; + + if (!result) { + // we need to cleanup the server or the next test will fail + rtaFramework_Shutdown(framework, fds[1]); + rtaFramework_Destroy(&framework); + close(fds[0]); + close(fds[1]); + unblockSigChild(); + assertTrue(result, "mismatched lines, expected '%s' got '%s'", truth[count].line, response); + } + } + count++; + } + fclose(fh); + + rtaFramework_Shutdown(framework, fds[1]); + rtaFramework_Destroy(&framework); + close(fds[0]); + close(fds[1]); + + unblockSigChild(); +} + +LONGBOW_TEST_CASE(Local, rtaWebService_ProcessRequest) +{ +#ifndef __APPLE__ + testSkip("Test broken on non-darwin"); +#endif + + blockSigChild(); + int fds[2]; + int failure = socketpair(AF_LOCAL, SOCK_STREAM, 0, fds); + assertFalse(failure, "error on socketpair: (%d) %s", errno, strerror(errno)); + + RtaFramework *framework = rtaFramework_Create(fds[0]); + rtaFramework_Start(framework); + rtaFramework_WaitForStatus(framework, FRAMEWORK_RUNNING); + + int fd = socket(AF_INET, SOCK_STREAM, 0); + + struct sockaddr_in sin; + sin.sin_addr.s_addr = inet_addr("127.0.0.1"); + sin.sin_port = htons(9090); + + failure = connect(fd, (struct sockaddr *) &sin, sizeof(sin)); + assertFalse(failure, "error on connect: (%d) %s", errno, strerror(errno)); + + char request[] = "GET /foo HTTP/1.1\r\n\r\n"; + write(fd, request, sizeof(request)); + + struct truth_s { + char *line; + } truth[] = { + { .line = "HTTP/1.1 404 Document was not found\r\n" }, + { .line = "Content-Type: text/html\r\n" }, + { .line = "Connection: close\r\n" }, + { .line = "" }, // do not care line for Date + { .line = "Content-Length: 116\r\n" }, + { .line = "\r\n" }, + { .line = "<HTML><HEAD>\n" }, + { .line = "<TITLE>404 Document was not found</TITLE>\n" }, + { .line = "</HEAD><BODY>\n" }, + { .line = "<H1>Document was not found</H1>\n" }, + { .line = "</BODY></HTML>\n" }, + { .line = NULL } + }; + + // read response line by line + FILE *fh = fdopen(fd, "r"); + int count = 0; + while (!feof(fh) && truth[count].line != NULL) { + assertNotNull(truth[count].line, "read too many lines: %d", count); + + char response[16384]; + fgets(response, sizeof(response), fh); + if (truth[count].line[0] != '\0') { + assertTrue(strcmp(truth[count].line, response) == 0, "mismatched lines, expected '%s' got '%s'", truth[count].line, response); + } + count++; + } + fclose(fh); + + rtaFramework_Shutdown(framework, fds[1]); + rtaFramework_Destroy(&framework); + close(fds[0]); + close(fds[1]); + unblockSigChild(); +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_WebService); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} |