aboutsummaryrefslogtreecommitdiffstats
path: root/libccnx-transport-rta/ccnx/transport/transport_rta/connectors
diff options
context:
space:
mode:
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/connectors')
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.c264
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.h22
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder.h30
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c552
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c1712
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c634
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.h83
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/CMakeLists.txt16
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Api.c61
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Local.c249
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Metis.c1350
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_rta_ApiConnection.c278
12 files changed, 5251 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.c
new file mode 100644
index 00000000..7b810adc
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.c
@@ -0,0 +1,264 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Interface between the event dispatcher and component callbacks to
+ * the RtaApiConnection. The API connector, per se, is implemented in rta_ApiConnection. This
+ * module is the scaffolding to work within the RTA component framework.
+ *
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <sys/socket.h>
+#include <errno.h>
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <LongBow/runtime.h>
+
+#include <parc/algol/parc_Memory.h>
+
+#include <ccnx/transport/transport_rta/connectors/rta_ApiConnection.h>
+
+#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+#include <ccnx/transport/transport_rta/connectors/connector_Api.h>
+
+#include <ccnx/api/control/controlPlaneInterface.h>
+
+#ifndef DEBUG_OUTPUT
+#define DEBUG_OUTPUT 0
+#endif
+
+static int connector_Api_Init(RtaProtocolStack *stack);
+static int connector_Api_Opener(RtaConnection *conn);
+static void connector_Api_Upcall_Read(PARCEventQueue *, PARCEventType, void *conn);
+static int connector_Api_Closer(RtaConnection *conn);
+static int connector_Api_Release(RtaProtocolStack *stack);
+static void connector_Api_StateChange(RtaConnection *conn);
+
+RtaComponentOperations api_ops =
+{
+ .init = connector_Api_Init,
+ .open = connector_Api_Opener,
+ .upcallRead = connector_Api_Upcall_Read,
+ .upcallEvent = NULL,
+ .downcallRead = NULL,
+ .downcallEvent = NULL,
+ .close = connector_Api_Closer,
+ .release = connector_Api_Release,
+ .stateChange = connector_Api_StateChange
+};
+
+// ========================
+
+static int
+connector_Api_Init(RtaProtocolStack *stack)
+{
+ // nothing to do here
+ if (DEBUG_OUTPUT) {
+ printf("%s init stack %p\n",
+ __func__,
+ (void *) stack);
+ }
+ return 0;
+}
+
+/*
+ * Api_Open will put the RtaConnection as the callback parameter in the UpcallRead,
+ * because its a per-connection descriptor.
+ *
+ * Returns 0 on success, -1 on error
+ */
+static int
+connector_Api_Opener(RtaConnection *connection)
+{
+ RtaComponentStats *stats;
+ RtaApiConnection *apiConnection = rtaApiConnection_Create(connection);
+
+ rtaConnection_SetPrivateData(connection, API_CONNECTOR, apiConnection);
+
+ stats = rtaConnection_GetStats(connection, API_CONNECTOR);
+ assertNotNull(stats, "%s returned null stats\n", __func__);
+ rtaComponentStats_Increment(stats, STATS_OPENS);
+
+ rtaConnection_SetState(connection, CONN_OPEN);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s opened transport_fd %d\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(connection))),
+ __func__,
+ rtaConnection_GetTransportFd(connection));
+
+ printf("%9" PRIu64 " %s open conn %p state %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(connection))),
+ __func__,
+ (void *) connection,
+ (void *) apiConnection);
+ }
+
+ return 0;
+}
+
+/*
+ * Read a message from below in stack
+ * Write a message up to the API
+ */
+static void
+connector_Api_Upcall_Read(PARCEventQueue *eventBuffer, PARCEventType type, void *protocolStackVoid)
+{
+ TransportMessage *tm;
+
+ assertNotNull(protocolStackVoid, "%s called with null ProtocolStack\n", __func__);
+
+ while ((tm = rtaComponent_GetMessage(eventBuffer)) != NULL) {
+ RtaConnection *conn = rtaConnection_GetFromTransport(tm);
+ assertNotNull(conn, "got null connection from transport message\n");
+
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, API_CONNECTOR);
+ assertNotNull(stats, "returned null stats\n");
+
+ rtaComponentStats_Increment(stats, STATS_UPCALL_IN);
+
+ RtaApiConnection *apiConnection = rtaConnection_GetPrivateData(conn, API_CONNECTOR);
+ assertNotNull(apiConnection, "got null apiConnection\n");
+
+ // If we are blocked, only pass control messages
+ if (!rtaConnection_BlockedUp(conn) || transportMessage_IsControl(tm)) {
+ if (!rtaApiConnection_SendToApi(apiConnection, tm, stats)) {
+ // memory is freed at bottom of function
+ }
+ } else {
+ // closed connection, just destroy the message
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p destroying transport message %p due to closed connection\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn,
+ (void *) tm);
+ }
+ }
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p total upcall reads in %" PRIu64 " out %" PRIu64 "\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn,
+ rtaComponentStats_Get(stats, STATS_UPCALL_IN),
+ rtaComponentStats_Get(stats, STATS_UPCALL_OUT));
+ }
+
+ // This is the end of life for the transport message. If the inner TlvDictionary
+ // was put in a CCNxMessage and sent up the stack, then we made another reference to it
+ // so this destroy will not destroy that part.
+ transportMessage_Destroy(&tm);
+ }
+}
+
+/*
+ * The higher layer should no longer be writing to this
+ * socketpair, so we can drain it then close it.
+ */
+static int
+connector_Api_Closer(RtaConnection *conn)
+{
+ RtaComponentStats *stats;
+ RtaApiConnection *apiConnection = rtaConnection_GetPrivateData(conn, API_CONNECTOR);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s starting close conn %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn);
+ }
+
+ stats = rtaConnection_GetStats(conn, API_CONNECTOR);
+ assertNotNull(stats, "%s returned null stats\n", __func__);
+ rtaComponentStats_Increment(stats, STATS_CLOSES);
+
+ // This will prevent any new data going in to queues for the connection
+ // Existing messages will be destroyed
+ rtaConnection_SetState(conn, CONN_CLOSED);
+
+ rtaApiConnection_Destroy(&apiConnection);
+ rtaConnection_SetPrivateData(conn, API_CONNECTOR, NULL);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s close conn %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn);
+ }
+
+ return 0;
+}
+
+static int
+connector_Api_Release(RtaProtocolStack *stack)
+{
+ // nothing to do here, there's no ProtocolStack state
+ if (DEBUG_OUTPUT) {
+ printf("%s release stack %p\n",
+ __func__,
+ (void *) stack);
+ }
+
+ return 0;
+}
+
+/**
+ * Respond to events for the connection
+ *
+ * Typcially, the forwarder connector will block and unblock the DOWN direction. We need
+ * to stop putting new data in the down directon if its blocked.
+ *
+ * The API connector (us) is generally the thing blocking the UP direction, so we don't need
+ * to respond to those (our own) events.
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * {
+ * ComponentOperations api_ops = {
+ * // [other settings]
+ * .stateChange = connector_Api_StateChange
+ * };
+ * }
+ * @endcode
+ */
+static void
+connector_Api_StateChange(RtaConnection *conn)
+{
+ RtaApiConnection *apiConnection = rtaConnection_GetPrivateData(conn, API_CONNECTOR);
+
+ // we do not test the rtaConnection_BlockedUp() because we are the one setting those
+
+ // If we are blocked in the DOWN direction, disable events on the read queue
+ if (rtaConnection_BlockedDown(conn)) {
+ rtaApiConnection_BlockDown(apiConnection);
+ } else {
+ rtaApiConnection_UnblockDown(apiConnection);
+ }
+}
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.h b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.h
new file mode 100644
index 00000000..6299116b
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Api.h
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef Libccnx_connector_api_h
+#define Libccnx_connector_api_h
+
+// Function structs for component variations
+extern RtaComponentOperations api_ops;
+#endif
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder.h b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder.h
new file mode 100644
index 00000000..64a3c6e9
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder.h
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//
+// connector_Forwarder.h
+// Libccnx
+//
+//
+
+#ifndef Libccnx_connector_fwd_h
+#define Libccnx_connector_fwd_h
+
+// Function structs for component variations
+extern RtaComponentOperations fwd_flan_ops;
+extern RtaComponentOperations fwd_local_ops;
+extern RtaComponentOperations fwd_tlvrtr_ops;
+extern RtaComponentOperations fwd_metis_ops;
+#endif
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c
new file mode 100644
index 00000000..a434600a
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c
@@ -0,0 +1,552 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PF_LOCAL forwarder glue, mostly for testing. This uses a
+ * STREAM socket with a user specified coding. Each message
+ * on the stream is of this format:
+ *
+ * uint32_t process pid
+ * uint32_t user_socket_fd
+ * uint32_t message bytes that follow
+ * uint8_t[] message encoded with user specified codec
+ *
+ * The user_socket_fd will be the same number that the API was assigned
+ * in transportRta_Socket->api_socket_pair[PAIR_OTHER].
+ *
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <errno.h>
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <parc/algol/parc_EventBuffer.h>
+
+#include <LongBow/runtime.h>
+#include <LongBow/debugging.h>
+
+#include <parc/algol/parc_Memory.h>
+
+#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h>
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+#include <ccnx/transport/transport_rta/connectors/connector_Forwarder.h>
+
+#include <ccnx/transport/transport_rta/config/config_Forwarder_Local.h>
+#include <ccnx/api/control/controlPlaneInterface.h>
+#include <ccnx/api/control/cpi_ControlFacade.h>
+
+#include <ccnx/common/ccnx_WireFormatMessage.h>
+
+#ifndef DEBUG_OUTPUT
+#define DEBUG_OUTPUT 0
+#endif
+
+static int connector_Fwd_Local_Init(RtaProtocolStack *stack);
+static int connector_Fwd_Local_Opener(RtaConnection *conn);
+static void connector_Fwd_Local_Upcall_Read(PARCEventQueue *, PARCEventType, void *conn);
+static void connector_Fwd_Local_Upcall_Event(PARCEventQueue *, PARCEventQueueEventType, void *stack);
+static void connector_Fwd_Local_Downcall_Read(PARCEventQueue *, PARCEventType, void *conn);
+static int connector_Fwd_Local_Closer(RtaConnection *conn);
+static int connector_Fwd_Local_Release(RtaProtocolStack *stack);
+static void connector_Fwd_Local_StateChange(RtaConnection *conn);
+
+RtaComponentOperations fwd_local_ops = {
+ .init = connector_Fwd_Local_Init,
+ .open = connector_Fwd_Local_Opener,
+ .upcallRead = connector_Fwd_Local_Upcall_Read,
+ .upcallEvent = connector_Fwd_Local_Upcall_Event,
+ .downcallRead = connector_Fwd_Local_Downcall_Read,
+ .downcallEvent = NULL,
+ .close = connector_Fwd_Local_Closer,
+ .release = connector_Fwd_Local_Release,
+ .stateChange = connector_Fwd_Local_StateChange
+};
+
+struct fwd_local_state {
+ int fd;
+ PARCEventQueue *bev_local;
+ int connected;
+};
+
+typedef struct {
+ uint32_t pid;
+ uint32_t fd;
+ uint32_t length;
+ uint32_t pad; // make it 16 bytes
+} __attribute__ ((packed)) localhdr;
+
+// ================================
+// NULL
+
+static int
+connector_Fwd_Local_Init(RtaProtocolStack *stack)
+{
+ // no stack-wide initialization
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s init stack %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(stack)),
+ __func__,
+ (void *) stack);
+ }
+ return 0;
+}
+
+/*
+ * Create a PF_LOCAL socket
+ * Set it non-blocking
+ * Wrap it in a buffer event
+ * Set Read and Event callbacks
+ * connect to LOCAL_NAME
+ *
+ * Return 0 success, -1 failure
+ */
+static int
+connector_Fwd_Local_Opener(RtaConnection *conn)
+{
+ PARCEventScheduler *base;
+ RtaProtocolStack *stack;
+ const char *sock_name;
+
+ stack = rtaConnection_GetStack(conn);
+ base = rtaFramework_GetEventScheduler(rtaProtocolStack_GetFramework(stack));
+
+ sock_name = localForwarder_GetPath(rtaConnection_GetParameters(conn));
+ assertNotNull(sock_name, "connector_Fwd_Local_Opener called without setting LOCAL_NAME");
+
+ if (sock_name == NULL) {
+ return -1;
+ }
+
+ struct fwd_local_state *fwd_state = parcMemory_Allocate(sizeof(struct fwd_local_state));
+ assertNotNull(fwd_state, "parcMemory_Allocate(%zu) returned NULL", sizeof(struct fwd_local_state));
+
+ rtaConnection_SetPrivateData(conn, FWD_LOCAL, fwd_state);
+
+ fwd_state->fd = socket(PF_LOCAL, SOCK_STREAM, 0);
+ if (fwd_state->fd < 0) {
+ perror("socket PF_LOCAL");
+ }
+ assertFalse(fwd_state->fd < 0, "socket PF_LOCAL error");
+
+ struct sockaddr_un addr_unix;
+ memset(&addr_unix, 0, sizeof(struct sockaddr_un));
+ addr_unix.sun_family = AF_UNIX;
+
+ trapIllegalValueIf(sizeof(addr_unix.sun_path) <= strlen(sock_name), "sock_name too long, maximum length %zu", sizeof(addr_unix.sun_path) - 1);
+ strcpy(addr_unix.sun_path, sock_name);
+
+ // Setup the socket as non-blocking then wrap in a parcEventQueue.
+
+ int flags = fcntl(fwd_state->fd, F_GETFL, NULL);
+ assertFalse(flags < 0, "fcntl failed to obtain file descriptor flags (%d)\n", errno);
+
+ int failure = fcntl(fwd_state->fd, F_SETFL, flags | O_NONBLOCK);
+ assertFalse(failure, "fcntl failed to set file descriptor flags (%d)\n", errno);
+
+ assertTrue(failure == 0, "could not make socket non-blocking");
+ if (failure < 0) {
+ rtaConnection_SetPrivateData(conn, FWD_LOCAL, NULL);
+ close(fwd_state->fd);
+ parcMemory_Deallocate((void **) &fwd_state);
+ return -1;
+ }
+
+ fwd_state->bev_local = parcEventQueue_Create(base, fwd_state->fd, PARCEventQueueOption_CloseOnFree);
+
+ assertNotNull(fwd_state->bev_local, "Null buffer event for local socket.");
+
+ parcEventQueue_SetCallbacks(fwd_state->bev_local,
+ connector_Fwd_Local_Upcall_Read,
+ NULL,
+ connector_Fwd_Local_Upcall_Event,
+ conn);
+
+ parcEventQueue_Enable(fwd_state->bev_local, PARCEventType_Read);
+
+ memset(&addr_unix, 0, sizeof(addr_unix));
+ addr_unix.sun_family = AF_UNIX;
+
+ trapIllegalValueIf(sizeof(addr_unix.sun_path) <= strlen(sock_name), "sock_name too long, maximum length %zu", sizeof(addr_unix.sun_path) - 1);
+ strcpy(addr_unix.sun_path, sock_name);
+
+ // This will deliver a PARCEventQueue_Connected on connect success
+ if (parcEventQueue_ConnectSocket(fwd_state->bev_local,
+ (struct sockaddr*) &addr_unix,
+ (socklen_t) sizeof(addr_unix)) < 0) {
+ perror("connect PF_LOCAL");
+ assertTrue(0, "connect PF_LOCAL");
+ rtaConnection_SetPrivateData(conn, FWD_LOCAL, NULL);
+ close(fwd_state->fd);
+ parcMemory_Deallocate((void **) &fwd_state);
+ return -1;
+ }
+
+ // Socket will be ready for use once we get PARCEventQueueEventType_Connected
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s open conn %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn);
+ }
+
+ return 0;
+}
+
+/*
+ * Read from bev_local. We are passed the connection on the ptr.
+ */
+static void
+connector_Fwd_Local_Upcall_Read(PARCEventQueue *bev, PARCEventType type, void *ptr)
+{
+ RtaConnection *conn = (RtaConnection *) ptr;
+ RtaProtocolStack *stack = rtaConnection_GetStack(conn);
+ PARCEventBuffer *in = parcEventBuffer_GetQueueBufferInput(bev);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_LOCAL, RTA_UP);
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_LOCAL);
+ TransportMessage *tm;
+
+ unsigned char *mem;
+ int res;
+
+ // only move forward if enough bytes available
+
+ while (parcEventBuffer_GetLength(in) >= sizeof(localhdr)) {
+ size_t msg_length;
+
+ mem = parcEventBuffer_Pullup(in, sizeof(localhdr));
+ if (mem == NULL) {
+ // not enough bytes
+ parcEventBuffer_Destroy(&in);
+ return;
+ }
+
+ msg_length = ((localhdr *) mem)->length;
+ if (parcEventBuffer_GetLength(in) < msg_length + sizeof(localhdr)) {
+ // not enough bytes
+ parcEventBuffer_Destroy(&in);
+ return;
+ }
+
+ PARCBuffer *wireFormat = parcBuffer_Allocate(msg_length);
+ assertNotNull(wireFormat, "parcBuffer_Allocate(%zu) returned NULL", msg_length);
+
+ rtaComponentStats_Increment(stats, STATS_UPCALL_IN);
+
+ // we can read a whole message. Read it directly in to a buffer
+ // Skip the FWD_LOCAL header
+ res = parcEventBuffer_Read(in, NULL, sizeof(localhdr));
+ assertTrue(res == 0, "Got error draining header from buffer");
+
+ uint8_t *overlay = parcBuffer_Overlay(wireFormat, msg_length);
+ res = parcEventBuffer_Read(in, overlay, msg_length);
+ overlay = NULL;
+
+ assertTrue(res == msg_length,
+ "parcEventBuffer_Read returned wrong size, expected %zu got %d",
+ msg_length, res);
+
+ parcBuffer_Flip(wireFormat);
+
+ if (rtaConnection_GetState(conn) == CONN_OPEN) {
+ CCNxWireFormatMessage *wireFormatMessage = ccnxWireFormatMessage_Create(wireFormat);
+ CCNxTlvDictionary *dictionary = ccnxWireFormatMessage_GetDictionary(wireFormatMessage);
+ if (dictionary != NULL) {
+ // wrap it for transport module
+ tm = transportMessage_CreateFromDictionary(dictionary);
+
+ // add the connection info to the transport message before sending up stack
+ transportMessage_SetInfo(tm, rtaConnection_Copy(conn), rtaConnection_FreeFunc);
+
+ // send it up the stack
+ if (rtaComponent_PutMessage(out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ }
+
+ // Now release our hold on the wireFormatMessage (aka dictionary)
+ ccnxWireFormatMessage_Release(&wireFormatMessage);
+ } else {
+ printf("Failed to create CCNxTlvDictionary from wireformat\n");
+ parcBuffer_Display(wireFormat, 3);
+ }
+ } else {
+ //drop packets
+ }
+
+ parcBuffer_Release(&wireFormat);
+ }
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s total upcall reads in %" PRIu64 " out %" PRIu64 "\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaComponentStats_Get(stats, STATS_UPCALL_IN),
+ rtaComponentStats_Get(stats, STATS_UPCALL_OUT));
+ }
+ parcEventBuffer_Destroy(&in);
+}
+
+/*
+ * Event on connection to forwarder.
+ * Passed the RtaConnection in the pointer
+ */
+static void
+connector_Fwd_Local_Upcall_Event(PARCEventQueue *queue, PARCEventQueueEventType events, void *ptr)
+{
+ RtaConnection *conn = (RtaConnection *) ptr;
+
+ struct fwd_local_state *fwd_state = rtaConnection_GetPrivateData(conn, FWD_LOCAL);
+
+ if (events & PARCEventQueueEventType_Connected) {
+ if (DEBUG_OUTPUT) {
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ printf("%6lu.%06ld %s (pid %d) connected socket %d\n",
+ tv.tv_sec, (long) tv.tv_usec,
+ __func__,
+ getpid(),
+ rtaConnection_GetTransportFd(conn));
+ }
+
+ fwd_state->connected = 1;
+ rtaConnection_SendStatus(conn, FWD_LOCAL, RTA_UP, notifyStatusCode_CONNECTION_OPEN, NULL, NULL);
+ } else if (events & PARCEventQueueEventType_Error) {
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+
+ longBowRuntime_StackTrace(1);
+
+ if (events & PARCEventQueueEventType_Reading) {
+ printf("%6lu.%06ld %s (pid %d) Got read error on PF_LOCAL, transport socket %d: (%d) %s\n",
+ tv.tv_sec, (long) tv.tv_usec,
+ __func__,
+ getpid(),
+ rtaConnection_GetTransportFd(conn),
+ errno,
+ strerror(errno));
+ } else if (events & PARCEventQueueEventType_Writing) {
+ printf("%6lu.%06ld %s (pid %d) Got write error on PF_LOCAL, transport socket %d: (%d) %s\n",
+ tv.tv_sec, (long) tv.tv_usec,
+ __func__,
+ getpid(),
+ rtaConnection_GetTransportFd(conn),
+ errno,
+ strerror(errno));
+ } else {
+ printf("%6lu.%06ld %s (pid %d) Got error on PF_LOCAL, transport socket %d: (%d) %s\n",
+ tv.tv_sec, (long) tv.tv_usec,
+ __func__,
+ getpid(),
+ rtaConnection_GetTransportFd(conn),
+ errno,
+ strerror(errno));
+ }
+
+ /* An error occured while connecting. */
+ rtaConnection_SendStatus(conn, FWD_LOCAL, RTA_UP, notifyStatusCode_FORWARDER_NOT_AVAILABLE, NULL, NULL);
+ }
+}
+
+static void
+_ackRequest(RtaConnection *conn, PARCJSON *request)
+{
+ PARCJSON *response = cpiAcks_CreateAck(request);
+ CCNxTlvDictionary *ackDict = ccnxControlFacade_CreateCPI(response);
+
+ TransportMessage *tm_ack = transportMessage_CreateFromDictionary(ackDict);
+ ccnxTlvDictionary_Release(&ackDict);
+ parcJSON_Release(&response);
+
+ transportMessage_SetInfo(tm_ack, rtaConnection_Copy(conn), rtaConnection_FreeFunc);
+
+ RtaProtocolStack *stack = rtaConnection_GetStack(conn);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_LOCAL, RTA_UP);
+ if (rtaComponent_PutMessage(out, tm_ack)) {
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_LOCAL);
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ }
+}
+
+static void
+connector_Fwd_Local_ProcessControl(RtaConnection *conn, TransportMessage *tm)
+{
+ CCNxTlvDictionary *controlDictionary = transportMessage_GetDictionary(tm);
+
+ if (ccnxControlFacade_IsCPI(controlDictionary)) {
+ PARCJSON *json = ccnxControlFacade_GetJson(controlDictionary);
+ if (controlPlaneInterface_GetCPIMessageType(json) == CPI_REQUEST) {
+ if (cpi_getCPIOperation2(json) == CPI_PAUSE) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p recieved PAUSE\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn);
+ }
+ _ackRequest(conn, json);
+ } else if (cpi_getCPIOperation2(json) == CPI_FLUSH) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p recieved FLUSH\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn);
+ }
+ _ackRequest(conn, json);
+ } else {
+ // some other message. We just ACK everything in the local connector.
+ _ackRequest(conn, json);
+ }
+ }
+ }
+}
+
+static void
+connector_Fwd_Local_WriteIovec(struct fwd_local_state *fwdConnState, RtaConnection *conn, CCNxCodecNetworkBufferIoVec *vec, RtaComponentStats *stats)
+{
+ localhdr lh;
+
+ memset(&lh, 0, sizeof(localhdr));
+ lh.pid = getpid();
+ lh.fd = rtaConnection_GetTransportFd(conn);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s total downcall reads %" PRIu64 "\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_IN));
+ }
+
+ int iovcnt = ccnxCodecNetworkBufferIoVec_GetCount(vec);
+ const struct iovec *array = ccnxCodecNetworkBufferIoVec_GetArray(vec);
+
+ lh.length = 0;
+ for (int i = 0; i < iovcnt; i++) {
+ lh.length += array[i].iov_len;
+ }
+
+ if (parcEventQueue_Write(fwdConnState->bev_local, &lh, sizeof(lh)) < 0) {
+ trapUnrecoverableState("%s error writing to bev_local", __func__);
+ }
+
+ for (int i = 0; i < iovcnt; i++) {
+ if (parcEventQueue_Write(fwdConnState->bev_local, array[i].iov_base, array[i].iov_len) < 0) {
+ trapUnrecoverableState("%s error writing iovec to bev_local", __func__);
+ }
+ }
+}
+
+/* send raw packet from codec to forwarder */
+static void
+connector_Fwd_Local_Downcall_Read(PARCEventQueue *in, PARCEventType event, void *ptr)
+{
+ TransportMessage *tm;
+
+ while ((tm = rtaComponent_GetMessage(in)) != NULL) {
+ RtaConnection *conn;
+ struct fwd_local_state *fwdConnState;
+ RtaComponentStats *stats;
+
+ CCNxTlvDictionary *messageDictionary = transportMessage_GetDictionary(tm);
+
+ conn = rtaConnection_GetFromTransport(tm);
+ fwdConnState = rtaConnection_GetPrivateData(conn, FWD_LOCAL);
+ stats = rtaConnection_GetStats(conn, FWD_LOCAL);
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN);
+
+ // ignore configuration messages for the send
+ if (ccnxTlvDictionary_IsControl(messageDictionary)) {
+ connector_Fwd_Local_ProcessControl(conn, tm);
+ } else {
+ CCNxCodecNetworkBufferIoVec *vec = ccnxWireFormatMessage_GetIoVec(messageDictionary);
+ assertNotNull(vec, "%s got null wire format\n", __func__);
+
+ connector_Fwd_Local_WriteIovec(fwdConnState, conn, vec, stats);
+
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT);
+ }
+
+ // we can release everything here. connector_Fwd_Local_WriteIovec made its own references
+ // to the wire format if it needed them.
+ transportMessage_Destroy(&tm);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s total downcall reads in %" PRIu64 " out %" PRIu64 "\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_IN),
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT));
+ }
+ }
+}
+
+static int
+connector_Fwd_Local_Closer(RtaConnection *conn)
+{
+ struct fwd_local_state *fwd_state = rtaConnection_GetPrivateData(conn, FWD_LOCAL);
+ RtaComponentStats *stats;
+
+ assertNotNull(fwd_state, "invalid state");
+ assertNotNull(fwd_state->bev_local, "invalid PARCEventQueue pointer");
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s called on fwd_state %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), __func__, (void *) fwd_state);
+ }
+
+ stats = rtaConnection_GetStats(conn, FWD_LOCAL);
+
+ // this will close too
+ parcEventQueue_Destroy(&(fwd_state->bev_local));
+ memset(fwd_state, 0, sizeof(struct fwd_local_state));
+ parcMemory_Deallocate((void **) &fwd_state);
+
+ rtaConnection_SetPrivateData(conn, FWD_LOCAL, NULL);
+ rtaComponentStats_Increment(stats, STATS_CLOSES);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s closed fwd_state %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), __func__, (void *) fwd_state);
+ }
+
+ return 0;
+}
+
+static int
+connector_Fwd_Local_Release(RtaProtocolStack *stack)
+{
+ // no stack-wide initialization
+ if (DEBUG_OUTPUT) {
+ printf("%s release stack %p\n",
+ __func__,
+ (void *) stack);
+ }
+
+ return 0;
+}
+
+static void
+connector_Fwd_Local_StateChange(RtaConnection *conn)
+{
+ //not implemented
+}
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c
new file mode 100644
index 00000000..59ad1dcb
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c
@@ -0,0 +1,1712 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The metis connector does the following per connection:
+ * - Opens a TCP socket to Metis
+ * - Creates an "event" for the socket, does not use the buffer to avoid doing extra copy.
+ * - On read events, uses direct socket operations to read in data
+ *
+ * - DOES NOT HANDLE FRAMING ERRORS. If somehow metis and the connector get
+ * out of whack (technical term), there is no recovery.
+ *
+ * - The connection to metis is started in the Opener, but may not complete by the time
+ * the user sends data down in the Downcall_Read. We should not process the Downcall_Read
+ * until we get the Upcall_Event of connected. When we finally get the connected event,
+ * we should make the Downcall_Read pending again (or just call it) to flush the pending
+ * user data out to metis.
+ *
+ * - Because of how we get scheduled, there might be a large batch of messages waiting at the
+ * forwarder. We don't want to put a giant blob up the stack. So, we keep a deque of TransportMessage
+ * and only feed a few at a time up.
+ *
+ * - Accepts both a PARCBuffer or a CCNxCodecNetworkBufferIoVec as the wire format in the DOWN direction.
+ * - The UP direction is always a PARCBuffer right now
+ *
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <errno.h>
+#include <arpa/inet.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <netdb.h>
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <LongBow/runtime.h>
+
+#include <parc/algol/parc_Memory.h>
+#include <parc/algol/parc_Deque.h>
+#include <parc/algol/parc_EventBuffer.h>
+#include <parc/algol/parc_EventTimer.h>
+#include <parc/algol/parc_Network.h>
+
+#include <ccnx/transport/common/transport_Message.h>
+
+#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h>
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+
+#include "connector_Forwarder.h"
+
+#include <ccnx/transport/transport_rta/config/config_Forwarder_Metis.h>
+
+#include <ccnx/api/control/controlPlaneInterface.h>
+#include <ccnx/api/control/cpi_ControlFacade.h>
+
+#include <ccnx/common/codec/ccnxCodec_TlvEncoder.h>
+#include <ccnx/common/codec/ccnxCodec_TlvDecoder.h>
+#include <ccnx/common/internal/ccnx_TlvDictionary.h>
+
+#include <ccnx/common/codec/ccnxCodec_TlvPacket.h>
+#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_FixedHeader.h>
+#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_Types.h>
+
+#include <ccnx/common/ccnx_WireFormatMessage.h>
+
+#define MINIMUM_READ_LENGTH 8
+
+// The message type for a Metis control packet
+#define METIS_CONTROL_TYPE 0xA4
+
+// at most 10MB, this is used as the output buffer down to metis
+#define METIS_OUTPUT_QUEUE_BYTES (10 * 1024 * 1024)
+
+// How big should we try to make the output socket size?
+#define METIS_SEND_SOCKET_BUFFER 65536
+
+// Maximum input backlog in messages, not bytes
+#define METIS_INPUT_QUEUE_MESSAGES 100
+
+#ifndef DEBUG_OUTPUT
+#define DEBUG_OUTPUT 0
+#endif
+
+static int connector_Fwd_Metis_Init(RtaProtocolStack *stack);
+static int connector_Fwd_Metis_Opener(RtaConnection *conn);
+
+static void _eventCallback(int fd, PARCEventType what, void *connectionVoid);
+static void connector_Fwd_Metis_Dequeue(int fd, PARCEventType which_event, void *metisStateVoid);
+
+static void connector_Fwd_Metis_Downcall_Read(PARCEventQueue *, PARCEventType, void *conn);
+static int connector_Fwd_Metis_Closer(RtaConnection *conn);
+static int connector_Fwd_Metis_Release(RtaProtocolStack *stack);
+static void connector_Fwd_Metis_StateChange(RtaConnection *conn);
+
+RtaComponentOperations fwd_metis_ops = {
+ .init = connector_Fwd_Metis_Init,
+ .open = connector_Fwd_Metis_Opener,
+ .upcallRead = NULL,
+ .upcallEvent = NULL,
+ .downcallRead = connector_Fwd_Metis_Downcall_Read,
+ .downcallEvent = NULL,
+ .close = connector_Fwd_Metis_Closer,
+ .release = connector_Fwd_Metis_Release,
+ .stateChange = connector_Fwd_Metis_StateChange
+};
+
+typedef enum {
+ PacketType_Interest,
+ PacketType_ContentObject,
+ PacketType_Control,
+ PacketType_InterestReturn,
+ PacketType_Unknown
+} _PacketType;
+
+typedef struct metis_connector_stats {
+ unsigned countUpcallReads;
+ unsigned countUpcallWriteDataOk;
+ unsigned countUpcallWriteDataError;
+ unsigned countUpcallWriteDataBlocked;
+ unsigned countUpcallWriteDataQueueFull;
+
+ unsigned countUpcallWriteControlOk;
+ unsigned countUpcallWriteControlError;
+
+ unsigned countDowncallReads;
+ unsigned countDowncallWrites;
+ unsigned countDowncallControl;
+} _MetisConnectorStats;
+
+/**
+ * This structure holds the read-ahead data for the next message being read based
+ * on its fixed header
+ */
+typedef struct next_message_header {
+ // this is how we frame received messages on a stream connection. We
+ // wait until we read a complete fixed header, then we can set the length
+ // of that message and keep waiting until we receive at least that many bytes.
+ size_t length;
+
+ // at the time when we parse out the message length from the fixed header,
+ // we also parse out the TLV message type from the fixed header
+ _PacketType packetType;
+ uint8_t version;
+
+ // we will read bytes into this structure
+ union _hdr {
+ CCNxCodecSchemaV1FixedHeader v1;
+ uint8_t buffer[MINIMUM_READ_LENGTH];
+ } fixedHeader;
+
+ uint8_t *readLocation;
+ size_t remainingReadLength;
+
+ // The whole message
+ PARCBuffer *packet;
+} NextMessage;
+
+typedef struct fwd_metis_state {
+ uint16_t port;
+ int fd;
+
+ // separate events for read and write on fd so we can individually enable them
+ PARCEvent *readEvent;
+ PARCEvent *writeEvent;
+
+ bool isConnected;
+
+ // This is our read-ahead of the next message fixed header
+ NextMessage nextMessage;
+
+ // the transportMessageQueueEvent is used to dequeue from the queue.
+ // we make sure its scheduled so long as there's messages in the queue, even if there's
+ // nothing else being read
+ PARCDeque *transportMessageQueue;
+ PARCEventTimer *transportMessageQueueEvent;
+
+ // This buffer is the queue of stuff we need to send to the network
+ PARCEventBuffer *metisOutputQueue;
+
+ _MetisConnectorStats stats;
+} FwdMetisState;
+
+/**
+ * @typedef PacketData
+ * @brief Used to pass a record between reading a packet and sending it up the stack
+ * @discussion Used internally to pass data between functions
+ */
+typedef struct packet_data {
+ FwdMetisState *fwd_state;
+ RtaConnection *conn;
+ PARCEventQueue *out;
+ RtaComponentStats *stats;
+} PacketData;
+
+
+// for debugging
+static unsigned fwd_metis_references_queued = 0;
+static unsigned fwd_metis_references_dequeued = 0;
+static unsigned fwd_metis_references_notqueued = 0;
+
+
+typedef enum {
+ ReadReturnCode_Finished, // read all needed bytes
+ ReadReturnCode_PartialRead, // still need some bytes
+ ReadReturnCode_Closed, // the socket is closed
+ ReadReturnCode_Error, // An error on the socket
+} ReadReturnCode;
+
+// ================================
+
+static void
+_nextMessage_Display(const NextMessage *next, unsigned indent)
+{
+ printf("NextMessage %p length %zu type %d version %u readLocation %p remaining %zu\n",
+ (void *) next, next->length, next->packetType, next->version, (void *) next->readLocation, next->remainingReadLength);
+
+ printf("fixedHeader\n");
+ longBowDebug_MemoryDump((const char *) next->fixedHeader.buffer, MINIMUM_READ_LENGTH);
+
+ if (next->packet) {
+ parcBuffer_Display(next->packet, 3);
+ }
+}
+
+static int
+connector_Fwd_Metis_Init(RtaProtocolStack *stack)
+{
+ struct sigaction ignore_action;
+ ignore_action.sa_handler = SIG_IGN;
+ sigemptyset(&ignore_action.sa_mask);
+ ignore_action.sa_flags = 0;
+ sigaction(SIGPIPE, &ignore_action, NULL);
+
+ return 0;
+}
+
+
+/**
+ * Setup the NextMessage structure to begin reading a fixed header
+ *
+ * All fields are zeroed and the readLocation is set to the first byte of the fixedHeader.
+ * The remainingReadLength is set to the size of the fixedHeader.
+ *
+ * @param [in] next An allocated NextMessage to initialize
+ *
+ * Example:
+ * @code
+ * {
+ * NextMessage nextMessage;
+ * _initializeNextMessage(&nextMessage);
+ * }
+ * @endcode
+ */
+static void
+_initializeNextMessage(NextMessage *next)
+{
+ memset(next, 0, sizeof(NextMessage));
+ next->version = 0xFF;
+ next->packetType = PacketType_Unknown;
+ next->readLocation = next->fixedHeader.buffer;
+ next->remainingReadLength = MINIMUM_READ_LENGTH;
+}
+
+static FwdMetisState *
+connector_Fwd_Metis_CreateConnectionState(PARCEventScheduler *scheduler)
+{
+ FwdMetisState *fwd_state = parcMemory_Allocate(sizeof(FwdMetisState));
+ assertNotNull(fwd_state, "parcMemory_Allocate(%zu) returned NULL", sizeof(FwdMetisState));
+
+ memset(fwd_state, 0, sizeof(FwdMetisState));
+ _initializeNextMessage(&fwd_state->nextMessage);
+
+ fwd_state->fd = 0;
+ fwd_state->readEvent = NULL;
+ fwd_state->writeEvent = NULL;
+ fwd_state->transportMessageQueue = parcDeque_Create();
+ fwd_state->transportMessageQueueEvent = parcEventTimer_Create(scheduler, 0, connector_Fwd_Metis_Dequeue, fwd_state);
+ fwd_state->isConnected = false;
+ fwd_state->metisOutputQueue = parcEventBuffer_Create();
+
+ return fwd_state;
+}
+
+static bool
+_openSocket(FwdMetisState *fwd_state, uint16_t port)
+{
+ fwd_state->port = port;
+ fwd_state->fd = socket(PF_INET, SOCK_STREAM, 0);
+
+ if (fwd_state->fd < 0) {
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s failed to open PF_INET SOCK_STREAM socket: (%d) %s\n",
+ ' ', __func__, errno, strerror(errno));
+ }
+ return false;
+ }
+
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s create socket %d port %u\n",
+ ' ', __func__, fwd_state->fd, fwd_state->port);
+ }
+
+ return true;
+}
+
+/**
+ * @function connector_Fwd_Metis_SetupSocket
+ * @abstract Creates the socket and sets the port, but does not call connect
+ * @discussion
+ * Creates and sets up the socket descriptor. makes it non-blocking.
+ * Sets the port in FwdMetisState.
+ *
+ * This is a full PF_INET socket, not forced to PF_LOCAL.
+ *
+ * The sendbuffer size is set to METIS_OUTPUT_QUEUE_BYTES
+ *
+ * precondition: called _openSocket
+ *
+ * @param <#param1#>
+ * @return <#return#>
+ */
+static bool
+_setupSocket(FwdMetisState *fwd_state)
+{
+ trapUnexpectedStateIf(fwd_state->fd < 1, "Invalid socket %d", fwd_state->fd);
+
+ // Set non-blocking flag
+ int flags = fcntl(fwd_state->fd, F_GETFL, NULL);
+ assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno);
+ int res = fcntl(fwd_state->fd, F_SETFL, flags | O_NONBLOCK);
+
+ if (res < 0) {
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s failed to make socket non-blocking: (%d) %s\n",
+ ' ', __func__, errno, strerror(errno));
+ }
+
+ close(fwd_state->fd);
+ return false;
+ }
+
+ const int sendBufferSize = METIS_SEND_SOCKET_BUFFER;
+ res = setsockopt(fwd_state->fd, SOL_SOCKET, SO_SNDBUF, &sendBufferSize, sizeof(int));
+ if (res < 0) {
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s failed to set SO_SNDBUF to %d: (%d) %s\n",
+ ' ', __func__, sendBufferSize, errno, strerror(errno));
+ }
+ // This is a non-fatal error
+ }
+
+#if defined(SO_NOSIGPIPE)
+ // turn off SIGPIPE, return EPIPE
+ const int on = 1;
+ res = setsockopt(fwd_state->fd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on));
+ if (res < 0) {
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s failed to set SO_NOSIGPIPE to %d: (%d) %s\n",
+ ' ', __func__, sendBufferSize, errno, strerror(errno));
+ }
+ // this is not a fatal error, so keep going
+ }
+#endif
+
+ return true;
+}
+
+/**
+ * @function connector_Fwd_Metis_SetupConnectionBuffer
+ * @abstract Creates the connection buffer and adds it to libevent
+ * @discussion
+ * <#Discussion#>
+ *
+ * @param <#param1#>
+ * @return <#return#>
+ */
+static bool
+_setupSocketEvents(FwdMetisState *fwd_state, RtaConnection *conn)
+{
+ RtaProtocolStack *stack = rtaConnection_GetStack(conn);
+ PARCEventScheduler *scheduler = rtaFramework_GetEventScheduler(rtaProtocolStack_GetFramework(stack));
+
+ // the connect() call will be asynchrnous because the socket is non-blocking, so we
+ // need ET_WRITE to trigger a callback when the socket becomes writable (i.e. connected).
+ // If there's an error on connect it will be an ET_READ | ET_WRITE event with an error on the socket.
+ fwd_state->readEvent = parcEvent_Create(scheduler, fwd_state->fd, PARCEventType_Read | PARCEventType_Persist | PARCEventType_EdgeTriggered, _eventCallback, conn);
+ assertNotNull(fwd_state->readEvent, "Got a null readEvent for socket %d", fwd_state->fd);
+
+ fwd_state->writeEvent = parcEvent_Create(scheduler, fwd_state->fd, PARCEventType_Write | PARCEventType_Persist | PARCEventType_EdgeTriggered, _eventCallback, conn);
+ assertNotNull(fwd_state->writeEvent, "Got a null readEvent for socket %d", fwd_state->fd);
+
+ // Start the write event. It will be signaled on a connect error or when we are connected.
+ // The read event is not enabled until after connect.
+
+ int failure = parcEvent_Start(fwd_state->writeEvent);
+ assertFalse(failure < 0, "Error starting writeEvent event %p: (%d) %s", (void *) fwd_state->writeEvent, errno, strerror(errno));
+
+ return true;
+}
+
+/**
+ * The connection to the forwarder succeeded, step the state machine
+ *
+ * Change the state of the connection to connected and notify the user that it's ready.
+ *
+ * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+_connectionSucceeded(FwdMetisState *fwd_state, RtaConnection *conn)
+{
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s Connection %p connected fd %d\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn, fwd_state->fd);
+ }
+
+ fwd_state->isConnected = true;
+
+ // enable read events
+ parcEvent_Start(fwd_state->readEvent);
+
+ rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_CONNECTION_OPEN, NULL, NULL);
+}
+
+static void
+_readInEnvironmentConnectionSpecification(struct sockaddr_in *addr_in)
+{
+ char *forwarderIpEnv = getenv(FORWARDER_CONNECTION_ENV);
+ if (forwarderIpEnv == NULL) {
+ return;
+ }
+
+ char forwarderIpAddress[NI_MAXHOST] = { 0 };
+ in_port_t forwarderIpPort = 0;
+
+ // Currently, we only support tcp control connections to the forwarder
+ sscanf(forwarderIpEnv, "tcp://%[^:]:%hu", forwarderIpAddress, &forwarderIpPort);
+
+ // If provided, use the specified address in a canonical form
+ if (forwarderIpAddress[0] != '\0') {
+ // Normalize the provided hostname
+ struct sockaddr_in *addr = (struct sockaddr_in *) parcNetwork_SockAddress(forwarderIpAddress, forwarderIpPort);
+ char *ipAddress = inet_ntoa(addr->sin_addr);
+ parcMemory_Deallocate(&addr);
+ if (ipAddress) {
+ addr_in->sin_addr.s_addr = inet_addr(ipAddress);
+ } else {
+ addr_in->sin_addr.s_addr = inet_addr(forwarderIpAddress);
+ }
+ }
+
+ // If provided, use the specified port
+ if (forwarderIpPort != 0) {
+ addr_in->sin_port = htons(forwarderIpPort);
+ }
+}
+
+/**
+ * @function connector_Fwd_Metis_BeginConnect
+ * @abstract Begins the non-blocking connect() call to 127.0.0.1 on the port in FwdMetisState
+ * @discussion
+ * <#Discussion#>
+ *
+ * @param <#param1#>
+ * @return <#return#>
+ */
+static bool
+connector_Fwd_Metis_BeginConnect(FwdMetisState *fwd_state, RtaConnection *conn)
+{
+ bool success = false;
+
+ struct sockaddr_in addr_in;
+ memset(&addr_in, 0, sizeof(addr_in));
+ addr_in.sin_port = htons(fwd_state->port);
+ addr_in.sin_family = AF_INET;
+ addr_in.sin_addr.s_addr = inet_addr("127.0.0.1");
+
+ // Override defaults if specified
+ _readInEnvironmentConnectionSpecification(&addr_in);
+
+ if (DEBUG_OUTPUT) {
+ char inetAddress[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, &(addr_in.sin_addr), inetAddress, INET_ADDRSTRLEN);
+ printf("%9" PRIu64 " %s beginning connect socket %d to port %d on %s\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ fwd_state->fd,
+ fwd_state->port,
+ inetAddress);
+ }
+
+ // This will deliver a PARCEventType_Write event on connect success
+ int res = connect(fwd_state->fd, (struct sockaddr*) &addr_in, (socklen_t) sizeof(addr_in));
+
+ if (res == 0) {
+ // connect succeded immediately
+ _connectionSucceeded(fwd_state, conn);
+ success = true;
+ } else if (errno == EINPROGRESS) {
+ // connection is deferred
+ success = true;
+ } else {
+ // a hard error
+ printf("Error connecting: (%d) %s\n", errno, strerror(errno));
+ }
+
+ return success;
+}
+
+/**
+ * We maintain an input queue going up the stack and only dequeue a small number of packets
+ * with each call from the dispatch loop. THis is to avoid bursting a bunch of packets up the stack.
+ */
+static void
+connector_Fwd_Metis_Dequeue(int fd, PARCEventType which_event, void *metisStateVoid)
+{
+ FwdMetisState *fwd_state = (FwdMetisState *) metisStateVoid;
+
+ // random small number. What is right value for this?
+ unsigned max_loops = 6;
+
+ if (DEBUG_OUTPUT) {
+ printf("%9d %s deque size %zu\n",
+ 0,
+ __func__,
+ parcDeque_Size(fwd_state->transportMessageQueue));
+ }
+
+ while (max_loops > 0 && !parcDeque_IsEmpty(fwd_state->transportMessageQueue)) {
+ max_loops--;
+ TransportMessage *tm = parcDeque_RemoveFirst(fwd_state->transportMessageQueue);
+
+ RtaConnection *conn = rtaConnection_GetFromTransport(tm);
+ RtaProtocolStack *stack = rtaConnection_GetStack(conn);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_METIS, RTA_UP);
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS);
+
+ if (rtaComponent_PutMessage(out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ }
+ }
+
+ // If there are still messages in there, re-schedule
+ if (!parcDeque_IsEmpty(fwd_state->transportMessageQueue)) {
+ if (DEBUG_OUTPUT) {
+ printf("%9d %s rescheduling output queue timer %p\n",
+ 0,
+ __func__,
+ (void *) fwd_state->transportMessageQueueEvent);
+ }
+
+ struct timeval immediateTimeout = { 0, 0 };
+ parcEventTimer_Start(fwd_state->transportMessageQueueEvent, &immediateTimeout);
+ }
+}
+
+/**
+ * Create a TCP socket
+ * Set it non-blocking
+ * Wrap it in a buffer event
+ * Set Read and Event callbacks
+ *
+ * Return 0 success, -1 failure
+ */
+static int
+connector_Fwd_Metis_Opener(RtaConnection *conn)
+{
+ bool success = false;
+
+ uint16_t port = metisForwarder_GetPortFromConfig(rtaConnection_GetParameters(conn));
+
+ PARCEventScheduler *scheduler = rtaFramework_GetEventScheduler(rtaConnection_GetFramework(conn));
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+
+ if (_openSocket(fwd_state, port)) {
+ if (_setupSocket(fwd_state)) {
+ if (_setupSocketEvents(fwd_state, conn)) {
+ if (connector_Fwd_Metis_BeginConnect(fwd_state, conn)) {
+ // stash it away in the per-connection cubby hole
+ rtaConnection_SetPrivateData(conn, FWD_METIS, fwd_state);
+ success = true;
+ }
+ }
+ }
+ }
+
+ if (!success) {
+ if (fwd_state->fd) {
+ close(fwd_state->fd);
+ }
+ if (fwd_state->readEvent) {
+ parcEvent_Destroy(&(fwd_state->readEvent));
+ }
+ if (fwd_state->writeEvent) {
+ parcEvent_Destroy(&(fwd_state->writeEvent));
+ }
+ parcMemory_Deallocate((void **) &fwd_state);
+ return -1;
+ }
+
+ // Socket will be ready for use once we get PARCEventQueue_Connected
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s open conn %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn);
+ }
+
+ return 0;
+}
+
+/**
+ * We received a Metis control packet. Translate it to a control packet and send it up the stack.
+ */
+static void
+receiveControlMessage(PacketData *data)
+{
+ CCNxTlvDictionary *packetDictionary =
+ ccnxWireFormatMessage_FromControlPacketType(data->fwd_state->nextMessage.version, data->fwd_state->nextMessage.packet);
+
+ bool success = ccnxCodecTlvPacket_BufferDecode(data->fwd_state->nextMessage.packet, packetDictionary);
+
+ if (success) {
+ TransportMessage *tm = transportMessage_CreateFromDictionary(packetDictionary);
+ transportMessage_SetInfo(tm, rtaConnection_Copy(data->conn), rtaConnection_FreeFunc);
+
+ // send it up the stack
+ if (rtaComponent_PutMessage(data->out, tm)) {
+ rtaComponentStats_Increment(data->stats, STATS_UPCALL_OUT);
+ data->fwd_state->stats.countUpcallWriteControlOk++;
+ } else {
+ data->fwd_state->stats.countUpcallWriteControlError++;
+ }
+ } else {
+ assertTrue(success, "Error decoding a Metis control packet\n")
+ {
+ parcBuffer_Display(data->fwd_state->nextMessage.packet, 3);
+ }
+ }
+
+ // we are now done with our references
+ ccnxTlvDictionary_Release(&packetDictionary);
+}
+
+
+static void
+_queueNonControl(PacketData *data)
+{
+ CCNxTlvDictionary *packetDictionary = ccnxWireFormatMessage_Create(data->fwd_state->nextMessage.packet);
+
+ assertNotNull(packetDictionary, "Got a null packet decode")
+ {
+ parcBuffer_Display(data->fwd_state->nextMessage.packet, 3);
+ }
+
+ TransportMessage *tm = transportMessage_CreateFromDictionary(packetDictionary);
+
+ // add the connection info to the transport message before sending up stack
+ transportMessage_SetInfo(tm, rtaConnection_Copy(data->conn), rtaConnection_FreeFunc);
+
+ parcDeque_Append(data->fwd_state->transportMessageQueue, tm);
+
+ // start if went from emtpy to 1
+ if (parcDeque_Size(data->fwd_state->transportMessageQueue) == 1) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %u schedule dequeue event %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(data->conn))),
+ __func__,
+ rtaConnection_GetConnectionId(data->conn),
+ (void *) data->fwd_state->transportMessageQueueEvent);
+ }
+
+ struct timeval immediateTimeout = { 0, 0 };
+ parcEventTimer_Start(data->fwd_state->transportMessageQueueEvent, &immediateTimeout);
+ }
+
+ // we are now done with our references
+ ccnxTlvDictionary_Release(&packetDictionary);
+}
+
+/**
+ * Receive a non-control packet
+ *
+ * Non-control messages may be dropped due to lack of input buffer space.
+ * If the connection has state Block Up or the up queue's length is
+ * too many messages deep, the non-control message will be dropped.
+ *
+ * precondition: the caller knows the message is not a control message
+ *
+ * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+_receiveNonControl(PacketData *data)
+{
+ if (rtaConnection_BlockedUp(data->conn)) {
+ data->fwd_state->stats.countUpcallWriteDataBlocked++;
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %u blocked up, drop wireFormat %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(data->conn))),
+ __func__,
+ rtaConnection_GetConnectionId(data->conn),
+ (void *) data->fwd_state->nextMessage.packet);
+ }
+ } else {
+ if (parcDeque_Size(data->fwd_state->transportMessageQueue) < METIS_INPUT_QUEUE_MESSAGES) {
+ _queueNonControl(data);
+ data->fwd_state->stats.countUpcallWriteDataOk++;
+ } else {
+ data->fwd_state->stats.countUpcallWriteDataQueueFull++;
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %u input buffer full, drop wireFormat %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(data->conn))),
+ __func__,
+ rtaConnection_GetConnectionId(data->conn),
+ (void *) data->fwd_state->nextMessage.packet);
+ }
+ }
+ }
+}
+
+/**
+ * We received an entire packet, send it up the stack in a Transport message.
+ *
+ * If its a control message, we make it a CCNxControlMessage here for symmetry with us
+ * encoding the control messages at this level
+ */
+static void
+connector_Fwd_Metis_SendUpStack(PacketData *data)
+{
+ // Always send control messages up the stack
+ if (data->fwd_state->nextMessage.packetType == PacketType_Control) {
+ receiveControlMessage(data);
+ } else {
+ _receiveNonControl(data);
+ }
+}
+
+/**
+ * Return the SO_ERROR value for the given socket
+ *
+ * If getsockopt returns an error, the return code could be the error from getsockopt.
+ *
+ * Typically you will get ECONNREFUSED when you cannot connect and one of the many getsockopt
+ * errors if there's a problem with the actual socket.
+ *
+ * @param [in] fd The socket
+ *
+ * @return errno An errno value
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static int
+_getSocketError(int fd)
+{
+ int value;
+ socklen_t valueLength = sizeof(value);
+ int res = getsockopt(fd, SOL_SOCKET, SO_ERROR, &value, &valueLength);
+ if (res < 0) {
+ value = res;
+ }
+ return value;
+}
+
+/**
+ * Received an event on a socket we have marked as not yet connected
+ *
+ * Ether it's ready to go or there's an error. We will receive a PARCEventType_Read and the socket
+ * will have an SO_ERROR of 0 if it's now connected. If the SO_ERROR is non-zero, there
+ * was an error on connect.
+ *
+ * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+_disconnectedEventHandler(FwdMetisState *fwd_state, RtaConnection *conn, PARCEventType what)
+{
+ if (what & PARCEventType_Read) {
+ int socketError = _getSocketError(fwd_state->fd);
+ if (socketError == 0) {
+ // I don't think these happen, they will be write events
+ _connectionSucceeded(fwd_state, conn);
+ } else {
+ // error on connect
+ printf("%9" PRIu64 " %s Connection %p got error on SOCK_STREAM, fd %d: %s\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn,
+ fwd_state->fd,
+ strerror(errno));
+
+ // make the event non-pending
+ parcEvent_Stop(fwd_state->readEvent);
+ parcEvent_Stop(fwd_state->writeEvent);
+
+ rtaConnection_SetBlockedDown(conn);
+
+ // at least tell the API whats going on
+ rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_FORWARDER_NOT_AVAILABLE, NULL, NULL);
+ }
+ }
+
+ if (what & PARCEventType_Write) {
+ int socketError = _getSocketError(fwd_state->fd);
+ if (socketError == 0) {
+ _connectionSucceeded(fwd_state, conn);
+ }
+ }
+}
+
+static void
+_setupNextPacketV1(FwdMetisState *fwd_state)
+{
+ switch (fwd_state->nextMessage.fixedHeader.v1.packetType) {
+ case CCNxCodecSchemaV1Types_PacketType_Interest:
+ fwd_state->nextMessage.packetType = PacketType_Interest;
+ break;
+ case CCNxCodecSchemaV1Types_PacketType_ContentObject:
+ fwd_state->nextMessage.packetType = PacketType_ContentObject;
+ break;
+ case CCNxCodecSchemaV1Types_PacketType_Control:
+ fwd_state->nextMessage.packetType = PacketType_Control;
+ break;
+ case CCNxCodecSchemaV1Types_PacketType_InterestReturn:
+ fwd_state->nextMessage.packetType = PacketType_InterestReturn;
+ break;
+ default:
+ fwd_state->nextMessage.packetType = PacketType_Unknown;
+ break;
+ }
+
+ size_t fixedHeaderLength = sizeof(CCNxCodecSchemaV1FixedHeader);
+ fwd_state->nextMessage.length = htons(fwd_state->nextMessage.fixedHeader.v1.packetLength);
+
+ fwd_state->nextMessage.packet = parcBuffer_Allocate(fwd_state->nextMessage.length);
+ assertNotNull(fwd_state->nextMessage.packet, "Could not allocate packet of size %zu", fwd_state->nextMessage.length);
+
+ // finally copy in the fixed header as we have already read that in
+ parcBuffer_PutArray(fwd_state->nextMessage.packet, fixedHeaderLength, fwd_state->nextMessage.fixedHeader.buffer);
+}
+
+/**
+ * Called after reading whole FixedHeader, will setup the packet buffer
+ *
+ * After reading the fixed header, we need to allocate a PARCBuffer for the packet. Setup that
+ * buffer and copy the FixedHeader in to it. Remaining reads will go in to this buffer.
+ *
+ * After this function completes, the parsed version, packetType, and length of the nextMessage will
+ * be filled in, the packet buffer allocated and the fixedHeader copied to that packet buffer.
+ *
+ * precondition: forwarder->nextMessage.remainingReadLength == 0 && fwd_state->nextMessage.packet == NULL
+ *
+ * @param [in] fwd_state An allocated forwarder connection state that has read in the fixed header
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+_setupNextPacket(FwdMetisState *fwd_state)
+{
+ trapUnexpectedStateIf(fwd_state->nextMessage.packet != NULL, "Calling _setupNextPacket but the packet field is not NULL");
+
+ fwd_state->nextMessage.version = fwd_state->nextMessage.fixedHeader.buffer[0];
+
+ switch (fwd_state->nextMessage.version) {
+ case 1:
+ _setupNextPacketV1(fwd_state);
+ break;
+
+ default:
+ trapUnexpectedState("Illegal packet version %d", fwd_state->nextMessage.version)
+ {
+ _nextMessage_Display(&fwd_state->nextMessage, 0);
+ }
+ break;
+ }
+}
+
+/**
+ * Reads the FixedHeader. If full read will setup the next packet buffer.
+ *
+ * Reads up to FixedHeader length bytes. If read whole header will allocate the next packet
+ * buffer to right size and copy the Fixed Header in to the buffer.
+ *
+ * preconditions:
+ * - fwd_state->nextMessage.packet should be NULL
+ * - fwd_state->nextMessage.remainingReadLength should be the remaining bytes to read of the Fixed Header
+ * - fwd_state->nextMessage.readLocation should point to the location in the FixedHeader to start reading
+ *
+ * postconditions:
+ * - fwd_state->nextMessage.remainingReadLength will be decremented by the amount read
+ * - If remainingReadLength is decremented to 0, will allocate fwd_state->nextMessage.packet and copy in the FixedHeader
+ * - The fields in fwd_state->nextMessage (length, packetType, version) will be set based on the fixed header
+ *
+ * @param [in] fwd_state An allocated forwarder connection state
+ *
+ * @retval ReadReturnCode_Finished one entire packet is ready in the buffer
+ * @retval ReadReturnCode_PartialRead need more bytes
+ * @retval ReadRetrunCode_Closed The socket to metis is closed (a special case of Error)
+ * @retval ReadReturnCode_Error An error occured on the socket to metis
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static ReadReturnCode
+_readPacketHeader(FwdMetisState *fwd_state)
+{
+ ReadReturnCode returnCode = ReadReturnCode_Error;
+
+ // This could be switched to MSG_PEEK instead of copying later, but I don't think it makes any significant change.
+ ssize_t nread = recv(fwd_state->fd, fwd_state->nextMessage.readLocation, fwd_state->nextMessage.remainingReadLength, 0);
+ if (nread > 0) {
+ // recv will always runturn at most fwd_state->nextMessage.remainingReadLength, so this won't wrap around to negative.
+ fwd_state->nextMessage.remainingReadLength -= nread;
+
+ if (fwd_state->nextMessage.remainingReadLength == 0) {
+ returnCode = ReadReturnCode_Finished;
+ _setupNextPacket(fwd_state);
+ } else {
+ fwd_state->nextMessage.readLocation += nread;
+ returnCode = ReadReturnCode_PartialRead;
+ }
+ } else if (nread == 0) {
+ // the connection is closed
+ returnCode = ReadReturnCode_Closed;
+ } else {
+ switch (errno) {
+ case EAGAIN:
+ // call would block. These can happen becasue _readMessage is in a while loop and we detect
+ // the end of the loop because we cannot read another fixed header.
+ returnCode = ReadReturnCode_PartialRead;
+ break;
+
+ default:
+ // an error. I think all errors will be hard errors and we close the connection
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s socket %d recv error: (%d) %s\n",
+ ' ', __func__, fwd_state->fd, errno, strerror(errno));
+ }
+ returnCode = ReadReturnCode_Error;
+ break;
+ }
+ }
+
+ return returnCode;
+}
+
+
+/**
+ * We have finished reading the fixed header, reading the message body
+ *
+ * Will modify the nextMessage.packet buffer. When the buffer has 0 remaining, the whole packet has been read
+ *
+ * precondition: _readHeaderFromMetis read the header and allocated the packet buffer
+ *
+ * @param [in] fwd_state An allocated forwarder connection state
+ *
+ * @retval ReadReturnCode_Finished one entire packet is ready in the buffer
+ * @retval ReadReturnCode_PartialRead need more bytes
+ * @retval ReadRetrunCode_Closed The socket to metis is closed (a special case of Error)
+ * @retval ReadReturnCode_Error An error occured on the socket to metis
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static ReadReturnCode
+_readPacketBody(FwdMetisState *fwd_state)
+{
+ ReadReturnCode returnCode = ReadReturnCode_Error;
+
+ trapUnexpectedStateIf(fwd_state->nextMessage.packet == NULL, "Trying to read a message with a null packet buffer");
+
+ size_t remaining = parcBuffer_Remaining(fwd_state->nextMessage.packet);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s socket %d read up to %zu bytes\n",
+ ' ', __func__, fwd_state->fd, remaining);
+ }
+
+ void *overlay = parcBuffer_Overlay(fwd_state->nextMessage.packet, 0);
+ ssize_t nread = recv(fwd_state->fd, overlay, remaining, 0);
+
+ if (nread > 0) {
+ // good read
+ parcBuffer_SetPosition(fwd_state->nextMessage.packet, parcBuffer_Position(fwd_state->nextMessage.packet) + nread);
+
+ if (nread == remaining) {
+ returnCode = ReadReturnCode_Finished;
+ } else {
+ returnCode = ReadReturnCode_PartialRead;
+ }
+ } else if (nread == 0) {
+ // connection closed
+ returnCode = ReadReturnCode_Closed;
+ } else {
+ switch (errno) {
+ case EAGAIN:
+ // call would block. These can happen becasue _readMessage is in a while loop and we detect
+ // the end of the loop because we cannot read the entire message body.
+ returnCode = ReadReturnCode_PartialRead;
+ break;
+
+ default:
+ // an error. I think all errors will be hard errors and we close the connection
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s socket %d recv error: (%d) %s\n",
+ ' ', __func__, fwd_state->fd, errno, strerror(errno));
+ }
+ returnCode = ReadReturnCode_Error;
+ }
+ }
+
+
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s socket %u msg_length %zu read_length %zd remaining %zu\n",
+ ' ',
+ __func__,
+ fwd_state->fd,
+ fwd_state->nextMessage.length,
+ nread,
+ parcBuffer_Remaining(fwd_state->nextMessage.packet));
+ }
+
+ return returnCode;
+}
+
+/**
+ * Read packet from metis
+ *
+ * Reads the fixed heder. Once fixed header is done, begins reading the packet body. Keeps
+ * all the incremental state to do partial reads.
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @retval ReadReturnCode_Finished one entire packet is ready in the buffer
+ * @retval ReadReturnCode_PartialRead need more bytes
+ * @retval ReadRetrunCode_Closed The socket to metis is closed (a special case of Error)
+ * @retval ReadReturnCode_Error An error occured on the socket to metis
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static ReadReturnCode
+_readPacket(FwdMetisState *fwd_state)
+{
+ ReadReturnCode returnCode = ReadReturnCode_PartialRead;
+
+ // are we still reading the header?
+ if (fwd_state->nextMessage.remainingReadLength > 0) {
+ returnCode = _readPacketHeader(fwd_state);
+ } else {
+ returnCode = ReadReturnCode_Finished;
+ }
+
+ // After reading the header, it may be possible to read the body too
+ if (returnCode == ReadReturnCode_Finished && fwd_state->nextMessage.remainingReadLength == 0) {
+ returnCode = _readPacketBody(fwd_state);
+ }
+
+ return returnCode;
+}
+
+/**
+ * Read as many packets as we can from Metis
+ *
+ * Will read the stream socket from metis until we get a PartialRead return code from
+ * either the attempt to read the header or the body.
+ *
+ * On read error, will send a notification message the connection is closed up to
+ * the API and will disable read and write events.
+ *
+ * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#>
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+_readFromMetis(FwdMetisState *fwd_state, RtaConnection *conn)
+{
+ RtaProtocolStack *stack = rtaConnection_GetStack(conn);
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS);
+
+ ReadReturnCode readCode;
+ while ((readCode = _readPacket(fwd_state)) == ReadReturnCode_Finished) {
+ rtaComponentStats_Increment(stats, STATS_UPCALL_IN);
+ fwd_state->stats.countUpcallReads++;
+
+ // setup the buffer for reading
+ parcBuffer_Flip(fwd_state->nextMessage.packet);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s sending packet buffer %p up stack length %zu\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) fwd_state->nextMessage.packet,
+ parcBuffer_Remaining(fwd_state->nextMessage.packet));
+ }
+
+ // this is just to make the signature of connector_Fwd_Metis_SendUpStack tractable, PacketData
+ // is not exposed outside this scope.
+
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_METIS, RTA_UP);
+ PacketData data = {
+ .fwd_state = fwd_state,
+ .conn = conn,
+ .out = out,
+ .stats = stats,
+ };
+
+ connector_Fwd_Metis_SendUpStack(&data);
+
+ // done with the packet buffer. Release our hold on it. If it was sent up the stack
+ // another reference count was made.
+ parcBuffer_Release(&fwd_state->nextMessage.packet);
+
+ // now setup for next packet
+ _initializeNextMessage(&fwd_state->nextMessage);
+ }
+
+ if (readCode == ReadReturnCode_Closed) {
+ fwd_state->isConnected = false;
+ parcEvent_Stop(fwd_state->readEvent);
+ parcEvent_Stop(fwd_state->writeEvent);
+ rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_CONNECTION_CLOSED, NULL, "Socket operation returned closed by remote");
+ } else if (readCode == ReadReturnCode_Error) {
+ fwd_state->isConnected = false;
+ parcEvent_Stop(fwd_state->readEvent);
+ parcEvent_Stop(fwd_state->writeEvent);
+ rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_CONNECTION_CLOSED, NULL, "Socket operation returned error");
+ }
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s total upcall reads in %" PRIu64 " out %" PRIu64 "\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaComponentStats_Get(stats, STATS_UPCALL_IN),
+ rtaComponentStats_Get(stats, STATS_UPCALL_OUT));
+ }
+}
+
+/**
+ * Append a vector to the buffer
+ *
+ * @param [in] wireFormat The wire format packet, assumes current position is start of packet
+ * @param [in] fwd_output The libevent buffer to add the memory reference to
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+_queueIoVecMessageToMetis(CCNxCodecNetworkBufferIoVec *vec, PARCEventBuffer *fwd_output)
+{
+ fwd_metis_references_queued++;
+
+ int iovcnt = ccnxCodecNetworkBufferIoVec_GetCount(vec);
+ const struct iovec *array = ccnxCodecNetworkBufferIoVec_GetArray(vec);
+
+ for (int i = 0; i < iovcnt; i++) {
+ if (parcEventBuffer_Append(fwd_output, array[i].iov_base, array[i].iov_len) < 0) {
+ trapUnrecoverableState("%s error writing to bev_local", __func__);
+ }
+ }
+}
+
+/**
+ * Append to the buffer
+ *
+ * @param [in] wireFormat The wire format packet, assumes current position is start of packet
+ * @param [in] fwd_output The libevent buffer to add the memory reference to
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+_queueBufferMessageToMetis(PARCBuffer *wireFormat, PARCEventBuffer *fwd_output)
+{
+ fwd_metis_references_queued++;
+
+ void *overlay = parcBuffer_Overlay(wireFormat, 0);
+ size_t length = parcBuffer_Remaining(wireFormat);
+
+ if (parcEventBuffer_Append(fwd_output, overlay, length) < 0) {
+ trapUnrecoverableState("%s error writing to bev_local", __func__);
+ }
+}
+
+/**
+ * Write as much as possible from the output buffer to metis
+ *
+ * Write as much as we can to metis. If there is nothing left, deactivate the write event.
+ * If there is still bytes left in the output buffer, activate the write event.
+ *
+ * postconditions:
+ * - Write as many bytes as possible from the output buffer to metis
+ * - If there are still bytes remaining, enable the write event
+ * - If there are no bytes remaining, disable the write event.
+ *
+ * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#>
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+_dequeueMessagesToMetis(FwdMetisState *fwdConnState)
+{
+ // if we try to write a 0 length buffer, write will return -1 like an error
+ if (parcEventBuffer_GetLength(fwdConnState->metisOutputQueue) > 0) {
+ fwdConnState->stats.countDowncallWrites++;
+ int nwritten = parcEventBuffer_WriteToFileDescriptor(fwdConnState->metisOutputQueue, fwdConnState->fd, -1);
+ if (nwritten < 0) {
+ // an error
+ trapNotImplemented("Bugzid: 2194");
+ }
+
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s wrote %d bytes to socket %d, %zu bytes remaining\n",
+ ' ',
+ __func__,
+ nwritten,
+ fwdConnState->fd,
+ parcEventBuffer_GetLength(fwdConnState->metisOutputQueue));
+ }
+
+ // if we could not write the whole buffer, make sure we have a write event pending
+ if (parcEventBuffer_GetLength(fwdConnState->metisOutputQueue) > 0) {
+ parcEvent_Start(fwdConnState->writeEvent);
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s enabled write event\n", ' ', __func__);
+ }
+ } else {
+ parcEvent_Stop(fwdConnState->writeEvent);
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s disabled write event\n", ' ', __func__);
+ }
+ }
+ }
+}
+
+
+/**
+ * Called when we get an event on a socket we believe is connected
+ *
+ * libevent will call this with an PARCEventType_Read on connection close too (the read length will be 0).
+ *
+ * @param [in] fwd_state An allocated forwarder connection state
+ * @param [in] conn The corresponding RTA connection
+ * @param [in] what The Libevent set of events
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+_connectedEventHandler(FwdMetisState *fwd_state, RtaConnection *conn, short what)
+{
+ if (what & PARCEventType_Read) {
+ _readFromMetis(fwd_state, conn);
+ }
+
+ if (what & PARCEventType_Write) {
+ _dequeueMessagesToMetis(fwd_state);
+ }
+}
+
+/**
+ * Called for any activity on the socket. Maybe in either connected or disconnected state.
+ */
+static void
+_eventCallback(int fd, PARCEventType what, void *connectionVoid)
+{
+ RtaConnection *conn = (RtaConnection *) connectionVoid;
+ FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);;
+
+ if (!fwd_state->isConnected) {
+ _disconnectedEventHandler(fwd_state, conn, what);
+
+ // once we connect, we should try a read immediately too
+ }
+
+ if (fwd_state->isConnected) {
+ _connectedEventHandler(fwd_state, conn, what);
+ }
+}
+
+/**
+ * Updates the connections's Blocked Down state
+ *
+ * If the bytes in our output buffer are greater than METIS_OUTPUT_QUEUE_BYTES, then
+ * we will set the Blocked Down condition on the connection. This will prevent the
+ * API connector from accepting more messages.
+ *
+ * Messages already in the connection queue will still be processed.
+ *
+ * @param [in] fwd_output The libevent buffer to check the backlog
+ * @param [in] conn The RtaConnection the set or clear the blocked down condition
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+_updateBlockedDownState(PARCEventBuffer *fwd_output, RtaConnection *conn)
+{
+ size_t queue_bytes = parcEventBuffer_GetLength(fwd_output);
+ if (queue_bytes > METIS_OUTPUT_QUEUE_BYTES) {
+ // block down
+
+ if (!rtaConnection_BlockedDown(conn)) {
+ rtaConnection_SetBlockedDown(conn);
+ }
+
+ // note that we continue execution and put the packet we have in hand on the queue
+ // setting the blocked down state only affects the API connector. Packets already in the system
+ // will keep flowing down to us
+ } else {
+ // if it is blocked, unblock it
+ if (rtaConnection_BlockedDown(conn)) {
+ rtaConnection_ClearBlockedDown(conn);
+ }
+ }
+}
+
+static void
+connector_Fwd_Metis_Downcall_HandleConnected(FwdMetisState *fwdConnState, TransportMessage *tm, RtaConnection *conn, RtaComponentStats *stats)
+{
+ _updateBlockedDownState(fwdConnState->metisOutputQueue, conn);
+
+ CCNxTlvDictionary *dictionary = transportMessage_GetDictionary(tm);
+
+ bool queued = false;
+
+ CCNxCodecNetworkBufferIoVec *vec = ccnxWireFormatMessage_GetIoVec(dictionary);
+ if (vec != NULL) {
+ _queueIoVecMessageToMetis(vec, fwdConnState->metisOutputQueue);
+ queued = true;
+ } else {
+ PARCBuffer *wireFormat = ccnxWireFormatMessage_GetWireFormatBuffer(dictionary);
+ if (wireFormat != NULL) {
+ _queueBufferMessageToMetis(wireFormat, fwdConnState->metisOutputQueue);
+ queued = true;
+ }
+ }
+
+ if (queued) {
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT);
+
+ if (DEBUG_OUTPUT) {
+ struct timeval delay = transportMessage_GetDelay(tm);
+ printf("%9" PRIu64 " %s total downcall reads %" PRIu64 " references queued %u dequeued %u not queued %u last delay %.6f\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_IN),
+ fwd_metis_references_queued,
+ fwd_metis_references_dequeued,
+ fwd_metis_references_notqueued,
+ delay.tv_sec + delay.tv_usec * 1E-6);
+ }
+ } else {
+ fwd_metis_references_notqueued++;
+ }
+
+ // The transport message is destroyed in connector_Fwd_Metis_Downcall_Read()
+}
+
+static void
+_ackRequest(RtaConnection *conn, PARCJSON *request)
+{
+ PARCJSON *response = cpiAcks_CreateAck(request);
+ CCNxTlvDictionary *ackDict = ccnxControlFacade_CreateCPI(response);
+
+ TransportMessage *tm_ack = transportMessage_CreateFromDictionary(ackDict);
+ ccnxTlvDictionary_Release(&ackDict);
+ parcJSON_Release(&response);
+
+ transportMessage_SetInfo(tm_ack, rtaConnection_Copy(conn), rtaConnection_FreeFunc);
+
+ RtaProtocolStack *stack = rtaConnection_GetStack(conn);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_METIS, RTA_UP);
+ if (rtaComponent_PutMessage(out, tm_ack)) {
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS);
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ }
+}
+
+static bool
+_handleDownControl(FwdMetisState *fwdConnState, RtaConnection *conn, TransportMessage *tm)
+{
+ bool consumedMessage = false;
+
+ CCNxTlvDictionary *dict = transportMessage_GetDictionary(tm);
+ if (ccnxTlvDictionary_IsControl(dict)) {
+ if (ccnxControlFacade_IsCPI(dict)) {
+ PARCJSON *json = ccnxControlFacade_GetJson(dict);
+ if (controlPlaneInterface_GetCPIMessageType(json) == CPI_REQUEST) {
+ if (cpi_getCPIOperation2(json) == CPI_PAUSE) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p recieved PAUSE\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn);
+ }
+ _ackRequest(conn, json);
+ consumedMessage = true;
+ }
+
+ if (cpi_getCPIOperation2(json) == CPI_FLUSH) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p recieved FLUSH\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn);
+ }
+ _ackRequest(conn, json);
+ consumedMessage = true;
+ }
+ }
+ }
+ }
+
+ if (consumedMessage) {
+ fwdConnState->stats.countDowncallControl++;
+ }
+
+ return consumedMessage;
+}
+
+/**
+ * send raw packet from codec to forwarder. We are passed the ProtocolStack on the ptr.
+ */
+static void
+connector_Fwd_Metis_Downcall_Read(PARCEventQueue *in, PARCEventType event, void *ptr)
+{
+ TransportMessage *tm;
+
+ while ((tm = rtaComponent_GetMessage(in)) != NULL) {
+ RtaConnection *conn = rtaConnection_GetFromTransport(tm);
+ FwdMetisState *fwdConnState = rtaConnection_GetPrivateData(conn, FWD_METIS);
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS);
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN);
+ fwdConnState->stats.countDowncallReads++;
+
+ bool consumedControl = _handleDownControl(fwdConnState, conn, tm);
+ if (!consumedControl) {
+ // we did not consume the message as a control packet for the metis connector
+
+ if (fwdConnState->isConnected) {
+ // If the socket is connected, this will "do the right thing" and consume the transport message.
+ connector_Fwd_Metis_Downcall_HandleConnected(fwdConnState, tm, conn, stats);
+ } else {
+ // Oops, got a packet before we're connected.
+ printf("\nConnection %p transport message %p on fd %d that's not open\n", (void *) conn, (void *) tm, fwdConnState->fd);
+ }
+
+ // now attempt to write to the network
+ _dequeueMessagesToMetis(fwdConnState);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s total downcall reads in %" PRIu64 " out %" PRIu64 "\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_IN),
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT));
+ }
+ }
+
+ transportMessage_Destroy(&tm);
+ }
+}
+
+/**
+ * Destroy the FwdMetisState object.
+ *
+ * Destroys any packets waiting in queue, frees the libevent structures used by the connection to Metis.
+ * Frees the FwdMetisState object and will NULL *fwdStatePtr.
+ *
+ * @param [in,out] fwdStatePtr Double pointer to the allocated state. Will be NULL'd on output.
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+_fwdMetisState_Release(FwdMetisState **fwdStatePtr)
+{
+ FwdMetisState *fwd_state = *fwdStatePtr;
+
+ while (!parcDeque_IsEmpty(fwd_state->transportMessageQueue)) {
+ TransportMessage *tm = parcDeque_RemoveFirst(fwd_state->transportMessageQueue);
+ transportMessage_Destroy(&tm);
+ }
+
+ parcDeque_Release(&fwd_state->transportMessageQueue);
+
+ if (fwd_state->readEvent) {
+ parcEvent_Destroy(&(fwd_state->readEvent));
+ }
+
+ if (fwd_state->writeEvent) {
+ parcEvent_Destroy(&(fwd_state->writeEvent));
+ }
+
+ parcEventTimer_Destroy(&(fwd_state->transportMessageQueueEvent));
+
+ if (fwd_state->metisOutputQueue) {
+ parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue));
+ }
+
+ if (fwd_state->nextMessage.packet) {
+ parcBuffer_Release(&fwd_state->nextMessage.packet);
+ }
+
+ close(fwd_state->fd);
+
+ parcMemory_Deallocate((void **) &fwd_state);
+ *fwdStatePtr = NULL;
+}
+
+static int
+connector_Fwd_Metis_Closer(RtaConnection *conn)
+{
+ FwdMetisState *fwd_state = rtaConnection_GetPrivateData(conn, FWD_METIS);
+ rtaConnection_SetPrivateData(conn, FWD_METIS, NULL);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s called on fwd_state %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), __func__, (void *) fwd_state);
+ }
+
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS);
+ rtaComponentStats_Increment(stats, STATS_CLOSES);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s closed fwd_state %p deque length %zu\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) fwd_state,
+ parcDeque_Size(fwd_state->transportMessageQueue));
+
+ printf("%9" PRIu64 " %s closed fwd_state %p stats: up { reads %u wok %u werr %u wblk %u wfull %u wctrlok %u wctrlerr %u }\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) fwd_state,
+ fwd_state->stats.countUpcallReads, fwd_state->stats.countUpcallWriteDataOk, fwd_state->stats.countUpcallWriteDataError,
+ fwd_state->stats.countUpcallWriteDataBlocked, fwd_state->stats.countUpcallWriteDataQueueFull,
+ fwd_state->stats.countUpcallWriteControlOk, fwd_state->stats.countUpcallWriteControlError);
+
+ printf("%9" PRIu64 " %s closed fwd_state %p stats: dn { reads %u wok %u wctrlok %u }\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) fwd_state,
+ fwd_state->stats.countDowncallReads, fwd_state->stats.countDowncallWrites, fwd_state->stats.countDowncallControl);
+ }
+
+ _fwdMetisState_Release(&fwd_state);
+
+ return 0;
+}
+
+static int
+connector_Fwd_Metis_Release(RtaProtocolStack *stack)
+{
+ return 0;
+}
+
+/**
+ * Enable to disable the read event based on the Blocked Up state
+ *
+ * If we receive a Blocked Up state change and the read event is pending, make it
+ * not pending. If we receive a not blocked up state change and the read event is not
+ * pending, make it pending.
+ *
+ * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#>
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ */
+static void
+connector_Fwd_Metis_StateChange(RtaConnection *conn)
+{
+ struct fwd_metis_state *fwd_state = rtaConnection_GetPrivateData(conn, FWD_METIS);
+
+ int isReadPending = parcEvent_Poll(fwd_state->readEvent, PARCEventType_Read);
+
+
+ // If we are blocked in the UP direction, disable events on the read queue
+ if (rtaConnection_BlockedUp(conn)) {
+ // we only disable it and log it if it was active
+ if (isReadPending) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %u blocked up, disable PARCEventType_Read\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaConnection_GetConnectionId(conn));
+ }
+
+ parcEvent_Stop(fwd_state->readEvent);
+ }
+ } else {
+ if ((!isReadPending) && fwd_state->isConnected) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %u unblocked up, enable PARCEventType_Read\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaConnection_GetConnectionId(conn));
+ }
+ parcEvent_Start(fwd_state->readEvent);
+ }
+ }
+
+ // We do not need to do anything with DOWN direction, becasue we're the component sending
+ // those block down messages.
+}
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c
new file mode 100644
index 00000000..4e5ea48f
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c
@@ -0,0 +1,634 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implements the API connector. The API connector is a event based component to manage the socket
+ * to the API.
+ *
+ * The API Connector's job is to manage the socket to the API between the RTA Framework and the
+ * API. It does this by using an event directly to manage that socket. It uses the same
+ * event scheduler base as the RTA framework, so its all part of the same event dispatcher.
+ *
+ * The RTA Transport now only speaks CCNxTlvDictionary messages. If we receive old timey Interest,
+ * ContentObject, etc., we translate them to the Dictionary format. The TransportMessage and CCNxMessage
+ * will both go away.
+ *
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <fcntl.h>
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <errno.h>
+
+#include <parc/algol/parc_EventBuffer.h>
+
+#include <parc/algol/parc_Memory.h>
+#include <LongBow/runtime.h>
+
+#include <ccnx/transport/transport_rta/connectors/rta_ApiConnection.h>
+#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h>
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+#include <ccnx/api/control/controlPlaneInterface.h>
+#include <ccnx/api/control/cpi_ControlFacade.h>
+
+#include <ccnx/transport/transport_rta/config/config_Codec_Tlv.h>
+
+#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_TlvDictionary.h>
+
+
+#ifndef DEBUG_OUTPUT
+#define DEBUG_OUTPUT 0
+#endif
+
+#define PAIR_TRANSPORT 0
+#define PAIR_OTHER 1
+
+// we are only putting an 8-byte pointer on the queue, so
+// this should be 50 messages
+#define MAX_API_QUEUE_BYTES 400
+
+
+unsigned api_upcall_writes = 0;
+unsigned api_downcall_reads = 0;
+extern unsigned rta_transport_reads;
+
+// per connection state
+struct rta_api_connection {
+ // A reference to our connection
+ RtaConnection *connection;
+
+ // event queue for socketpair to API
+ PARCEventQueue *bev_api;
+
+ // these are assingned to us by the Transport
+ int api_fd;
+ int transport_fd;
+};
+
+// ==========================================================================================
+// STATIC PROTOTYPES and their headerdoc
+
+/**
+ * PARCEvent calls this when the API queue falls below the watermark
+ *
+ * We watermark the write queue at MAX_API_QUEUE_BYTES bytes. When a write takes
+ * the queue backlog below that amount, PARCEvent calls this.
+ *
+ * @param [in] connVoid Void pointer to the RtaConnection
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void rtaApiConnection_WriteCallback(PARCEventQueue *queue, PARCEventType type, void *conn);
+
+/**
+ * PARCEvent calls this when there's a message from the API
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void rtaApiConnection_Downcall_Read(PARCEventQueue *bev, PARCEventType type, void *conn);
+
+/**
+ * PARCEvent calls this when there's a non-read/write event on the API's socket
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void rtaApiConnection_Downcall_Event(PARCEventQueue *, PARCEventQueueEventType events, void *conn);
+
+
+/**
+ * Drains the input queue and output queue of a connection to the API
+ *
+ * The input queue and output queue contain pointers to CCNxMessages. On close,
+ * we need to drain these queues and release all the messages.
+ *
+ * The API Connector is responsible for only draining its input queue. The output
+ * queue up to the API is drained by the RTA Framework.
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void rtaApiConnection_DrainApiConnection(RtaApiConnection *apiConnection);
+
+/**
+ * Writes a message to the API
+ *
+ * Takes ownership of the message which is passed up to the API
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void rtaApiConnection_WriteMessageToApi(RtaApiConnection *apiConnection, CCNxMetaMessage *msg);
+
+// ==========================================================================================
+// Public API
+
+static void
+rtaApiConnection_SetupSocket(RtaApiConnection *apiConnection, RtaConnection *connection)
+{
+ RtaProtocolStack *stack = rtaConnection_GetStack(connection);
+ PARCEventScheduler *base = rtaFramework_GetEventScheduler(rtaProtocolStack_GetFramework(stack));
+ int error;
+
+ // Set non-blocking flag
+ int flags = fcntl(apiConnection->transport_fd, F_GETFL, NULL);
+ assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno);
+ int failure = fcntl(apiConnection->transport_fd, F_SETFL, flags | O_NONBLOCK);
+ assertFalse(failure, "fcntl failed to set socket non-blocking(%d) %s\n", errno, strerror(errno));
+
+ apiConnection->bev_api = parcEventQueue_Create(base, apiConnection->transport_fd, 0);
+ assertNotNull(apiConnection->bev_api, "Got null result from parcEventQueue_Create");
+
+ // Set buffer size
+ int sendbuff = 1000 * 8;
+
+ error = setsockopt(rtaConnection_GetTransportFd(connection), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
+ assertTrue(error == 0, "Got error setting SO_SNDBUF: %s", strerror(errno));
+
+ parcEventQueue_SetWatermark(apiConnection->bev_api, PARCEventType_Write, MAX_API_QUEUE_BYTES, 0);
+ parcEventQueue_SetCallbacks(apiConnection->bev_api,
+ rtaApiConnection_Downcall_Read,
+ rtaApiConnection_WriteCallback,
+ rtaApiConnection_Downcall_Event,
+ (void *) connection);
+
+ parcEventQueue_Enable(apiConnection->bev_api, PARCEventType_Read | PARCEventType_Write);
+}
+
+RtaApiConnection *
+rtaApiConnection_Create(RtaConnection *connection)
+{
+ RtaApiConnection *apiConnection = parcMemory_AllocateAndClear(sizeof(RtaApiConnection));
+ assertNotNull(apiConnection, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(RtaApiConnection));
+
+ apiConnection->connection = rtaConnection_Copy(connection);
+ apiConnection->api_fd = rtaConnection_GetApiFd(connection);
+ apiConnection->transport_fd = rtaConnection_GetTransportFd(connection);
+ rtaApiConnection_SetupSocket(apiConnection, connection);
+
+ return apiConnection;
+}
+
+void
+rtaApiConnection_Destroy(RtaApiConnection **apiConnectionPtr)
+{
+ assertNotNull(apiConnectionPtr, "Parameter apiConnecitonPtr must be non-null");
+ assertNotNull(*apiConnectionPtr, "Parameter apiConnecitonPtr must dereference to non-null");
+ RtaApiConnection *apiConnection = *apiConnectionPtr;
+
+
+ // Send all the outbound messages up to the API. This at least gets them out
+ // of our output queue on to the API's socket.
+ parcEventQueue_Finished(apiConnection->bev_api, PARCEventType_Write);
+ rtaApiConnection_DrainApiConnection(apiConnection);
+
+ parcEventQueue_Destroy(&(apiConnection->bev_api));
+
+ rtaConnection_Destroy(&apiConnection->connection);
+
+ parcMemory_Deallocate((void **) &apiConnection);
+
+ *apiConnectionPtr = NULL;
+}
+
+static void
+rtaApiConnection_SendToApiAsDictionary(RtaApiConnection *apiConnection, TransportMessage *tm)
+{
+ CCNxMetaMessage *msg = ccnxMetaMessage_Acquire(transportMessage_GetDictionary(tm));
+ rtaApiConnection_WriteMessageToApi(apiConnection, msg);
+}
+
+static CCNxName *
+rtaApiConnection_GetNameFromTransportMessage(TransportMessage *tm)
+{
+ CCNxName *name = NULL;
+ CCNxTlvDictionary *dictionary = transportMessage_GetDictionary(tm);
+ switch (ccnxTlvDictionary_GetSchemaVersion(dictionary)) {
+ case CCNxTlvDictionary_SchemaVersion_V1:
+ name = ccnxTlvDictionary_GetName(dictionary, CCNxCodecSchemaV1TlvDictionary_MessageFastArray_NAME);
+ break;
+
+ default:
+ break;
+ }
+ return name;
+}
+
+/**
+ * Writes the CCNxMessage inside the transport message up to the API.
+ * Its possible that if there's no space in the socket the write will block
+ * and return an error.
+ *
+ * @return true if written to API, false if not (most likely would block)
+ */
+bool
+rtaApiConnection_SendToApi(RtaApiConnection *apiConnection, TransportMessage *tm, RtaComponentStats *stats)
+{
+ assertNotNull(apiConnection, "Parameter apiConnection must be non-null");
+
+ if (DEBUG_OUTPUT) {
+ CCNxName *name = rtaApiConnection_GetNameFromTransportMessage(tm);
+ char *nameString = NULL;
+ if (name) {
+ nameString = ccnxName_ToString(name);
+ }
+
+ struct timeval delay = transportMessage_GetDelay(tm);
+ printf("%9" PRIu64 " %s putting transport msg %p to user fd %d delay %.6f name %s\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ (void *) tm,
+ apiConnection->api_fd,
+ delay.tv_sec + delay.tv_usec * 1E-6,
+ nameString);
+
+ if (nameString) {
+ parcMemory_Deallocate((void **) &nameString);
+ }
+ }
+
+ rtaApiConnection_SendToApiAsDictionary(apiConnection, tm);
+
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p fd_out %d state %p upcalls %u reads %u\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ (void *) apiConnection->connection,
+ apiConnection->transport_fd,
+ (void *) apiConnection,
+ api_upcall_writes,
+ rta_transport_reads);
+ }
+
+ return true;
+}
+
+void
+rtaApiConnection_BlockDown(RtaApiConnection *apiConnection)
+{
+ assertNotNull(apiConnection, "Parameter apiConnection must be non-null");
+ PARCEventType enabled_events = parcEventQueue_GetEnabled(apiConnection->bev_api);
+
+ // we only disable it and log it if it was active
+ if (enabled_events & PARCEventType_Read) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %u blocked down, disable PARCEventType_Read\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ rtaConnection_GetConnectionId(apiConnection->connection));
+ }
+
+ parcEventQueue_Disable(apiConnection->bev_api, PARCEventType_Read);
+ }
+}
+
+void
+rtaApiConnection_UnblockDown(RtaApiConnection *apiConnection)
+{
+ assertNotNull(apiConnection, "Parameter apiConnection must be non-null");
+ PARCEventType enabled_events = parcEventQueue_GetEnabled(apiConnection->bev_api);
+
+ if (!(enabled_events & PARCEventType_Read)) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %u unblocked down, enable PARCEventType_Read\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ rtaConnection_GetConnectionId(apiConnection->connection));
+ }
+ parcEventQueue_Enable(apiConnection->bev_api, PARCEventType_Read);
+ }
+}
+
+// ==========================================================================================
+// Internal implementation
+
+static void
+rtaApiConnection_WriteMessageToApi(RtaApiConnection *apiConnection, CCNxMetaMessage *msg)
+{
+ assertNotNull(msg, "Parameter msg must be non-null");
+
+ int error = parcEventQueue_Write(apiConnection->bev_api, &msg, sizeof(&msg));
+ assertTrue(error == 0,
+ "write to transport_fd %d write error: (%d) %s",
+ apiConnection->transport_fd, errno, strerror(errno));
+
+ // debugging tracking
+ api_upcall_writes++;
+}
+
+static void
+_rtaAPIConnection_ProcessCPIRequest(RtaConnection *conn, PARCJSON *json)
+{
+ // Is it a request type we know about?
+
+ switch (cpi_getCPIOperation2(json)) {
+ case CPI_PAUSE: {
+ RtaConnectionStateType oldstate = rtaConnection_GetState(conn);
+ if (oldstate == CONN_OPEN) {
+ rtaConnection_SetState(conn, CONN_PAUSED);
+ }
+ break;
+ }
+
+ default:
+ // do nothing, don't know about this message type
+ break;
+ }
+}
+
+static void
+connector_Api_ProcessCpiMessage(RtaConnection *conn, CCNxTlvDictionary *controlDictionary)
+{
+ if (ccnxControlFacade_IsCPI(controlDictionary)) {
+ PARCJSON *json = ccnxControlFacade_GetJson(controlDictionary);
+ switch (controlPlaneInterface_GetCPIMessageType(json)) {
+ case CPI_REQUEST: {
+ _rtaAPIConnection_ProcessCPIRequest(conn, json);
+ break;
+ }
+
+ case CPI_RESPONSE:
+ break;
+
+ case CPI_ACK:
+ break;
+
+ default:
+ assertTrue(0, "Got unknown CPI message type: %d", controlPlaneInterface_GetCPIMessageType(json));
+ }
+ }
+}
+
+static void
+rtaApiConnection_ProcessControlFromApi(RtaApiConnection *apiConnection, RtaProtocolStack *stack, CCNxTlvDictionary *controlDictionary)
+{
+ if (ccnxControlFacade_IsCPI(controlDictionary)) {
+ connector_Api_ProcessCpiMessage(apiConnection->connection, controlDictionary);
+ }
+}
+
+static void
+rtaApiConnection_Downcall_ProcessDictionary(RtaApiConnection *apiConnection, RtaProtocolStack *stack,
+ PARCEventQueue *queue_out, RtaComponentStats *stats, CCNxTlvDictionary *messageDictionary)
+{
+ // Look at the control message before checking for the connection closed
+ if (ccnxTlvDictionary_IsControl(messageDictionary)) {
+ rtaApiConnection_ProcessControlFromApi(apiConnection, stack, messageDictionary);
+ }
+
+ // In paused or closed state, we only pass control messages
+ if ((rtaConnection_GetState(apiConnection->connection) == CONN_OPEN) || (ccnxTlvDictionary_IsControl(messageDictionary))) {
+ TransportMessage *tm = transportMessage_CreateFromDictionary(messageDictionary);
+
+ // Set the auxiliary information to the message's connection
+ transportMessage_SetInfo(tm, rtaConnection_Copy(apiConnection->connection), rtaConnection_FreeFunc);
+
+ if (DEBUG_OUTPUT) {
+ CCNxName *name = NULL;
+ if (ccnxTlvDictionary_IsInterest(messageDictionary)) {
+ name = ccnxInterest_GetName(messageDictionary);
+ } else if (ccnxTlvDictionary_IsContentObject(messageDictionary)) {
+ name = ccnxContentObject_GetName(messageDictionary);
+ }
+
+ char *noname = "NONAME";
+ char *nameString = noname;
+ if (name) {
+ nameString = ccnxName_ToString(name);
+ }
+
+ printf("%9" PRIu64 " %s putting transport msg %p from user fd %d: %s\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ (void *) tm, apiConnection->api_fd,
+ nameString);
+
+ if (nameString != noname) {
+ parcMemory_Deallocate((void **) &nameString);
+ }
+
+ //ccnxTlvDictionary_Display(0, messageDictionary);
+ }
+
+ // send down the stack. If it fails, it destroys the message.
+ if (rtaComponent_PutMessage(queue_out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT);
+ }
+ }
+}
+
+static void
+rtaApiConnection_Downcall_ProcessMessage(RtaApiConnection *apiConnection, RtaProtocolStack *stack, PARCEventBuffer *eb_in,
+ PARCEventQueue *queue_out, RtaComponentStats *stats)
+{
+ api_downcall_reads++;
+ CCNxMetaMessage *msg;
+
+ int bytesRemoved = parcEventBuffer_Read(eb_in, &msg, sizeof(CCNxMetaMessage *));
+ assertTrue(bytesRemoved == sizeof(CCNxMetaMessage *),
+ "Error, did not remove an entire pointer, expected %zu got %d",
+ sizeof(CCNxMetaMessage *),
+ bytesRemoved);
+
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN);
+
+ // This will save its own reference to the messageDictionary
+ rtaApiConnection_Downcall_ProcessDictionary(apiConnection, stack, queue_out, stats, msg);
+
+ // At this point, the CCNxMetaMessage passed in by the application thread has been
+ // acquired rtaApiConnection_Downcall_ProcessDictionary(), so we can Release the reference we
+ // acquired in rtaTransport_Send().
+ ccnxMetaMessage_Release(&msg);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p total downcall reads in %" PRIu64 " out %" PRIu64 "\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ (void *) apiConnection->connection,
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_IN),
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT));
+ }
+}
+
+
+/*
+ * Called by PARCEvent when there's a message to read from the API
+ * Read a message from the API.
+ * rtaConnectionVoid is the RtaConnection associated with the api descriptor
+ */
+static void
+rtaApiConnection_Downcall_Read(PARCEventQueue *bev, PARCEventType type, void *rtaConnectionVoid)
+{
+ RtaConnection *conn = (RtaConnection *) rtaConnectionVoid;
+
+ assertNotNull(rtaConnectionVoid, "Parameter must be a non-null void *");
+
+ RtaProtocolStack *stack = rtaConnection_GetStack(conn);
+ assertNotNull(stack, "rtaConnection_GetStack returned null");
+
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, API_CONNECTOR);
+ assertNotNull(stats, "rtaConnection_GetStats returned null");
+
+ RtaApiConnection *apiConnection = rtaConnection_GetPrivateData(conn, API_CONNECTOR);
+ assertNotNull(apiConnection, "rtaConnection_GetPrivateData got null");
+
+ PARCEventBuffer *eb_in = parcEventBuffer_GetQueueBufferInput(bev);
+
+ PARCEventQueue *queue_out = rtaComponent_GetOutputQueue(conn, API_CONNECTOR, RTA_DOWN);
+ assertNotNull(queue_out, "component_GetOutputQueue returned null");
+
+ while (parcEventBuffer_GetLength(eb_in) >= sizeof(TransportMessage *)) {
+ rtaApiConnection_Downcall_ProcessMessage(apiConnection, stack, eb_in, queue_out, stats);
+ }
+ parcEventBuffer_Destroy(&eb_in);
+}
+
+/*
+ * This is used on the connection to the API out of the transport box
+ */
+static void
+rtaApiConnection_Downcall_Event(PARCEventQueue *bev, PARCEventQueueEventType events, void *ptr)
+{
+}
+
+/**
+ * Drains all the CCNxMessages off an event buffer and destroys them
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void
+drainBuffer(PARCEventBuffer *buffer, RtaConnection *conn)
+{
+ size_t length;
+
+ while ((length = parcEventBuffer_GetLength(buffer)) > 0) {
+ CCNxMetaMessage *msg;
+ ssize_t len;
+
+ len = parcEventBuffer_Read(buffer, &msg, sizeof(CCNxMetaMessage *));
+ assertTrue(len == sizeof(CCNxMetaMessage *),
+ "Removed incorrect length, expected %zu got %zd: (%d) %s",
+ sizeof(CCNxMetaMessage *),
+ len,
+ errno,
+ strerror(errno));
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p drained message %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn,
+ (void *) msg);
+ }
+ ccnxMetaMessage_Release(&msg);
+ }
+}
+
+/**
+ * Called on Destroy to clear our input buffer. This does not
+ * drain the output (to API) buffer, that is done by the RTA Framework
+ */
+static void
+rtaApiConnection_DrainApiConnection(RtaApiConnection *apiConnection)
+{
+ // drain and free the transport_fd
+ parcEventQueue_Disable(apiConnection->bev_api, PARCEventType_Read);
+
+ PARCEventBuffer *in = parcEventBuffer_GetQueueBufferInput(apiConnection->bev_api);
+ drainBuffer(in, apiConnection->connection);
+ parcEventBuffer_Destroy(&in);
+
+ // There may be some messages in the output buffer that
+ // have not actually been written to the kernel socket.
+ // Drain those too, as the API will never see them
+
+ if (DEBUG_OUTPUT) {
+ PARCEventBuffer *out = parcEventBuffer_GetQueueBufferOutput(apiConnection->bev_api);
+ printf("%9" PRIu64 " %s conn %p output buffer has %zu bytes\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ (void *) apiConnection->connection,
+ parcEventBuffer_GetLength(out));
+ parcEventBuffer_Destroy(&out);
+ }
+}
+
+/**
+ * Called by PARCEvent when we cross below the write watermark
+ */
+static void
+rtaApiConnection_WriteCallback(PARCEventQueue *queue, PARCEventType type, void *connVoid)
+{
+ // we dropped below the write watermark, unblock the connection in the UP direction
+ RtaConnection *conn = (RtaConnection *) connVoid;
+ if (rtaConnection_BlockedUp(conn)) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %u output fell below watermark, unblocking UP\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaConnection_GetConnectionId(conn));
+ }
+
+ rtaConnection_ClearBlockedUp(conn);
+ }
+}
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.h b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.h
new file mode 100644
index 00000000..799c45e6
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.h
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file rta_ApiConnection.h
+ * @brief Implementation of the API connection
+ *
+ * <#Detailed Description#>
+ *
+ */
+
+#ifndef TransportRTA_rta_ApiConnection_h
+#define TransportRTA_rta_ApiConnection_h
+
+struct rta_api_connection;
+typedef struct rta_api_connection RtaApiConnection;
+
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+
+RtaApiConnection *rtaApiConnection_Create(RtaConnection *connection);
+void rtaApiConnection_Destroy(RtaApiConnection **rtaApiConnectionPtr);
+
+/**
+ * Sends a TransportMessage up to the API
+ *
+ * Decapsulates the ccnx message and sends it up to the API. It will destroy the TransportMessage wrapper.
+ *
+ * @param [in] apiConnection The API connection to write to
+ * @param [in] tm The transport message to send
+ * @param [in] stats The statistics counter to increment on success
+ *
+ * @return true Transport message written
+ * @return false Transport message not written (but still destroyed)
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+bool rtaApiConnection_SendToApi(RtaApiConnection *apiConnection, TransportMessage *tm, RtaComponentStats *stats);
+
+/**
+ * Block data flow in the DOWN direction
+ *
+ * To block in the DOWN direction, we disable READ events on the API's buffer
+ *
+ * @param [in] apiConnection The API Connector's connection state
+ * @param [in] conn The RTA Connection
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+void rtaApiConnection_BlockDown(RtaApiConnection *apiConnection);
+
+/**
+ * Unblock data flow in the DOWN direction
+ *
+ * To unblock in the DOWN direction, we enable READ events on the API's buffer
+ *
+ * @param [in] apiConnection The API Connector's connection state
+ * @param [in] conn The RTA Connection
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+void rtaApiConnection_UnblockDown(RtaApiConnection *apiConnection);
+#endif
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/CMakeLists.txt b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/CMakeLists.txt
new file mode 100644
index 00000000..85e4812f
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/CMakeLists.txt
@@ -0,0 +1,16 @@
+# Enable gcov output for the tests
+add_definitions(--coverage)
+set(CMAKE_EXE_LINKER_FLAGS ${CMAKE_EXE_LINKER_FLAGS} " --coverage")
+
+set(TestsExpectedToPass
+ test_connector_Api
+ test_rta_ApiConnection
+ test_connector_Forwarder_Local
+ test_connector_Forwarder_Metis
+)
+
+
+foreach(test ${TestsExpectedToPass})
+ AddTest(${test})
+endforeach()
+
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Api.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Api.c
new file mode 100644
index 00000000..409e075d
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Api.c
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// Include the file(s) containing the functions to be tested.
+// This permits internal static functions to be visible to this Test Framework.
+#include "../connector_Api.c"
+
+#include <LongBow/unit-test.h>
+
+LONGBOW_TEST_RUNNER(connector_Api)
+{
+ LONGBOW_RUN_TEST_FIXTURE(Global);
+}
+
+// The Test Runner calls this function once before any Test Fixtures are run.
+LONGBOW_TEST_RUNNER_SETUP(connector_Api)
+{
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+// The Test Runner calls this function once after all the Test Fixtures are run.
+LONGBOW_TEST_RUNNER_TEARDOWN(connector_Api)
+{
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE(Global)
+{
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(Global)
+{
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(Global)
+{
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+int
+main(int argc, char *argv[])
+{
+ LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(connector_Api);
+ int exitStatus = longBowMain(argc, argv, testRunner, NULL);
+ longBowTestRunner_Destroy(&testRunner);
+ exit(exitStatus);
+}
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Local.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Local.c
new file mode 100644
index 00000000..014f4bbe
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Local.c
@@ -0,0 +1,249 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#define DEBUG_OUTPUT 1
+#include "../connector_Forwarder_Local.c"
+#include <LongBow/unit-test.h>
+
+#include <parc/algol/parc_SafeMemory.h>
+#include <parc/security/parc_Security.h>
+#include <parc/security/parc_Pkcs12KeyStore.h>
+
+#include <ccnx/api/control/cpi_ControlMessage.h>
+#include <ccnx/api/control/controlPlaneInterface.h>
+
+#include <ccnx/transport/transport_rta/core/rta_Framework_Commands.c>
+#include <ccnx/transport/transport_rta/core/rta_Framework_private.h>
+#include <ccnx/transport/transport_rta/config/config_All.h>
+#include <ccnx/transport/test_tools/bent_pipe.h>
+
+typedef struct test_data {
+ PARCRingBuffer1x1 *commandRingBuffer;
+ PARCNotifier *commandNotifier;
+ RtaFramework *framework;
+
+ int api_fds[2];
+ int listen_fd;
+ int rnd_fd;
+
+ int stackId;
+ RtaConnection *connectionUnderTest;
+
+ char bentpipe_LocalName[1024];
+ BentPipeState *bentpipe;
+ char keystoreName[1024];
+ char keystorePassword[1024];
+} TestData;
+
+static CCNxTransportConfig *
+_createParams(const char *local_name, const char *keystore_name, const char *keystore_passwd)
+{
+ assertNotNull(local_name, "Got null keystore name\n");
+ assertNotNull(keystore_name, "Got null keystore name\n");
+ assertNotNull(keystore_passwd, "Got null keystore passwd\n");
+
+ CCNxStackConfig *stackConfig = apiConnector_ProtocolStackConfig(
+ testingUpper_ProtocolStackConfig(
+ localForwarder_ProtocolStackConfig(
+ protocolStack_ComponentsConfigArgs(ccnxStackConfig_Create(),
+ apiConnector_GetName(),
+ testingUpper_GetName(),
+ localForwarder_GetName(), NULL))));
+
+ CCNxConnectionConfig *connConfig = apiConnector_ConnectionConfig(
+ testingUpper_ConnectionConfig(
+ tlvCodec_ConnectionConfig(
+ localForwarder_ConnectionConfig(
+ ccnxConnectionConfig_Create(), local_name))));
+
+ publicKeySigner_ConnectionConfig(connConfig, keystore_name, keystore_passwd);
+
+ CCNxTransportConfig *result = ccnxTransportConfig_Create(stackConfig, connConfig);
+ ccnxStackConfig_Release(&stackConfig);
+ return result;
+}
+
+static TestData *
+_commonSetup(void)
+{
+ parcSecurity_Init();
+ TestData *data = parcMemory_AllocateAndClear(sizeof(TestData));
+ assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData));
+
+ sprintf(data->bentpipe_LocalName, "/tmp/bentpipe_%d.sock", getpid());
+ data->bentpipe = bentpipe_Create(data->bentpipe_LocalName);
+ bentpipe_Start(data->bentpipe);
+
+ sprintf(data->keystoreName, "/tmp/keystore_%d.p12", getpid());
+ sprintf(data->keystorePassword, "23439429");
+
+ unlink(data->keystoreName);
+
+ bool success = parcPkcs12KeyStore_CreateFile(data->keystoreName, data->keystorePassword, "user", 1024, 30);
+ assertTrue(success, "parcPkcs12KeyStore_CreateFile() failed.");
+
+ data->commandRingBuffer = parcRingBuffer1x1_Create(128, NULL);
+ data->commandNotifier = parcNotifier_Create();
+ data->framework = rtaFramework_Create(data->commandRingBuffer, data->commandNotifier);
+
+ // Create a protocol stack and a connection to use
+ CCNxTransportConfig *params = _createParams(data->bentpipe_LocalName, data->keystoreName, data->keystorePassword);
+
+ data->stackId = 1;
+
+ RtaCommandCreateProtocolStack *createStack =
+ rtaCommandCreateProtocolStack_Create(data->stackId, ccnxTransportConfig_GetStackConfig(params));
+
+ _rtaFramework_ExecuteCreateStack(data->framework, createStack);
+ rtaCommandCreateProtocolStack_Release(&createStack);
+
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, data->api_fds);
+ RtaCommandOpenConnection *openConnection = rtaCommandOpenConnection_Create(data->stackId, data->api_fds[0], data->api_fds[1],
+ ccnxConnectionConfig_GetJson(ccnxTransportConfig_GetConnectionConfig(params)));
+ _rtaFramework_ExecuteOpenConnection(data->framework, openConnection);
+ rtaCommandOpenConnection_Release(&openConnection);
+
+ ccnxTransportConfig_Destroy(&params);
+
+ // now poke in to the connection table to get the connection to test
+ data->connectionUnderTest = rtaConnectionTable_GetByApiFd(data->framework->connectionTable, data->api_fds[0]);
+
+ return data;
+}
+
+static void
+_commonTeardown(TestData *data)
+{
+ rtaFramework_Teardown(data->framework);
+
+ parcRingBuffer1x1_Release(&data->commandRingBuffer);
+ parcNotifier_Release(&data->commandNotifier);
+ rtaFramework_Destroy(&data->framework);
+
+ bentpipe_Stop(data->bentpipe);
+ bentpipe_Destroy(&data->bentpipe);
+ unlink(data->keystoreName);
+ parcMemory_Deallocate((void **) &data);
+ parcSecurity_Fini();
+}
+
+// =============================================================
+
+LONGBOW_TEST_RUNNER(connector_Forwarder_Local)
+{
+ LONGBOW_RUN_TEST_FIXTURE(Local);
+}
+
+// The Test Runner calls this function once before any Test Fixtures are run.
+LONGBOW_TEST_RUNNER_SETUP(connector_Forwarder_Local)
+{
+ parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory);
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+// The Test Runner calls this function once after all the Test Fixtures are run.
+LONGBOW_TEST_RUNNER_TEARDOWN(connector_Forwarder_Local)
+{
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+
+// ======================================================
+LONGBOW_TEST_FIXTURE(Local)
+{
+ LONGBOW_RUN_TEST_CASE(Local, connector_Fwd_Local_Init_Release);
+ LONGBOW_RUN_TEST_CASE(Local, connector_Fwd_Local_Cpi_Pause);
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(Local)
+{
+ longBowTestCase_SetClipBoardData(testCase, _commonSetup());
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(Local)
+{
+ _commonTeardown(longBowTestCase_GetClipBoardData(testCase));
+
+ if (parcSafeMemory_ReportAllocation(STDOUT_FILENO) != 0) {
+ printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding());
+ return LONGBOW_STATUS_MEMORYLEAK;
+ }
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+// ====================================================================
+
+LONGBOW_TEST_CASE(Local, connector_Fwd_Local_Init_Release)
+{
+ // nothing to do, just checking that memory is in balance in teardown
+}
+
+/**
+ * Send a PAUSE CPI message to the forwarder. It should reflect
+ * back a CPI ACK
+ */
+LONGBOW_TEST_CASE(Local, connector_Fwd_Local_Cpi_Pause)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ PARCJSON *controlPause = cpi_CreatePauseInputRequest();
+
+ CCNxTlvDictionary *controlDictionary = ccnxControlFacade_CreateCPI(controlPause);
+ TransportMessage *tm_in = transportMessage_CreateFromDictionary(controlDictionary);
+
+ uint64_t pause_seqnum = controlPlaneInterface_GetSequenceNumber(controlPause);
+ parcJSON_Release(&controlPause);
+ ccnxTlvDictionary_Release(&controlDictionary);
+
+ transportMessage_SetInfo(tm_in, rtaConnection_Copy(data->connectionUnderTest), (void (*)(void **))rtaConnection_Destroy);
+
+ PARCEventQueue *in = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(data->connectionUnderTest), TESTING_UPPER, RTA_DOWN);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(data->connectionUnderTest), TESTING_UPPER, RTA_DOWN);
+
+ rtaComponent_PutMessage(in, tm_in);
+
+ // this will crank it though the forwarder and reflect back up to us
+ rtaFramework_NonThreadedStepCount(data->framework, 4);
+
+ // The first message out should be a CONNECTION_OPEN
+ TransportMessage *tm_out = rtaComponent_GetMessage(out);
+ transportMessage_Destroy(&tm_out);
+
+ tm_out = rtaComponent_GetMessage(out);
+
+ assertTrue(transportMessage_IsControl(tm_out), "got wrong type, not a control message");
+
+ CCNxControl *control = ccnxMetaMessage_GetControl(transportMessage_GetDictionary(tm_out));
+
+ assertTrue(ccnxControl_IsACK(control), "Expected ccnxControl_IsACK to be true.");
+
+ uint64_t _ack_original_seqnum = ccnxControl_GetAckOriginalSequenceNumber(control);
+
+ assertTrue(_ack_original_seqnum == pause_seqnum,
+ "Got wrong original message seqnum, expected %" PRIu64 " got %" PRIu64, pause_seqnum, _ack_original_seqnum);
+
+ transportMessage_Destroy(&tm_out);
+}
+
+int
+main(int argc, char *argv[])
+{
+ LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(connector_Forwarder_Local);
+ int exitStatus = longBowMain(argc, argv, testRunner, NULL);
+ longBowTestRunner_Destroy(&testRunner);
+ exit(exitStatus);
+}
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Metis.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Metis.c
new file mode 100644
index 00000000..6fe9e3d2
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_connector_Forwarder_Metis.c
@@ -0,0 +1,1350 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ * This test will setup a server socket so the Metis connector can connect to it. We can
+ * then see the packets the connector thinks it is sending to Metis.
+ */
+
+#define DEBUG 1
+#include "../connector_Forwarder_Metis.c"
+
+#include <parc/algol/parc_SafeMemory.h>
+#include <parc/security/parc_Pkcs12KeyStore.h>
+#include <parc/security/parc_Security.h>
+
+#include <ccnx/api/control/cpi_ControlFacade.h>
+#include <ccnx/api/control/controlPlaneInterface.h>
+#include <ccnx/api/control/cpi_Forwarding.h>
+
+#include <ccnx/transport/transport_rta/core/rta_Framework_Commands.c>
+#include <ccnx/transport/transport_rta/core/rta_Framework_private.h>
+#include <ccnx/transport/transport_rta/config/config_All.h>
+
+#include <ccnx/common/codec/ccnxCodec_TlvPacket.h>
+#include <ccnx/transport/test_tools/traffic_tools.h>
+
+#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_FixedHeader.h>
+#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_PacketEncoder.h>
+#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_Types.h>
+#include <ccnx/common/codec/schema_v1/testdata/v1_interest_nameA.h>
+#include <ccnx/common/codec/schema_v1/testdata/v1_content_nameA_crc32c.h>
+#include <ccnx/common/codec/schema_v1/testdata/v1_cpi_add_route_crc32c.h>
+
+#include <ccnx/common/ccnx_WireFormatMessage.h>
+
+// inet_pton
+#include <arpa/inet.h>
+
+#include <LongBow/unit-test.h>
+
+static char keystorename[1024];
+static const char keystorepass[] = "2398472983479234";
+
+#ifndef INPORT_ANY
+#define INPORT_ANY 0
+#endif
+
+typedef struct test_data {
+ PARCRingBuffer1x1 *commandRingBuffer;
+ PARCNotifier *commandNotifier;
+
+ // we will bind to a random port, this is what we end up binding to
+ // Its in host byte order
+ uint16_t metis_port;
+
+ // server_socket is a socket we listen to like the Metis forwarder, so
+ // we can see all the traffic that comes out the bottom of the connector.
+ int server_socket;
+
+ // when we accept a client on the server socket, this is his socket
+ int client_socket;
+
+ RtaFramework *framework;
+ CCNxTransportConfig *params;
+
+ char keystoreName[1024];
+ char keystorePassword[1024];
+} TestData;
+
+/*
+ * @function setup_server
+ * @abstract Bind to 127.0.0.1 on a random port, returns the socket and port
+ * @discussion
+ * <#Discussion#>
+ *
+ * @param portOutput is the port bound to in host byte order
+ * @return <#return#>
+ */
+static int
+_setup_server(uint16_t *portOutput)
+{
+ struct sockaddr_in address;
+
+ /* listen on 127.0.0.1 random port */
+ address.sin_family = PF_INET;
+ address.sin_port = INPORT_ANY;
+ inet_pton(AF_INET, "127.0.0.1", &(address.sin_addr));
+
+ int fd = socket(PF_INET, SOCK_STREAM, 0);
+ assertFalse(fd < 0, "error on bind: (%d) %s", errno, strerror(errno));
+
+ // Set non-blocking flag
+ int flags = fcntl(fd, F_GETFL, NULL);
+ assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno);
+ int failure = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ assertFalse(failure, "fcntl failed to set file descriptor flags (%d)\n", errno);
+
+ failure = bind(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_in));
+ assertFalse(failure, "error on bind: (%d) %s", errno, strerror(errno));
+
+ failure = listen(fd, 16);
+ assertFalse(failure, "error on listen: (%d) %s", errno, strerror(errno));
+
+ socklen_t x = sizeof(address);
+ failure = getsockname(fd, (struct sockaddr *) &address, &x);
+ assertFalse(failure, "error on getsockname: (%d) %s", errno, strerror(errno));
+
+ *portOutput = htons(address.sin_port);
+
+ printf("test server setup on port %d\n", *portOutput);
+ return fd;
+}
+
+static int
+_accept_client(int server_socket)
+{
+ socklen_t addrlen;
+ struct sockaddr_in address;
+ int client_socket;
+
+ addrlen = sizeof(struct sockaddr_in);
+ client_socket = accept(server_socket, (struct sockaddr *) &address, &addrlen);
+ assertFalse(client_socket < 0, "accept error: %s", strerror(errno));
+
+ printf("%s accepted client on socket %d\n", __func__, client_socket);
+ return client_socket;
+}
+
+static RtaConnection *
+_openConnection(TestData *data, int stack_id, int fds[2])
+{
+ RtaCommandOpenConnection *openConnection = rtaCommandOpenConnection_Create(stack_id, fds[0], fds[1],
+ ccnxConnectionConfig_GetJson(ccnxTransportConfig_GetConnectionConfig(data->params)));
+ _rtaFramework_ExecuteOpenConnection(data->framework, openConnection);
+ rtaCommandOpenConnection_Release(&openConnection);
+
+ return rtaConnectionTable_GetByApiFd(data->framework->connectionTable, fds[0]);
+}
+
+static void
+_createStack(TestData *data, int stack_id)
+{
+ RtaCommandCreateProtocolStack *createStack =
+ rtaCommandCreateProtocolStack_Create(stack_id, ccnxTransportConfig_GetStackConfig(data->params));
+ _rtaFramework_ExecuteCreateStack(data->framework, createStack);
+ rtaCommandCreateProtocolStack_Release(&createStack);
+}
+
+static CCNxTransportConfig *
+_createParams(int port, const char *keystore_name, const char *keystore_passwd)
+{
+ CCNxStackConfig *stackConfig;
+ CCNxConnectionConfig *connConfig;
+
+ assertNotNull(keystore_name, "Got null keystore name\n");
+ assertNotNull(keystore_passwd, "Got null keystore passwd\n");
+
+ stackConfig = apiConnector_ProtocolStackConfig(
+ testingUpper_ProtocolStackConfig(
+ metisForwarder_ProtocolStackConfig(
+ protocolStack_ComponentsConfigArgs(ccnxStackConfig_Create(),
+ apiConnector_GetName(),
+ testingUpper_GetName(),
+ metisForwarder_GetName(), NULL))));
+
+ connConfig = apiConnector_ConnectionConfig(
+ testingUpper_ConnectionConfig(
+ metisForwarder_ConnectionConfig(
+ tlvCodec_ConnectionConfig(
+ ccnxConnectionConfig_Create()), port)));
+
+ publicKeySigner_ConnectionConfig(connConfig, keystore_name, keystore_passwd);
+
+ CCNxTransportConfig *result = ccnxTransportConfig_Create(stackConfig, connConfig);
+ ccnxStackConfig_Release(&stackConfig);
+ return result;
+}
+
+static TestData *
+_commonSetup(void)
+{
+ parcSecurity_Init();
+
+ TestData *data = parcMemory_AllocateAndClear(sizeof(TestData));
+ assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData));
+ memset(data, 0, sizeof(TestData));
+
+ data->server_socket = _setup_server(&data->metis_port);
+
+ // printf("%s listening on port %u\n", __func__, data->metis_port);
+
+ sprintf(data->keystoreName, "%s", keystorename);
+ sprintf(data->keystorePassword, keystorepass);
+
+ data->commandRingBuffer = parcRingBuffer1x1_Create(128, NULL);
+ data->commandNotifier = parcNotifier_Create();
+ data->framework = rtaFramework_Create(data->commandRingBuffer, data->commandNotifier);
+
+ data->params = _createParams(data->metis_port, data->keystoreName, keystorepass);
+ // we will always create stack #1 as the default stack
+ _createStack(data, 1);
+ return data;
+}
+
+static void
+_commonTeardown(TestData *data)
+{
+ if (data != NULL) {
+ if (data->server_socket > 0) {
+ close(data->server_socket);
+ }
+
+ if (data->client_socket > 0) {
+ close(data->client_socket);
+ }
+
+ ccnxTransportConfig_Destroy(&data->params);
+ rtaFramework_Teardown(data->framework);
+
+ parcRingBuffer1x1_Release(&data->commandRingBuffer);
+ parcNotifier_Release(&data->commandNotifier);
+ rtaFramework_Destroy(&data->framework);
+ parcMemory_Deallocate((void **) &data);
+ }
+ parcSecurity_Fini();
+}
+
+// ======================================================
+// helper functions
+
+/**
+ * Wait for a READ event on the specifid socket. Has a 1 second timeout.
+ *
+ * @return true if READ event
+ * @return false otherwise
+ */
+static bool
+_waitForSelect(int fd)
+{
+ fd_set readset;
+ FD_ZERO(&readset);
+ FD_SET(fd, &readset);
+ int result = select(fd + 1, &readset, NULL, NULL, &(struct timeval) { 1, 0 });
+ assertFalse(result < 0, "Error on select: (%d) %s", errno, strerror(errno));
+ assertFalse(result == 0, "Timeout waiting for connection attempt");
+ assertTrue(FD_ISSET(fd, &readset), "server_socket was not set by select");
+
+ return true;
+}
+
+
+
+static size_t
+_sendPacketToConnectorV1(int fd, size_t payloadLength)
+{
+ // Setup the header
+ uint8_t headerLength = 13;
+ uint16_t packetLength = payloadLength + headerLength;
+ uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest;
+
+ CCNxCodecSchemaV1FixedHeader hdr = { .version = 1, .packetType = packetType, .packetLength = htons(packetLength), .headerLength = headerLength };
+
+ // put header in packet and write the packet
+ uint8_t packet[1024];
+ memcpy(packet, &hdr, sizeof(hdr));
+
+ // write out exactly the number of bytes we need
+ size_t writeSize = packetLength;
+
+ ssize_t nwritten = write(fd, packet, writeSize);
+ assertTrue(nwritten == writeSize, "Wrong write size, expected %zu got %zd", writeSize, nwritten);
+ return writeSize;
+}
+
+static RtaConnection *
+setupConnectionAndClientSocket(TestData *data, int *apiSocketOuptut, int *clientSocketOutput)
+{
+ // Open a listener and accept the forwarders connection
+ int fds[2];
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, fds);
+ RtaConnection *conn = _openConnection(data, 1, fds);
+ assertNotNull(conn, "Got null connection opening on stack 1");
+
+ rtaFramework_NonThreadedStepCount(data->framework, 2);
+
+ // we should now see a connection request
+ _waitForSelect(data->server_socket);
+
+ // accept the client and set a 1 second read timeout on the socket
+ int client_fd = _accept_client(data->server_socket);
+ struct timeval readTimeout = { 1, 0 };
+ setsockopt(client_fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &readTimeout, sizeof(readTimeout));
+
+ *apiSocketOuptut = fds[0];
+ *clientSocketOutput = client_fd;
+ return conn;
+}
+
+// throw away the first control message
+static void
+_throwAwayControlMessage(PARCEventQueue *out)
+{
+ TransportMessage *control_tm = rtaComponent_GetMessage(out);
+ assertNotNull(control_tm, "Did not receive a transport message out of the top of the connector");
+ assertTrue(transportMessage_IsControl(control_tm),
+ "transport message is not a control message")
+ {
+ ccnxTlvDictionary_Display(transportMessage_GetDictionary(control_tm), 0);
+ }
+ transportMessage_Destroy(&control_tm);
+}
+
+static void
+_testReadPacketV1(size_t extraBytes)
+{
+ const int REMOTE = 0;
+ const int STACK = 1;
+ int fds[2];
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, fds);
+
+ PARCEventScheduler *scheduler = parcEventScheduler_Create();
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+
+ // this replaces "_openSocket"
+ fwd_state->fd = fds[STACK];
+
+ _setupSocket(fwd_state);
+
+ // Setup the header
+ uint16_t packetLength = 24;
+ uint8_t headerLength = 13;
+ uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest;
+
+ CCNxCodecSchemaV1FixedHeader hdr = { .version = 1, .packetType = packetType, .packetLength = htons(packetLength), .headerLength = headerLength };
+
+ // put header in packet and write the packet
+ uint8_t packet[1024];
+ memcpy(packet, &hdr, sizeof(hdr));
+
+ // write out exactly the number of bytes we need
+ size_t firstWrite = packetLength;
+
+ ssize_t nwritten = write(fds[REMOTE], packet, firstWrite + extraBytes);
+ assertTrue(nwritten == firstWrite + extraBytes, "Wrong write size, expected %zu got %zd",
+ firstWrite + extraBytes, nwritten);
+
+ ReadReturnCode readCode = _readPacket(fwd_state);
+
+ assertTrue(readCode == ReadReturnCode_Finished, "readCode should be %d got %d", ReadReturnCode_Finished, readCode);
+
+ // should indicate there's nothing left to read of the header
+ assertTrue(fwd_state->nextMessage.remainingReadLength == 0, "Remaining length should be 0 got %zu", fwd_state->nextMessage.remainingReadLength);
+
+ // we should be at position "firstWrite" in the packet buffer
+ assertNotNull(fwd_state->nextMessage.packet, "Packet buffer is null");
+ assertTrue(parcBuffer_Position(fwd_state->nextMessage.packet) == firstWrite,
+ "Wrong position, expected %zu got %zu", firstWrite, parcBuffer_Position(fwd_state->nextMessage.packet));
+
+ // cleanup
+ _fwdMetisState_Release(&fwd_state);
+ parcEventScheduler_Destroy(&scheduler);
+ close(fds[REMOTE]);
+}
+
+
+
+static void
+_testReadFromMetisFromArray(TestData *data, size_t length, uint8_t buffer[length])
+{
+ // create our connection. This will become part of the RTA framework, so will be
+ // cleaned up in the teardown
+ int api_fd;
+ int client_fd;
+ RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd);
+
+ ssize_t nwritten = write(client_fd, buffer, length);
+ assertTrue(nwritten == length, "Wrong write size, expected %zu got %zd", length, nwritten);
+
+ FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);;
+
+ _readFromMetis(fwd_state, conn);
+
+ // now crank the handle to pop those messages up the stack
+ rtaFramework_NonThreadedStepCount(data->framework, 5);
+
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(conn), TESTING_UPPER, RTA_DOWN);
+ _throwAwayControlMessage(out);
+
+ // verify the wire format is what we wrote
+ TransportMessage *test_tm = rtaComponent_GetMessage(out);
+ assertNotNull(test_tm, "Did not receive a transport message out of the top of the connector");
+
+ CCNxTlvDictionary *testDictionary = transportMessage_GetDictionary(test_tm);
+ PARCBuffer *writeFormat = ccnxWireFormatMessage_GetWireFormatBuffer(testDictionary);
+ assertNotNull(writeFormat,
+ "transport message does not have a wire format");
+
+ PARCBuffer *truth = parcBuffer_Wrap(buffer, length, 0, length);
+ assertTrue(parcBuffer_Equals(truth, writeFormat), "Wire format does not match expected")
+ {
+ printf("Expected:\n");
+ parcBuffer_Display(truth, 3);
+ printf("Received:\n");
+ parcBuffer_Display(writeFormat, 3);
+ }
+
+ parcBuffer_Release(&truth);
+ transportMessage_Destroy(&test_tm);
+}
+
+
+// ======================================================
+
+LONGBOW_TEST_RUNNER(connector_Forwarder_Metis)
+{
+ LONGBOW_RUN_TEST_FIXTURE(Local);
+
+ LONGBOW_RUN_TEST_FIXTURE(UpDirectionV1);
+ LONGBOW_RUN_TEST_FIXTURE(DownDirectionV1);
+}
+
+// The Test Runner calls this function once before any Test Fixtures are run.
+LONGBOW_TEST_RUNNER_SETUP(connector_Forwarder_Metis)
+{
+ parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory);
+
+ snprintf(keystorename, 1024, "/tmp/keystore_%d.p12", getpid());
+
+ // init + fini here so there's no memory imbalance
+ parcSecurity_Init();
+ parcPkcs12KeyStore_CreateFile(keystorename, keystorepass, "ccnxuser", 1024, 365);
+ parcSecurity_Fini();
+
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+// The Test Runner calls this function once after all the Test Fixtures are run.
+LONGBOW_TEST_RUNNER_TEARDOWN(connector_Forwarder_Metis)
+{
+ unlink(keystorename);
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+// ======================================================
+LONGBOW_TEST_FIXTURE(Local)
+{
+ LONGBOW_RUN_TEST_CASE(Local, connector_Fwd_Metis_Init_Release);
+ LONGBOW_RUN_TEST_CASE(Local, connector_Fwd_Metis_Opener_GoodPort);
+ LONGBOW_RUN_TEST_CASE(Local, _fwdMetisState_Release);
+ LONGBOW_RUN_TEST_CASE(Local, _readInEnvironmentConnectionSpecification);
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(Local)
+{
+ longBowTestCase_SetClipBoardData(testCase, _commonSetup());
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(Local)
+{
+ _commonTeardown(longBowTestCase_GetClipBoardData(testCase));
+
+ if (parcSafeMemory_ReportAllocation(STDOUT_FILENO) != 0) {
+ printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding());
+ return LONGBOW_STATUS_MEMORYLEAK;
+ }
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+// ====================================================================
+
+LONGBOW_TEST_CASE(Local, connector_Fwd_Metis_Init_Release)
+{
+ // nothing to do, just checking that memory is in balance in teardown
+}
+
+/**
+ * Call the opener with the right port. We should see a connection attempt on
+ * the server socket and be able to accept it.
+ */
+LONGBOW_TEST_CASE(Local, connector_Fwd_Metis_Opener_GoodPort)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ int fds[2];
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, fds);
+ RtaConnection *conn = _openConnection(data, 1, fds);
+ assertNotNull(conn, "Got null connection opening on stack 1");
+
+ rtaFramework_NonThreadedStepCount(data->framework, 2);
+
+ // we should now see a connection request
+ _waitForSelect(data->server_socket);
+
+ close(fds[1]);
+}
+
+/**
+ * Make sure everything is released and file descriptor is closed
+ */
+LONGBOW_TEST_CASE(Local, _fwdMetisState_Release)
+{
+ const int REMOTE = 0;
+ const int STACK = 1;
+ int fds[2];
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, fds);
+
+ PARCEventScheduler *scheduler = parcEventScheduler_Create();
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+
+ // this replaces "_openSocket"
+ fwd_state->fd = fds[STACK];
+
+ _setupSocket(fwd_state);
+
+
+ _fwdMetisState_Release(&fwd_state);
+ parcEventScheduler_Destroy(&scheduler);
+
+ // ensure that fds[STACK] is closed by _fwdMetisState_Release
+ uint8_t buffer[16];
+ ssize_t nread = recv(fds[STACK], buffer, 16, 0);
+ assertTrue(nread == -1 && errno == EBADF,
+ "read from closed socket %d should be EBADF, got return %zd and errno (%d) %s",
+ fds[STACK], nread, errno, strerror(errno));
+
+ close(fds[REMOTE]);
+}
+
+LONGBOW_TEST_CASE(Local, _readInEnvironmentConnectionSpecification)
+{
+ char *oldEnv = getenv(FORWARDER_CONNECTION_ENV);
+ setenv(FORWARDER_CONNECTION_ENV, "tcp://127.0.0.1:9999", 1);
+ struct sockaddr_in addr_in;
+ _readInEnvironmentConnectionSpecification(&addr_in);
+ assertTrue(addr_in.sin_port == htons(9999), "Port specification incorrectly parsed");
+ assertTrue(addr_in.sin_addr.s_addr == inet_addr("127.0.0.1"), "Address specification incorrectly parsed");;
+ if (oldEnv) {
+ setenv(FORWARDER_CONNECTION_ENV, oldEnv, 1);
+ } else {
+ unsetenv(FORWARDER_CONNECTION_ENV);
+ }
+}
+
+
+
+// ====================================================================
+
+
+LONGBOW_TEST_FIXTURE(UpDirectionV1)
+{
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketHeader_ExactFit);
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketHeader_TwoReads);
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _setupNextPacket);
+
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacket_PartialMessage);
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacket_ExactlyOneMessage);
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacket_MoreThanOneMessage);
+
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readFromMetis_ThreeMessages);
+
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readFromMetis_InterestV1);
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readFromMetis_ContentObjectV1);
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readFromMetis_ControlV1);
+
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketHeader_Error);
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketBody_Error);
+
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketHeader_Closed);
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readPacketBody_Closed);
+ LONGBOW_RUN_TEST_CASE(UpDirectionV1, _readFromMetis_Closed);
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(UpDirectionV1)
+{
+ longBowTestCase_SetClipBoardData(testCase, _commonSetup());
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(UpDirectionV1)
+{
+ _commonTeardown(longBowTestCase_GetClipBoardData(testCase));
+
+ if (parcSafeMemory_ReportAllocation(STDOUT_FILENO) != 0) {
+ printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding());
+ return LONGBOW_STATUS_MEMORYLEAK;
+ }
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+/**
+ * Put in exactly 8 bytes.
+ * This should return NULL, but will set nextMessageLength to be the right thing.
+ * Does not drain the buffer
+ */
+LONGBOW_TEST_CASE(UpDirectionV1, _readPacketHeader_ExactFit)
+{
+ const int REMOTE = 0;
+ const int STACK = 1;
+ int fds[2];
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, fds);
+
+ PARCEventScheduler *scheduler = parcEventScheduler_Create();
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+
+ // this replaces "_openSocket"
+ fwd_state->fd = fds[STACK];
+
+ _setupSocket(fwd_state);
+
+ // Setup the header
+ uint16_t packetLength = 24;
+ uint8_t headerLength = 13;
+ uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest;
+
+ CCNxCodecSchemaV1FixedHeader hdr = { .version = 1, .packetType = packetType, .packetLength = htons(packetLength), .headerLength = headerLength };
+
+ // put header in packet and write the packet
+ uint8_t packet[1024];
+ size_t bufferReadLength = sizeof(hdr);
+ memcpy(packet, &hdr, bufferReadLength);
+
+ // write out exactly the number of bytes we need
+ ssize_t nwritten = write(fds[REMOTE], packet, sizeof(CCNxCodecSchemaV1FixedHeader));
+ assertTrue(nwritten == sizeof(CCNxCodecSchemaV1FixedHeader), "Wrong write size, expected %zu got %zd",
+ sizeof(CCNxCodecSchemaV1FixedHeader), nwritten);
+
+ // test the function
+ ReadReturnCode readCode = _readPacketHeader(fwd_state);
+ assertTrue(readCode == ReadReturnCode_Finished, "readCode should be %d got %d", ReadReturnCode_Finished, readCode);
+ assertTrue(fwd_state->nextMessage.remainingReadLength == 0, "Remaining length should be 0 got %zu", fwd_state->nextMessage.remainingReadLength);
+
+ // other properties are tested as part of _setupNextPacket
+
+ // cleanup
+ _fwdMetisState_Release(&fwd_state);
+ parcEventScheduler_Destroy(&scheduler);
+ close(fds[REMOTE]);
+}
+
+/*
+ * Write the fixed header in two 4 byte writes
+ */
+LONGBOW_TEST_CASE(UpDirectionV1, _readPacketHeader_TwoReads)
+{
+ const int REMOTE = 0;
+ const int STACK = 1;
+ int fds[2];
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, fds);
+
+ PARCEventScheduler *scheduler = parcEventScheduler_Create();
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+
+ // this replaces "_openSocket"
+ fwd_state->fd = fds[STACK];
+
+ _setupSocket(fwd_state);
+
+ // Setup the header
+ uint16_t packetLength = 24;
+ uint8_t headerLength = 13;
+ uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest;
+
+ CCNxCodecSchemaV1FixedHeader hdr = {
+ .version = 1,
+ .packetType = packetType,
+ .packetLength = htons(packetLength),
+ .headerLength = headerLength
+ };
+
+ // put header in packet and write the packet
+ uint8_t packet[1024];
+ size_t bufferReadLength = sizeof(hdr);
+ memcpy(packet, &hdr, bufferReadLength);
+
+ // write out exactly the number of bytes we need
+ size_t firstWrite = 4;
+ size_t secondWrite = sizeof(CCNxCodecSchemaV1FixedHeader) - firstWrite;
+
+ ssize_t nwritten = write(fds[REMOTE], packet, firstWrite);
+ assertTrue(nwritten == firstWrite, "Wrong write size, expected %zu got %zd", firstWrite, nwritten);
+
+ ReadReturnCode readCode = _readPacketHeader(fwd_state);
+ assertTrue(readCode == ReadReturnCode_PartialRead, "readCode should be %d got %d", ReadReturnCode_PartialRead, readCode);
+
+ nwritten = write(fds[REMOTE], packet + firstWrite, secondWrite);
+ assertTrue(nwritten == secondWrite, "Wrong write size, expected %zu got %zd", secondWrite, nwritten);
+
+ readCode = _readPacketHeader(fwd_state);
+ assertTrue(readCode == ReadReturnCode_Finished, "readCode should be %d got %d", ReadReturnCode_Finished, readCode);
+
+ assertTrue(fwd_state->nextMessage.remainingReadLength == 0, "Remaining length should be 0 got %zu", fwd_state->nextMessage.remainingReadLength);
+
+ // other properties are tested as part of _setupNextPacket
+
+ // cleanup
+ _fwdMetisState_Release(&fwd_state);
+ parcEventScheduler_Destroy(&scheduler);
+ close(fds[REMOTE]);
+}
+
+LONGBOW_TEST_CASE(UpDirectionV1, _setupNextPacket)
+{
+ uint16_t packetLength = 24;
+ uint8_t headerLength = 13;
+ uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest;
+ uint8_t version = 1;
+ CCNxCodecSchemaV1FixedHeader hdr = { .version = version, .packetType = packetType, .packetLength = htons(packetLength), .headerLength = headerLength };
+
+ // setup fwd_state->nextMessage like we just read a header
+ PARCEventScheduler *scheduler = parcEventScheduler_Create();
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+ fwd_state->nextMessage.remainingReadLength = 0;
+ memcpy(&fwd_state->nextMessage.fixedHeader, &hdr, sizeof(hdr));
+
+ // this is the truth we will test against
+ size_t nextMessageLength = packetLength;
+
+ _setupNextPacket(fwd_state);
+
+ size_t allocatedLength = parcBuffer_Capacity(fwd_state->nextMessage.packet);
+ size_t position = parcBuffer_Position(fwd_state->nextMessage.packet);
+ parcBuffer_Flip(fwd_state->nextMessage.packet);
+ void *buffer = parcBuffer_Overlay(fwd_state->nextMessage.packet, 0);
+
+ assertTrue(fwd_state->nextMessage.length == nextMessageLength, "Wrong packet length, expected %zu got %zu", nextMessageLength, fwd_state->nextMessage.length);
+ assertTrue(fwd_state->nextMessage.packetType == packetType, "Wrong packetType, expected %u got %u", packetType, fwd_state->nextMessage.packetType);
+ assertTrue(fwd_state->nextMessage.version == version, "Wrong version, expected %u got %u", version, fwd_state->nextMessage.version);
+ assertTrue(allocatedLength == nextMessageLength, "Wrong packet buffer length, expected %zu got %zu", nextMessageLength, allocatedLength);
+
+ // and make sure the beginning of the buffer is the fixed header
+ assertTrue(position == sizeof(hdr), "Wrong write position, expected %zu got %zu", sizeof(hdr), position);
+ assertTrue(memcmp(buffer, &hdr, sizeof(hdr)) == 0, "Beginning of buffer not the fixed header");
+
+ // TODO: Finish me
+ _fwdMetisState_Release(&fwd_state);
+ parcEventScheduler_Destroy(&scheduler);
+}
+
+/**
+ * Write the fixed header plus part of the message body.
+ */
+LONGBOW_TEST_CASE(UpDirectionV1, _readPacket_PartialMessage)
+{
+ const int REMOTE = 0;
+ const int STACK = 1;
+ int fds[2];
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, fds);
+
+ PARCEventScheduler *scheduler = parcEventScheduler_Create();
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+
+ // this replaces "_openSocket"
+ fwd_state->fd = fds[STACK];
+
+ _setupSocket(fwd_state);
+
+ // Setup the header
+ uint16_t packetLength = 160;
+ uint8_t headerLength = 13;
+ uint8_t packetType = CCNxCodecSchemaV1Types_PacketType_Interest;
+ uint8_t version = 1;
+ CCNxCodecSchemaV1FixedHeader hdr = { .version = version, .packetType = packetType, .packetLength = htons(packetLength), .headerLength = headerLength };
+
+ // put header in packet and write the packet
+ uint8_t packet[1024];
+ memcpy(packet, &hdr, sizeof(hdr));
+
+ // write out exactly the number of bytes we need
+ size_t firstWrite = 100;
+
+ ssize_t nwritten = write(fds[REMOTE], packet, firstWrite);
+ assertTrue(nwritten == firstWrite, "Wrong write size, expected %zu got %zd", firstWrite, nwritten);
+
+ ReadReturnCode readCode = _readPacket(fwd_state);
+
+ assertTrue(readCode == ReadReturnCode_PartialRead, "return value should be %d got %d", ReadReturnCode_PartialRead, readCode);
+
+ // should indicate there's nothing left to read of the header
+ assertTrue(fwd_state->nextMessage.remainingReadLength == 0, "Remaining length should be 0 got %zu", fwd_state->nextMessage.remainingReadLength);
+
+ // we should be at position "firstWrite" in the packet buffer
+ assertNotNull(fwd_state->nextMessage.packet, "Packet buffer is null");
+ assertTrue(parcBuffer_Position(fwd_state->nextMessage.packet) == firstWrite,
+ "Wrong position, expected %zu got %zu", firstWrite, parcBuffer_Position(fwd_state->nextMessage.packet));
+
+ // cleanup
+ _fwdMetisState_Release(&fwd_state);
+ parcEventScheduler_Destroy(&scheduler);
+ close(fds[REMOTE]);
+}
+
+/**
+ * Write exactly one message
+ */
+LONGBOW_TEST_CASE(UpDirectionV1, _readPacket_ExactlyOneMessage)
+{
+ _testReadPacketV1(0);
+}
+
+/**
+ * Write more than one message.
+ */
+LONGBOW_TEST_CASE(UpDirectionV1, _readPacket_MoreThanOneMessage)
+{
+ _testReadPacketV1(100);
+}
+
+/**
+ * Make 3 messages pending on the read socket and make sure _readFromMetis delivers all
+ * 3 up the stack. _readFromMetis requires an RtaConnection, so we need a mock framework.
+ */
+LONGBOW_TEST_CASE(UpDirectionV1, _readFromMetis_ThreeMessages)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ // create our connection. This will become part of the RTA framework, so will be
+ // cleaned up in the teardown
+ int api_fd;
+ int client_fd;
+ RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd);
+
+ // Write three wire format packets up the bottom of the connector
+ const int loopCount = 3;
+ size_t writeSizes[loopCount];
+
+ for (int i = 0; i < loopCount; i++) {
+ writeSizes[i] = _sendPacketToConnectorV1(client_fd, (i + 1) * 100);
+ }
+
+ FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);;
+
+ _readFromMetis(fwd_state, conn);
+
+ // now crank the handle to pop those messages up the stack
+ rtaFramework_NonThreadedStepCount(data->framework, 5);
+
+ // now read the message out of the test component
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(conn), TESTING_UPPER, RTA_DOWN);
+
+ // throw away the first control message
+ _throwAwayControlMessage(out);
+
+ // Now read the actual messages we want to test
+ for (int i = 0; i < loopCount; i++) {
+ TransportMessage *test_tm = rtaComponent_GetMessage(out);
+ assertNotNull(test_tm, "Did not receive a transport message %d out of %d out of the top of the connector", i + 1, loopCount);
+
+ assertTrue(transportMessage_IsInterest(test_tm),
+ "second transport message is not an interest")
+ {
+ ccnxTlvDictionary_Display(transportMessage_GetDictionary(test_tm), 0);
+ }
+
+ // Make sure the transport message has the right properties
+ CCNxTlvDictionary *testDictionary = transportMessage_GetDictionary(test_tm);
+ PARCBuffer *writeFormat = ccnxWireFormatMessage_GetWireFormatBuffer(testDictionary);
+ assertNotNull(writeFormat,
+ "transport message does not have a wire format");
+
+ assertTrue(parcBuffer_Remaining(writeFormat) == writeSizes[i],
+ "Raw format message wrong length, expected %zu got %zu",
+ writeSizes[i],
+ parcBuffer_Remaining(writeFormat));
+
+ // cleanup
+ transportMessage_Destroy(&test_tm);
+ }
+
+ // no extra cleanup, done in teardown
+}
+
+LONGBOW_TEST_CASE(UpDirectionV1, _readFromMetis_InterestV1)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+ _testReadFromMetisFromArray(data, sizeof(v1_interest_nameA), v1_interest_nameA);
+}
+
+LONGBOW_TEST_CASE(UpDirectionV1, _readFromMetis_ContentObjectV1)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+ _testReadFromMetisFromArray(data, sizeof(v1_content_nameA_crc32c), v1_content_nameA_crc32c);
+}
+
+LONGBOW_TEST_CASE(UpDirectionV1, _readFromMetis_ControlV1)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+ _testReadFromMetisFromArray(data, sizeof(v1_cpi_add_route_crc32c), v1_cpi_add_route_crc32c);
+}
+
+/*
+ * read from a closed socket
+ */
+LONGBOW_TEST_CASE(UpDirectionV1, _readPacketHeader_Closed)
+{
+ const int REMOTE = 0;
+ const int STACK = 1;
+ int fds[2];
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, fds);
+
+ PARCEventScheduler *scheduler = parcEventScheduler_Create();
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+ fwd_state->fd = fds[STACK];
+ _setupSocket(fwd_state);
+
+ // close remote side then try to write to it
+ close(fds[REMOTE]);
+
+ ReadReturnCode readCode = _readPacketHeader(fwd_state);
+
+ _fwdMetisState_Release(&fwd_state);
+ parcEventScheduler_Destroy(&scheduler);
+
+ assertTrue(readCode == ReadReturnCode_Closed, "Wrong return code, expected %d got %d", ReadReturnCode_Closed, readCode);
+}
+
+LONGBOW_TEST_CASE(UpDirectionV1, _readPacketBody_Closed)
+{
+ const int REMOTE = 0;
+ const int STACK = 1;
+ int fds[2];
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, fds);
+
+ PARCEventScheduler *scheduler = parcEventScheduler_Create();
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+ fwd_state->fd = fds[STACK];
+ _setupSocket(fwd_state);
+
+ ssize_t nwritten = write(fds[REMOTE], v1_interest_nameA, 8);
+ assertTrue(nwritten == 8, "Wrong write size, expected 8 got %zd", nwritten);
+
+ // read the header to setup the read of the body
+ ReadReturnCode readCode;
+
+ readCode = _readPacketHeader(fwd_state);
+ assertTrue(readCode == ReadReturnCode_Finished, "Did not read entire header");
+
+ // close remote side then try to write to it
+ close(fds[REMOTE]);
+
+ // now try 2nd read
+ readCode = _readPacketBody(fwd_state);
+
+ _fwdMetisState_Release(&fwd_state);
+ parcEventScheduler_Destroy(&scheduler);
+
+ assertTrue(readCode == ReadReturnCode_Closed, "Wrong return code, expected %d got %d", ReadReturnCode_Closed, readCode);
+}
+
+/*
+ * Set the socket to -1 to cause and error
+ */
+LONGBOW_TEST_CASE(UpDirectionV1, _readPacketHeader_Error)
+{
+ const int REMOTE = 0;
+ const int STACK = 1;
+ int fds[2];
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, fds);
+
+ PARCEventScheduler *scheduler = parcEventScheduler_Create();
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+ fwd_state->fd = fds[STACK];
+ _setupSocket(fwd_state);
+
+ fwd_state->fd = -1;
+
+ // close remote side then try to write to it
+
+ ReadReturnCode readCode = _readPacketHeader(fwd_state);
+
+ _fwdMetisState_Release(&fwd_state);
+ parcEventScheduler_Destroy(&scheduler);
+ close(fds[STACK]);
+ close(fds[REMOTE]);
+
+ assertTrue(readCode == ReadReturnCode_Error, "Wrong return code, expected %d got %d", ReadReturnCode_Error, readCode);
+}
+
+/*
+ * Set the socket to -1 to cause and error
+ */
+LONGBOW_TEST_CASE(UpDirectionV1, _readPacketBody_Error)
+{
+ const int REMOTE = 0;
+ const int STACK = 1;
+ int fds[2];
+ socketpair(PF_LOCAL, SOCK_STREAM, 0, fds);
+
+ PARCEventScheduler *scheduler = parcEventScheduler_Create();
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+ fwd_state->fd = fds[STACK];
+ _setupSocket(fwd_state);
+
+ ssize_t nwritten = write(fds[REMOTE], v1_interest_nameA, 8);
+ assertTrue(nwritten == 8, "Wrong write size, expected 8 got %zd", nwritten);
+
+ // read the header to setup the read of the body
+ ReadReturnCode readCode;
+
+ readCode = _readPacketHeader(fwd_state);
+ assertTrue(readCode == ReadReturnCode_Finished, "Did not read entire header");
+
+ // invalidate to cause an error
+ fwd_state->fd = -1;
+
+ // now try 2nd read
+ readCode = _readPacketBody(fwd_state);
+
+ _fwdMetisState_Release(&fwd_state);
+ parcEventScheduler_Destroy(&scheduler);
+ close(fds[STACK]);
+ close(fds[REMOTE]);
+
+ assertTrue(readCode == ReadReturnCode_Error, "Wrong return code, expected %d got %d", ReadReturnCode_Error, readCode);
+}
+
+/*
+ * read from a closed socket.
+ * This should generate a Notify message that the connection is closed
+ */
+LONGBOW_TEST_CASE(UpDirectionV1, _readFromMetis_Closed)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ // create our connection. This will become part of the RTA framework, so will be
+ // cleaned up in the teardown
+ int api_fd;
+ int client_fd;
+ RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd);
+ FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);;
+
+ rtaFramework_NonThreadedStepCount(data->framework, 5);
+
+ close(client_fd);
+
+ _readFromMetis(fwd_state, conn);
+
+ // now crank the handle to pop those messages up the stack
+ rtaFramework_NonThreadedStepCount(data->framework, 5);
+
+ // now read the message out of the test component
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(conn), TESTING_UPPER, RTA_DOWN);
+
+ // throw away the first control message
+ _throwAwayControlMessage(out);
+
+ TransportMessage *test_tm = rtaComponent_GetMessage(out);
+ assertNotNull(test_tm, "Did not receive a transport message out of the top of the connector");
+
+ assertTrue(transportMessage_IsControl(test_tm),
+ "second transport message is not a control")
+ {
+ ccnxTlvDictionary_Display(transportMessage_GetDictionary(test_tm), 0);
+ }
+
+ // Make sure the transport message has the right properties
+ CCNxTlvDictionary *testDictionary = transportMessage_GetDictionary(test_tm);
+ assertTrue(ccnxControlFacade_IsNotification(testDictionary), "Control message is not Notification")
+ {
+ ccnxTlvDictionary_Display(testDictionary, 3);
+ }
+
+ PARCJSON *json = ccnxControlFacade_GetJson(testDictionary);
+ NotifyStatus *notify = notifyStatus_ParseJSON(json);
+ assertTrue(notifyStatus_GetStatusCode(notify) == notifyStatusCode_CONNECTION_CLOSED,
+ "Wrong code, expected %d got %d",
+ notifyStatusCode_CONNECTION_CLOSED,
+ notifyStatus_GetStatusCode(notify));
+ notifyStatus_Release(&notify);
+
+ // verify other properties
+ assertFalse(fwd_state->isConnected, "Forwarder state should show connection closed");
+
+ // cleanup
+ transportMessage_Destroy(&test_tm);
+
+ // no extra cleanup, done in teardown
+}
+
+LONGBOW_TEST_FIXTURE(DownDirectionV1)
+{
+ LONGBOW_RUN_TEST_CASE(DownDirectionV1, _queueMessageToMetis);
+ LONGBOW_RUN_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis);
+ LONGBOW_RUN_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis_TwoWrites);
+ LONGBOW_RUN_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis_Closed);
+
+ LONGBOW_RUN_TEST_CASE(DownDirectionV1, connector_Fwd_Metis_Downcall_Read_Interst);
+ LONGBOW_RUN_TEST_CASE(DownDirectionV1, connector_Fwd_Metis_Downcall_Read_CPIRequest);
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(DownDirectionV1)
+{
+ longBowTestCase_SetClipBoardData(testCase, _commonSetup());
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(DownDirectionV1)
+{
+ _commonTeardown(longBowTestCase_GetClipBoardData(testCase));
+
+ if (parcSafeMemory_ReportAllocation(STDOUT_FILENO) != 0) {
+ printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding());
+ return LONGBOW_STATUS_MEMORYLEAK;
+ }
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+/*
+ * _queueMessageToMetis postconditions:
+ * - increases the reference count to the wireFormat
+ * - adds the reference to fwd_output buffer
+ * - increments the debugging counter fwd_metis_references_queued
+ */
+LONGBOW_TEST_CASE(DownDirectionV1, _queueMessageToMetis)
+{
+ PARCEventScheduler *scheduler = parcEventScheduler_Create();
+ FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler);
+ PARCBuffer *wireFormat = parcBuffer_Wrap(v1_interest_nameA, sizeof(v1_interest_nameA), 0, sizeof(v1_interest_nameA));
+ size_t expectedRefCount = parcObject_GetReferenceCount(wireFormat);
+
+ _queueBufferMessageToMetis(wireFormat, fwd_state->metisOutputQueue);
+
+ assertTrue(parcObject_GetReferenceCount(wireFormat) == expectedRefCount,
+ "Did not get right ref count for wire format, expected %zu got %" PRIu64, expectedRefCount, parcObject_GetReferenceCount(wireFormat));
+ assertTrue(parcEventBuffer_GetLength(fwd_state->metisOutputQueue) == parcBuffer_Remaining(wireFormat),
+ "Wrong output buffer length, expected %zu got %zu", parcBuffer_Remaining(wireFormat), parcEventBuffer_GetLength(fwd_state->metisOutputQueue));
+
+ parcBuffer_Release(&wireFormat);
+ parcEventBuffer_Destroy(&fwd_state->metisOutputQueue);
+ _fwdMetisState_Release(&fwd_state);
+ parcEventScheduler_Destroy(&scheduler);
+}
+
+/*
+ * Dequeue a small message to metis, should all be written out.
+ */
+LONGBOW_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ int api_fd;
+ int client_fd;
+ RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd);
+
+ FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);;
+
+ // Put data in the output queue
+ PARCBuffer *wireFormat = parcBuffer_Wrap(v1_interest_nameA, sizeof(v1_interest_nameA), 0, sizeof(v1_interest_nameA));
+ _queueBufferMessageToMetis(wireFormat, fwd_state->metisOutputQueue);
+
+ // write it out
+ _dequeueMessagesToMetis(fwd_state);
+ rtaFramework_NonThreadedStepCount(data->framework, 5);
+
+ // we should now be able to read it
+ bool readReady = _waitForSelect(client_fd);
+ assertTrue(readReady, "client socket %d not ready for read", client_fd);
+
+ uint8_t testArray[sizeof(v1_interest_nameA) + 1];
+ ssize_t nrecv = recv(client_fd, testArray, sizeof(testArray), 0);
+
+ assertTrue(nrecv == sizeof(v1_interest_nameA), "Wrong read length, expected %zu got %zd", sizeof(v1_interest_nameA), nrecv);
+ assertTrue(memcmp(testArray, v1_interest_nameA, sizeof(v1_interest_nameA)) == 0, "Read memory does not compare");
+ assertTrue(parcEventBuffer_GetLength(fwd_state->metisOutputQueue) == 0, "Metis output buffer not zero length, got %zu", parcEventBuffer_GetLength(fwd_state->metisOutputQueue));
+ parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue));
+ parcBuffer_Release(&wireFormat);
+}
+
+/*
+ * Set the forwarder's send buffer small so it will take two writes to send the packet.
+ * This will test that when _dequeueMessagesToMetis cannot write the whole thing it will enable the
+ * write event and that metis will then trigger a second write when there's buffer space.
+ */
+LONGBOW_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis_TwoWrites)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ int api_fd;
+ int client_fd;
+ RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd);
+
+ FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);;
+
+ // set the send buffer
+ {
+ // make it slightly bigger than 1/2
+ const int sendBufferSize = sizeof(v1_interest_nameA) / 2 + 1;
+ int res = setsockopt(fwd_state->fd, SOL_SOCKET, SO_SNDBUF, &sendBufferSize, sizeof(int));
+ if (res < 0) {
+ if (DEBUG_OUTPUT) {
+ printf("%9c %s failed to set SO_SNDBUF to %d: (%d) %s\n",
+ ' ', __func__, sendBufferSize, errno, strerror(errno));
+ }
+ // This is a non-fatal error
+ }
+ }
+
+ // Put data in the output queue
+ PARCBuffer *wireFormat = parcBuffer_Wrap(v1_interest_nameA, sizeof(v1_interest_nameA), 0, sizeof(v1_interest_nameA));
+ _queueBufferMessageToMetis(wireFormat, fwd_state->metisOutputQueue);
+
+ // write it out
+ _dequeueMessagesToMetis(fwd_state);
+ rtaFramework_NonThreadedStepCount(data->framework, 5);
+
+ // we should now be able to read it
+ bool readReady = _waitForSelect(client_fd);
+ assertTrue(readReady, "client socket %d not ready for read", client_fd);
+
+ uint8_t testArray[sizeof(v1_interest_nameA) + 1];
+ ssize_t nrecv = recv(client_fd, testArray, sizeof(testArray), 0);
+
+ assertTrue(nrecv == sizeof(v1_interest_nameA), "Wrong read length, expected %zu got %zd", sizeof(v1_interest_nameA), nrecv);
+ assertTrue(memcmp(testArray, v1_interest_nameA, sizeof(v1_interest_nameA)) == 0, "Read memory does not compare");
+ assertTrue(parcEventBuffer_GetLength(fwd_state->metisOutputQueue) == 0, "Metis output buffer not zero length, got %zu", parcEventBuffer_GetLength(fwd_state->metisOutputQueue));
+ parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue));
+ parcBuffer_Release(&wireFormat);
+}
+
+/*
+ * Dequeue a message to a closed socket
+ */
+LONGBOW_TEST_CASE(DownDirectionV1, _dequeueMessagesToMetis_Closed)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ int api_fd;
+ int client_fd;
+ RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd);
+
+ FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);;
+ PARCBuffer *wireFormat = parcBuffer_Wrap(v1_interest_nameA, sizeof(v1_interest_nameA), 0, sizeof(v1_interest_nameA));
+ _queueBufferMessageToMetis(wireFormat, fwd_state->metisOutputQueue);
+
+ // close remote side then try to write to it
+ close(client_fd);
+
+ _dequeueMessagesToMetis(fwd_state);
+ rtaFramework_NonThreadedStepCount(data->framework, 5);
+
+ parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue));
+ parcBuffer_Release(&wireFormat);
+}
+
+/**
+ * Sends an Interest down the stack. We need to create an Interest and encode its TLV wire format,
+ * then send it down the stack and make sure we receive it on a client socket. We don't actually
+ * run Metis in this test.
+ */
+LONGBOW_TEST_CASE(DownDirectionV1, connector_Fwd_Metis_Downcall_Read_Interst)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ // create our connection. This will become part of the RTA framework, so will be
+ // cleaned up in the teardown
+ int api_fd;
+ int client_fd;
+ RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd);
+ FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);
+
+ // Create the interest with wire format and send it down the stack
+ TransportMessage *tm = trafficTools_CreateTransportMessageWithDictionaryInterest(conn, CCNxTlvDictionary_SchemaVersion_V1);
+ CCNxCodecNetworkBufferIoVec *vec = ccnxCodecSchemaV1PacketEncoder_DictionaryEncode(transportMessage_GetDictionary(tm), NULL);
+
+ ccnxWireFormatMessage_PutIoVec(transportMessage_GetDictionary(tm), vec);
+ ccnxCodecNetworkBufferIoVec_Release(&vec);
+
+ // send it down the stack
+ PARCEventQueue *in = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(conn), TESTING_UPPER, RTA_DOWN);
+ rtaComponent_PutMessage(in, tm);
+ rtaFramework_NonThreadedStepCount(data->framework, 5);
+
+ bool readReady = _waitForSelect(client_fd);
+ assertTrue(readReady, "select did not indicate read ready");
+
+ // now read it from out listener. It has a read timeout so if we dont get it in a reasonable amount
+ // of time, read will return an error about the timeout
+
+ const size_t maxPacketLength = 1024;
+ uint8_t packet[maxPacketLength];
+
+ ssize_t readBytes = read(client_fd, packet, maxPacketLength);
+ assertFalse(readBytes < 0, "Got error on read: (%d) %s", errno, strerror(errno));
+
+ parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue));
+ close(client_fd);
+}
+
+/**
+ * Send an AddRoute command down the stack. It does not need a wire format in the transport message, its
+ * the job of the forwarder to create the Metis-specific message.
+ */
+LONGBOW_TEST_CASE(DownDirectionV1, connector_Fwd_Metis_Downcall_Read_CPIRequest)
+{
+ testUnimplemented("No way to create a v1 CPI message yet");
+
+// TestData *data = longBowTestCase_GetClipBoardData(testCase);
+//
+// // create our connection. This will become part of the RTA framework, so will be
+// // cleaned up in the teardown
+// int api_fd;
+// int client_fd;
+// RtaConnection *conn = setupConnectionAndClientSocket(data, &api_fd, &client_fd);
+// FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);
+//
+// // now make the control message
+// TransportMessage *tm = trafficTools_CreateTransportMessageWithDictionaryControl(conn, CCNxTlvDictionary_SchemaVersion_V1);
+// CCNxCodecNetworkBufferIoVec *vec = ccnxCodecSchemaV1PacketEncoder_DictionaryEncode(transportMessage_GetDictionary(tm), NULL);
+// ccnxWireFormatFacade_PutIoVec(transportMessage_GetDictionary(tm), vec);
+// ccnxCodecNetworkBufferIoVec_Release(&vec);
+//
+// // send it down the stack
+// PARCEventQueue *in = rtaProtocolStack_GetPutQueue(rtaConnection_GetStack(conn), TESTING_UPPER, RTA_DOWN);
+// rtaComponent_PutMessage(in, tm);
+// rtaFramework_NonThreadedStepCount(data->framework, 5);
+//
+// // now read it from out listener. It has a read timeout so if we dont get it in a reasonable amount
+// // of time, read will return an error about the timeout
+//
+// const size_t maxPacketLength = 1024;
+// uint8_t packet[maxPacketLength];
+//
+// ssize_t readBytes = read(client_fd, packet, maxPacketLength);
+// assertFalse(readBytes < 0, "Got error on read: (%d) %s", errno, strerror(errno));
+// parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue));
+}
+
+// ====================================================================
+
+int
+main(int argc, char *argv[])
+{
+ LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(connector_Forwarder_Metis);
+ int exitStatus = longBowMain(argc, argv, testRunner, NULL);
+ longBowTestRunner_Destroy(&testRunner);
+ exit(exitStatus);
+}
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_rta_ApiConnection.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_rta_ApiConnection.c
new file mode 100644
index 00000000..b0e937cd
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/test/test_rta_ApiConnection.c
@@ -0,0 +1,278 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Create a non-threaded framework to test internal RTA functions
+ *
+ */
+#include "../rta_ApiConnection.c"
+
+#include <poll.h>
+
+#include <parc/algol/parc_SafeMemory.h>
+#include <LongBow/unit-test.h>
+#include <ccnx/transport/transport_rta/config/config_All.h>
+#include <ccnx/transport/transport_rta/core/rta_Framework_Commands.c>
+
+#include <ccnx/transport/test_tools/traffic_tools.h>
+
+typedef struct test_data {
+ PARCRingBuffer1x1 *commandRingBuffer;
+ PARCNotifier *commandNotifier;
+ RtaFramework *framework;
+
+ int api_fds[2];
+ int stackId;
+
+ RtaProtocolStack *stack;
+ RtaConnection *connection;
+} TestData;
+
+static TestData *
+_commonSetup(void)
+{
+ TestData *data = parcMemory_AllocateAndClear(sizeof(TestData));
+ assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData));
+
+ data->commandRingBuffer = parcRingBuffer1x1_Create(128, NULL);
+ data->commandNotifier = parcNotifier_Create();
+ data->framework = rtaFramework_Create(data->commandRingBuffer, data->commandNotifier);
+ assertNotNull(data->framework, "rtaFramework_Create returned null");
+
+ CCNxStackConfig *stackConfig = ccnxStackConfig_Create();
+
+ apiConnector_ProtocolStackConfig(stackConfig);
+ testingLower_ProtocolStackConfig(stackConfig);
+ protocolStack_ComponentsConfigArgs(stackConfig, apiConnector_GetName(), testingLower_GetName(), NULL);
+
+ rtaFramework_NonThreadedStepCount(data->framework, 10);
+
+ // Create the protocol stack
+
+ data->stackId = 1;
+ RtaCommandCreateProtocolStack *createStack = rtaCommandCreateProtocolStack_Create(data->stackId, stackConfig);
+ _rtaFramework_ExecuteCreateStack(data->framework, createStack);
+ rtaCommandCreateProtocolStack_Release(&createStack);
+
+ rtaFramework_NonThreadedStepCount(data->framework, 10);
+ data->stack = (rtaFramework_GetProtocolStackByStackId(data->framework, data->stackId))->stack;
+
+ // Create a connection in the stack
+
+ int error = socketpair(AF_UNIX, SOCK_STREAM, 0, data->api_fds);
+ assertFalse(error, "Error creating socket pair: (%d) %s", errno, strerror(errno));
+
+ CCNxConnectionConfig *connConfig = ccnxConnectionConfig_Create();
+ apiConnector_ConnectionConfig(connConfig);
+
+ tlvCodec_ConnectionConfig(connConfig);
+
+ testingLower_ConnectionConfig(connConfig);
+
+ RtaCommandOpenConnection *openConnection = rtaCommandOpenConnection_Create(data->stackId, data->api_fds[PAIR_OTHER], data->api_fds[PAIR_TRANSPORT], ccnxConnectionConfig_GetJson(connConfig));
+ _rtaFramework_ExecuteOpenConnection(data->framework, openConnection);
+ rtaCommandOpenConnection_Release(&openConnection);
+
+ rtaFramework_NonThreadedStepCount(data->framework, 10);
+
+ data->connection = rtaConnectionTable_GetByApiFd(data->framework->connectionTable, data->api_fds[PAIR_OTHER]);
+
+ // cleanup
+
+ ccnxConnectionConfig_Destroy(&connConfig);
+ ccnxStackConfig_Release(&stackConfig);
+
+ return data;
+}
+
+static void
+_commonTeardown(TestData *data)
+{
+ rtaFramework_Teardown(data->framework);
+
+ parcRingBuffer1x1_Release(&data->commandRingBuffer);
+ parcNotifier_Release(&data->commandNotifier);
+ rtaFramework_Destroy(&data->framework);
+
+ close(data->api_fds[0]);
+ close(data->api_fds[1]);
+ parcMemory_Deallocate((void **) &data);
+}
+
+LONGBOW_TEST_RUNNER(rta_ApiConnection)
+{
+ LONGBOW_RUN_TEST_FIXTURE(Global);
+}
+
+// The Test Runner calls this function once before any Test Fixtures are run.
+LONGBOW_TEST_RUNNER_SETUP(rta_ApiConnection)
+{
+ parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory);
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+// The Test Runner calls this function once after all the Test Fixtures are run.
+LONGBOW_TEST_RUNNER_TEARDOWN(rta_ApiConnection)
+{
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE(Global)
+{
+ LONGBOW_RUN_TEST_CASE(Global, rtaApiConnection_BlockDown);
+ LONGBOW_RUN_TEST_CASE(Global, rtaApiConnection_Create_Destroy);
+ LONGBOW_RUN_TEST_CASE(Global, rtaApiConnection_Create_Checks);
+ LONGBOW_RUN_TEST_CASE(Global, rtaApiConnection_Create_Check_ApiSocket);
+
+ LONGBOW_RUN_TEST_CASE(Global, rtaApiConnection_UnblockDown);
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(Global)
+{
+ longBowTestCase_SetClipBoardData(testCase, _commonSetup());
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(Global)
+{
+ printf("Finishing testcase %s\n", longBowTestCase_GetName(testCase));
+ _commonTeardown(longBowTestCase_GetClipBoardData(testCase));
+
+ uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO);
+ if (outstandingAllocations != 0) {
+ printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations);
+ return LONGBOW_STATUS_MEMORYLEAK;
+ }
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_CASE(Global, rtaApiConnection_SendToApi)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+ RtaApiConnection *apiConnection = rtaConnection_GetPrivateData(data->connection, API_CONNECTOR);
+
+ TransportMessage *tm = trafficTools_CreateTransportMessageWithDictionaryInterest(data->connection, CCNxTlvDictionary_SchemaVersion_V1);
+
+ RtaComponentStats *stats = rtaConnection_GetStats(data->connection, API_CONNECTOR);
+ rtaApiConnection_SendToApi(apiConnection, tm, stats);
+ rtaFramework_NonThreadedStepCount(data->framework, 10);
+
+ // Let the dispatcher run
+ struct pollfd pfd = { .fd = data->api_fds[PAIR_OTHER], .events = POLLIN, .revents = 0 };
+ int millisecondTimeout = 1000;
+
+ int pollvalue = poll(&pfd, 1, millisecondTimeout);
+ assertTrue(pollvalue == 1, "Did not get an event from the API's side of the socket");
+
+ CCNxMetaMessage *testMessage;
+ ssize_t bytesRead = read(data->api_fds[PAIR_OTHER], &testMessage, sizeof(testMessage));
+ assertTrue(bytesRead == sizeof(testMessage), "Wrong read size, got %zd expected %zd: (%d) %s",
+ bytesRead, sizeof(testMessage),
+ errno, strerror(errno));
+ assertNotNull(testMessage, "Message read is NULL");
+
+
+ assertTrue(testMessage == transportMessage_GetDictionary(tm),
+ "Got wrong raw message, got %p expected %p",
+ (void *) testMessage, (void *) transportMessage_GetDictionary(tm));
+
+ ccnxMetaMessage_Release(&testMessage);
+ transportMessage_Destroy(&tm);
+}
+
+LONGBOW_TEST_CASE(Global, rtaApiConnection_BlockDown)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+ RtaApiConnection *apiConnection = rtaApiConnection_Create(data->connection);
+
+ // make sure we're startined unblocked
+ short enabled = parcEventQueue_GetEnabled(apiConnection->bev_api);
+ assertTrue(enabled & PARCEventType_Read, "PARCEventType_Read is not enabled on a new Api Connector: enabled = %04X", enabled);
+
+ rtaApiConnection_BlockDown(apiConnection);
+ enabled = parcEventQueue_GetEnabled(apiConnection->bev_api);
+ assertFalse(enabled & PARCEventType_Read, "PARCEventType_Read is still enabled after caling BlockDown: enabled = %04X", enabled);
+
+ rtaApiConnection_Destroy(&apiConnection);
+}
+
+LONGBOW_TEST_CASE(Global, rtaApiConnection_Create_Destroy)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ uint32_t beforeBalance = parcMemory_Outstanding();
+ RtaApiConnection *apiConnection = rtaApiConnection_Create(data->connection);
+ assertNotNull(apiConnection, "Got null API connection");
+
+ rtaApiConnection_Destroy(&apiConnection);
+ assertNull(apiConnection, "Destroy did not null apiConnection");
+ uint32_t afterBalance = parcMemory_Outstanding();
+ assertTrue(beforeBalance == afterBalance, "Memory imbalance: %d", (int) (afterBalance - beforeBalance));
+}
+
+LONGBOW_TEST_CASE(Global, rtaApiConnection_Create_Checks)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ RtaApiConnection *apiConnection = rtaApiConnection_Create(data->connection);
+ assertTrue(apiConnection->api_fd == rtaConnection_GetApiFd(data->connection),
+ "Wrong api fd, got %d expected %d",
+ apiConnection->api_fd, rtaConnection_GetApiFd(data->connection));
+
+ assertTrue(apiConnection->transport_fd == rtaConnection_GetTransportFd(data->connection),
+ "Wrong api fd, got %d expected %d",
+ apiConnection->transport_fd, rtaConnection_GetTransportFd(data->connection));
+
+ assertTrue(apiConnection->connection == data->connection,
+ "Wrong connection, got %p expected %p",
+ (void *) apiConnection->connection,
+ (void *) data->connection);
+
+ rtaApiConnection_Destroy(&apiConnection);
+}
+
+LONGBOW_TEST_CASE(Global, rtaApiConnection_Create_Check_ApiSocket)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+ RtaApiConnection *apiConnection = rtaApiConnection_Create(data->connection);
+
+ assertNotNull(apiConnection->bev_api, "API socket event null");
+
+ rtaApiConnection_Destroy(&apiConnection);
+}
+
+LONGBOW_TEST_CASE(Global, rtaApiConnection_UnblockDown)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+ RtaApiConnection *apiConnection = rtaApiConnection_Create(data->connection);
+
+ rtaApiConnection_BlockDown(apiConnection);
+ // we know from previous test that this puts the apiConnector in blocked state
+
+ rtaApiConnection_UnblockDown(apiConnection);
+ short enabled = parcEventQueue_GetEnabled(apiConnection->bev_api);
+ assertTrue(enabled & PARCEventType_Read, "PARCEventType_Read is not enabled after caling UnlockDown: enabled = %04X", enabled);
+
+ rtaApiConnection_Destroy(&apiConnection);
+}
+
+int
+main(int argc, char *argv[])
+{
+ LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(rta_ApiConnection);
+ int exitStatus = longBowMain(argc, argv, testRunner, NULL);
+ longBowTestRunner_Destroy(&testRunner);
+ exit(exitStatus);
+}