diff options
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/connectors')
12 files changed, 5251 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.c new file mode 100644 index 00000000..7b810adc --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.c @@ -0,0 +1,264 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Interface between the event dispatcher and component callbacks to + * the RtaApiConnection. The API connector, per se, is implemented in rta_ApiConnection. This + * module is the scaffolding to work within the RTA component framework. + * + */ + +#include <config.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <fcntl.h> +#include <pthread.h> +#include <sys/socket.h> +#include <errno.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <LongBow/runtime.h> + +#include <parc/algol/parc_Memory.h> + +#include <ccnx/transport/transport_rta/connectors/rta_ApiConnection.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_Component.h> +#include <ccnx/transport/transport_rta/connectors/connector_Api.h> + +#include <ccnx/api/control/controlPlaneInterface.h> + +#ifndef DEBUG_OUTPUT +#define DEBUG_OUTPUT 0 +#endif + +static int connector_Api_Init(RtaProtocolStack *stack); +static int connector_Api_Opener(RtaConnection *conn); +static void connector_Api_Upcall_Read(PARCEventQueue *, PARCEventType, void *conn); +static int connector_Api_Closer(RtaConnection *conn); +static int connector_Api_Release(RtaProtocolStack *stack); +static void connector_Api_StateChange(RtaConnection *conn); + +RtaComponentOperations api_ops = +{ + .init = connector_Api_Init, + .open = connector_Api_Opener, + .upcallRead = connector_Api_Upcall_Read, + .upcallEvent = NULL, + .downcallRead = NULL, + .downcallEvent = NULL, + .close = connector_Api_Closer, + .release = connector_Api_Release, + .stateChange = connector_Api_StateChange +}; + +// ======================== + +static int +connector_Api_Init(RtaProtocolStack *stack) +{ + // nothing to do here + if (DEBUG_OUTPUT) { + printf("%s init stack %p\n", + __func__, + (void *) stack); + } + return 0; +} + +/* + * Api_Open will put the RtaConnection as the callback parameter in the UpcallRead, + * because its a per-connection descriptor. + * + * Returns 0 on success, -1 on error + */ +static int +connector_Api_Opener(RtaConnection *connection) +{ + RtaComponentStats *stats; + RtaApiConnection *apiConnection = rtaApiConnection_Create(connection); + + rtaConnection_SetPrivateData(connection, API_CONNECTOR, apiConnection); + + stats = rtaConnection_GetStats(connection, API_CONNECTOR); + assertNotNull(stats, "%s returned null stats\n", __func__); + rtaComponentStats_Increment(stats, STATS_OPENS); + + rtaConnection_SetState(connection, CONN_OPEN); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s opened transport_fd %d\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(connection))), + __func__, + rtaConnection_GetTransportFd(connection)); + + printf("%9" PRIu64 " %s open conn %p state %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(connection))), + __func__, + (void *) connection, + (void *) apiConnection); + } + + return 0; +} + +/* + * Read a message from below in stack + * Write a message up to the API + */ +static void +connector_Api_Upcall_Read(PARCEventQueue *eventBuffer, PARCEventType type, void *protocolStackVoid) +{ + TransportMessage *tm; + + assertNotNull(protocolStackVoid, "%s called with null ProtocolStack\n", __func__); + + while ((tm = rtaComponent_GetMessage(eventBuffer)) != NULL) { + RtaConnection *conn = rtaConnection_GetFromTransport(tm); + assertNotNull(conn, "got null connection from transport message\n"); + + RtaComponentStats *stats = rtaConnection_GetStats(conn, API_CONNECTOR); + assertNotNull(stats, "returned null stats\n"); + + rtaComponentStats_Increment(stats, STATS_UPCALL_IN); + + RtaApiConnection *apiConnection = rtaConnection_GetPrivateData(conn, API_CONNECTOR); + assertNotNull(apiConnection, "got null apiConnection\n"); + + // If we are blocked, only pass control messages + if (!rtaConnection_BlockedUp(conn) || transportMessage_IsControl(tm)) { + if (!rtaApiConnection_SendToApi(apiConnection, tm, stats)) { + // memory is freed at bottom of function + } + } else { + // closed connection, just destroy the message + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s conn %p destroying transport message %p due to closed connection\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn, + (void *) tm); + } + } + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s conn %p total upcall reads in %" PRIu64 " out %" PRIu64 "\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn, + rtaComponentStats_Get(stats, STATS_UPCALL_IN), + rtaComponentStats_Get(stats, STATS_UPCALL_OUT)); + } + + // This is the end of life for the transport message. If the inner TlvDictionary + // was put in a CCNxMessage and sent up the stack, then we made another reference to it + // so this destroy will not destroy that part. + transportMessage_Destroy(&tm); + } +} + +/* + * The higher layer should no longer be writing to this + * socketpair, so we can drain it then close it. + */ +static int +connector_Api_Closer(RtaConnection *conn) +{ + RtaComponentStats *stats; + RtaApiConnection *apiConnection = rtaConnection_GetPrivateData(conn, API_CONNECTOR); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s starting close conn %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn); + } + + stats = rtaConnection_GetStats(conn, API_CONNECTOR); + assertNotNull(stats, "%s returned null stats\n", __func__); + rtaComponentStats_Increment(stats, STATS_CLOSES); + + // This will prevent any new data going in to queues for the connection + // Existing messages will be destroyed + rtaConnection_SetState(conn, CONN_CLOSED); + + rtaApiConnection_Destroy(&apiConnection); + rtaConnection_SetPrivateData(conn, API_CONNECTOR, NULL); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s close conn %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn); + } + + return 0; +} + +static int +connector_Api_Release(RtaProtocolStack *stack) +{ + // nothing to do here, there's no ProtocolStack state + if (DEBUG_OUTPUT) { + printf("%s release stack %p\n", + __func__, + (void *) stack); + } + + return 0; +} + +/** + * Respond to events for the connection + * + * Typcially, the forwarder connector will block and unblock the DOWN direction. We need + * to stop putting new data in the down directon if its blocked. + * + * The API connector (us) is generally the thing blocking the UP direction, so we don't need + * to respond to those (our own) events. + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * { + * ComponentOperations api_ops = { + * // [other settings] + * .stateChange = connector_Api_StateChange + * }; + * } + * @endcode + */ +static void +connector_Api_StateChange(RtaConnection *conn) +{ + RtaApiConnection *apiConnection = rtaConnection_GetPrivateData(conn, API_CONNECTOR); + + // we do not test the rtaConnection_BlockedUp() because we are the one setting those + + // If we are blocked in the DOWN direction, disable events on the read queue + if (rtaConnection_BlockedDown(conn)) { + rtaApiConnection_BlockDown(apiConnection); + } else { + rtaApiConnection_UnblockDown(apiConnection); + } +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.h b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.h new file mode 100644 index 00000000..6299116b --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.h @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef Libccnx_connector_api_h +#define Libccnx_connector_api_h + +// Function structs for component variations +extern RtaComponentOperations api_ops; +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder.h b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder.h new file mode 100644 index 00000000..64a3c6e9 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// +// connector_Forwarder.h +// Libccnx +// +// + +#ifndef Libccnx_connector_fwd_h +#define Libccnx_connector_fwd_h + +// Function structs for component variations +extern RtaComponentOperations fwd_flan_ops; +extern RtaComponentOperations fwd_local_ops; +extern RtaComponentOperations fwd_tlvrtr_ops; +extern RtaComponentOperations fwd_metis_ops; +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c new file mode 100644 index 00000000..a434600a --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c @@ -0,0 +1,552 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * PF_LOCAL forwarder glue, mostly for testing. This uses a + * STREAM socket with a user specified coding. Each message + * on the stream is of this format: + * + * uint32_t process pid + * uint32_t user_socket_fd + * uint32_t message bytes that follow + * uint8_t[] message encoded with user specified codec + * + * The user_socket_fd will be the same number that the API was assigned + * in transportRta_Socket->api_socket_pair[PAIR_OTHER]. + * + */ + +#include <config.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <fcntl.h> +#include <pthread.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <errno.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <parc/algol/parc_EventBuffer.h> + +#include <LongBow/runtime.h> +#include <LongBow/debugging.h> + +#include <parc/algol/parc_Memory.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/core/rta_Component.h> +#include <ccnx/transport/transport_rta/connectors/connector_Forwarder.h> + +#include <ccnx/transport/transport_rta/config/config_Forwarder_Local.h> +#include <ccnx/api/control/controlPlaneInterface.h> +#include <ccnx/api/control/cpi_ControlFacade.h> + +#include <ccnx/common/ccnx_WireFormatMessage.h> + +#ifndef DEBUG_OUTPUT +#define DEBUG_OUTPUT 0 +#endif + +static int connector_Fwd_Local_Init(RtaProtocolStack *stack); +static int connector_Fwd_Local_Opener(RtaConnection *conn); +static void connector_Fwd_Local_Upcall_Read(PARCEventQueue *, PARCEventType, void *conn); +static void connector_Fwd_Local_Upcall_Event(PARCEventQueue *, PARCEventQueueEventType, void *stack); +static void connector_Fwd_Local_Downcall_Read(PARCEventQueue *, PARCEventType, void *conn); +static int connector_Fwd_Local_Closer(RtaConnection *conn); +static int connector_Fwd_Local_Release(RtaProtocolStack *stack); +static void connector_Fwd_Local_StateChange(RtaConnection *conn); + +RtaComponentOperations fwd_local_ops = { + .init = connector_Fwd_Local_Init, + .open = connector_Fwd_Local_Opener, + .upcallRead = connector_Fwd_Local_Upcall_Read, + .upcallEvent = connector_Fwd_Local_Upcall_Event, + .downcallRead = connector_Fwd_Local_Downcall_Read, + .downcallEvent = NULL, + .close = connector_Fwd_Local_Closer, + .release = connector_Fwd_Local_Release, + .stateChange = connector_Fwd_Local_StateChange +}; + +struct fwd_local_state { + int fd; + PARCEventQueue *bev_local; + int connected; +}; + +typedef struct { + uint32_t pid; + uint32_t fd; + uint32_t length; + uint32_t pad; // make it 16 bytes +} __attribute__ ((packed)) localhdr; + +// ================================ +// NULL + +static int +connector_Fwd_Local_Init(RtaProtocolStack *stack) +{ + // no stack-wide initialization + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s init stack %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(stack)), + __func__, + (void *) stack); + } + return 0; +} + +/* + * Create a PF_LOCAL socket + * Set it non-blocking + * Wrap it in a buffer event + * Set Read and Event callbacks + * connect to LOCAL_NAME + * + * Return 0 success, -1 failure + */ +static int +connector_Fwd_Local_Opener(RtaConnection *conn) +{ + PARCEventScheduler *base; + RtaProtocolStack *stack; + const char *sock_name; + + stack = rtaConnection_GetStack(conn); + base = rtaFramework_GetEventScheduler(rtaProtocolStack_GetFramework(stack)); + + sock_name = localForwarder_GetPath(rtaConnection_GetParameters(conn)); + assertNotNull(sock_name, "connector_Fwd_Local_Opener called without setting LOCAL_NAME"); + + if (sock_name == NULL) { + return -1; + } + + struct fwd_local_state *fwd_state = parcMemory_Allocate(sizeof(struct fwd_local_state)); + assertNotNull(fwd_state, "parcMemory_Allocate(%zu) returned NULL", sizeof(struct fwd_local_state)); + + rtaConnection_SetPrivateData(conn, FWD_LOCAL, fwd_state); + + fwd_state->fd = socket(PF_LOCAL, SOCK_STREAM, 0); + if (fwd_state->fd < 0) { + perror("socket PF_LOCAL"); + } + assertFalse(fwd_state->fd < 0, "socket PF_LOCAL error"); + + struct sockaddr_un addr_unix; + memset(&addr_unix, 0, sizeof(struct sockaddr_un)); + addr_unix.sun_family = AF_UNIX; + + trapIllegalValueIf(sizeof(addr_unix.sun_path) <= strlen(sock_name), "sock_name too long, maximum length %zu", sizeof(addr_unix.sun_path) - 1); + strcpy(addr_unix.sun_path, sock_name); + + // Setup the socket as non-blocking then wrap in a parcEventQueue. + + int flags = fcntl(fwd_state->fd, F_GETFL, NULL); + assertFalse(flags < 0, "fcntl failed to obtain file descriptor flags (%d)\n", errno); + + int failure = fcntl(fwd_state->fd, F_SETFL, flags | O_NONBLOCK); + assertFalse(failure, "fcntl failed to set file descriptor flags (%d)\n", errno); + + assertTrue(failure == 0, "could not make socket non-blocking"); + if (failure < 0) { + rtaConnection_SetPrivateData(conn, FWD_LOCAL, NULL); + close(fwd_state->fd); + parcMemory_Deallocate((void **) &fwd_state); + return -1; + } + + fwd_state->bev_local = parcEventQueue_Create(base, fwd_state->fd, PARCEventQueueOption_CloseOnFree); + + assertNotNull(fwd_state->bev_local, "Null buffer event for local socket."); + + parcEventQueue_SetCallbacks(fwd_state->bev_local, + connector_Fwd_Local_Upcall_Read, + NULL, + connector_Fwd_Local_Upcall_Event, + conn); + + parcEventQueue_Enable(fwd_state->bev_local, PARCEventType_Read); + + memset(&addr_unix, 0, sizeof(addr_unix)); + addr_unix.sun_family = AF_UNIX; + + trapIllegalValueIf(sizeof(addr_unix.sun_path) <= strlen(sock_name), "sock_name too long, maximum length %zu", sizeof(addr_unix.sun_path) - 1); + strcpy(addr_unix.sun_path, sock_name); + + // This will deliver a PARCEventQueue_Connected on connect success + if (parcEventQueue_ConnectSocket(fwd_state->bev_local, + (struct sockaddr*) &addr_unix, + (socklen_t) sizeof(addr_unix)) < 0) { + perror("connect PF_LOCAL"); + assertTrue(0, "connect PF_LOCAL"); + rtaConnection_SetPrivateData(conn, FWD_LOCAL, NULL); + close(fwd_state->fd); + parcMemory_Deallocate((void **) &fwd_state); + return -1; + } + + // Socket will be ready for use once we get PARCEventQueueEventType_Connected + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s open conn %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn); + } + + return 0; +} + +/* + * Read from bev_local. We are passed the connection on the ptr. + */ +static void +connector_Fwd_Local_Upcall_Read(PARCEventQueue *bev, PARCEventType type, void *ptr) +{ + RtaConnection *conn = (RtaConnection *) ptr; + RtaProtocolStack *stack = rtaConnection_GetStack(conn); + PARCEventBuffer *in = parcEventBuffer_GetQueueBufferInput(bev); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_LOCAL, RTA_UP); + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_LOCAL); + TransportMessage *tm; + + unsigned char *mem; + int res; + + // only move forward if enough bytes available + + while (parcEventBuffer_GetLength(in) >= sizeof(localhdr)) { + size_t msg_length; + + mem = parcEventBuffer_Pullup(in, sizeof(localhdr)); + if (mem == NULL) { + // not enough bytes + parcEventBuffer_Destroy(&in); + return; + } + + msg_length = ((localhdr *) mem)->length; + if (parcEventBuffer_GetLength(in) < msg_length + sizeof(localhdr)) { + // not enough bytes + parcEventBuffer_Destroy(&in); + return; + } + + PARCBuffer *wireFormat = parcBuffer_Allocate(msg_length); + assertNotNull(wireFormat, "parcBuffer_Allocate(%zu) returned NULL", msg_length); + + rtaComponentStats_Increment(stats, STATS_UPCALL_IN); + + // we can read a whole message. Read it directly in to a buffer + // Skip the FWD_LOCAL header + res = parcEventBuffer_Read(in, NULL, sizeof(localhdr)); + assertTrue(res == 0, "Got error draining header from buffer"); + + uint8_t *overlay = parcBuffer_Overlay(wireFormat, msg_length); + res = parcEventBuffer_Read(in, overlay, msg_length); + overlay = NULL; + + assertTrue(res == msg_length, + "parcEventBuffer_Read returned wrong size, expected %zu got %d", + msg_length, res); + + parcBuffer_Flip(wireFormat); + + if (rtaConnection_GetState(conn) == CONN_OPEN) { + CCNxWireFormatMessage *wireFormatMessage = ccnxWireFormatMessage_Create(wireFormat); + CCNxTlvDictionary *dictionary = ccnxWireFormatMessage_GetDictionary(wireFormatMessage); + if (dictionary != NULL) { + // wrap it for transport module + tm = transportMessage_CreateFromDictionary(dictionary); + + // add the connection info to the transport message before sending up stack + transportMessage_SetInfo(tm, rtaConnection_Copy(conn), rtaConnection_FreeFunc); + + // send it up the stack + if (rtaComponent_PutMessage(out, tm)) { + rtaComponentStats_Increment(stats, STATS_UPCALL_OUT); + } + + // Now release our hold on the wireFormatMessage (aka dictionary) + ccnxWireFormatMessage_Release(&wireFormatMessage); + } else { + printf("Failed to create CCNxTlvDictionary from wireformat\n"); + parcBuffer_Display(wireFormat, 3); + } + } else { + //drop packets + } + + parcBuffer_Release(&wireFormat); + } + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s total upcall reads in %" PRIu64 " out %" PRIu64 "\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaComponentStats_Get(stats, STATS_UPCALL_IN), + rtaComponentStats_Get(stats, STATS_UPCALL_OUT)); + } + parcEventBuffer_Destroy(&in); +} + +/* + * Event on connection to forwarder. + * Passed the RtaConnection in the pointer + */ +static void +connector_Fwd_Local_Upcall_Event(PARCEventQueue *queue, PARCEventQueueEventType events, void *ptr) +{ + RtaConnection *conn = (RtaConnection *) ptr; + + struct fwd_local_state *fwd_state = rtaConnection_GetPrivateData(conn, FWD_LOCAL); + + if (events & PARCEventQueueEventType_Connected) { + if (DEBUG_OUTPUT) { + struct timeval tv; + gettimeofday(&tv, NULL); + printf("%6lu.%06ld %s (pid %d) connected socket %d\n", + tv.tv_sec, (long) tv.tv_usec, + __func__, + getpid(), + rtaConnection_GetTransportFd(conn)); + } + + fwd_state->connected = 1; + rtaConnection_SendStatus(conn, FWD_LOCAL, RTA_UP, notifyStatusCode_CONNECTION_OPEN, NULL, NULL); + } else if (events & PARCEventQueueEventType_Error) { + struct timeval tv; + gettimeofday(&tv, NULL); + + longBowRuntime_StackTrace(1); + + if (events & PARCEventQueueEventType_Reading) { + printf("%6lu.%06ld %s (pid %d) Got read error on PF_LOCAL, transport socket %d: (%d) %s\n", + tv.tv_sec, (long) tv.tv_usec, + __func__, + getpid(), + rtaConnection_GetTransportFd(conn), + errno, + strerror(errno)); + } else if (events & PARCEventQueueEventType_Writing) { + printf("%6lu.%06ld %s (pid %d) Got write error on PF_LOCAL, transport socket %d: (%d) %s\n", + tv.tv_sec, (long) tv.tv_usec, + __func__, + getpid(), + rtaConnection_GetTransportFd(conn), + errno, + strerror(errno)); + } else { + printf("%6lu.%06ld %s (pid %d) Got error on PF_LOCAL, transport socket %d: (%d) %s\n", + tv.tv_sec, (long) tv.tv_usec, + __func__, + getpid(), + rtaConnection_GetTransportFd(conn), + errno, + strerror(errno)); + } + + /* An error occured while connecting. */ + rtaConnection_SendStatus(conn, FWD_LOCAL, RTA_UP, notifyStatusCode_FORWARDER_NOT_AVAILABLE, NULL, NULL); + } +} + +static void +_ackRequest(RtaConnection *conn, PARCJSON *request) +{ + PARCJSON *response = cpiAcks_CreateAck(request); + CCNxTlvDictionary *ackDict = ccnxControlFacade_CreateCPI(response); + + TransportMessage *tm_ack = transportMessage_CreateFromDictionary(ackDict); + ccnxTlvDictionary_Release(&ackDict); + parcJSON_Release(&response); + + transportMessage_SetInfo(tm_ack, rtaConnection_Copy(conn), rtaConnection_FreeFunc); + + RtaProtocolStack *stack = rtaConnection_GetStack(conn); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_LOCAL, RTA_UP); + if (rtaComponent_PutMessage(out, tm_ack)) { + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_LOCAL); + rtaComponentStats_Increment(stats, STATS_UPCALL_OUT); + } +} + +static void +connector_Fwd_Local_ProcessControl(RtaConnection *conn, TransportMessage *tm) +{ + CCNxTlvDictionary *controlDictionary = transportMessage_GetDictionary(tm); + + if (ccnxControlFacade_IsCPI(controlDictionary)) { + PARCJSON *json = ccnxControlFacade_GetJson(controlDictionary); + if (controlPlaneInterface_GetCPIMessageType(json) == CPI_REQUEST) { + if (cpi_getCPIOperation2(json) == CPI_PAUSE) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s conn %p recieved PAUSE\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn); + } + _ackRequest(conn, json); + } else if (cpi_getCPIOperation2(json) == CPI_FLUSH) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s conn %p recieved FLUSH\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn); + } + _ackRequest(conn, json); + } else { + // some other message. We just ACK everything in the local connector. + _ackRequest(conn, json); + } + } + } +} + +static void +connector_Fwd_Local_WriteIovec(struct fwd_local_state *fwdConnState, RtaConnection *conn, CCNxCodecNetworkBufferIoVec *vec, RtaComponentStats *stats) +{ + localhdr lh; + + memset(&lh, 0, sizeof(localhdr)); + lh.pid = getpid(); + lh.fd = rtaConnection_GetTransportFd(conn); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s total downcall reads %" PRIu64 "\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaComponentStats_Get(stats, STATS_DOWNCALL_IN)); + } + + int iovcnt = ccnxCodecNetworkBufferIoVec_GetCount(vec); + const struct iovec *array = ccnxCodecNetworkBufferIoVec_GetArray(vec); + + lh.length = 0; + for (int i = 0; i < iovcnt; i++) { + lh.length += array[i].iov_len; + } + + if (parcEventQueue_Write(fwdConnState->bev_local, &lh, sizeof(lh)) < 0) { + trapUnrecoverableState("%s error writing to bev_local", __func__); + } + + for (int i = 0; i < iovcnt; i++) { + if (parcEventQueue_Write(fwdConnState->bev_local, array[i].iov_base, array[i].iov_len) < 0) { + trapUnrecoverableState("%s error writing iovec to bev_local", __func__); + } + } +} + +/* send raw packet from codec to forwarder */ +static void +connector_Fwd_Local_Downcall_Read(PARCEventQueue *in, PARCEventType event, void *ptr) +{ + TransportMessage *tm; + + while ((tm = rtaComponent_GetMessage(in)) != NULL) { + RtaConnection *conn; + struct fwd_local_state *fwdConnState; + RtaComponentStats *stats; + + CCNxTlvDictionary *messageDictionary = transportMessage_GetDictionary(tm); + + conn = rtaConnection_GetFromTransport(tm); + fwdConnState = rtaConnection_GetPrivateData(conn, FWD_LOCAL); + stats = rtaConnection_GetStats(conn, FWD_LOCAL); + rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN); + + // ignore configuration messages for the send + if (ccnxTlvDictionary_IsControl(messageDictionary)) { + connector_Fwd_Local_ProcessControl(conn, tm); + } else { + CCNxCodecNetworkBufferIoVec *vec = ccnxWireFormatMessage_GetIoVec(messageDictionary); + assertNotNull(vec, "%s got null wire format\n", __func__); + + connector_Fwd_Local_WriteIovec(fwdConnState, conn, vec, stats); + + rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT); + } + + // we can release everything here. connector_Fwd_Local_WriteIovec made its own references + // to the wire format if it needed them. + transportMessage_Destroy(&tm); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s total downcall reads in %" PRIu64 " out %" PRIu64 "\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaComponentStats_Get(stats, STATS_DOWNCALL_IN), + rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT)); + } + } +} + +static int +connector_Fwd_Local_Closer(RtaConnection *conn) +{ + struct fwd_local_state *fwd_state = rtaConnection_GetPrivateData(conn, FWD_LOCAL); + RtaComponentStats *stats; + + assertNotNull(fwd_state, "invalid state"); + assertNotNull(fwd_state->bev_local, "invalid PARCEventQueue pointer"); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s called on fwd_state %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), __func__, (void *) fwd_state); + } + + stats = rtaConnection_GetStats(conn, FWD_LOCAL); + + // this will close too + parcEventQueue_Destroy(&(fwd_state->bev_local)); + memset(fwd_state, 0, sizeof(struct fwd_local_state)); + parcMemory_Deallocate((void **) &fwd_state); + + rtaConnection_SetPrivateData(conn, FWD_LOCAL, NULL); + rtaComponentStats_Increment(stats, STATS_CLOSES); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s closed fwd_state %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), __func__, (void *) fwd_state); + } + + return 0; +} + +static int +connector_Fwd_Local_Release(RtaProtocolStack *stack) +{ + // no stack-wide initialization + if (DEBUG_OUTPUT) { + printf("%s release stack %p\n", + __func__, + (void *) stack); + } + + return 0; +} + +static void +connector_Fwd_Local_StateChange(RtaConnection *conn) +{ + //not implemented +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c new file mode 100644 index 00000000..59ad1dcb --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c @@ -0,0 +1,1712 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The metis connector does the following per connection: + * - Opens a TCP socket to Metis + * - Creates an "event" for the socket, does not use the buffer to avoid doing extra copy. + * - On read events, uses direct socket operations to read in data + * + * - DOES NOT HANDLE FRAMING ERRORS. If somehow metis and the connector get + * out of whack (technical term), there is no recovery. + * + * - The connection to metis is started in the Opener, but may not complete by the time + * the user sends data down in the Downcall_Read. We should not process the Downcall_Read + * until we get the Upcall_Event of connected. When we finally get the connected event, + * we should make the Downcall_Read pending again (or just call it) to flush the pending + * user data out to metis. + * + * - Because of how we get scheduled, there might be a large batch of messages waiting at the + * forwarder. We don't want to put a giant blob up the stack. So, we keep a deque of TransportMessage + * and only feed a few at a time up. + * + * - Accepts both a PARCBuffer or a CCNxCodecNetworkBufferIoVec as the wire format in the DOWN direction. + * - The UP direction is always a PARCBuffer right now + * + */ + +#include <config.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <fcntl.h> +#include <pthread.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <errno.h> +#include <arpa/inet.h> +#include <signal.h> +#include <sys/socket.h> +#include <sys/ioctl.h> +#include <netdb.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <LongBow/runtime.h> + +#include <parc/algol/parc_Memory.h> +#include <parc/algol/parc_Deque.h> +#include <parc/algol/parc_EventBuffer.h> +#include <parc/algol/parc_EventTimer.h> +#include <parc/algol/parc_Network.h> + +#include <ccnx/transport/common/transport_Message.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/core/rta_Component.h> + +#include "connector_Forwarder.h" + +#include <ccnx/transport/transport_rta/config/config_Forwarder_Metis.h> + +#include <ccnx/api/control/controlPlaneInterface.h> +#include <ccnx/api/control/cpi_ControlFacade.h> + +#include <ccnx/common/codec/ccnxCodec_TlvEncoder.h> +#include <ccnx/common/codec/ccnxCodec_TlvDecoder.h> +#include <ccnx/common/internal/ccnx_TlvDictionary.h> + +#include <ccnx/common/codec/ccnxCodec_TlvPacket.h> +#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_FixedHeader.h> +#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_Types.h> + +#include <ccnx/common/ccnx_WireFormatMessage.h> + +#define MINIMUM_READ_LENGTH 8 + +// The message type for a Metis control packet +#define METIS_CONTROL_TYPE 0xA4 + +// at most 10MB, this is used as the output buffer down to metis +#define METIS_OUTPUT_QUEUE_BYTES (10 * 1024 * 1024) + +// How big should we try to make the output socket size? +#define METIS_SEND_SOCKET_BUFFER 65536 + +// Maximum input backlog in messages, not bytes +#define METIS_INPUT_QUEUE_MESSAGES 100 + +#ifndef DEBUG_OUTPUT +#define DEBUG_OUTPUT 0 +#endif + +static int connector_Fwd_Metis_Init(RtaProtocolStack *stack); +static int connector_Fwd_Metis_Opener(RtaConnection *conn); + +static void _eventCallback(int fd, PARCEventType what, void *connectionVoid); +static void connector_Fwd_Metis_Dequeue(int fd, PARCEventType which_event, void *metisStateVoid); + +static void connector_Fwd_Metis_Downcall_Read(PARCEventQueue *, PARCEventType, void *conn); +static int connector_Fwd_Metis_Closer(RtaConnection *conn); +static int connector_Fwd_Metis_Release(RtaProtocolStack *stack); +static void connector_Fwd_Metis_StateChange(RtaConnection *conn); + +RtaComponentOperations fwd_metis_ops = { + .init = connector_Fwd_Metis_Init, + .open = connector_Fwd_Metis_Opener, + .upcallRead = NULL, + .upcallEvent = NULL, + .downcallRead = connector_Fwd_Metis_Downcall_Read, + .downcallEvent = NULL, + .close = connector_Fwd_Metis_Closer, + .release = connector_Fwd_Metis_Release, + .stateChange = connector_Fwd_Metis_StateChange +}; + +typedef enum { + PacketType_Interest, + PacketType_ContentObject, + PacketType_Control, + PacketType_InterestReturn, + PacketType_Unknown +} _PacketType; + +typedef struct metis_connector_stats { + unsigned countUpcallReads; + unsigned countUpcallWriteDataOk; + unsigned countUpcallWriteDataError; + unsigned countUpcallWriteDataBlocked; + unsigned countUpcallWriteDataQueueFull; + + unsigned countUpcallWriteControlOk; + unsigned countUpcallWriteControlError; + + unsigned countDowncallReads; + unsigned countDowncallWrites; + unsigned countDowncallControl; +} _MetisConnectorStats; + +/** + * This structure holds the read-ahead data for the next message being read based + * on its fixed header + */ +typedef struct next_message_header { + // this is how we frame received messages on a stream connection. We + // wait until we read a complete fixed header, then we can set the length + // of that message and keep waiting until we receive at least that many bytes. + size_t length; + + // at the time when we parse out the message length from the fixed header, + // we also parse out the TLV message type from the fixed header + _PacketType packetType; + uint8_t version; + + // we will read bytes into this structure + union _hdr { + CCNxCodecSchemaV1FixedHeader v1; + uint8_t buffer[MINIMUM_READ_LENGTH]; + } fixedHeader; + + uint8_t *readLocation; + size_t remainingReadLength; + + // The whole message + PARCBuffer *packet; +} NextMessage; + +typedef struct fwd_metis_state { + uint16_t port; + int fd; + + // separate events for read and write on fd so we can individually enable them + PARCEvent *readEvent; + PARCEvent *writeEvent; + + bool isConnected; + + // This is our read-ahead of the next message fixed header + NextMessage nextMessage; + + // the transportMessageQueueEvent is used to dequeue from the queue. + // we make sure its scheduled so long as there's messages in the queue, even if there's + // nothing else being read + PARCDeque *transportMessageQueue; + PARCEventTimer *transportMessageQueueEvent; + + // This buffer is the queue of stuff we need to send to the network + PARCEventBuffer *metisOutputQueue; + + _MetisConnectorStats stats; +} FwdMetisState; + +/** + * @typedef PacketData + * @brief Used to pass a record between reading a packet and sending it up the stack + * @discussion Used internally to pass data between functions + */ +typedef struct packet_data { + FwdMetisState *fwd_state; + RtaConnection *conn; + PARCEventQueue *out; + RtaComponentStats *stats; +} PacketData; + + +// for debugging +static unsigned fwd_metis_references_queued = 0; +static unsigned fwd_metis_references_dequeued = 0; +static unsigned fwd_metis_references_notqueued = 0; + + +typedef enum { + ReadReturnCode_Finished, // read all needed bytes + ReadReturnCode_PartialRead, // still need some bytes + ReadReturnCode_Closed, // the socket is closed + ReadReturnCode_Error, // An error on the socket +} ReadReturnCode; + +// ================================ + +static void +_nextMessage_Display(const NextMessage *next, unsigned indent) +{ + printf("NextMessage %p length %zu type %d version %u readLocation %p remaining %zu\n", + (void *) next, next->length, next->packetType, next->version, (void *) next->readLocation, next->remainingReadLength); + + printf("fixedHeader\n"); + longBowDebug_MemoryDump((const char *) next->fixedHeader.buffer, MINIMUM_READ_LENGTH); + + if (next->packet) { + parcBuffer_Display(next->packet, 3); + } +} + +static int +connector_Fwd_Metis_Init(RtaProtocolStack *stack) +{ + struct sigaction ignore_action; + ignore_action.sa_handler = SIG_IGN; + sigemptyset(&ignore_action.sa_mask); + ignore_action.sa_flags = 0; + sigaction(SIGPIPE, &ignore_action, NULL); + + return 0; +} + + +/** + * Setup the NextMessage structure to begin reading a fixed header + * + * All fields are zeroed and the readLocation is set to the first byte of the fixedHeader. + * The remainingReadLength is set to the size of the fixedHeader. + * + * @param [in] next An allocated NextMessage to initialize + * + * Example: + * @code + * { + * NextMessage nextMessage; + * _initializeNextMessage(&nextMessage); + * } + * @endcode + */ +static void +_initializeNextMessage(NextMessage *next) +{ + memset(next, 0, sizeof(NextMessage)); + next->version = 0xFF; + next->packetType = PacketType_Unknown; + next->readLocation = next->fixedHeader.buffer; + next->remainingReadLength = MINIMUM_READ_LENGTH; +} + +static FwdMetisState * +connector_Fwd_Metis_CreateConnectionState(PARCEventScheduler *scheduler) +{ + FwdMetisState *fwd_state = parcMemory_Allocate(sizeof(FwdMetisState)); + assertNotNull(fwd_state, "parcMemory_Allocate(%zu) returned NULL", sizeof(FwdMetisState)); + + memset(fwd_state, 0, sizeof(FwdMetisState)); + _initializeNextMessage(&fwd_state->nextMessage); + + fwd_state->fd = 0; + fwd_state->readEvent = NULL; + fwd_state->writeEvent = NULL; + fwd_state->transportMessageQueue = parcDeque_Create(); + fwd_state->transportMessageQueueEvent = parcEventTimer_Create(scheduler, 0, connector_Fwd_Metis_Dequeue, fwd_state); + fwd_state->isConnected = false; + fwd_state->metisOutputQueue = parcEventBuffer_Create(); + + return fwd_state; +} + +static bool +_openSocket(FwdMetisState *fwd_state, uint16_t port) +{ + fwd_state->port = port; + fwd_state->fd = socket(PF_INET, SOCK_STREAM, 0); + + if (fwd_state->fd < 0) { + if (DEBUG_OUTPUT) { + printf("%9c %s failed to open PF_INET SOCK_STREAM socket: (%d) %s\n", + ' ', __func__, errno, strerror(errno)); + } + return false; + } + + if (DEBUG_OUTPUT) { + printf("%9c %s create socket %d port %u\n", + ' ', __func__, fwd_state->fd, fwd_state->port); + } + + return true; +} + +/** + * @function connector_Fwd_Metis_SetupSocket + * @abstract Creates the socket and sets the port, but does not call connect + * @discussion + * Creates and sets up the socket descriptor. makes it non-blocking. + * Sets the port in FwdMetisState. + * + * This is a full PF_INET socket, not forced to PF_LOCAL. + * + * The sendbuffer size is set to METIS_OUTPUT_QUEUE_BYTES + * + * precondition: called _openSocket + * + * @param <#param1#> + * @return <#return#> + */ +static bool +_setupSocket(FwdMetisState *fwd_state) +{ + trapUnexpectedStateIf(fwd_state->fd < 1, "Invalid socket %d", fwd_state->fd); + + // Set non-blocking flag + int flags = fcntl(fwd_state->fd, F_GETFL, NULL); + assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno); + int res = fcntl(fwd_state->fd, F_SETFL, flags | O_NONBLOCK); + + if (res < 0) { + if (DEBUG_OUTPUT) { + printf("%9c %s failed to make socket non-blocking: (%d) %s\n", + ' ', __func__, errno, strerror(errno)); + } + + close(fwd_state->fd); + return false; + } + + const int sendBufferSize = METIS_SEND_SOCKET_BUFFER; + res = setsockopt(fwd_state->fd, SOL_SOCKET, SO_SNDBUF, &sendBufferSize, sizeof(int)); + if (res < 0) { + if (DEBUG_OUTPUT) { + printf("%9c %s failed to set SO_SNDBUF to %d: (%d) %s\n", + ' ', __func__, sendBufferSize, errno, strerror(errno)); + } + // This is a non-fatal error + } + +#if defined(SO_NOSIGPIPE) + // turn off SIGPIPE, return EPIPE + const int on = 1; + res = setsockopt(fwd_state->fd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)); + if (res < 0) { + if (DEBUG_OUTPUT) { + printf("%9c %s failed to set SO_NOSIGPIPE to %d: (%d) %s\n", + ' ', __func__, sendBufferSize, errno, strerror(errno)); + } + // this is not a fatal error, so keep going + } +#endif + + return true; +} + +/** + * @function connector_Fwd_Metis_SetupConnectionBuffer + * @abstract Creates the connection buffer and adds it to libevent + * @discussion + * <#Discussion#> + * + * @param <#param1#> + * @return <#return#> + */ +static bool +_setupSocketEvents(FwdMetisState *fwd_state, RtaConnection *conn) +{ + RtaProtocolStack *stack = rtaConnection_GetStack(conn); + PARCEventScheduler *scheduler = rtaFramework_GetEventScheduler(rtaProtocolStack_GetFramework(stack)); + + // the connect() call will be asynchrnous because the socket is non-blocking, so we + // need ET_WRITE to trigger a callback when the socket becomes writable (i.e. connected). + // If there's an error on connect it will be an ET_READ | ET_WRITE event with an error on the socket. + fwd_state->readEvent = parcEvent_Create(scheduler, fwd_state->fd, PARCEventType_Read | PARCEventType_Persist | PARCEventType_EdgeTriggered, _eventCallback, conn); + assertNotNull(fwd_state->readEvent, "Got a null readEvent for socket %d", fwd_state->fd); + + fwd_state->writeEvent = parcEvent_Create(scheduler, fwd_state->fd, PARCEventType_Write | PARCEventType_Persist | PARCEventType_EdgeTriggered, _eventCallback, conn); + assertNotNull(fwd_state->writeEvent, "Got a null readEvent for socket %d", fwd_state->fd); + + // Start the write event. It will be signaled on a connect error or when we are connected. + // The read event is not enabled until after connect. + + int failure = parcEvent_Start(fwd_state->writeEvent); + assertFalse(failure < 0, "Error starting writeEvent event %p: (%d) %s", (void *) fwd_state->writeEvent, errno, strerror(errno)); + + return true; +} + +/** + * The connection to the forwarder succeeded, step the state machine + * + * Change the state of the connection to connected and notify the user that it's ready. + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_connectionSucceeded(FwdMetisState *fwd_state, RtaConnection *conn) +{ + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s Connection %p connected fd %d\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn, fwd_state->fd); + } + + fwd_state->isConnected = true; + + // enable read events + parcEvent_Start(fwd_state->readEvent); + + rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_CONNECTION_OPEN, NULL, NULL); +} + +static void +_readInEnvironmentConnectionSpecification(struct sockaddr_in *addr_in) +{ + char *forwarderIpEnv = getenv(FORWARDER_CONNECTION_ENV); + if (forwarderIpEnv == NULL) { + return; + } + + char forwarderIpAddress[NI_MAXHOST] = { 0 }; + in_port_t forwarderIpPort = 0; + + // Currently, we only support tcp control connections to the forwarder + sscanf(forwarderIpEnv, "tcp://%[^:]:%hu", forwarderIpAddress, &forwarderIpPort); + + // If provided, use the specified address in a canonical form + if (forwarderIpAddress[0] != '\0') { + // Normalize the provided hostname + struct sockaddr_in *addr = (struct sockaddr_in *) parcNetwork_SockAddress(forwarderIpAddress, forwarderIpPort); + char *ipAddress = inet_ntoa(addr->sin_addr); + parcMemory_Deallocate(&addr); + if (ipAddress) { + addr_in->sin_addr.s_addr = inet_addr(ipAddress); + } else { + addr_in->sin_addr.s_addr = inet_addr(forwarderIpAddress); + } + } + + // If provided, use the specified port + if (forwarderIpPort != 0) { + addr_in->sin_port = htons(forwarderIpPort); + } +} + +/** + * @function connector_Fwd_Metis_BeginConnect + * @abstract Begins the non-blocking connect() call to 127.0.0.1 on the port in FwdMetisState + * @discussion + * <#Discussion#> + * + * @param <#param1#> + * @return <#return#> + */ +static bool +connector_Fwd_Metis_BeginConnect(FwdMetisState *fwd_state, RtaConnection *conn) +{ + bool success = false; + + struct sockaddr_in addr_in; + memset(&addr_in, 0, sizeof(addr_in)); + addr_in.sin_port = htons(fwd_state->port); + addr_in.sin_family = AF_INET; + addr_in.sin_addr.s_addr = inet_addr("127.0.0.1"); + + // Override defaults if specified + _readInEnvironmentConnectionSpecification(&addr_in); + + if (DEBUG_OUTPUT) { + char inetAddress[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(addr_in.sin_addr), inetAddress, INET_ADDRSTRLEN); + printf("%9" PRIu64 " %s beginning connect socket %d to port %d on %s\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + fwd_state->fd, + fwd_state->port, + inetAddress); + } + + // This will deliver a PARCEventType_Write event on connect success + int res = connect(fwd_state->fd, (struct sockaddr*) &addr_in, (socklen_t) sizeof(addr_in)); + + if (res == 0) { + // connect succeded immediately + _connectionSucceeded(fwd_state, conn); + success = true; + } else if (errno == EINPROGRESS) { + // connection is deferred + success = true; + } else { + // a hard error + printf("Error connecting: (%d) %s\n", errno, strerror(errno)); + } + + return success; +} + +/** + * We maintain an input queue going up the stack and only dequeue a small number of packets + * with each call from the dispatch loop. THis is to avoid bursting a bunch of packets up the stack. + */ +static void +connector_Fwd_Metis_Dequeue(int fd, PARCEventType which_event, void *metisStateVoid) +{ + FwdMetisState *fwd_state = (FwdMetisState *) metisStateVoid; + + // random small number. What is right value for this? + unsigned max_loops = 6; + + if (DEBUG_OUTPUT) { + printf("%9d %s deque size %zu\n", + 0, + __func__, + parcDeque_Size(fwd_state->transportMessageQueue)); + } + + while (max_loops > 0 && !parcDeque_IsEmpty(fwd_state->transportMessageQueue)) { + max_loops--; + TransportMessage *tm = parcDeque_RemoveFirst(fwd_state->transportMessageQueue); + + RtaConnection *conn = rtaConnection_GetFromTransport(tm); + RtaProtocolStack *stack = rtaConnection_GetStack(conn); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_METIS, RTA_UP); + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS); + + if (rtaComponent_PutMessage(out, tm)) { + rtaComponentStats_Increment(stats, STATS_UPCALL_OUT); + } + } + + // If there are still messages in there, re-schedule + if (!parcDeque_IsEmpty(fwd_state->transportMessageQueue)) { + if (DEBUG_OUTPUT) { + printf("%9d %s rescheduling output queue timer %p\n", + 0, + __func__, + (void *) fwd_state->transportMessageQueueEvent); + } + + struct timeval immediateTimeout = { 0, 0 }; + parcEventTimer_Start(fwd_state->transportMessageQueueEvent, &immediateTimeout); + } +} + +/** + * Create a TCP socket + * Set it non-blocking + * Wrap it in a buffer event + * Set Read and Event callbacks + * + * Return 0 success, -1 failure + */ +static int +connector_Fwd_Metis_Opener(RtaConnection *conn) +{ + bool success = false; + + uint16_t port = metisForwarder_GetPortFromConfig(rtaConnection_GetParameters(conn)); + + PARCEventScheduler *scheduler = rtaFramework_GetEventScheduler(rtaConnection_GetFramework(conn)); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + + if (_openSocket(fwd_state, port)) { + if (_setupSocket(fwd_state)) { + if (_setupSocketEvents(fwd_state, conn)) { + if (connector_Fwd_Metis_BeginConnect(fwd_state, conn)) { + // stash it away in the per-connection cubby hole + rtaConnection_SetPrivateData(conn, FWD_METIS, fwd_state); + success = true; + } + } + } + } + + if (!success) { + if (fwd_state->fd) { + close(fwd_state->fd); + } + if (fwd_state->readEvent) { + parcEvent_Destroy(&(fwd_state->readEvent)); + } + if (fwd_state->writeEvent) { + parcEvent_Destroy(&(fwd_state->writeEvent)); + } + parcMemory_Deallocate((void **) &fwd_state); + return -1; + } + + // Socket will be ready for use once we get PARCEventQueue_Connected + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s open conn %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn); + } + + return 0; +} + +/** + * We received a Metis control packet. Translate it to a control packet and send it up the stack. + */ +static void +receiveControlMessage(PacketData *data) +{ + CCNxTlvDictionary *packetDictionary = + ccnxWireFormatMessage_FromControlPacketType(data->fwd_state->nextMessage.version, data->fwd_state->nextMessage.packet); + + bool success = ccnxCodecTlvPacket_BufferDecode(data->fwd_state->nextMessage.packet, packetDictionary); + + if (success) { + TransportMessage *tm = transportMessage_CreateFromDictionary(packetDictionary); + transportMessage_SetInfo(tm, rtaConnection_Copy(data->conn), rtaConnection_FreeFunc); + + // send it up the stack + if (rtaComponent_PutMessage(data->out, tm)) { + rtaComponentStats_Increment(data->stats, STATS_UPCALL_OUT); + data->fwd_state->stats.countUpcallWriteControlOk++; + } else { + data->fwd_state->stats.countUpcallWriteControlError++; + } + } else { + assertTrue(success, "Error decoding a Metis control packet\n") + { + parcBuffer_Display(data->fwd_state->nextMessage.packet, 3); + } + } + + // we are now done with our references + ccnxTlvDictionary_Release(&packetDictionary); +} + + +static void +_queueNonControl(PacketData *data) +{ + CCNxTlvDictionary *packetDictionary = ccnxWireFormatMessage_Create(data->fwd_state->nextMessage.packet); + + assertNotNull(packetDictionary, "Got a null packet decode") + { + parcBuffer_Display(data->fwd_state->nextMessage.packet, 3); + } + + TransportMessage *tm = transportMessage_CreateFromDictionary(packetDictionary); + + // add the connection info to the transport message before sending up stack + transportMessage_SetInfo(tm, rtaConnection_Copy(data->conn), rtaConnection_FreeFunc); + + parcDeque_Append(data->fwd_state->transportMessageQueue, tm); + + // start if went from emtpy to 1 + if (parcDeque_Size(data->fwd_state->transportMessageQueue) == 1) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u schedule dequeue event %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(data->conn))), + __func__, + rtaConnection_GetConnectionId(data->conn), + (void *) data->fwd_state->transportMessageQueueEvent); + } + + struct timeval immediateTimeout = { 0, 0 }; + parcEventTimer_Start(data->fwd_state->transportMessageQueueEvent, &immediateTimeout); + } + + // we are now done with our references + ccnxTlvDictionary_Release(&packetDictionary); +} + +/** + * Receive a non-control packet + * + * Non-control messages may be dropped due to lack of input buffer space. + * If the connection has state Block Up or the up queue's length is + * too many messages deep, the non-control message will be dropped. + * + * precondition: the caller knows the message is not a control message + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_receiveNonControl(PacketData *data) +{ + if (rtaConnection_BlockedUp(data->conn)) { + data->fwd_state->stats.countUpcallWriteDataBlocked++; + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u blocked up, drop wireFormat %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(data->conn))), + __func__, + rtaConnection_GetConnectionId(data->conn), + (void *) data->fwd_state->nextMessage.packet); + } + } else { + if (parcDeque_Size(data->fwd_state->transportMessageQueue) < METIS_INPUT_QUEUE_MESSAGES) { + _queueNonControl(data); + data->fwd_state->stats.countUpcallWriteDataOk++; + } else { + data->fwd_state->stats.countUpcallWriteDataQueueFull++; + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u input buffer full, drop wireFormat %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(data->conn))), + __func__, + rtaConnection_GetConnectionId(data->conn), + (void *) data->fwd_state->nextMessage.packet); + } + } + } +} + +/** + * We received an entire packet, send it up the stack in a Transport message. + * + * If its a control message, we make it a CCNxControlMessage here for symmetry with us + * encoding the control messages at this level + */ +static void +connector_Fwd_Metis_SendUpStack(PacketData *data) +{ + // Always send control messages up the stack + if (data->fwd_state->nextMessage.packetType == PacketType_Control) { + receiveControlMessage(data); + } else { + _receiveNonControl(data); + } +} + +/** + * Return the SO_ERROR value for the given socket + * + * If getsockopt returns an error, the return code could be the error from getsockopt. + * + * Typically you will get ECONNREFUSED when you cannot connect and one of the many getsockopt + * errors if there's a problem with the actual socket. + * + * @param [in] fd The socket + * + * @return errno An errno value + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static int +_getSocketError(int fd) +{ + int value; + socklen_t valueLength = sizeof(value); + int res = getsockopt(fd, SOL_SOCKET, SO_ERROR, &value, &valueLength); + if (res < 0) { + value = res; + } + return value; +} + +/** + * Received an event on a socket we have marked as not yet connected + * + * Ether it's ready to go or there's an error. We will receive a PARCEventType_Read and the socket + * will have an SO_ERROR of 0 if it's now connected. If the SO_ERROR is non-zero, there + * was an error on connect. + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_disconnectedEventHandler(FwdMetisState *fwd_state, RtaConnection *conn, PARCEventType what) +{ + if (what & PARCEventType_Read) { + int socketError = _getSocketError(fwd_state->fd); + if (socketError == 0) { + // I don't think these happen, they will be write events + _connectionSucceeded(fwd_state, conn); + } else { + // error on connect + printf("%9" PRIu64 " %s Connection %p got error on SOCK_STREAM, fd %d: %s\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn, + fwd_state->fd, + strerror(errno)); + + // make the event non-pending + parcEvent_Stop(fwd_state->readEvent); + parcEvent_Stop(fwd_state->writeEvent); + + rtaConnection_SetBlockedDown(conn); + + // at least tell the API whats going on + rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_FORWARDER_NOT_AVAILABLE, NULL, NULL); + } + } + + if (what & PARCEventType_Write) { + int socketError = _getSocketError(fwd_state->fd); + if (socketError == 0) { + _connectionSucceeded(fwd_state, conn); + } + } +} + +static void +_setupNextPacketV1(FwdMetisState *fwd_state) +{ + switch (fwd_state->nextMessage.fixedHeader.v1.packetType) { + case CCNxCodecSchemaV1Types_PacketType_Interest: + fwd_state->nextMessage.packetType = PacketType_Interest; + break; + case CCNxCodecSchemaV1Types_PacketType_ContentObject: + fwd_state->nextMessage.packetType = PacketType_ContentObject; + break; + case CCNxCodecSchemaV1Types_PacketType_Control: + fwd_state->nextMessage.packetType = PacketType_Control; + break; + case CCNxCodecSchemaV1Types_PacketType_InterestReturn: + fwd_state->nextMessage.packetType = PacketType_InterestReturn; + break; + default: + fwd_state->nextMessage.packetType = PacketType_Unknown; + break; + } + + size_t fixedHeaderLength = sizeof(CCNxCodecSchemaV1FixedHeader); + fwd_state->nextMessage.length = htons(fwd_state->nextMessage.fixedHeader.v1.packetLength); + + fwd_state->nextMessage.packet = parcBuffer_Allocate(fwd_state->nextMessage.length); + assertNotNull(fwd_state->nextMessage.packet, "Could not allocate packet of size %zu", fwd_state->nextMessage.length); + + // finally copy in the fixed header as we have already read that in + parcBuffer_PutArray(fwd_state->nextMessage.packet, fixedHeaderLength, fwd_state->nextMessage.fixedHeader.buffer); +} + +/** + * Called after reading whole FixedHeader, will setup the packet buffer + * + * After reading the fixed header, we need to allocate a PARCBuffer for the packet. Setup that + * buffer and copy the FixedHeader in to it. Remaining reads will go in to this buffer. + * + * After this function completes, the parsed version, packetType, and length of the nextMessage will + * be filled in, the packet buffer allocated and the fixedHeader copied to that packet buffer. + * + * precondition: forwarder->nextMessage.remainingReadLength == 0 && fwd_state->nextMessage.packet == NULL + * + * @param [in] fwd_state An allocated forwarder connection state that has read in the fixed header + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_setupNextPacket(FwdMetisState *fwd_state) +{ + trapUnexpectedStateIf(fwd_state->nextMessage.packet != NULL, "Calling _setupNextPacket but the packet field is not NULL"); + + fwd_state->nextMessage.version = fwd_state->nextMessage.fixedHeader.buffer[0]; + + switch (fwd_state->nextMessage.version) { + case 1: + _setupNextPacketV1(fwd_state); + break; + + default: + trapUnexpectedState("Illegal packet version %d", fwd_state->nextMessage.version) + { + _nextMessage_Display(&fwd_state->nextMessage, 0); + } + break; + } +} + +/** + * Reads the FixedHeader. If full read will setup the next packet buffer. + * + * Reads up to FixedHeader length bytes. If read whole header will allocate the next packet + * buffer to right size and copy the Fixed Header in to the buffer. + * + * preconditions: + * - fwd_state->nextMessage.packet should be NULL + * - fwd_state->nextMessage.remainingReadLength should be the remaining bytes to read of the Fixed Header + * - fwd_state->nextMessage.readLocation should point to the location in the FixedHeader to start reading + * + * postconditions: + * - fwd_state->nextMessage.remainingReadLength will be decremented by the amount read + * - If remainingReadLength is decremented to 0, will allocate fwd_state->nextMessage.packet and copy in the FixedHeader + * - The fields in fwd_state->nextMessage (length, packetType, version) will be set based on the fixed header + * + * @param [in] fwd_state An allocated forwarder connection state + * + * @retval ReadReturnCode_Finished one entire packet is ready in the buffer + * @retval ReadReturnCode_PartialRead need more bytes + * @retval ReadRetrunCode_Closed The socket to metis is closed (a special case of Error) + * @retval ReadReturnCode_Error An error occured on the socket to metis + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static ReadReturnCode +_readPacketHeader(FwdMetisState *fwd_state) +{ + ReadReturnCode returnCode = ReadReturnCode_Error; + + // This could be switched to MSG_PEEK instead of copying later, but I don't think it makes any significant change. + ssize_t nread = recv(fwd_state->fd, fwd_state->nextMessage.readLocation, fwd_state->nextMessage.remainingReadLength, 0); + if (nread > 0) { + // recv will always runturn at most fwd_state->nextMessage.remainingReadLength, so this won't wrap around to negative. + fwd_state->nextMessage.remainingReadLength -= nread; + + if (fwd_state->nextMessage.remainingReadLength == 0) { + returnCode = ReadReturnCode_Finished; + _setupNextPacket(fwd_state); + } else { + fwd_state->nextMessage.readLocation += nread; + returnCode = ReadReturnCode_PartialRead; + } + } else if (nread == 0) { + // the connection is closed + returnCode = ReadReturnCode_Closed; + } else { + switch (errno) { + case EAGAIN: + // call would block. These can happen becasue _readMessage is in a while loop and we detect + // the end of the loop because we cannot read another fixed header. + returnCode = ReadReturnCode_PartialRead; + break; + + default: + // an error. I think all errors will be hard errors and we close the connection + if (DEBUG_OUTPUT) { + printf("%9c %s socket %d recv error: (%d) %s\n", + ' ', __func__, fwd_state->fd, errno, strerror(errno)); + } + returnCode = ReadReturnCode_Error; + break; + } + } + + return returnCode; +} + + +/** + * We have finished reading the fixed header, reading the message body + * + * Will modify the nextMessage.packet buffer. When the buffer has 0 remaining, the whole packet has been read + * + * precondition: _readHeaderFromMetis read the header and allocated the packet buffer + * + * @param [in] fwd_state An allocated forwarder connection state + * + * @retval ReadReturnCode_Finished one entire packet is ready in the buffer + * @retval ReadReturnCode_PartialRead need more bytes + * @retval ReadRetrunCode_Closed The socket to metis is closed (a special case of Error) + * @retval ReadReturnCode_Error An error occured on the socket to metis + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static ReadReturnCode +_readPacketBody(FwdMetisState *fwd_state) +{ + ReadReturnCode returnCode = ReadReturnCode_Error; + + trapUnexpectedStateIf(fwd_state->nextMessage.packet == NULL, "Trying to read a message with a null packet buffer"); + + size_t remaining = parcBuffer_Remaining(fwd_state->nextMessage.packet); + + if (DEBUG_OUTPUT) { + printf("%9c %s socket %d read up to %zu bytes\n", + ' ', __func__, fwd_state->fd, remaining); + } + + void *overlay = parcBuffer_Overlay(fwd_state->nextMessage.packet, 0); + ssize_t nread = recv(fwd_state->fd, overlay, remaining, 0); + + if (nread > 0) { + // good read + parcBuffer_SetPosition(fwd_state->nextMessage.packet, parcBuffer_Position(fwd_state->nextMessage.packet) + nread); + + if (nread == remaining) { + returnCode = ReadReturnCode_Finished; + } else { + returnCode = ReadReturnCode_PartialRead; + } + } else if (nread == 0) { + // connection closed + returnCode = ReadReturnCode_Closed; + } else { + switch (errno) { + case EAGAIN: + // call would block. These can happen becasue _readMessage is in a while loop and we detect + // the end of the loop because we cannot read the entire message body. + returnCode = ReadReturnCode_PartialRead; + break; + + default: + // an error. I think all errors will be hard errors and we close the connection + if (DEBUG_OUTPUT) { + printf("%9c %s socket %d recv error: (%d) %s\n", + ' ', __func__, fwd_state->fd, errno, strerror(errno)); + } + returnCode = ReadReturnCode_Error; + } + } + + + if (DEBUG_OUTPUT) { + printf("%9c %s socket %u msg_length %zu read_length %zd remaining %zu\n", + ' ', + __func__, + fwd_state->fd, + fwd_state->nextMessage.length, + nread, + parcBuffer_Remaining(fwd_state->nextMessage.packet)); + } + + return returnCode; +} + +/** + * Read packet from metis + * + * Reads the fixed heder. Once fixed header is done, begins reading the packet body. Keeps + * all the incremental state to do partial reads. + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @retval ReadReturnCode_Finished one entire packet is ready in the buffer + * @retval ReadReturnCode_PartialRead need more bytes + * @retval ReadRetrunCode_Closed The socket to metis is closed (a special case of Error) + * @retval ReadReturnCode_Error An error occured on the socket to metis + * + * Example: + * @code + * <#example#> + * @endcode + */ +static ReadReturnCode +_readPacket(FwdMetisState *fwd_state) +{ + ReadReturnCode returnCode = ReadReturnCode_PartialRead; + + // are we still reading the header? + if (fwd_state->nextMessage.remainingReadLength > 0) { + returnCode = _readPacketHeader(fwd_state); + } else { + returnCode = ReadReturnCode_Finished; + } + + // After reading the header, it may be possible to read the body too + if (returnCode == ReadReturnCode_Finished && fwd_state->nextMessage.remainingReadLength == 0) { + returnCode = _readPacketBody(fwd_state); + } + + return returnCode; +} + +/** + * Read as many packets as we can from Metis + * + * Will read the stream socket from metis until we get a PartialRead return code from + * either the attempt to read the header or the body. + * + * On read error, will send a notification message the connection is closed up to + * the API and will disable read and write events. + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_readFromMetis(FwdMetisState *fwd_state, RtaConnection *conn) +{ + RtaProtocolStack *stack = rtaConnection_GetStack(conn); + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS); + + ReadReturnCode readCode; + while ((readCode = _readPacket(fwd_state)) == ReadReturnCode_Finished) { + rtaComponentStats_Increment(stats, STATS_UPCALL_IN); + fwd_state->stats.countUpcallReads++; + + // setup the buffer for reading + parcBuffer_Flip(fwd_state->nextMessage.packet); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s sending packet buffer %p up stack length %zu\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) fwd_state->nextMessage.packet, + parcBuffer_Remaining(fwd_state->nextMessage.packet)); + } + + // this is just to make the signature of connector_Fwd_Metis_SendUpStack tractable, PacketData + // is not exposed outside this scope. + + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_METIS, RTA_UP); + PacketData data = { + .fwd_state = fwd_state, + .conn = conn, + .out = out, + .stats = stats, + }; + + connector_Fwd_Metis_SendUpStack(&data); + + // done with the packet buffer. Release our hold on it. If it was sent up the stack + // another reference count was made. + parcBuffer_Release(&fwd_state->nextMessage.packet); + + // now setup for next packet + _initializeNextMessage(&fwd_state->nextMessage); + } + + if (readCode == ReadReturnCode_Closed) { + fwd_state->isConnected = false; + parcEvent_Stop(fwd_state->readEvent); + parcEvent_Stop(fwd_state->writeEvent); + rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_CONNECTION_CLOSED, NULL, "Socket operation returned closed by remote"); + } else if (readCode == ReadReturnCode_Error) { + fwd_state->isConnected = false; + parcEvent_Stop(fwd_state->readEvent); + parcEvent_Stop(fwd_state->writeEvent); + rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_CONNECTION_CLOSED, NULL, "Socket operation returned error"); + } + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s total upcall reads in %" PRIu64 " out %" PRIu64 "\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaComponentStats_Get(stats, STATS_UPCALL_IN), + rtaComponentStats_Get(stats, STATS_UPCALL_OUT)); + } +} + +/** + * Append a vector to the buffer + * + * @param [in] wireFormat The wire format packet, assumes current position is start of packet + * @param [in] fwd_output The libevent buffer to add the memory reference to + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_queueIoVecMessageToMetis(CCNxCodecNetworkBufferIoVec *vec, PARCEventBuffer *fwd_output) +{ + fwd_metis_references_queued++; + + int iovcnt = ccnxCodecNetworkBufferIoVec_GetCount(vec); + const struct iovec *array = ccnxCodecNetworkBufferIoVec_GetArray(vec); + + for (int i = 0; i < iovcnt; i++) { + if (parcEventBuffer_Append(fwd_output, array[i].iov_base, array[i].iov_len) < 0) { + trapUnrecoverableState("%s error writing to bev_local", __func__); + } + } +} + +/** + * Append to the buffer + * + * @param [in] wireFormat The wire format packet, assumes current position is start of packet + * @param [in] fwd_output The libevent buffer to add the memory reference to + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_queueBufferMessageToMetis(PARCBuffer *wireFormat, PARCEventBuffer *fwd_output) +{ + fwd_metis_references_queued++; + + void *overlay = parcBuffer_Overlay(wireFormat, 0); + size_t length = parcBuffer_Remaining(wireFormat); + + if (parcEventBuffer_Append(fwd_output, overlay, length) < 0) { + trapUnrecoverableState("%s error writing to bev_local", __func__); + } +} + +/** + * Write as much as possible from the output buffer to metis + * + * Write as much as we can to metis. If there is nothing left, deactivate the write event. + * If there is still bytes left in the output buffer, activate the write event. + * + * postconditions: + * - Write as many bytes as possible from the output buffer to metis + * - If there are still bytes remaining, enable the write event + * - If there are no bytes remaining, disable the write event. + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_dequeueMessagesToMetis(FwdMetisState *fwdConnState) +{ + // if we try to write a 0 length buffer, write will return -1 like an error + if (parcEventBuffer_GetLength(fwdConnState->metisOutputQueue) > 0) { + fwdConnState->stats.countDowncallWrites++; + int nwritten = parcEventBuffer_WriteToFileDescriptor(fwdConnState->metisOutputQueue, fwdConnState->fd, -1); + if (nwritten < 0) { + // an error + trapNotImplemented("Bugzid: 2194"); + } + + if (DEBUG_OUTPUT) { + printf("%9c %s wrote %d bytes to socket %d, %zu bytes remaining\n", + ' ', + __func__, + nwritten, + fwdConnState->fd, + parcEventBuffer_GetLength(fwdConnState->metisOutputQueue)); + } + + // if we could not write the whole buffer, make sure we have a write event pending + if (parcEventBuffer_GetLength(fwdConnState->metisOutputQueue) > 0) { + parcEvent_Start(fwdConnState->writeEvent); + if (DEBUG_OUTPUT) { + printf("%9c %s enabled write event\n", ' ', __func__); + } + } else { + parcEvent_Stop(fwdConnState->writeEvent); + if (DEBUG_OUTPUT) { + printf("%9c %s disabled write event\n", ' ', __func__); + } + } + } +} + + +/** + * Called when we get an event on a socket we believe is connected + * + * libevent will call this with an PARCEventType_Read on connection close too (the read length will be 0). + * + * @param [in] fwd_state An allocated forwarder connection state + * @param [in] conn The corresponding RTA connection + * @param [in] what The Libevent set of events + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_connectedEventHandler(FwdMetisState *fwd_state, RtaConnection *conn, short what) +{ + if (what & PARCEventType_Read) { + _readFromMetis(fwd_state, conn); + } + + if (what & PARCEventType_Write) { + _dequeueMessagesToMetis(fwd_state); + } +} + +/** + * Called for any activity on the socket. Maybe in either connected or disconnected state. + */ +static void +_eventCallback(int fd, PARCEventType what, void *connectionVoid) +{ + RtaConnection *conn = (RtaConnection *) connectionVoid; + FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);; + + if (!fwd_state->isConnected) { + _disconnectedEventHandler(fwd_state, conn, what); + + // once we connect, we should try a read immediately too + } + + if (fwd_state->isConnected) { + _connectedEventHandler(fwd_state, conn, what); + } +} + +/** + * Updates the connections's Blocked Down state + * + * If the bytes in our output buffer are greater than METIS_OUTPUT_QUEUE_BYTES, then + * we will set the Blocked Down condition on the connection. This will prevent the + * API connector from accepting more messages. + * + * Messages already in the connection queue will still be processed. + * + * @param [in] fwd_output The libevent buffer to check the backlog + * @param [in] conn The RtaConnection the set or clear the blocked down condition + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_updateBlockedDownState(PARCEventBuffer *fwd_output, RtaConnection *conn) +{ + size_t queue_bytes = parcEventBuffer_GetLength(fwd_output); + if (queue_bytes > METIS_OUTPUT_QUEUE_BYTES) { + // block down + + if (!rtaConnection_BlockedDown(conn)) { + rtaConnection_SetBlockedDown(conn); + } + + // note that we continue execution and put the packet we have in hand on the queue + // setting the blocked down state only affects the API connector. Packets already in the system + // will keep flowing down to us + } else { + // if it is blocked, unblock it + if (rtaConnection_BlockedDown(conn)) { + rtaConnection_ClearBlockedDown(conn); + } + } +} + +static void +connector_Fwd_Metis_Downcall_HandleConnected(FwdMetisState *fwdConnState, TransportMessage *tm, RtaConnection *conn, RtaComponentStats *stats) +{ + _updateBlockedDownState(fwdConnState->metisOutputQueue, conn); + + CCNxTlvDictionary *dictionary = transportMessage_GetDictionary(tm); + + bool queued = false; + + CCNxCodecNetworkBufferIoVec *vec = ccnxWireFormatMessage_GetIoVec(dictionary); + if (vec != NULL) { + _queueIoVecMessageToMetis(vec, fwdConnState->metisOutputQueue); + queued = true; + } else { + PARCBuffer *wireFormat = ccnxWireFormatMessage_GetWireFormatBuffer(dictionary); + if (wireFormat != NULL) { + _queueBufferMessageToMetis(wireFormat, fwdConnState->metisOutputQueue); + queued = true; + } + } + + if (queued) { + rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT); + + if (DEBUG_OUTPUT) { + struct timeval delay = transportMessage_GetDelay(tm); + printf("%9" PRIu64 " %s total downcall reads %" PRIu64 " references queued %u dequeued %u not queued %u last delay %.6f\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaComponentStats_Get(stats, STATS_DOWNCALL_IN), + fwd_metis_references_queued, + fwd_metis_references_dequeued, + fwd_metis_references_notqueued, + delay.tv_sec + delay.tv_usec * 1E-6); + } + } else { + fwd_metis_references_notqueued++; + } + + // The transport message is destroyed in connector_Fwd_Metis_Downcall_Read() +} + +static void +_ackRequest(RtaConnection *conn, PARCJSON *request) +{ + PARCJSON *response = cpiAcks_CreateAck(request); + CCNxTlvDictionary *ackDict = ccnxControlFacade_CreateCPI(response); + + TransportMessage *tm_ack = transportMessage_CreateFromDictionary(ackDict); + ccnxTlvDictionary_Release(&ackDict); + parcJSON_Release(&response); + + transportMessage_SetInfo(tm_ack, rtaConnection_Copy(conn), rtaConnection_FreeFunc); + + RtaProtocolStack *stack = rtaConnection_GetStack(conn); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_METIS, RTA_UP); + if (rtaComponent_PutMessage(out, tm_ack)) { + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS); + rtaComponentStats_Increment(stats, STATS_UPCALL_OUT); + } +} + +static bool +_handleDownControl(FwdMetisState *fwdConnState, RtaConnection *conn, TransportMessage *tm) +{ + bool consumedMessage = false; + + CCNxTlvDictionary *dict = transportMessage_GetDictionary(tm); + if (ccnxTlvDictionary_IsControl(dict)) { + if (ccnxControlFacade_IsCPI(dict)) { + PARCJSON *json = ccnxControlFacade_GetJson(dict); + if (controlPlaneInterface_GetCPIMessageType(json) == CPI_REQUEST) { + if (cpi_getCPIOperation2(json) == CPI_PAUSE) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s conn %p recieved PAUSE\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn); + } + _ackRequest(conn, json); + consumedMessage = true; + } + + if (cpi_getCPIOperation2(json) == CPI_FLUSH) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s conn %p recieved FLUSH\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn); + } + _ackRequest(conn, json); + consumedMessage = true; + } + } + } + } + + if (consumedMessage) { + fwdConnState->stats.countDowncallControl++; + } + + return consumedMessage; +} + +/** + * send raw packet from codec to forwarder. We are passed the ProtocolStack on the ptr. + */ +static void +connector_Fwd_Metis_Downcall_Read(PARCEventQueue *in, PARCEventType event, void *ptr) +{ + TransportMessage *tm; + + while ((tm = rtaComponent_GetMessage(in)) != NULL) { + RtaConnection *conn = rtaConnection_GetFromTransport(tm); + FwdMetisState *fwdConnState = rtaConnection_GetPrivateData(conn, FWD_METIS); + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS); + rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN); + fwdConnState->stats.countDowncallReads++; + + bool consumedControl = _handleDownControl(fwdConnState, conn, tm); + if (!consumedControl) { + // we did not consume the message as a control packet for the metis connector + + if (fwdConnState->isConnected) { + // If the socket is connected, this will "do the right thing" and consume the transport message. + connector_Fwd_Metis_Downcall_HandleConnected(fwdConnState, tm, conn, stats); + } else { + // Oops, got a packet before we're connected. + printf("\nConnection %p transport message %p on fd %d that's not open\n", (void *) conn, (void *) tm, fwdConnState->fd); + } + + // now attempt to write to the network + _dequeueMessagesToMetis(fwdConnState); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s total downcall reads in %" PRIu64 " out %" PRIu64 "\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaComponentStats_Get(stats, STATS_DOWNCALL_IN), + rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT)); + } + } + + transportMessage_Destroy(&tm); + } +} + +/** + * Destroy the FwdMetisState object. + * + * Destroys any packets waiting in queue, frees the libevent structures used by the connection to Metis. + * Frees the FwdMetisState object and will NULL *fwdStatePtr. + * + * @param [in,out] fwdStatePtr Double pointer to the allocated state. Will be NULL'd on output. + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_fwdMetisState_Release(FwdMetisState **fwdStatePtr) +{ + FwdMetisState *fwd_state = *fwdStatePtr; + + while (!parcDeque_IsEmpty(fwd_state->transportMessageQueue)) { + TransportMessage *tm = parcDeque_RemoveFirst(fwd_state->transportMessageQueue); + transportMessage_Destroy(&tm); + } + + parcDeque_Release(&fwd_state->transportMessageQueue); + + if (fwd_state->readEvent) { + parcEvent_Destroy(&(fwd_state->readEvent)); + } + + if (fwd_state->writeEvent) { + parcEvent_Destroy(&(fwd_state->writeEvent)); + } + + parcEventTimer_Destroy(&(fwd_state->transportMessageQueueEvent)); + + if (fwd_state->metisOutputQueue) { + parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue)); + } + + if (fwd_state->nextMessage.packet) { + parcBuffer_Release(&fwd_state->nextMessage.packet); + } + + close(fwd_state->fd); + + parcMemory_Deallocate((void **) &fwd_state); + *fwdStatePtr = NULL; +} + +static int +connector_Fwd_Metis_Closer(RtaConnection *conn) +{ + FwdMetisState *fwd_state = rtaConnection_GetPrivateData(conn, FWD_METIS); + rtaConnection_SetPrivateData(conn, FWD_METIS, NULL); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s called on fwd_state %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), __func__, (void *) fwd_state); + } + + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS); + rtaComponentStats_Increment(stats, STATS_CLOSES); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s closed fwd_state %p deque length %zu\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) fwd_state, + parcDeque_Size(fwd_state->transportMessageQueue)); + + printf("%9" PRIu64 " %s closed fwd_state %p stats: up { reads %u wok %u werr %u wblk %u wfull %u wctrlok %u wctrlerr %u }\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) fwd_state, + fwd_state->stats.countUpcallReads, fwd_state->stats.countUpcallWriteDataOk, fwd_state->stats.countUpcallWriteDataError, + fwd_state->stats.countUpcallWriteDataBlocked, fwd_state->stats.countUpcallWriteDataQueueFull, + fwd_state->stats.countUpcallWriteControlOk, fwd_state->stats.countUpcallWriteControlError); + + printf("%9" PRIu64 " %s closed fwd_state %p stats: dn { reads %u wok %u wctrlok %u }\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) fwd_state, + fwd_state->stats.countDowncallReads, fwd_state->stats.countDowncallWrites, fwd_state->stats.countDowncallControl); + } + + _fwdMetisState_Release(&fwd_state); + + return 0; +} + +static int +connector_Fwd_Metis_Release(RtaProtocolStack *stack) +{ + return 0; +} + +/** + * Enable to disable the read event based on the Blocked Up state + * + * If we receive a Blocked Up state change and the read event is pending, make it + * not pending. If we receive a not blocked up state change and the read event is not + * pending, make it pending. + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +connector_Fwd_Metis_StateChange(RtaConnection *conn) +{ + struct fwd_metis_state *fwd_state = rtaConnection_GetPrivateData(conn, FWD_METIS); + + int isReadPending = parcEvent_Poll(fwd_state->readEvent, PARCEventType_Read); + + + // If we are blocked in the UP direction, disable events on the read queue + if (rtaConnection_BlockedUp(conn)) { + // we only disable it and log it if it was active + if (isReadPending) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u blocked up, disable PARCEventType_Read\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaConnection_GetConnectionId(conn)); + } + + parcEvent_Stop(fwd_state->readEvent); + } + } else { + if ((!isReadPending) && fwd_state->isConnected) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u unblocked up, enable PARCEventType_Read\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaConnection_GetConnectionId(conn)); + } + parcEvent_Start(fwd_state->readEvent); + } + } + + // We do not need to do anything with DOWN direction, becasue we're the component sending + // those block down messages. +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c new file mode 100644 index 00000000..4e5ea48f --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c @@ -0,0 +1,634 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implements the API connector. The API connector is a event based component to manage the socket + * to the API. + * + * The API Connector's job is to manage the socket to the API between the RTA Framework and the + * API. It does this by using an event directly to manage that socket. It uses the same + * event scheduler base as the RTA framework, so its all part of the same event dispatcher. + * + * The RTA Transport now only speaks CCNxTlvDictionary messages. If we receive old timey Interest, + * ContentObject, etc., we translate them to the Dictionary format. The TransportMessage and CCNxMessage + * will both go away. + * + */ + +#include <config.h> +#include <stdio.h> +#include <fcntl.h> +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <errno.h> + +#include <parc/algol/parc_EventBuffer.h> + +#include <parc/algol/parc_Memory.h> +#include <LongBow/runtime.h> + +#include <ccnx/transport/transport_rta/connectors/rta_ApiConnection.h> +#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/core/rta_Component.h> +#include <ccnx/api/control/controlPlaneInterface.h> +#include <ccnx/api/control/cpi_ControlFacade.h> + +#include <ccnx/transport/transport_rta/config/config_Codec_Tlv.h> + +#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_TlvDictionary.h> + + +#ifndef DEBUG_OUTPUT +#define DEBUG_OUTPUT 0 +#endif + +#define PAIR_TRANSPORT 0 +#define PAIR_OTHER 1 + +// we are only putting an 8-byte pointer on the queue, so +// this should be 50 messages +#define MAX_API_QUEUE_BYTES 400 + + +unsigned api_upcall_writes = 0; +unsigned api_downcall_reads = 0; +extern unsigned rta_transport_reads; + +// per connection state +struct rta_api_connection { + // A reference to our connection + RtaConnection *connection; + + // event queue for socketpair to API + PARCEventQueue *bev_api; + + // these are assingned to us by the Transport + int api_fd; + int transport_fd; +}; + +// ========================================================================================== +// STATIC PROTOTYPES and their headerdoc + +/** + * PARCEvent calls this when the API queue falls below the watermark + * + * We watermark the write queue at MAX_API_QUEUE_BYTES bytes. When a write takes + * the queue backlog below that amount, PARCEvent calls this. + * + * @param [in] connVoid Void pointer to the RtaConnection + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void rtaApiConnection_WriteCallback(PARCEventQueue *queue, PARCEventType type, void *conn); + +/** + * PARCEvent calls this when there's a message from the API + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void rtaApiConnection_Downcall_Read(PARCEventQueue *bev, PARCEventType type, void *conn); + +/** + * PARCEvent calls this when there's a non-read/write event on the API's socket + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void rtaApiConnection_Downcall_Event(PARCEventQueue *, PARCEventQueueEventType events, void *conn); + + +/** + * Drains the input queue and output queue of a connection to the API + * + * The input queue and output queue contain pointers to CCNxMessages. On close, + * we need to drain these queues and release all the messages. + * + * The API Connector is responsible for only draining its input queue. The output + * queue up to the API is drained by the RTA Framework. + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void rtaApiConnection_DrainApiConnection(RtaApiConnection *apiConnection); + +/** + * Writes a message to the API + * + * Takes ownership of the message which is passed up to the API + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void rtaApiConnection_WriteMessageToApi(RtaApiConnection *apiConnection, CCNxMetaMessage *msg); + +// ========================================================================================== +// Public API + +static void +rtaApiConnection_SetupSocket(RtaApiConnection *apiConnection, RtaConnection *connection) +{ + RtaProtocolStack *stack = rtaConnection_GetStack(connection); + PARCEventScheduler *base = rtaFramework_GetEventScheduler(rtaProtocolStack_GetFramework(stack)); + int error; + + // Set non-blocking flag + int flags = fcntl(apiConnection->transport_fd, F_GETFL, NULL); + assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno); + int failure = fcntl(apiConnection->transport_fd, F_SETFL, flags | O_NONBLOCK); + assertFalse(failure, "fcntl failed to set socket non-blocking(%d) %s\n", errno, strerror(errno)); + + apiConnection->bev_api = parcEventQueue_Create(base, apiConnection->transport_fd, 0); + assertNotNull(apiConnection->bev_api, "Got null result from parcEventQueue_Create"); + + // Set buffer size + int sendbuff = 1000 * 8; + + error = setsockopt(rtaConnection_GetTransportFd(connection), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); + assertTrue(error == 0, "Got error setting SO_SNDBUF: %s", strerror(errno)); + + parcEventQueue_SetWatermark(apiConnection->bev_api, PARCEventType_Write, MAX_API_QUEUE_BYTES, 0); + parcEventQueue_SetCallbacks(apiConnection->bev_api, + rtaApiConnection_Downcall_Read, + rtaApiConnection_WriteCallback, + rtaApiConnection_Downcall_Event, + (void *) connection); + + parcEventQueue_Enable(apiConnection->bev_api, PARCEventType_Read | PARCEventType_Write); +} + +RtaApiConnection * +rtaApiConnection_Create(RtaConnection *connection) +{ + RtaApiConnection *apiConnection = parcMemory_AllocateAndClear(sizeof(RtaApiConnection)); + assertNotNull(apiConnection, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(RtaApiConnection)); + + apiConnection->connection = rtaConnection_Copy(connection); + apiConnection->api_fd = rtaConnection_GetApiFd(connection); + apiConnection->transport_fd = rtaConnection_GetTransportFd(connection); + rtaApiConnection_SetupSocket(apiConnection, connection); + + return apiConnection; +} + +void +rtaApiConnection_Destroy(RtaApiConnection **apiConnectionPtr) +{ + assertNotNull(apiConnectionPtr, "Parameter apiConnecitonPtr must be non-null"); + assertNotNull(*apiConnectionPtr, "Parameter apiConnecitonPtr must dereference to non-null"); + RtaApiConnection *apiConnection = *apiConnectionPtr; + + + // Send all the outbound messages up to the API. This at least gets them out + // of our output queue on to the API's socket. + parcEventQueue_Finished(apiConnection->bev_api, PARCEventType_Write); + rtaApiConnection_DrainApiConnection(apiConnection); + + parcEventQueue_Destroy(&(apiConnection->bev_api)); + + rtaConnection_Destroy(&apiConnection->connection); + + parcMemory_Deallocate((void **) &apiConnection); + + *apiConnectionPtr = NULL; +} + +static void +rtaApiConnection_SendToApiAsDictionary(RtaApiConnection *apiConnection, TransportMessage *tm) +{ + CCNxMetaMessage *msg = ccnxMetaMessage_Acquire(transportMessage_GetDictionary(tm)); + rtaApiConnection_WriteMessageToApi(apiConnection, msg); +} + +static CCNxName * +rtaApiConnection_GetNameFromTransportMessage(TransportMessage *tm) +{ + CCNxName *name = NULL; + CCNxTlvDictionary *dictionary = transportMessage_GetDictionary(tm); + switch (ccnxTlvDictionary_GetSchemaVersion(dictionary)) { + case CCNxTlvDictionary_SchemaVersion_V1: + name = ccnxTlvDictionary_GetName(dictionary, CCNxCodecSchemaV1TlvDictionary_MessageFastArray_NAME); + break; + + default: + break; + } + return name; +} + +/** + * Writes the CCNxMessage inside the transport message up to the API. + * Its possible that if there's no space in the socket the write will block + * and return an error. + * + * @return true if written to API, false if not (most likely would block) + */ +bool +rtaApiConnection_SendToApi(RtaApiConnection *apiConnection, TransportMessage *tm, RtaComponentStats *stats) +{ + assertNotNull(apiConnection, "Parameter apiConnection must be non-null"); + + if (DEBUG_OUTPUT) { + CCNxName *name = rtaApiConnection_GetNameFromTransportMessage(tm); + char *nameString = NULL; + if (name) { + nameString = ccnxName_ToString(name); + } + + struct timeval delay = transportMessage_GetDelay(tm); + printf("%9" PRIu64 " %s putting transport msg %p to user fd %d delay %.6f name %s\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))), + __func__, + (void *) tm, + apiConnection->api_fd, + delay.tv_sec + delay.tv_usec * 1E-6, + nameString); + + if (nameString) { + parcMemory_Deallocate((void **) &nameString); + } + } + + rtaApiConnection_SendToApiAsDictionary(apiConnection, tm); + + rtaComponentStats_Increment(stats, STATS_UPCALL_OUT); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s conn %p fd_out %d state %p upcalls %u reads %u\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))), + __func__, + (void *) apiConnection->connection, + apiConnection->transport_fd, + (void *) apiConnection, + api_upcall_writes, + rta_transport_reads); + } + + return true; +} + +void +rtaApiConnection_BlockDown(RtaApiConnection *apiConnection) +{ + assertNotNull(apiConnection, "Parameter apiConnection must be non-null"); + PARCEventType enabled_events = parcEventQueue_GetEnabled(apiConnection->bev_api); + + // we only disable it and log it if it was active + if (enabled_events & PARCEventType_Read) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u blocked down, disable PARCEventType_Read\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))), + __func__, + rtaConnection_GetConnectionId(apiConnection->connection)); + } + + parcEventQueue_Disable(apiConnection->bev_api, PARCEventType_Read); + } +} + +void +rtaApiConnection_UnblockDown(RtaApiConnection *apiConnection) +{ + assertNotNull(apiConnection, "Parameter apiConnection must be non-null"); + PARCEventType enabled_events = parcEventQueue_GetEnabled(apiConnection->bev_api); + + if (!(enabled_events & PARCEventType_Read)) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u unblocked down, enable PARCEventType_Read\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))), + __func__, + rtaConnection_GetConnectionId(apiConnection->connection)); + } + parcEventQueue_Enable(apiConnection->bev_api, PARCEventType_Read); + } +} + +// ========================================================================================== +// Internal implementation + +static void +rtaApiConnection_WriteMessageToApi(RtaApiConnection *apiConnection, CCNxMetaMessage *msg) +{ + assertNotNull(msg, "Parameter msg must be non-null"); + + int error = parcEventQueue_Write(apiConnection->bev_api, &msg, sizeof(&msg)); + assertTrue(error == 0, + "write to transport_fd %d write error: (%d) %s", + apiConnection->transport_fd, errno, strerror(errno)); + + // debugging tracking + api_upcall_writes++; +} + +static void +_rtaAPIConnection_ProcessCPIRequest(RtaConnection *conn, PARCJSON *json) +{ + // Is it a request type we know about? + + switch (cpi_getCPIOperation2(json)) { + case CPI_PAUSE: { + RtaConnectionStateType oldstate = rtaConnection_GetState(conn); + if (oldstate == CONN_OPEN) { + rtaConnection_SetState(conn, CONN_PAUSED); + } + break; + } + + default: + // do nothing, don't know about this message type + break; + } +} + +static void +connector_Api_ProcessCpiMessage(RtaConnection *conn, CCNxTlvDictionary *controlDictionary) +{ + if (ccnxControlFacade_IsCPI(controlDictionary)) { + PARCJSON *json = ccnxControlFacade_GetJson(controlDictionary); + switch (controlPlaneInterface_GetCPIMessageType(json)) { + case CPI_REQUEST: { + _rtaAPIConnection_ProcessCPIRequest(conn, json); + break; + } + + case CPI_RESPONSE: + break; + + case CPI_ACK: + break; + + default: + assertTrue(0, "Got unknown CPI message type: %d", controlPlaneInterface_GetCPIMessageType(json)); + } + } +} + +static void +rtaApiConnection_ProcessControlFromApi(RtaApiConnection *apiConnection, RtaProtocolStack *stack, CCNxTlvDictionary *controlDictionary) +{ + if (ccnxControlFacade_IsCPI(controlDictionary)) { + connector_Api_ProcessCpiMessage(apiConnection->connection, controlDictionary); + } +} + +static void +rtaApiConnection_Downcall_ProcessDictionary(RtaApiConnection *apiConnection, RtaProtocolStack *stack, + PARCEventQueue *queue_out, RtaComponentStats *stats, CCNxTlvDictionary *messageDictionary) +{ + // Look at the control message before checking for the connection closed + if (ccnxTlvDictionary_IsControl(messageDictionary)) { + rtaApiConnection_ProcessControlFromApi(apiConnection, stack, messageDictionary); + } + + // In paused or closed state, we only pass control messages + if ((rtaConnection_GetState(apiConnection->connection) == CONN_OPEN) || (ccnxTlvDictionary_IsControl(messageDictionary))) { + TransportMessage *tm = transportMessage_CreateFromDictionary(messageDictionary); + + // Set the auxiliary information to the message's connection + transportMessage_SetInfo(tm, rtaConnection_Copy(apiConnection->connection), rtaConnection_FreeFunc); + + if (DEBUG_OUTPUT) { + CCNxName *name = NULL; + if (ccnxTlvDictionary_IsInterest(messageDictionary)) { + name = ccnxInterest_GetName(messageDictionary); + } else if (ccnxTlvDictionary_IsContentObject(messageDictionary)) { + name = ccnxContentObject_GetName(messageDictionary); + } + + char *noname = "NONAME"; + char *nameString = noname; + if (name) { + nameString = ccnxName_ToString(name); + } + + printf("%9" PRIu64 " %s putting transport msg %p from user fd %d: %s\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))), + __func__, + (void *) tm, apiConnection->api_fd, + nameString); + + if (nameString != noname) { + parcMemory_Deallocate((void **) &nameString); + } + + //ccnxTlvDictionary_Display(0, messageDictionary); + } + + // send down the stack. If it fails, it destroys the message. + if (rtaComponent_PutMessage(queue_out, tm)) { + rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT); + } + } +} + +static void +rtaApiConnection_Downcall_ProcessMessage(RtaApiConnection *apiConnection, RtaProtocolStack *stack, PARCEventBuffer *eb_in, + PARCEventQueue *queue_out, RtaComponentStats *stats) +{ + api_downcall_reads++; + CCNxMetaMessage *msg; + + int bytesRemoved = parcEventBuffer_Read(eb_in, &msg, sizeof(CCNxMetaMessage *)); + assertTrue(bytesRemoved == sizeof(CCNxMetaMessage *), + "Error, did not remove an entire pointer, expected %zu got %d", + sizeof(CCNxMetaMessage *), + bytesRemoved); + + rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN); + + // This will save its own reference to the messageDictionary + rtaApiConnection_Downcall_ProcessDictionary(apiConnection, stack, queue_out, stats, msg); + + // At this point, the CCNxMetaMessage passed in by the application thread has been + // acquired rtaApiConnection_Downcall_ProcessDictionary(), so we can Release the reference we + // acquired in rtaTransport_Send(). + ccnxMetaMessage_Release(&msg); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s conn %p total downcall reads in %" PRIu64 " out %" PRIu64 "\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))), + __func__, + (void *) apiConnection->connection, + rtaComponentStats_Get(stats, STATS_DOWNCALL_IN), + rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT)); + } +} + + +/* + * Called by PARCEvent when there's a message to read from the API + * Read a message from the API. + * rtaConnectionVoid is the RtaConnection associated with the api descriptor + */ +static void +rtaApiConnection_Downcall_Read(PARCEventQueue *bev, PARCEventType type, void *rtaConnectionVoid) +{ + RtaConnection *conn = (RtaConnection *) rtaConnectionVoid; + + assertNotNull(rtaConnectionVoid, "Parameter must be a non-null void *"); + + RtaProtocolStack *stack = rtaConnection_GetStack(conn); + assertNotNull(stack, "rtaConnection_GetStack returned null"); + + RtaComponentStats *stats = rtaConnection_GetStats(conn, API_CONNECTOR); + assertNotNull(stats, "rtaConnection_GetStats returned null"); + + RtaApiConnection *apiConnection = rtaConnection_GetPrivateData(conn, API_CONNECTOR); + assertNotNull(apiConnection, "rtaConnection_GetPrivateData got null"); + + PARCEventBuffer *eb_in = parcEventBuffer_GetQueueBufferInput(bev); + + PARCEventQueue *queue_out = rtaComponent_GetOutputQueue(conn, API_CONNECTOR, RTA_DOWN); + assertNotNull(queue_out, "component_GetOutputQueue returned null"); + + while (parcEventBuffer_GetLength(eb_in) >= sizeof(TransportMessage *)) { + rtaApiConnection_Downcall_ProcessMessage(apiConnection, stack, eb_in, queue_out, stats); + } + parcEventBuffer_Destroy(&eb_in); +} + +/* + * This is used on the connection to the API out of the transport box + */ +static void +rtaApiConnection_Downcall_Event(PARCEventQueue *bev, PARCEventQueueEventType events, void *ptr) +{ +} + +/** + * Drains all the CCNxMessages off an event buffer and destroys them + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void +drainBuffer(PARCEventBuffer *buffer, RtaConnection *conn) +{ + size_t length; + + while ((length = parcEventBuffer_GetLength(buffer)) > 0) { + CCNxMetaMessage *msg; + ssize_t len; + + len = parcEventBuffer_Read(buffer, &msg, sizeof(CCNxMetaMessage *)); + assertTrue(len == sizeof(CCNxMetaMessage *), + "Removed incorrect length, expected %zu got %zd: (%d) %s", + sizeof(CCNxMetaMessage *), + len, + errno, + strerror(errno)); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s conn %p drained message %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn, + (void *) msg); + } + ccnxMetaMessage_Release(&msg); + } +} + +/** + * Called on Destroy to clear our input buffer. This does not + * drain the output (to API) buffer, that is done by the RTA Framework + */ +static void +rtaApiConnection_DrainApiConnection(RtaApiConnection *apiConnection) +{ + // drain and free the transport_fd + parcEventQueue_Disable(apiConnection->bev_api, PARCEventType_Read); + + PARCEventBuffer *in = parcEventBuffer_GetQueueBufferInput(apiConnection->bev_api); + drainBuffer(in, apiConnection->connection); + parcEventBuffer_Destroy(&in); + + // There may be some messages in the output buffer that + // have not actually been written to the kernel socket. + // Drain those too, as the API will never see them + + if (DEBUG_OUTPUT) { + PARCEventBuffer *out = parcEventBuffer_GetQueueBufferOutput(apiConnection->bev_api); + printf("%9" PRIu64 " %s conn %p output buffer has %zu bytes\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))), + __func__, + (void *) apiConnection->connection, + parcEventBuffer_GetLength(out)); + parcEventBuffer_Destroy(&out); + } +} + +/** + * Called by PARCEvent when we cross below the write watermark + */ +static void +rtaApiConnection_WriteCallback(PARCEventQueue *queue, PARCEventType type, void *connVoid) +{ + // we dropped below the write watermark, unblock the connection in the UP direction + RtaConnection *conn = (RtaConnection *) connVoid; + if (rtaConnection_BlockedUp(conn)) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u output fell below watermark, unblocking UP\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaConnection_GetConnectionId(conn)); + } + + rtaConnection_ClearBlockedUp(conn); + } +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.h b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.h new file mode 100644 index 00000000..799c45e6 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.h @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rta_ApiConnection.h + * @brief Implementation of the API connection + * + * <#Detailed Description#> + * + */ + +#ifndef TransportRTA_rta_ApiConnection_h +#define TransportRTA_rta_ApiConnection_h + +struct rta_api_connection; +typedef struct rta_api_connection RtaApiConnection; + +#include <ccnx/transport/transport_rta/core/rta_Connection.h> + +RtaApiConnection *rtaApiConnection_Create(RtaConnection *connection); +void rtaApiConnection_Destroy(RtaApiConnection **rtaApiConnectionPtr); + +/** + * Sends a TransportMessage up to the API + * + * Decapsulates the ccnx message and sends it up to the API. It will destroy the TransportMessage wrapper. + * + * @param [in] apiConnection The API connection to write to + * @param [in] tm The transport message to send + * @param [in] stats The statistics counter to increment on success + * + * @return true Transport message written + * @return false Transport message not written (but still destroyed) + * + * Example: + * @code + * <#example#> + * @endcode + */ +bool rtaApiConnection_SendToApi(RtaApiConnection *apiConnection, TransportMessage *tm, RtaComponentStats *stats); + +/** + * Block data flow in the DOWN direction + * + * To block in the DOWN direction, we disable READ events on the API's buffer + * + * @param [in] apiConnection The API Connector's connection state + * @param [in] conn The RTA Connection + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaApiConnection_BlockDown(RtaApiConnection *apiConnection); + +/** + * Unblock data flow in the DOWN direction + * + * To unblock in the DOWN direction, we enable READ events on the API's buffer + * + * @param [in] apiConnection The API Connector's connection state + * @param [in] conn The RTA Connection + * + * Example: + * @code + * <#example#> + * @endcode + */ +void rtaApiConnection_UnblockDown(RtaApiConnection *apiConnection); +#endif diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/CMakeLists.txt b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/CMakeLists.txt new file mode 100644 index 00000000..85e4812f --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/CMakeLists.txt @@ -0,0 +1,16 @@ +# Enable gcov output for the tests +add_definitions(--coverage) +set(CMAKE_EXE_LINKER_FLAGS ${CMAKE_EXE_LINKER_FLAGS} " --coverage") + +set(TestsExpectedToPass + test_connector_Api + test_rta_ApiConnection + test_connector_Forwarder_Local + test_connector_Forwarder_Metis +) + + +foreach(test ${TestsExpectedToPass}) + AddTest(${test}) +endforeach() + diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Api.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Api.c new file mode 100644 index 00000000..409e075d --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Api.c @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// Include the file(s) containing the functions to be tested. +// This permits internal static functions to be visible to this Test Framework. +#include "../connector_Api.c" + +#include <LongBow/unit-test.h> + +LONGBOW_TEST_RUNNER(connector_Api) +{ + LONGBOW_RUN_TEST_FIXTURE(Global); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(connector_Api) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(connector_Api) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Global) +{ +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(connector_Api); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Local.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Local.c new file mode 100644 index 00000000..014f4bbe --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Local.c @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#define DEBUG_OUTPUT 1 +#include "../connector_Forwarder_Local.c" +#include <LongBow/unit-test.h> + +#include <parc/algol/parc_SafeMemory.h> +#include <parc/security/parc_Security.h> +#include <parc/security/parc_Pkcs12KeyStore.h> + +#include <ccnx/api/control/cpi_ControlMessage.h> +#include <ccnx/api/control/controlPlaneInterface.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework_Commands.c> +#include <ccnx/transport/transport_rta/core/rta_Framework_private.h> +#include <ccnx/transport/transport_rta/config/config_All.h> +#include <ccnx/transport/test_tools/bent_pipe.h> + +typedef struct test_data { + PARCRingBuffer1x1 *commandRingBuffer; + PARCNotifier *commandNotifier; + RtaFramework *framework; + + int api_fds[2]; + int listen_fd; + int rnd_fd; + + int stackId; + RtaConnection *connectionUnderTest; + + char bentpipe_LocalName[1024]; + BentPipeState *bentpipe; + char keystoreName[1024]; + char keystorePassword[1024]; +} TestData; + +static CCNxTransportConfig * +_createParams(const char *local_name, const char *keystore_name, const char *keystore_passwd) +{ + assertNotNull(local_name, "Got null keystore name\n"); + assertNotNull(keystore_name, "Got null keystore name\n"); + assertNotNull(keystore_passwd, "Got null keystore passwd\n"); + + CCNxStackConfig *stackConfig = apiConnector_ProtocolStackConfig( + testingUpper_ProtocolStackConfig( + localForwarder_ProtocolStackConfig( + protocolStack_ComponentsConfigArgs(ccnxStackConfig_Create(), + apiConnector_GetName(), + testingUpper_GetName(), + localForwarder_GetName(), NULL)))); + + CCNxConnectionConfig *connConfig = apiConnector_ConnectionConfig( + testingUpper_ConnectionConfig( + tlvCodec_ConnectionConfig( + localForwarder_ConnectionConfig( + ccnxConnectionConfig_Create(), local_name)))); + + publicKeySigner_ConnectionConfig(connConfig, keystore_name, keystore_passwd); + + CCNxTransportConfig *result = ccnxTransportConfig_Create(stackConfig, connConfig); + ccnxStackConfig_Release(&stackConfig); + return result; +} + +static TestData * +_commonSetup(void) +{ + parcSecurity_Init(); + TestData *data = parcMemory_AllocateAndClear(sizeof(TestData)); + assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData)); + + sprintf(data->bentpipe_LocalName, "/tmp/bentpipe_%d.sock", getpid()); + data->bentpipe = bentpipe_Create(data->bentpipe_LocalName); + bentpipe_Start(data->bentpipe); + + sprintf(data->keystoreName, "/tmp/keystore_%d.p12", getpid()); + sprintf(data->keystorePassword, "23439429"); + + unlink(data->keystoreName); + + bool success = parcPkcs12KeyStore_CreateFile(data->keystoreName, data->keystorePassword, "user", 1024, 30); + assertTrue(success, "parcPkcs12KeyStore_CreateFile() failed."); + + data->commandRingBuffer = parcRingBuffer1x1_Create(128, NULL); + data->commandNotifier = parcNotifier_Create(); + data->framework = rtaFramework_Create(data->commandRingBuffer, data->commandNotifier); + + // Create a protocol stack and a connection to use + CCNxTransportConfig *params = _createParams(data->bentpipe_LocalName, data->keystoreName, data->keystorePassword); + + data->stackId = 1; + + RtaCommandCreateProtocolStack *createStack = + rtaCommandCreateProtocolStack_Create(data->stackId, ccnxTransportConfig_GetStackConfig(params)); + + _rtaFramework_ExecuteCreateStack(data->framework, createStack); + rtaCommandCreateProtocolStack_Release(&createStack); + + socketpair(PF_LOCAL, SOCK_STREAM, 0, data->api_fds); + RtaCommandOpenConnection *openConnection = rtaCommandOpenConnection_Create(data->stackId, data->api_fds[0], data->api_fds[1], + ccnxConnectionConfig_GetJson(ccnxTransportConfig_GetConnectionConfig(params))); + _rtaFramework_ExecuteOpenConnection(data->framework, openConnection); + rtaCommandOpenConnection_Release(&openConnection); + + ccnxTransportConfig_Destroy(¶ms); + + // now poke in to the connection table to get the connection to test + data->connectionUnderTest = rtaConnectionTable_GetByApiFd(data->framework->connectionTable, data->api_fds[0]); + + return data; +} + +static void +_commonTeardown(TestData *data) +{ + rtaFramework_Teardown(data->framework); + + parcRingBuffer1x1_Release(&data->commandRingBuffer); + parcNotifier_Release(&data->commandNotifier); + rtaFramework_Destroy(&data->framework); + + bentpipe_Stop(data->bentpipe); + bentpipe_Destroy(&data->bentpipe); + unlink(data->keystoreName); + parcMemory_Deallocate((void **) &data); + parcSecurity_Fini(); +} + +// ============================================================= + +LONGBOW_TEST_RUNNER(connector_Forwarder_Local) +{ + LONGBOW_RUN_TEST_FIXTURE(Local); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(connector_Forwarder_Local) +{ + parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory); + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(connector_Forwarder_Local) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + + +// ====================================================== +LONGBOW_TEST_FIXTURE(Local) +{ + LONGBOW_RUN_TEST_CASE(Local, connector_Fwd_Local_Init_Release); + LONGBOW_RUN_TEST_CASE(Local, connector_Fwd_Local_Cpi_Pause); +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + longBowTestCase_SetClipBoardData(testCase, _commonSetup()); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + _commonTeardown(longBowTestCase_GetClipBoardData(testCase)); + + if (parcSafeMemory_ReportAllocation(STDOUT_FILENO) != 0) { + printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding()); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +// ==================================================================== + +LONGBOW_TEST_CASE(Local, connector_Fwd_Local_Init_Release) +{ + // nothing to do, just checking that memory is in balance in teardown +} + +/** + * Send a PAUSE CPI message to the forwarder. It should reflect + * back a CPI ACK + */ +LONGBOW_TEST_CASE(Local, connector_Fwd_Local_Cpi_Pause) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + PARCJSON *controlPause = cpi_CreatePauseInputRequest(); + + CCNxTlvDictionary *controlDictionary = ccnxControlFacade_CreateCPI(controlPause); + TransportMessage *tm_in = transportMessage_CreateFromDictionary(controlDictionary); + + uint64_t pause_seqnum = controlPlaneInterface_GetSequenceNumber(controlPause); + parcJSON_Release(&controlPause); + ccnxTlvDictionary_Release(&controlDictionary); + + transportMessage_SetInfo(tm_in, rtaConnection_Copy(data->connectionUnderTest), (void (*)(void **))rtaConnection_Destroy); + + PARCEventQueue *in = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(data->connectionUnderTest), TESTING_UPPER, RTA_DOWN); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(data->connectionUnderTest), TESTING_UPPER, RTA_DOWN); + + rtaComponent_PutMessage(in, tm_in); + + // this will crank it though the forwarder and reflect back up to us + rtaFramework_NonThreadedStepCount(data->framework, 4); + + // The first message out should be a CONNECTION_OPEN + TransportMessage *tm_out = rtaComponent_GetMessage(out); + transportMessage_Destroy(&tm_out); + + tm_out = rtaComponent_GetMessage(out); + + assertTrue(transportMessage_IsControl(tm_out), "got wrong type, not a control message"); + + CCNxControl *control = ccnxMetaMessage_GetControl(transportMessage_GetDictionary(tm_out)); + + assertTrue(ccnxControl_IsACK(control), "Expected ccnxControl_IsACK to be true."); + + uint64_t _ack_original_seqnum = ccnxControl_GetAckOriginalSequenceNumber(control); + + assertTrue(_ack_original_seqnum == pause_seqnum, + "Got wrong original message seqnum, expected %" PRIu64 " got %" PRIu64, pause_seqnum, _ack_original_seqnum); + + transportMessage_Destroy(&tm_out); +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(connector_Forwarder_Local); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Metis.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Metis.c new file mode 100644 index 00000000..6fe9e3d2 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Metis.c @@ -0,0 +1,1350 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + * This test will setup a server socket so the Metis connector can connect to it. We can + * then see the packets the connector thinks it is sending to Metis. + */ + +#define DEBUG 1 +#include "../connector_Forwarder_Metis.c" + +#include <parc/algol/parc_SafeMemory.h> +#include <parc/security/parc_Pkcs12KeyStore.h> +#include <parc/security/parc_Security.h> + +#include <ccnx/api/control/cpi_ControlFacade.h> +#include <ccnx/api/control/controlPlaneInterface.h> +#include <ccnx/api/control/cpi_Forwarding.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework_Commands.c> +#include <ccnx/transport/transport_rta/core/rta_Framework_private.h> +#include <ccnx/transport/transport_rta/config/config_All.h> + +#include <ccnx/common/codec/ccnxCodec_TlvPacket.h> +#include <ccnx/transport/test_tools/traffic_tools.h> + +#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_FixedHeader.h> +#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_PacketEncoder.h> +#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_Types.h> +#include <ccnx/common/codec/schema_v1/testdata/v1_interest_nameA.h> +#include <ccnx/common/codec/schema_v1/testdata/v1_content_nameA_crc32c.h> +#include <ccnx/common/codec/schema_v1/testdata/v1_cpi_add_route_crc32c.h> + +#include <ccnx/common/ccnx_WireFormatMessage.h> + +// inet_pton +#include <arpa/inet.h> + +#include <LongBow/unit-test.h> + +static char keystorename[1024]; +static const char keystorepass[] = "2398472983479234"; + +#ifndef INPORT_ANY +#define INPORT_ANY 0 +#endif + +typedef struct test_data { + PARCRingBuffer1x1 *commandRingBuffer; + PARCNotifier *commandNotifier; + + // we will bind to a random port, this is what we end up binding to + // Its in host byte order + uint16_t metis_port; + + // server_socket is a socket we listen to like the Metis forwarder, so + // we can see all the traffic that comes out the bottom of the connector. + int server_socket; + + // when we accept a client on the server socket, this is his socket + int client_socket; + + RtaFramework *framework; + CCNxTransportConfig *params; + + char keystoreName[1024]; + char keystorePassword[1024]; +} TestData; + +/* + * @function setup_server + * @abstract Bind to 127.0.0.1 on a random port, returns the socket and port + * @discussion + * <#Discussion#> + * + * @param portOutput is the port bound to in host byte order + * @return <#return#> + */ +static int +_setup_server(uint16_t *portOutput) +{ + struct sockaddr_in address; + + /* listen on 127.0.0.1 random port */ + address.sin_family = PF_INET; + address.sin_port = INPORT_ANY; + inet_pton(AF_INET, "127.0.0.1", &(address.sin_addr)); + + int fd = socket(PF_INET, SOCK_STREAM, 0); + assertFalse(fd < 0, "error on bind: (%d) %s", errno, strerror(errno)); + + // Set non-blocking flag + int flags = fcntl(fd, F_GETFL, NULL); + assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno); + int failure = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + assertFalse(failure, "fcntl failed to set file descriptor flags (%d)\n", errno); + + failure = bind(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_in)); + assertFalse(failure, "error on bind: (%d) %s", errno, strerror(errno)); + + failure = listen(fd, 16); + assertFalse(failure, "error on listen: (%d) %s", errno, strerror(errno)); + + socklen_t x = sizeof(address); + failure = getsockname(fd, (struct sockaddr *) &address, &x); + assertFalse(failure, "error on getsockname: (%d) %s", errno, strerror(errno)); + + *portOutput = htons(address.sin_port); + + printf("test server setup on port %d\n", *portOutput); + return fd; +} + +static int +_accept_client(int server_socket) +{ + socklen_t addrlen; + struct sockaddr_in address; + int client_socket; + + addrlen = sizeof(struct sockaddr_in); + client_socket = accept(server_socket, (struct sockaddr *) &address, &addrlen); + assertFalse(client_socket < 0, "accept error: %s", strerror(errno)); + + printf("%s accepted client on socket %d\n", __func__, client_socket); + return client_socket; +} + +static RtaConnection * +_openConnection(TestData *data, int stack_id, int fds[2]) +{ + RtaCommandOpenConnection *openConnection = rtaCommandOpenConnection_Create(stack_id, fds[0], fds[1], + ccnxConnectionConfig_GetJson(ccnxTransportConfig_GetConnectionConfig(data->params))); + _rtaFramework_ExecuteOpenConnection(data->framework, openConnection); + rtaCommandOpenConnection_Release(&openConnection); + + return rtaConnectionTable_GetByApiFd(data->framework->connectionTable, fds[0]); +} + +static void +_createStack(TestData *data, int stack_id) +{ + RtaCommandCreateProtocolStack *createStack = + rtaCommandCreateProtocolStack_Create(stack_id, ccnxTransportConfig_GetStackConfig(data->params)); + _rtaFramework_ExecuteCreateStack(data->framework, createStack); + rtaCommandCreateProtocolStack_Release(&createStack); +} + +static CCNxTransportConfig * +_createParams(int port, const char *keystore_name, const char *keystore_passwd) +{ + CCNxStackConfig *stackConfig; + CCNxConnectionConfig *connConfig; + + assertNotNull(keystore_name, "Got null keystore name\n"); + assertNotNull(keystore_passwd, "Got null keystore passwd\n"); + + stackConfig = apiConnector_ProtocolStackConfig( + testingUpper_ProtocolStackConfig( + metisForwarder_ProtocolStackConfig( + protocolStack_ComponentsConfigArgs(ccnxStackConfig_Create(), + apiConnector_GetName(), + testingUpper_GetName(), + metisForwarder_GetName(), NULL)))); + + connConfig = apiConnector_ConnectionConfig( + testingUpper_ConnectionConfig( + metisForwarder_ConnectionConfig( + tlvCodec_ConnectionConfig( + ccnxConnectionConfig_Create()), port))); + + publicKeySigner_ConnectionConfig(connConfig, keystore_name, keystore_passwd); + + CCNxTransportConfig *result = ccnxTransportConfig_Create(stackConfig, connConfig); + ccnxStackConfig_Release(&stackConfig); + return result; +} + +static TestData * +_commonSetup(void) +{ + parcSecurity_Init(); + + TestData *data = parcMemory_AllocateAndClear(sizeof(TestData)); + assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData)); + memset(data, 0, sizeof(TestData)); + + data->server_socket = _setup_server(&data->metis_port); + + // printf("%s listening on port %u\n", __func__, data->metis_port); + + sprintf(data->keystoreName, "%s", keystorename); + sprintf(data->keystorePassword, keystorepass); + + data->commandRingBuffer = parcRingBuffer1x1_Create(128, NULL); + data->commandNotifier = parcNotifier_Create(); + data->framework = rtaFramework_Create(data->commandRingBuffer, data->commandNotifier); + + data->params = _createParams(data->metis_port, data->keystoreName, keystorepass); + // we will always create stack #1 as the default stack + _createStack(data, 1); + return data; +} + +static void +_commonTeardown(TestData *data) +{ + if (data != NULL) { + if (data->server_socket > 0) { + close(data->server_socket); + } + + if (data->client_socket > 0) { + close(data->client_socket); + } + + ccnxTransportConfig_Destroy(&data->params); + rtaFramework_Teardown(data->framework); + + parcRingBuffer1x1_Release(&data->commandRingBuffer); + parcNotifier_Release(&data->commandNotifier); + rtaFramework_Destroy(&data->framework); + parcMemory_Deallocate((void **) &data); + } + parcSecurity_Fini(); +} + +// ====================================================== +// helper functions + +/** + * Wait for a READ event on the specifid socket. Has a 1 second timeout. + * + * @return true if READ event + * @return false otherwise + */ +static bool +_waitForSelect(int fd) +{ + fd_set readset; + FD_ZERO(&readset); + FD_SET(fd, &readset); + int result = select(fd + 1, &readset, NULL, NULL, &(struct timeval) { 1, 0 }); + assertFalse(result < 0, "Error on select: (%d) %s", errno, strerror(errno)); + assertFalse(result == 0, "Timeout waiting for connection attempt"); + assertTrue(FD_ISSET(fd, &readset), "server_socket was not set by select"); + + return true; +} + + + +static size_t +_sendPacketToConnectorV1(int fd, size_t payloadLength) +{ + // Setup the header + uint8_t headerLength = 13; + uint16_t packetLength = payloadLength + headerLength; + uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest; + + CCNxCodecSchemaV1FixedHeader hdr = { .version = 1, .packetType = packetType, .packetLength = htons(packetLength), .headerLength = headerLength }; + + // put header in packet and write the packet + uint8_t packet[1024]; + memcpy(packet, &hdr, sizeof(hdr)); + + // write out exactly the number of bytes we need + size_t writeSize = packetLength; + + ssize_t nwritten = write(fd, packet, writeSize); + assertTrue(nwritten == writeSize, "Wrong write size, expected %zu got %zd", writeSize, nwritten); + return writeSize; +} + +static RtaConnection * +setupConnectionAndClientSocket(TestData *data, int *apiSocketOuptut, int *clientSocketOutput) +{ + // Open a listener and accept the forwarders connection + int fds[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, fds); + RtaConnection *conn = _openConnection(data, 1, fds); + assertNotNull(conn, "Got null connection opening on stack 1"); + + rtaFramework_NonThreadedStepCount(data->framework, 2); + + // we should now see a connection request + _waitForSelect(data->server_socket); + + // accept the client and set a 1 second read timeout on the socket + int client_fd = _accept_client(data->server_socket); + struct timeval readTimeout = { 1, 0 }; + setsockopt(client_fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &readTimeout, sizeof(readTimeout)); + + *apiSocketOuptut = fds[0]; + *clientSocketOutput = client_fd; + return conn; +} + +// throw away the first control message +static void +_throwAwayControlMessage(PARCEventQueue *out) +{ + TransportMessage *control_tm = rtaComponent_GetMessage(out); + assertNotNull(control_tm, "Did not receive a transport message out of the top of the connector"); + assertTrue(transportMessage_IsControl(control_tm), + "transport message is not a control message") + { + ccnxTlvDictionary_Display(transportMessage_GetDictionary(control_tm), 0); + } + transportMessage_Destroy(&control_tm); +} + +static void +_testReadPacketV1(size_t extraBytes) +{ + const int REMOTE = 0; + const int STACK = 1; + int fds[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, fds); + + PARCEventScheduler *scheduler = parcEventScheduler_Create(); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + + // this replaces "_openSocket" + fwd_state->fd = fds[STACK]; + + _setupSocket(fwd_state); + + // Setup the header + uint16_t packetLength = 24; + uint8_t headerLength = 13; + uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest; + + CCNxCodecSchemaV1FixedHeader hdr = { .version = 1, .packetType = packetType, .packetLength = htons(packetLength), .headerLength = headerLength }; + + // put header in packet and write the packet + uint8_t packet[1024]; + memcpy(packet, &hdr, sizeof(hdr)); + + // write out exactly the number of bytes we need + size_t firstWrite = packetLength; + + ssize_t nwritten = write(fds[REMOTE], packet, firstWrite + extraBytes); + assertTrue(nwritten == firstWrite + extraBytes, "Wrong write size, expected %zu got %zd", + firstWrite + extraBytes, nwritten); + + ReadReturnCode readCode = _readPacket(fwd_state); + + assertTrue(readCode == ReadReturnCode_Finished, "readCode should be %d got %d", ReadReturnCode_Finished, readCode); + + // should indicate there's nothing left to read of the header + assertTrue(fwd_state->nextMessage.remainingReadLength == 0, "Remaining length should be 0 got %zu", fwd_state->nextMessage.remainingReadLength); + + // we should be at position "firstWrite" in the packet buffer + assertNotNull(fwd_state->nextMessage.packet, "Packet buffer is null"); + assertTrue(parcBuffer_Position(fwd_state->nextMessage.packet) == firstWrite, + "Wrong position, expected %zu got %zu", firstWrite, parcBuffer_Position(fwd_state->nextMessage.packet)); + + // cleanup + _fwdMetisState_Release(&fwd_state); + parcEventScheduler_Destroy(&scheduler); + close(fds[REMOTE]); +} + + + +static void +_testReadFromMetisFromArray(TestData *data, size_t length, uint8_t buffer[length]) +{ + // create our connection. This will become part of the RTA framework, so will be + // cleaned up in the teardown + int api_fd; + int client_fd; + RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd); + + ssize_t nwritten = write(client_fd, buffer, length); + assertTrue(nwritten == length, "Wrong write size, expected %zu got %zd", length, nwritten); + + FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);; + + _readFromMetis(fwd_state, conn); + + // now crank the handle to pop those messages up the stack + rtaFramework_NonThreadedStepCount(data->framework, 5); + + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(conn), TESTING_UPPER, RTA_DOWN); + _throwAwayControlMessage(out); + + // verify the wire format is what we wrote + TransportMessage *test_tm = rtaComponent_GetMessage(out); + assertNotNull(test_tm, "Did not receive a transport message out of the top of the connector"); + + CCNxTlvDictionary *testDictionary = transportMessage_GetDictionary(test_tm); + PARCBuffer *writeFormat = ccnxWireFormatMessage_GetWireFormatBuffer(testDictionary); + assertNotNull(writeFormat, + "transport message does not have a wire format"); + + PARCBuffer *truth = parcBuffer_Wrap(buffer, length, 0, length); + assertTrue(parcBuffer_Equals(truth, writeFormat), "Wire format does not match expected") + { + printf("Expected:\n"); + parcBuffer_Display(truth, 3); + printf("Received:\n"); + parcBuffer_Display(writeFormat, 3); + } + + parcBuffer_Release(&truth); + transportMessage_Destroy(&test_tm); +} + + +// ====================================================== + +LONGBOW_TEST_RUNNER(connector_Forwarder_Metis) +{ + LONGBOW_RUN_TEST_FIXTURE(Local); + + LONGBOW_RUN_TEST_FIXTURE(UpDirectionV1); + LONGBOW_RUN_TEST_FIXTURE(DownDirectionV1); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(connector_Forwarder_Metis) +{ + parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory); + + snprintf(keystorename, 1024, "/tmp/keystore_%d.p12", getpid()); + + // init + fini here so there's no memory imbalance + parcSecurity_Init(); + parcPkcs12KeyStore_CreateFile(keystorename, keystorepass, "ccnxuser", 1024, 365); + parcSecurity_Fini(); + + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(connector_Forwarder_Metis) +{ + unlink(keystorename); + return LONGBOW_STATUS_SUCCEEDED; +} + +// ====================================================== +LONGBOW_TEST_FIXTURE(Local) +{ + LONGBOW_RUN_TEST_CASE(Local, connector_Fwd_Metis_Init_Release); + LONGBOW_RUN_TEST_CASE(Local, connector_Fwd_Metis_Opener_GoodPort); + LONGBOW_RUN_TEST_CASE(Local, _fwdMetisState_Release); + LONGBOW_RUN_TEST_CASE(Local, _readInEnvironmentConnectionSpecification); +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + longBowTestCase_SetClipBoardData(testCase, _commonSetup()); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + _commonTeardown(longBowTestCase_GetClipBoardData(testCase)); + + if (parcSafeMemory_ReportAllocation(STDOUT_FILENO) != 0) { + printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding()); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +// ==================================================================== + +LONGBOW_TEST_CASE(Local, connector_Fwd_Metis_Init_Release) +{ + // nothing to do, just checking that memory is in balance in teardown +} + +/** + * Call the opener with the right port. We should see a connection attempt on + * the server socket and be able to accept it. + */ +LONGBOW_TEST_CASE(Local, connector_Fwd_Metis_Opener_GoodPort) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + int fds[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, fds); + RtaConnection *conn = _openConnection(data, 1, fds); + assertNotNull(conn, "Got null connection opening on stack 1"); + + rtaFramework_NonThreadedStepCount(data->framework, 2); + + // we should now see a connection request + _waitForSelect(data->server_socket); + + close(fds[1]); +} + +/** + * Make sure everything is released and file descriptor is closed + */ +LONGBOW_TEST_CASE(Local, _fwdMetisState_Release) +{ + const int REMOTE = 0; + const int STACK = 1; + int fds[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, fds); + + PARCEventScheduler *scheduler = parcEventScheduler_Create(); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + + // this replaces "_openSocket" + fwd_state->fd = fds[STACK]; + + _setupSocket(fwd_state); + + + _fwdMetisState_Release(&fwd_state); + parcEventScheduler_Destroy(&scheduler); + + // ensure that fds[STACK] is closed by _fwdMetisState_Release + uint8_t buffer[16]; + ssize_t nread = recv(fds[STACK], buffer, 16, 0); + assertTrue(nread == -1 && errno == EBADF, + "read from closed socket %d should be EBADF, got return %zd and errno (%d) %s", + fds[STACK], nread, errno, strerror(errno)); + + close(fds[REMOTE]); +} + +LONGBOW_TEST_CASE(Local, _readInEnvironmentConnectionSpecification) +{ + char *oldEnv = getenv(FORWARDER_CONNECTION_ENV); + setenv(FORWARDER_CONNECTION_ENV, "tcp://127.0.0.1:9999", 1); + struct sockaddr_in addr_in; + _readInEnvironmentConnectionSpecification(&addr_in); + assertTrue(addr_in.sin_port == htons(9999), "Port specification incorrectly parsed"); + assertTrue(addr_in.sin_addr.s_addr == inet_addr("127.0.0.1"), "Address specification incorrectly parsed");; + if (oldEnv) { + setenv(FORWARDER_CONNECTION_ENV, oldEnv, 1); + } else { + unsetenv(FORWARDER_CONNECTION_ENV); + } +} + + + +// ==================================================================== + + +LONGBOW_TEST_FIXTURE(UpDirectionV1) +{ + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketHeader_ExactFit); + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketHeader_TwoReads); + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _setupNextPacket); + + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacket_PartialMessage); + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacket_ExactlyOneMessage); + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacket_MoreThanOneMessage); + + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readFromMetis_ThreeMessages); + + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readFromMetis_InterestV1); + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readFromMetis_ContentObjectV1); + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readFromMetis_ControlV1); + + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketHeader_Error); + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketBody_Error); + + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketHeader_Closed); + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketBody_Closed); + LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readFromMetis_Closed); +} + +LONGBOW_TEST_FIXTURE_SETUP(UpDirectionV1) +{ + longBowTestCase_SetClipBoardData(testCase, _commonSetup()); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(UpDirectionV1) +{ + _commonTeardown(longBowTestCase_GetClipBoardData(testCase)); + + if (parcSafeMemory_ReportAllocation(STDOUT_FILENO) != 0) { + printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding()); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +/** + * Put in exactly 8 bytes. + * This should return NULL, but will set nextMessageLength to be the right thing. + * Does not drain the buffer + */ +LONGBOW_TEST_CASE(UpDirectionV1, _readPacketHeader_ExactFit) +{ + const int REMOTE = 0; + const int STACK = 1; + int fds[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, fds); + + PARCEventScheduler *scheduler = parcEventScheduler_Create(); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + + // this replaces "_openSocket" + fwd_state->fd = fds[STACK]; + + _setupSocket(fwd_state); + + // Setup the header + uint16_t packetLength = 24; + uint8_t headerLength = 13; + uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest; + + CCNxCodecSchemaV1FixedHeader hdr = { .version = 1, .packetType = packetType, .packetLength = htons(packetLength), .headerLength = headerLength }; + + // put header in packet and write the packet + uint8_t packet[1024]; + size_t bufferReadLength = sizeof(hdr); + memcpy(packet, &hdr, bufferReadLength); + + // write out exactly the number of bytes we need + ssize_t nwritten = write(fds[REMOTE], packet, sizeof(CCNxCodecSchemaV1FixedHeader)); + assertTrue(nwritten == sizeof(CCNxCodecSchemaV1FixedHeader), "Wrong write size, expected %zu got %zd", + sizeof(CCNxCodecSchemaV1FixedHeader), nwritten); + + // test the function + ReadReturnCode readCode = _readPacketHeader(fwd_state); + assertTrue(readCode == ReadReturnCode_Finished, "readCode should be %d got %d", ReadReturnCode_Finished, readCode); + assertTrue(fwd_state->nextMessage.remainingReadLength == 0, "Remaining length should be 0 got %zu", fwd_state->nextMessage.remainingReadLength); + + // other properties are tested as part of _setupNextPacket + + // cleanup + _fwdMetisState_Release(&fwd_state); + parcEventScheduler_Destroy(&scheduler); + close(fds[REMOTE]); +} + +/* + * Write the fixed header in two 4 byte writes + */ +LONGBOW_TEST_CASE(UpDirectionV1, _readPacketHeader_TwoReads) +{ + const int REMOTE = 0; + const int STACK = 1; + int fds[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, fds); + + PARCEventScheduler *scheduler = parcEventScheduler_Create(); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + + // this replaces "_openSocket" + fwd_state->fd = fds[STACK]; + + _setupSocket(fwd_state); + + // Setup the header + uint16_t packetLength = 24; + uint8_t headerLength = 13; + uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest; + + CCNxCodecSchemaV1FixedHeader hdr = { + .version = 1, + .packetType = packetType, + .packetLength = htons(packetLength), + .headerLength = headerLength + }; + + // put header in packet and write the packet + uint8_t packet[1024]; + size_t bufferReadLength = sizeof(hdr); + memcpy(packet, &hdr, bufferReadLength); + + // write out exactly the number of bytes we need + size_t firstWrite = 4; + size_t secondWrite = sizeof(CCNxCodecSchemaV1FixedHeader) - firstWrite; + + ssize_t nwritten = write(fds[REMOTE], packet, firstWrite); + assertTrue(nwritten == firstWrite, "Wrong write size, expected %zu got %zd", firstWrite, nwritten); + + ReadReturnCode readCode = _readPacketHeader(fwd_state); + assertTrue(readCode == ReadReturnCode_PartialRead, "readCode should be %d got %d", ReadReturnCode_PartialRead, readCode); + + nwritten = write(fds[REMOTE], packet + firstWrite, secondWrite); + assertTrue(nwritten == secondWrite, "Wrong write size, expected %zu got %zd", secondWrite, nwritten); + + readCode = _readPacketHeader(fwd_state); + assertTrue(readCode == ReadReturnCode_Finished, "readCode should be %d got %d", ReadReturnCode_Finished, readCode); + + assertTrue(fwd_state->nextMessage.remainingReadLength == 0, "Remaining length should be 0 got %zu", fwd_state->nextMessage.remainingReadLength); + + // other properties are tested as part of _setupNextPacket + + // cleanup + _fwdMetisState_Release(&fwd_state); + parcEventScheduler_Destroy(&scheduler); + close(fds[REMOTE]); +} + +LONGBOW_TEST_CASE(UpDirectionV1, _setupNextPacket) +{ + uint16_t packetLength = 24; + uint8_t headerLength = 13; + uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest; + uint8_t version = 1; + CCNxCodecSchemaV1FixedHeader hdr = { .version = version, .packetType = packetType, .packetLength = htons(packetLength), .headerLength = headerLength }; + + // setup fwd_state->nextMessage like we just read a header + PARCEventScheduler *scheduler = parcEventScheduler_Create(); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + fwd_state->nextMessage.remainingReadLength = 0; + memcpy(&fwd_state->nextMessage.fixedHeader, &hdr, sizeof(hdr)); + + // this is the truth we will test against + size_t nextMessageLength = packetLength; + + _setupNextPacket(fwd_state); + + size_t allocatedLength = parcBuffer_Capacity(fwd_state->nextMessage.packet); + size_t position = parcBuffer_Position(fwd_state->nextMessage.packet); + parcBuffer_Flip(fwd_state->nextMessage.packet); + void *buffer = parcBuffer_Overlay(fwd_state->nextMessage.packet, 0); + + assertTrue(fwd_state->nextMessage.length == nextMessageLength, "Wrong packet length, expected %zu got %zu", nextMessageLength, fwd_state->nextMessage.length); + assertTrue(fwd_state->nextMessage.packetType == packetType, "Wrong packetType, expected %u got %u", packetType, fwd_state->nextMessage.packetType); + assertTrue(fwd_state->nextMessage.version == version, "Wrong version, expected %u got %u", version, fwd_state->nextMessage.version); + assertTrue(allocatedLength == nextMessageLength, "Wrong packet buffer length, expected %zu got %zu", nextMessageLength, allocatedLength); + + // and make sure the beginning of the buffer is the fixed header + assertTrue(position == sizeof(hdr), "Wrong write position, expected %zu got %zu", sizeof(hdr), position); + assertTrue(memcmp(buffer, &hdr, sizeof(hdr)) == 0, "Beginning of buffer not the fixed header"); + + // TODO: Finish me + _fwdMetisState_Release(&fwd_state); + parcEventScheduler_Destroy(&scheduler); +} + +/** + * Write the fixed header plus part of the message body. + */ +LONGBOW_TEST_CASE(UpDirectionV1, _readPacket_PartialMessage) +{ + const int REMOTE = 0; + const int STACK = 1; + int fds[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, fds); + + PARCEventScheduler *scheduler = parcEventScheduler_Create(); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + + // this replaces "_openSocket" + fwd_state->fd = fds[STACK]; + + _setupSocket(fwd_state); + + // Setup the header + uint16_t packetLength = 160; + uint8_t headerLength = 13; + uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest; + uint8_t version = 1; + CCNxCodecSchemaV1FixedHeader hdr = { .version = version, .packetType = packetType, .packetLength = htons(packetLength), .headerLength = headerLength }; + + // put header in packet and write the packet + uint8_t packet[1024]; + memcpy(packet, &hdr, sizeof(hdr)); + + // write out exactly the number of bytes we need + size_t firstWrite = 100; + + ssize_t nwritten = write(fds[REMOTE], packet, firstWrite); + assertTrue(nwritten == firstWrite, "Wrong write size, expected %zu got %zd", firstWrite, nwritten); + + ReadReturnCode readCode = _readPacket(fwd_state); + + assertTrue(readCode == ReadReturnCode_PartialRead, "return value should be %d got %d", ReadReturnCode_PartialRead, readCode); + + // should indicate there's nothing left to read of the header + assertTrue(fwd_state->nextMessage.remainingReadLength == 0, "Remaining length should be 0 got %zu", fwd_state->nextMessage.remainingReadLength); + + // we should be at position "firstWrite" in the packet buffer + assertNotNull(fwd_state->nextMessage.packet, "Packet buffer is null"); + assertTrue(parcBuffer_Position(fwd_state->nextMessage.packet) == firstWrite, + "Wrong position, expected %zu got %zu", firstWrite, parcBuffer_Position(fwd_state->nextMessage.packet)); + + // cleanup + _fwdMetisState_Release(&fwd_state); + parcEventScheduler_Destroy(&scheduler); + close(fds[REMOTE]); +} + +/** + * Write exactly one message + */ +LONGBOW_TEST_CASE(UpDirectionV1, _readPacket_ExactlyOneMessage) +{ + _testReadPacketV1(0); +} + +/** + * Write more than one message. + */ +LONGBOW_TEST_CASE(UpDirectionV1, _readPacket_MoreThanOneMessage) +{ + _testReadPacketV1(100); +} + +/** + * Make 3 messages pending on the read socket and make sure _readFromMetis delivers all + * 3 up the stack. _readFromMetis requires an RtaConnection, so we need a mock framework. + */ +LONGBOW_TEST_CASE(UpDirectionV1, _readFromMetis_ThreeMessages) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + // create our connection. This will become part of the RTA framework, so will be + // cleaned up in the teardown + int api_fd; + int client_fd; + RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd); + + // Write three wire format packets up the bottom of the connector + const int loopCount = 3; + size_t writeSizes[loopCount]; + + for (int i = 0; i < loopCount; i++) { + writeSizes[i] = _sendPacketToConnectorV1(client_fd, (i + 1) * 100); + } + + FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);; + + _readFromMetis(fwd_state, conn); + + // now crank the handle to pop those messages up the stack + rtaFramework_NonThreadedStepCount(data->framework, 5); + + // now read the message out of the test component + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(conn), TESTING_UPPER, RTA_DOWN); + + // throw away the first control message + _throwAwayControlMessage(out); + + // Now read the actual messages we want to test + for (int i = 0; i < loopCount; i++) { + TransportMessage *test_tm = rtaComponent_GetMessage(out); + assertNotNull(test_tm, "Did not receive a transport message %d out of %d out of the top of the connector", i + 1, loopCount); + + assertTrue(transportMessage_IsInterest(test_tm), + "second transport message is not an interest") + { + ccnxTlvDictionary_Display(transportMessage_GetDictionary(test_tm), 0); + } + + // Make sure the transport message has the right properties + CCNxTlvDictionary *testDictionary = transportMessage_GetDictionary(test_tm); + PARCBuffer *writeFormat = ccnxWireFormatMessage_GetWireFormatBuffer(testDictionary); + assertNotNull(writeFormat, + "transport message does not have a wire format"); + + assertTrue(parcBuffer_Remaining(writeFormat) == writeSizes[i], + "Raw format message wrong length, expected %zu got %zu", + writeSizes[i], + parcBuffer_Remaining(writeFormat)); + + // cleanup + transportMessage_Destroy(&test_tm); + } + + // no extra cleanup, done in teardown +} + +LONGBOW_TEST_CASE(UpDirectionV1, _readFromMetis_InterestV1) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + _testReadFromMetisFromArray(data, sizeof(v1_interest_nameA), v1_interest_nameA); +} + +LONGBOW_TEST_CASE(UpDirectionV1, _readFromMetis_ContentObjectV1) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + _testReadFromMetisFromArray(data, sizeof(v1_content_nameA_crc32c), v1_content_nameA_crc32c); +} + +LONGBOW_TEST_CASE(UpDirectionV1, _readFromMetis_ControlV1) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + _testReadFromMetisFromArray(data, sizeof(v1_cpi_add_route_crc32c), v1_cpi_add_route_crc32c); +} + +/* + * read from a closed socket + */ +LONGBOW_TEST_CASE(UpDirectionV1, _readPacketHeader_Closed) +{ + const int REMOTE = 0; + const int STACK = 1; + int fds[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, fds); + + PARCEventScheduler *scheduler = parcEventScheduler_Create(); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + fwd_state->fd = fds[STACK]; + _setupSocket(fwd_state); + + // close remote side then try to write to it + close(fds[REMOTE]); + + ReadReturnCode readCode = _readPacketHeader(fwd_state); + + _fwdMetisState_Release(&fwd_state); + parcEventScheduler_Destroy(&scheduler); + + assertTrue(readCode == ReadReturnCode_Closed, "Wrong return code, expected %d got %d", ReadReturnCode_Closed, readCode); +} + +LONGBOW_TEST_CASE(UpDirectionV1, _readPacketBody_Closed) +{ + const int REMOTE = 0; + const int STACK = 1; + int fds[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, fds); + + PARCEventScheduler *scheduler = parcEventScheduler_Create(); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + fwd_state->fd = fds[STACK]; + _setupSocket(fwd_state); + + ssize_t nwritten = write(fds[REMOTE], v1_interest_nameA, 8); + assertTrue(nwritten == 8, "Wrong write size, expected 8 got %zd", nwritten); + + // read the header to setup the read of the body + ReadReturnCode readCode; + + readCode = _readPacketHeader(fwd_state); + assertTrue(readCode == ReadReturnCode_Finished, "Did not read entire header"); + + // close remote side then try to write to it + close(fds[REMOTE]); + + // now try 2nd read + readCode = _readPacketBody(fwd_state); + + _fwdMetisState_Release(&fwd_state); + parcEventScheduler_Destroy(&scheduler); + + assertTrue(readCode == ReadReturnCode_Closed, "Wrong return code, expected %d got %d", ReadReturnCode_Closed, readCode); +} + +/* + * Set the socket to -1 to cause and error + */ +LONGBOW_TEST_CASE(UpDirectionV1, _readPacketHeader_Error) +{ + const int REMOTE = 0; + const int STACK = 1; + int fds[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, fds); + + PARCEventScheduler *scheduler = parcEventScheduler_Create(); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + fwd_state->fd = fds[STACK]; + _setupSocket(fwd_state); + + fwd_state->fd = -1; + + // close remote side then try to write to it + + ReadReturnCode readCode = _readPacketHeader(fwd_state); + + _fwdMetisState_Release(&fwd_state); + parcEventScheduler_Destroy(&scheduler); + close(fds[STACK]); + close(fds[REMOTE]); + + assertTrue(readCode == ReadReturnCode_Error, "Wrong return code, expected %d got %d", ReadReturnCode_Error, readCode); +} + +/* + * Set the socket to -1 to cause and error + */ +LONGBOW_TEST_CASE(UpDirectionV1, _readPacketBody_Error) +{ + const int REMOTE = 0; + const int STACK = 1; + int fds[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, fds); + + PARCEventScheduler *scheduler = parcEventScheduler_Create(); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + fwd_state->fd = fds[STACK]; + _setupSocket(fwd_state); + + ssize_t nwritten = write(fds[REMOTE], v1_interest_nameA, 8); + assertTrue(nwritten == 8, "Wrong write size, expected 8 got %zd", nwritten); + + // read the header to setup the read of the body + ReadReturnCode readCode; + + readCode = _readPacketHeader(fwd_state); + assertTrue(readCode == ReadReturnCode_Finished, "Did not read entire header"); + + // invalidate to cause an error + fwd_state->fd = -1; + + // now try 2nd read + readCode = _readPacketBody(fwd_state); + + _fwdMetisState_Release(&fwd_state); + parcEventScheduler_Destroy(&scheduler); + close(fds[STACK]); + close(fds[REMOTE]); + + assertTrue(readCode == ReadReturnCode_Error, "Wrong return code, expected %d got %d", ReadReturnCode_Error, readCode); +} + +/* + * read from a closed socket. + * This should generate a Notify message that the connection is closed + */ +LONGBOW_TEST_CASE(UpDirectionV1, _readFromMetis_Closed) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + // create our connection. This will become part of the RTA framework, so will be + // cleaned up in the teardown + int api_fd; + int client_fd; + RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd); + FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);; + + rtaFramework_NonThreadedStepCount(data->framework, 5); + + close(client_fd); + + _readFromMetis(fwd_state, conn); + + // now crank the handle to pop those messages up the stack + rtaFramework_NonThreadedStepCount(data->framework, 5); + + // now read the message out of the test component + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(conn), TESTING_UPPER, RTA_DOWN); + + // throw away the first control message + _throwAwayControlMessage(out); + + TransportMessage *test_tm = rtaComponent_GetMessage(out); + assertNotNull(test_tm, "Did not receive a transport message out of the top of the connector"); + + assertTrue(transportMessage_IsControl(test_tm), + "second transport message is not a control") + { + ccnxTlvDictionary_Display(transportMessage_GetDictionary(test_tm), 0); + } + + // Make sure the transport message has the right properties + CCNxTlvDictionary *testDictionary = transportMessage_GetDictionary(test_tm); + assertTrue(ccnxControlFacade_IsNotification(testDictionary), "Control message is not Notification") + { + ccnxTlvDictionary_Display(testDictionary, 3); + } + + PARCJSON *json = ccnxControlFacade_GetJson(testDictionary); + NotifyStatus *notify = notifyStatus_ParseJSON(json); + assertTrue(notifyStatus_GetStatusCode(notify) == notifyStatusCode_CONNECTION_CLOSED, + "Wrong code, expected %d got %d", + notifyStatusCode_CONNECTION_CLOSED, + notifyStatus_GetStatusCode(notify)); + notifyStatus_Release(¬ify); + + // verify other properties + assertFalse(fwd_state->isConnected, "Forwarder state should show connection closed"); + + // cleanup + transportMessage_Destroy(&test_tm); + + // no extra cleanup, done in teardown +} + +LONGBOW_TEST_FIXTURE(DownDirectionV1) +{ + LONGBOW_RUN_TEST_CASE(DownDirectionV1, _queueMessageToMetis); + LONGBOW_RUN_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis); + LONGBOW_RUN_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis_TwoWrites); + LONGBOW_RUN_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis_Closed); + + LONGBOW_RUN_TEST_CASE(DownDirectionV1, connector_Fwd_Metis_Downcall_Read_Interst); + LONGBOW_RUN_TEST_CASE(DownDirectionV1, connector_Fwd_Metis_Downcall_Read_CPIRequest); +} + +LONGBOW_TEST_FIXTURE_SETUP(DownDirectionV1) +{ + longBowTestCase_SetClipBoardData(testCase, _commonSetup()); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(DownDirectionV1) +{ + _commonTeardown(longBowTestCase_GetClipBoardData(testCase)); + + if (parcSafeMemory_ReportAllocation(STDOUT_FILENO) != 0) { + printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding()); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +/* + * _queueMessageToMetis postconditions: + * - increases the reference count to the wireFormat + * - adds the reference to fwd_output buffer + * - increments the debugging counter fwd_metis_references_queued + */ +LONGBOW_TEST_CASE(DownDirectionV1, _queueMessageToMetis) +{ + PARCEventScheduler *scheduler = parcEventScheduler_Create(); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + PARCBuffer *wireFormat = parcBuffer_Wrap(v1_interest_nameA, sizeof(v1_interest_nameA), 0, sizeof(v1_interest_nameA)); + size_t expectedRefCount = parcObject_GetReferenceCount(wireFormat); + + _queueBufferMessageToMetis(wireFormat, fwd_state->metisOutputQueue); + + assertTrue(parcObject_GetReferenceCount(wireFormat) == expectedRefCount, + "Did not get right ref count for wire format, expected %zu got %" PRIu64, expectedRefCount, parcObject_GetReferenceCount(wireFormat)); + assertTrue(parcEventBuffer_GetLength(fwd_state->metisOutputQueue) == parcBuffer_Remaining(wireFormat), + "Wrong output buffer length, expected %zu got %zu", parcBuffer_Remaining(wireFormat), parcEventBuffer_GetLength(fwd_state->metisOutputQueue)); + + parcBuffer_Release(&wireFormat); + parcEventBuffer_Destroy(&fwd_state->metisOutputQueue); + _fwdMetisState_Release(&fwd_state); + parcEventScheduler_Destroy(&scheduler); +} + +/* + * Dequeue a small message to metis, should all be written out. + */ +LONGBOW_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + int api_fd; + int client_fd; + RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd); + + FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);; + + // Put data in the output queue + PARCBuffer *wireFormat = parcBuffer_Wrap(v1_interest_nameA, sizeof(v1_interest_nameA), 0, sizeof(v1_interest_nameA)); + _queueBufferMessageToMetis(wireFormat, fwd_state->metisOutputQueue); + + // write it out + _dequeueMessagesToMetis(fwd_state); + rtaFramework_NonThreadedStepCount(data->framework, 5); + + // we should now be able to read it + bool readReady = _waitForSelect(client_fd); + assertTrue(readReady, "client socket %d not ready for read", client_fd); + + uint8_t testArray[sizeof(v1_interest_nameA) + 1]; + ssize_t nrecv = recv(client_fd, testArray, sizeof(testArray), 0); + + assertTrue(nrecv == sizeof(v1_interest_nameA), "Wrong read length, expected %zu got %zd", sizeof(v1_interest_nameA), nrecv); + assertTrue(memcmp(testArray, v1_interest_nameA, sizeof(v1_interest_nameA)) == 0, "Read memory does not compare"); + assertTrue(parcEventBuffer_GetLength(fwd_state->metisOutputQueue) == 0, "Metis output buffer not zero length, got %zu", parcEventBuffer_GetLength(fwd_state->metisOutputQueue)); + parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue)); + parcBuffer_Release(&wireFormat); +} + +/* + * Set the forwarder's send buffer small so it will take two writes to send the packet. + * This will test that when _dequeueMessagesToMetis cannot write the whole thing it will enable the + * write event and that metis will then trigger a second write when there's buffer space. + */ +LONGBOW_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis_TwoWrites) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + int api_fd; + int client_fd; + RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd); + + FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);; + + // set the send buffer + { + // make it slightly bigger than 1/2 + const int sendBufferSize = sizeof(v1_interest_nameA) / 2 + 1; + int res = setsockopt(fwd_state->fd, SOL_SOCKET, SO_SNDBUF, &sendBufferSize, sizeof(int)); + if (res < 0) { + if (DEBUG_OUTPUT) { + printf("%9c %s failed to set SO_SNDBUF to %d: (%d) %s\n", + ' ', __func__, sendBufferSize, errno, strerror(errno)); + } + // This is a non-fatal error + } + } + + // Put data in the output queue + PARCBuffer *wireFormat = parcBuffer_Wrap(v1_interest_nameA, sizeof(v1_interest_nameA), 0, sizeof(v1_interest_nameA)); + _queueBufferMessageToMetis(wireFormat, fwd_state->metisOutputQueue); + + // write it out + _dequeueMessagesToMetis(fwd_state); + rtaFramework_NonThreadedStepCount(data->framework, 5); + + // we should now be able to read it + bool readReady = _waitForSelect(client_fd); + assertTrue(readReady, "client socket %d not ready for read", client_fd); + + uint8_t testArray[sizeof(v1_interest_nameA) + 1]; + ssize_t nrecv = recv(client_fd, testArray, sizeof(testArray), 0); + + assertTrue(nrecv == sizeof(v1_interest_nameA), "Wrong read length, expected %zu got %zd", sizeof(v1_interest_nameA), nrecv); + assertTrue(memcmp(testArray, v1_interest_nameA, sizeof(v1_interest_nameA)) == 0, "Read memory does not compare"); + assertTrue(parcEventBuffer_GetLength(fwd_state->metisOutputQueue) == 0, "Metis output buffer not zero length, got %zu", parcEventBuffer_GetLength(fwd_state->metisOutputQueue)); + parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue)); + parcBuffer_Release(&wireFormat); +} + +/* + * Dequeue a message to a closed socket + */ +LONGBOW_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis_Closed) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + int api_fd; + int client_fd; + RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd); + + FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);; + PARCBuffer *wireFormat = parcBuffer_Wrap(v1_interest_nameA, sizeof(v1_interest_nameA), 0, sizeof(v1_interest_nameA)); + _queueBufferMessageToMetis(wireFormat, fwd_state->metisOutputQueue); + + // close remote side then try to write to it + close(client_fd); + + _dequeueMessagesToMetis(fwd_state); + rtaFramework_NonThreadedStepCount(data->framework, 5); + + parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue)); + parcBuffer_Release(&wireFormat); +} + +/** + * Sends an Interest down the stack. We need to create an Interest and encode its TLV wire format, + * then send it down the stack and make sure we receive it on a client socket. We don't actually + * run Metis in this test. + */ +LONGBOW_TEST_CASE(DownDirectionV1, connector_Fwd_Metis_Downcall_Read_Interst) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + // create our connection. This will become part of the RTA framework, so will be + // cleaned up in the teardown + int api_fd; + int client_fd; + RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd); + FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS); + + // Create the interest with wire format and send it down the stack + TransportMessage *tm = trafficTools_CreateTransportMessageWithDictionaryInterest(conn, CCNxTlvDictionary_SchemaVersion_V1); + CCNxCodecNetworkBufferIoVec *vec = ccnxCodecSchemaV1PacketEncoder_DictionaryEncode(transportMessage_GetDictionary(tm), NULL); + + ccnxWireFormatMessage_PutIoVec(transportMessage_GetDictionary(tm), vec); + ccnxCodecNetworkBufferIoVec_Release(&vec); + + // send it down the stack + PARCEventQueue *in = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(conn), TESTING_UPPER, RTA_DOWN); + rtaComponent_PutMessage(in, tm); + rtaFramework_NonThreadedStepCount(data->framework, 5); + + bool readReady = _waitForSelect(client_fd); + assertTrue(readReady, "select did not indicate read ready"); + + // now read it from out listener. It has a read timeout so if we dont get it in a reasonable amount + // of time, read will return an error about the timeout + + const size_t maxPacketLength = 1024; + uint8_t packet[maxPacketLength]; + + ssize_t readBytes = read(client_fd, packet, maxPacketLength); + assertFalse(readBytes < 0, "Got error on read: (%d) %s", errno, strerror(errno)); + + parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue)); + close(client_fd); +} + +/** + * Send an AddRoute command down the stack. It does not need a wire format in the transport message, its + * the job of the forwarder to create the Metis-specific message. + */ +LONGBOW_TEST_CASE(DownDirectionV1, connector_Fwd_Metis_Downcall_Read_CPIRequest) +{ + testUnimplemented("No way to create a v1 CPI message yet"); + +// TestData *data = longBowTestCase_GetClipBoardData(testCase); +// +// // create our connection. This will become part of the RTA framework, so will be +// // cleaned up in the teardown +// int api_fd; +// int client_fd; +// RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd); +// FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS); +// +// // now make the control message +// TransportMessage *tm = trafficTools_CreateTransportMessageWithDictionaryControl(conn, CCNxTlvDictionary_SchemaVersion_V1); +// CCNxCodecNetworkBufferIoVec *vec = ccnxCodecSchemaV1PacketEncoder_DictionaryEncode(transportMessage_GetDictionary(tm), NULL); +// ccnxWireFormatFacade_PutIoVec(transportMessage_GetDictionary(tm), vec); +// ccnxCodecNetworkBufferIoVec_Release(&vec); +// +// // send it down the stack +// PARCEventQueue *in = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(conn), TESTING_UPPER, RTA_DOWN); +// rtaComponent_PutMessage(in, tm); +// rtaFramework_NonThreadedStepCount(data->framework, 5); +// +// // now read it from out listener. It has a read timeout so if we dont get it in a reasonable amount +// // of time, read will return an error about the timeout +// +// const size_t maxPacketLength = 1024; +// uint8_t packet[maxPacketLength]; +// +// ssize_t readBytes = read(client_fd, packet, maxPacketLength); +// assertFalse(readBytes < 0, "Got error on read: (%d) %s", errno, strerror(errno)); +// parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue)); +} + +// ==================================================================== + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(connector_Forwarder_Metis); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_rta_ApiConnection.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_rta_ApiConnection.c new file mode 100644 index 00000000..b0e937cd --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_rta_ApiConnection.c @@ -0,0 +1,278 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Create a non-threaded framework to test internal RTA functions + * + */ +#include "../rta_ApiConnection.c" + +#include <poll.h> + +#include <parc/algol/parc_SafeMemory.h> +#include <LongBow/unit-test.h> +#include <ccnx/transport/transport_rta/config/config_All.h> +#include <ccnx/transport/transport_rta/core/rta_Framework_Commands.c> + +#include <ccnx/transport/test_tools/traffic_tools.h> + +typedef struct test_data { + PARCRingBuffer1x1 *commandRingBuffer; + PARCNotifier *commandNotifier; + RtaFramework *framework; + + int api_fds[2]; + int stackId; + + RtaProtocolStack *stack; + RtaConnection *connection; +} TestData; + +static TestData * +_commonSetup(void) +{ + TestData *data = parcMemory_AllocateAndClear(sizeof(TestData)); + assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData)); + + data->commandRingBuffer = parcRingBuffer1x1_Create(128, NULL); + data->commandNotifier = parcNotifier_Create(); + data->framework = rtaFramework_Create(data->commandRingBuffer, data->commandNotifier); + assertNotNull(data->framework, "rtaFramework_Create returned null"); + + CCNxStackConfig *stackConfig = ccnxStackConfig_Create(); + + apiConnector_ProtocolStackConfig(stackConfig); + testingLower_ProtocolStackConfig(stackConfig); + protocolStack_ComponentsConfigArgs(stackConfig, apiConnector_GetName(), testingLower_GetName(), NULL); + + rtaFramework_NonThreadedStepCount(data->framework, 10); + + // Create the protocol stack + + data->stackId = 1; + RtaCommandCreateProtocolStack *createStack = rtaCommandCreateProtocolStack_Create(data->stackId, stackConfig); + _rtaFramework_ExecuteCreateStack(data->framework, createStack); + rtaCommandCreateProtocolStack_Release(&createStack); + + rtaFramework_NonThreadedStepCount(data->framework, 10); + data->stack = (rtaFramework_GetProtocolStackByStackId(data->framework, data->stackId))->stack; + + // Create a connection in the stack + + int error = socketpair(AF_UNIX, SOCK_STREAM, 0, data->api_fds); + assertFalse(error, "Error creating socket pair: (%d) %s", errno, strerror(errno)); + + CCNxConnectionConfig *connConfig = ccnxConnectionConfig_Create(); + apiConnector_ConnectionConfig(connConfig); + + tlvCodec_ConnectionConfig(connConfig); + + testingLower_ConnectionConfig(connConfig); + + RtaCommandOpenConnection *openConnection = rtaCommandOpenConnection_Create(data->stackId, data->api_fds[PAIR_OTHER], data->api_fds[PAIR_TRANSPORT], ccnxConnectionConfig_GetJson(connConfig)); + _rtaFramework_ExecuteOpenConnection(data->framework, openConnection); + rtaCommandOpenConnection_Release(&openConnection); + + rtaFramework_NonThreadedStepCount(data->framework, 10); + + data->connection = rtaConnectionTable_GetByApiFd(data->framework->connectionTable, data->api_fds[PAIR_OTHER]); + + // cleanup + + ccnxConnectionConfig_Destroy(&connConfig); + ccnxStackConfig_Release(&stackConfig); + + return data; +} + +static void +_commonTeardown(TestData *data) +{ + rtaFramework_Teardown(data->framework); + + parcRingBuffer1x1_Release(&data->commandRingBuffer); + parcNotifier_Release(&data->commandNotifier); + rtaFramework_Destroy(&data->framework); + + close(data->api_fds[0]); + close(data->api_fds[1]); + parcMemory_Deallocate((void **) &data); +} + +LONGBOW_TEST_RUNNER(rta_ApiConnection) +{ + LONGBOW_RUN_TEST_FIXTURE(Global); +} + +// The Test Runner calls this function once before any Test Fixtures are run. +LONGBOW_TEST_RUNNER_SETUP(rta_ApiConnection) +{ + parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory); + return LONGBOW_STATUS_SUCCEEDED; +} + +// The Test Runner calls this function once after all the Test Fixtures are run. +LONGBOW_TEST_RUNNER_TEARDOWN(rta_ApiConnection) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE(Global) +{ + LONGBOW_RUN_TEST_CASE(Global, rtaApiConnection_BlockDown); + LONGBOW_RUN_TEST_CASE(Global, rtaApiConnection_Create_Destroy); + LONGBOW_RUN_TEST_CASE(Global, rtaApiConnection_Create_Checks); + LONGBOW_RUN_TEST_CASE(Global, rtaApiConnection_Create_Check_ApiSocket); + + LONGBOW_RUN_TEST_CASE(Global, rtaApiConnection_UnblockDown); +} + +LONGBOW_TEST_FIXTURE_SETUP(Global) +{ + longBowTestCase_SetClipBoardData(testCase, _commonSetup()); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Global) +{ + printf("Finishing testcase %s\n", longBowTestCase_GetName(testCase)); + _commonTeardown(longBowTestCase_GetClipBoardData(testCase)); + + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_CASE(Global, rtaApiConnection_SendToApi) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + RtaApiConnection *apiConnection = rtaConnection_GetPrivateData(data->connection, API_CONNECTOR); + + TransportMessage *tm = trafficTools_CreateTransportMessageWithDictionaryInterest(data->connection, CCNxTlvDictionary_SchemaVersion_V1); + + RtaComponentStats *stats = rtaConnection_GetStats(data->connection, API_CONNECTOR); + rtaApiConnection_SendToApi(apiConnection, tm, stats); + rtaFramework_NonThreadedStepCount(data->framework, 10); + + // Let the dispatcher run + struct pollfd pfd = { .fd = data->api_fds[PAIR_OTHER], .events = POLLIN, .revents = 0 }; + int millisecondTimeout = 1000; + + int pollvalue = poll(&pfd, 1, millisecondTimeout); + assertTrue(pollvalue == 1, "Did not get an event from the API's side of the socket"); + + CCNxMetaMessage *testMessage; + ssize_t bytesRead = read(data->api_fds[PAIR_OTHER], &testMessage, sizeof(testMessage)); + assertTrue(bytesRead == sizeof(testMessage), "Wrong read size, got %zd expected %zd: (%d) %s", + bytesRead, sizeof(testMessage), + errno, strerror(errno)); + assertNotNull(testMessage, "Message read is NULL"); + + + assertTrue(testMessage == transportMessage_GetDictionary(tm), + "Got wrong raw message, got %p expected %p", + (void *) testMessage, (void *) transportMessage_GetDictionary(tm)); + + ccnxMetaMessage_Release(&testMessage); + transportMessage_Destroy(&tm); +} + +LONGBOW_TEST_CASE(Global, rtaApiConnection_BlockDown) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + RtaApiConnection *apiConnection = rtaApiConnection_Create(data->connection); + + // make sure we're startined unblocked + short enabled = parcEventQueue_GetEnabled(apiConnection->bev_api); + assertTrue(enabled & PARCEventType_Read, "PARCEventType_Read is not enabled on a new Api Connector: enabled = %04X", enabled); + + rtaApiConnection_BlockDown(apiConnection); + enabled = parcEventQueue_GetEnabled(apiConnection->bev_api); + assertFalse(enabled & PARCEventType_Read, "PARCEventType_Read is still enabled after caling BlockDown: enabled = %04X", enabled); + + rtaApiConnection_Destroy(&apiConnection); +} + +LONGBOW_TEST_CASE(Global, rtaApiConnection_Create_Destroy) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + uint32_t beforeBalance = parcMemory_Outstanding(); + RtaApiConnection *apiConnection = rtaApiConnection_Create(data->connection); + assertNotNull(apiConnection, "Got null API connection"); + + rtaApiConnection_Destroy(&apiConnection); + assertNull(apiConnection, "Destroy did not null apiConnection"); + uint32_t afterBalance = parcMemory_Outstanding(); + assertTrue(beforeBalance == afterBalance, "Memory imbalance: %d", (int) (afterBalance - beforeBalance)); +} + +LONGBOW_TEST_CASE(Global, rtaApiConnection_Create_Checks) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + RtaApiConnection *apiConnection = rtaApiConnection_Create(data->connection); + assertTrue(apiConnection->api_fd == rtaConnection_GetApiFd(data->connection), + "Wrong api fd, got %d expected %d", + apiConnection->api_fd, rtaConnection_GetApiFd(data->connection)); + + assertTrue(apiConnection->transport_fd == rtaConnection_GetTransportFd(data->connection), + "Wrong api fd, got %d expected %d", + apiConnection->transport_fd, rtaConnection_GetTransportFd(data->connection)); + + assertTrue(apiConnection->connection == data->connection, + "Wrong connection, got %p expected %p", + (void *) apiConnection->connection, + (void *) data->connection); + + rtaApiConnection_Destroy(&apiConnection); +} + +LONGBOW_TEST_CASE(Global, rtaApiConnection_Create_Check_ApiSocket) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + RtaApiConnection *apiConnection = rtaApiConnection_Create(data->connection); + + assertNotNull(apiConnection->bev_api, "API socket event null"); + + rtaApiConnection_Destroy(&apiConnection); +} + +LONGBOW_TEST_CASE(Global, rtaApiConnection_UnblockDown) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + RtaApiConnection *apiConnection = rtaApiConnection_Create(data->connection); + + rtaApiConnection_BlockDown(apiConnection); + // we know from previous test that this puts the apiConnector in blocked state + + rtaApiConnection_UnblockDown(apiConnection); + short enabled = parcEventQueue_GetEnabled(apiConnection->bev_api); + assertTrue(enabled & PARCEventType_Read, "PARCEventType_Read is not enabled after caling UnlockDown: enabled = %04X", enabled); + + rtaApiConnection_Destroy(&apiConnection); +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_ApiConnection); + int exitStatus = longBowMain(argc, argv, testRunner, NULL); + longBowTestRunner_Destroy(&testRunner); + exit(exitStatus); +} |