aboutsummaryrefslogtreecommitdiffstats
path: root/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c
diff options
context:
space:
mode:
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c')
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c552
1 files changed, 552 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c
new file mode 100644
index 00000000..a434600a
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Local.c
@@ -0,0 +1,552 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PF_LOCAL forwarder glue, mostly for testing. This uses a
+ * STREAM socket with a user specified coding. Each message
+ * on the stream is of this format:
+ *
+ * uint32_t process pid
+ * uint32_t user_socket_fd
+ * uint32_t message bytes that follow
+ * uint8_t[] message encoded with user specified codec
+ *
+ * The user_socket_fd will be the same number that the API was assigned
+ * in transportRta_Socket->api_socket_pair[PAIR_OTHER].
+ *
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <errno.h>
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <parc/algol/parc_EventBuffer.h>
+
+#include <LongBow/runtime.h>
+#include <LongBow/debugging.h>
+
+#include <parc/algol/parc_Memory.h>
+
+#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h>
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+#include <ccnx/transport/transport_rta/connectors/connector_Forwarder.h>
+
+#include <ccnx/transport/transport_rta/config/config_Forwarder_Local.h>
+#include <ccnx/api/control/controlPlaneInterface.h>
+#include <ccnx/api/control/cpi_ControlFacade.h>
+
+#include <ccnx/common/ccnx_WireFormatMessage.h>
+
+#ifndef DEBUG_OUTPUT
+#define DEBUG_OUTPUT 0
+#endif
+
+static int connector_Fwd_Local_Init(RtaProtocolStack *stack);
+static int connector_Fwd_Local_Opener(RtaConnection *conn);
+static void connector_Fwd_Local_Upcall_Read(PARCEventQueue *, PARCEventType, void *conn);
+static void connector_Fwd_Local_Upcall_Event(PARCEventQueue *, PARCEventQueueEventType, void *stack);
+static void connector_Fwd_Local_Downcall_Read(PARCEventQueue *, PARCEventType, void *conn);
+static int connector_Fwd_Local_Closer(RtaConnection *conn);
+static int connector_Fwd_Local_Release(RtaProtocolStack *stack);
+static void connector_Fwd_Local_StateChange(RtaConnection *conn);
+
+RtaComponentOperations fwd_local_ops = {
+ .init = connector_Fwd_Local_Init,
+ .open = connector_Fwd_Local_Opener,
+ .upcallRead = connector_Fwd_Local_Upcall_Read,
+ .upcallEvent = connector_Fwd_Local_Upcall_Event,
+ .downcallRead = connector_Fwd_Local_Downcall_Read,
+ .downcallEvent = NULL,
+ .close = connector_Fwd_Local_Closer,
+ .release = connector_Fwd_Local_Release,
+ .stateChange = connector_Fwd_Local_StateChange
+};
+
+struct fwd_local_state {
+ int fd;
+ PARCEventQueue *bev_local;
+ int connected;
+};
+
+typedef struct {
+ uint32_t pid;
+ uint32_t fd;
+ uint32_t length;
+ uint32_t pad; // make it 16 bytes
+} __attribute__ ((packed)) localhdr;
+
+// ================================
+// NULL
+
+static int
+connector_Fwd_Local_Init(RtaProtocolStack *stack)
+{
+ // no stack-wide initialization
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s init stack %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(stack)),
+ __func__,
+ (void *) stack);
+ }
+ return 0;
+}
+
+/*
+ * Create a PF_LOCAL socket
+ * Set it non-blocking
+ * Wrap it in a buffer event
+ * Set Read and Event callbacks
+ * connect to LOCAL_NAME
+ *
+ * Return 0 success, -1 failure
+ */
+static int
+connector_Fwd_Local_Opener(RtaConnection *conn)
+{
+ PARCEventScheduler *base;
+ RtaProtocolStack *stack;
+ const char *sock_name;
+
+ stack = rtaConnection_GetStack(conn);
+ base = rtaFramework_GetEventScheduler(rtaProtocolStack_GetFramework(stack));
+
+ sock_name = localForwarder_GetPath(rtaConnection_GetParameters(conn));
+ assertNotNull(sock_name, "connector_Fwd_Local_Opener called without setting LOCAL_NAME");
+
+ if (sock_name == NULL) {
+ return -1;
+ }
+
+ struct fwd_local_state *fwd_state = parcMemory_Allocate(sizeof(struct fwd_local_state));
+ assertNotNull(fwd_state, "parcMemory_Allocate(%zu) returned NULL", sizeof(struct fwd_local_state));
+
+ rtaConnection_SetPrivateData(conn, FWD_LOCAL, fwd_state);
+
+ fwd_state->fd = socket(PF_LOCAL, SOCK_STREAM, 0);
+ if (fwd_state->fd < 0) {
+ perror("socket PF_LOCAL");
+ }
+ assertFalse(fwd_state->fd < 0, "socket PF_LOCAL error");
+
+ struct sockaddr_un addr_unix;
+ memset(&addr_unix, 0, sizeof(struct sockaddr_un));
+ addr_unix.sun_family = AF_UNIX;
+
+ trapIllegalValueIf(sizeof(addr_unix.sun_path) <= strlen(sock_name), "sock_name too long, maximum length %zu", sizeof(addr_unix.sun_path) - 1);
+ strcpy(addr_unix.sun_path, sock_name);
+
+ // Setup the socket as non-blocking then wrap in a parcEventQueue.
+
+ int flags = fcntl(fwd_state->fd, F_GETFL, NULL);
+ assertFalse(flags < 0, "fcntl failed to obtain file descriptor flags (%d)\n", errno);
+
+ int failure = fcntl(fwd_state->fd, F_SETFL, flags | O_NONBLOCK);
+ assertFalse(failure, "fcntl failed to set file descriptor flags (%d)\n", errno);
+
+ assertTrue(failure == 0, "could not make socket non-blocking");
+ if (failure < 0) {
+ rtaConnection_SetPrivateData(conn, FWD_LOCAL, NULL);
+ close(fwd_state->fd);
+ parcMemory_Deallocate((void **) &fwd_state);
+ return -1;
+ }
+
+ fwd_state->bev_local = parcEventQueue_Create(base, fwd_state->fd, PARCEventQueueOption_CloseOnFree);
+
+ assertNotNull(fwd_state->bev_local, "Null buffer event for local socket.");
+
+ parcEventQueue_SetCallbacks(fwd_state->bev_local,
+ connector_Fwd_Local_Upcall_Read,
+ NULL,
+ connector_Fwd_Local_Upcall_Event,
+ conn);
+
+ parcEventQueue_Enable(fwd_state->bev_local, PARCEventType_Read);
+
+ memset(&addr_unix, 0, sizeof(addr_unix));
+ addr_unix.sun_family = AF_UNIX;
+
+ trapIllegalValueIf(sizeof(addr_unix.sun_path) <= strlen(sock_name), "sock_name too long, maximum length %zu", sizeof(addr_unix.sun_path) - 1);
+ strcpy(addr_unix.sun_path, sock_name);
+
+ // This will deliver a PARCEventQueue_Connected on connect success
+ if (parcEventQueue_ConnectSocket(fwd_state->bev_local,
+ (struct sockaddr*) &addr_unix,
+ (socklen_t) sizeof(addr_unix)) < 0) {
+ perror("connect PF_LOCAL");
+ assertTrue(0, "connect PF_LOCAL");
+ rtaConnection_SetPrivateData(conn, FWD_LOCAL, NULL);
+ close(fwd_state->fd);
+ parcMemory_Deallocate((void **) &fwd_state);
+ return -1;
+ }
+
+ // Socket will be ready for use once we get PARCEventQueueEventType_Connected
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s open conn %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn);
+ }
+
+ return 0;
+}
+
+/*
+ * Read from bev_local. We are passed the connection on the ptr.
+ */
+static void
+connector_Fwd_Local_Upcall_Read(PARCEventQueue *bev, PARCEventType type, void *ptr)
+{
+ RtaConnection *conn = (RtaConnection *) ptr;
+ RtaProtocolStack *stack = rtaConnection_GetStack(conn);
+ PARCEventBuffer *in = parcEventBuffer_GetQueueBufferInput(bev);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_LOCAL, RTA_UP);
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_LOCAL);
+ TransportMessage *tm;
+
+ unsigned char *mem;
+ int res;
+
+ // only move forward if enough bytes available
+
+ while (parcEventBuffer_GetLength(in) >= sizeof(localhdr)) {
+ size_t msg_length;
+
+ mem = parcEventBuffer_Pullup(in, sizeof(localhdr));
+ if (mem == NULL) {
+ // not enough bytes
+ parcEventBuffer_Destroy(&in);
+ return;
+ }
+
+ msg_length = ((localhdr *) mem)->length;
+ if (parcEventBuffer_GetLength(in) < msg_length + sizeof(localhdr)) {
+ // not enough bytes
+ parcEventBuffer_Destroy(&in);
+ return;
+ }
+
+ PARCBuffer *wireFormat = parcBuffer_Allocate(msg_length);
+ assertNotNull(wireFormat, "parcBuffer_Allocate(%zu) returned NULL", msg_length);
+
+ rtaComponentStats_Increment(stats, STATS_UPCALL_IN);
+
+ // we can read a whole message. Read it directly in to a buffer
+ // Skip the FWD_LOCAL header
+ res = parcEventBuffer_Read(in, NULL, sizeof(localhdr));
+ assertTrue(res == 0, "Got error draining header from buffer");
+
+ uint8_t *overlay = parcBuffer_Overlay(wireFormat, msg_length);
+ res = parcEventBuffer_Read(in, overlay, msg_length);
+ overlay = NULL;
+
+ assertTrue(res == msg_length,
+ "parcEventBuffer_Read returned wrong size, expected %zu got %d",
+ msg_length, res);
+
+ parcBuffer_Flip(wireFormat);
+
+ if (rtaConnection_GetState(conn) == CONN_OPEN) {
+ CCNxWireFormatMessage *wireFormatMessage = ccnxWireFormatMessage_Create(wireFormat);
+ CCNxTlvDictionary *dictionary = ccnxWireFormatMessage_GetDictionary(wireFormatMessage);
+ if (dictionary != NULL) {
+ // wrap it for transport module
+ tm = transportMessage_CreateFromDictionary(dictionary);
+
+ // add the connection info to the transport message before sending up stack
+ transportMessage_SetInfo(tm, rtaConnection_Copy(conn), rtaConnection_FreeFunc);
+
+ // send it up the stack
+ if (rtaComponent_PutMessage(out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ }
+
+ // Now release our hold on the wireFormatMessage (aka dictionary)
+ ccnxWireFormatMessage_Release(&wireFormatMessage);
+ } else {
+ printf("Failed to create CCNxTlvDictionary from wireformat\n");
+ parcBuffer_Display(wireFormat, 3);
+ }
+ } else {
+ //drop packets
+ }
+
+ parcBuffer_Release(&wireFormat);
+ }
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s total upcall reads in %" PRIu64 " out %" PRIu64 "\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaComponentStats_Get(stats, STATS_UPCALL_IN),
+ rtaComponentStats_Get(stats, STATS_UPCALL_OUT));
+ }
+ parcEventBuffer_Destroy(&in);
+}
+
+/*
+ * Event on connection to forwarder.
+ * Passed the RtaConnection in the pointer
+ */
+static void
+connector_Fwd_Local_Upcall_Event(PARCEventQueue *queue, PARCEventQueueEventType events, void *ptr)
+{
+ RtaConnection *conn = (RtaConnection *) ptr;
+
+ struct fwd_local_state *fwd_state = rtaConnection_GetPrivateData(conn, FWD_LOCAL);
+
+ if (events & PARCEventQueueEventType_Connected) {
+ if (DEBUG_OUTPUT) {
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ printf("%6lu.%06ld %s (pid %d) connected socket %d\n",
+ tv.tv_sec, (long) tv.tv_usec,
+ __func__,
+ getpid(),
+ rtaConnection_GetTransportFd(conn));
+ }
+
+ fwd_state->connected = 1;
+ rtaConnection_SendStatus(conn, FWD_LOCAL, RTA_UP, notifyStatusCode_CONNECTION_OPEN, NULL, NULL);
+ } else if (events & PARCEventQueueEventType_Error) {
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+
+ longBowRuntime_StackTrace(1);
+
+ if (events & PARCEventQueueEventType_Reading) {
+ printf("%6lu.%06ld %s (pid %d) Got read error on PF_LOCAL, transport socket %d: (%d) %s\n",
+ tv.tv_sec, (long) tv.tv_usec,
+ __func__,
+ getpid(),
+ rtaConnection_GetTransportFd(conn),
+ errno,
+ strerror(errno));
+ } else if (events & PARCEventQueueEventType_Writing) {
+ printf("%6lu.%06ld %s (pid %d) Got write error on PF_LOCAL, transport socket %d: (%d) %s\n",
+ tv.tv_sec, (long) tv.tv_usec,
+ __func__,
+ getpid(),
+ rtaConnection_GetTransportFd(conn),
+ errno,
+ strerror(errno));
+ } else {
+ printf("%6lu.%06ld %s (pid %d) Got error on PF_LOCAL, transport socket %d: (%d) %s\n",
+ tv.tv_sec, (long) tv.tv_usec,
+ __func__,
+ getpid(),
+ rtaConnection_GetTransportFd(conn),
+ errno,
+ strerror(errno));
+ }
+
+ /* An error occured while connecting. */
+ rtaConnection_SendStatus(conn, FWD_LOCAL, RTA_UP, notifyStatusCode_FORWARDER_NOT_AVAILABLE, NULL, NULL);
+ }
+}
+
+static void
+_ackRequest(RtaConnection *conn, PARCJSON *request)
+{
+ PARCJSON *response = cpiAcks_CreateAck(request);
+ CCNxTlvDictionary *ackDict = ccnxControlFacade_CreateCPI(response);
+
+ TransportMessage *tm_ack = transportMessage_CreateFromDictionary(ackDict);
+ ccnxTlvDictionary_Release(&ackDict);
+ parcJSON_Release(&response);
+
+ transportMessage_SetInfo(tm_ack, rtaConnection_Copy(conn), rtaConnection_FreeFunc);
+
+ RtaProtocolStack *stack = rtaConnection_GetStack(conn);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_LOCAL, RTA_UP);
+ if (rtaComponent_PutMessage(out, tm_ack)) {
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_LOCAL);
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ }
+}
+
+static void
+connector_Fwd_Local_ProcessControl(RtaConnection *conn, TransportMessage *tm)
+{
+ CCNxTlvDictionary *controlDictionary = transportMessage_GetDictionary(tm);
+
+ if (ccnxControlFacade_IsCPI(controlDictionary)) {
+ PARCJSON *json = ccnxControlFacade_GetJson(controlDictionary);
+ if (controlPlaneInterface_GetCPIMessageType(json) == CPI_REQUEST) {
+ if (cpi_getCPIOperation2(json) == CPI_PAUSE) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p recieved PAUSE\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn);
+ }
+ _ackRequest(conn, json);
+ } else if (cpi_getCPIOperation2(json) == CPI_FLUSH) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p recieved FLUSH\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn);
+ }
+ _ackRequest(conn, json);
+ } else {
+ // some other message. We just ACK everything in the local connector.
+ _ackRequest(conn, json);
+ }
+ }
+ }
+}
+
+static void
+connector_Fwd_Local_WriteIovec(struct fwd_local_state *fwdConnState, RtaConnection *conn, CCNxCodecNetworkBufferIoVec *vec, RtaComponentStats *stats)
+{
+ localhdr lh;
+
+ memset(&lh, 0, sizeof(localhdr));
+ lh.pid = getpid();
+ lh.fd = rtaConnection_GetTransportFd(conn);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s total downcall reads %" PRIu64 "\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_IN));
+ }
+
+ int iovcnt = ccnxCodecNetworkBufferIoVec_GetCount(vec);
+ const struct iovec *array = ccnxCodecNetworkBufferIoVec_GetArray(vec);
+
+ lh.length = 0;
+ for (int i = 0; i < iovcnt; i++) {
+ lh.length += array[i].iov_len;
+ }
+
+ if (parcEventQueue_Write(fwdConnState->bev_local, &lh, sizeof(lh)) < 0) {
+ trapUnrecoverableState("%s error writing to bev_local", __func__);
+ }
+
+ for (int i = 0; i < iovcnt; i++) {
+ if (parcEventQueue_Write(fwdConnState->bev_local, array[i].iov_base, array[i].iov_len) < 0) {
+ trapUnrecoverableState("%s error writing iovec to bev_local", __func__);
+ }
+ }
+}
+
+/* send raw packet from codec to forwarder */
+static void
+connector_Fwd_Local_Downcall_Read(PARCEventQueue *in, PARCEventType event, void *ptr)
+{
+ TransportMessage *tm;
+
+ while ((tm = rtaComponent_GetMessage(in)) != NULL) {
+ RtaConnection *conn;
+ struct fwd_local_state *fwdConnState;
+ RtaComponentStats *stats;
+
+ CCNxTlvDictionary *messageDictionary = transportMessage_GetDictionary(tm);
+
+ conn = rtaConnection_GetFromTransport(tm);
+ fwdConnState = rtaConnection_GetPrivateData(conn, FWD_LOCAL);
+ stats = rtaConnection_GetStats(conn, FWD_LOCAL);
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN);
+
+ // ignore configuration messages for the send
+ if (ccnxTlvDictionary_IsControl(messageDictionary)) {
+ connector_Fwd_Local_ProcessControl(conn, tm);
+ } else {
+ CCNxCodecNetworkBufferIoVec *vec = ccnxWireFormatMessage_GetIoVec(messageDictionary);
+ assertNotNull(vec, "%s got null wire format\n", __func__);
+
+ connector_Fwd_Local_WriteIovec(fwdConnState, conn, vec, stats);
+
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT);
+ }
+
+ // we can release everything here. connector_Fwd_Local_WriteIovec made its own references
+ // to the wire format if it needed them.
+ transportMessage_Destroy(&tm);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s total downcall reads in %" PRIu64 " out %" PRIu64 "\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_IN),
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT));
+ }
+ }
+}
+
+static int
+connector_Fwd_Local_Closer(RtaConnection *conn)
+{
+ struct fwd_local_state *fwd_state = rtaConnection_GetPrivateData(conn, FWD_LOCAL);
+ RtaComponentStats *stats;
+
+ assertNotNull(fwd_state, "invalid state");
+ assertNotNull(fwd_state->bev_local, "invalid PARCEventQueue pointer");
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s called on fwd_state %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), __func__, (void *) fwd_state);
+ }
+
+ stats = rtaConnection_GetStats(conn, FWD_LOCAL);
+
+ // this will close too
+ parcEventQueue_Destroy(&(fwd_state->bev_local));
+ memset(fwd_state, 0, sizeof(struct fwd_local_state));
+ parcMemory_Deallocate((void **) &fwd_state);
+
+ rtaConnection_SetPrivateData(conn, FWD_LOCAL, NULL);
+ rtaComponentStats_Increment(stats, STATS_CLOSES);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s closed fwd_state %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), __func__, (void *) fwd_state);
+ }
+
+ return 0;
+}
+
+static int
+connector_Fwd_Local_Release(RtaProtocolStack *stack)
+{
+ // no stack-wide initialization
+ if (DEBUG_OUTPUT) {
+ printf("%s release stack %p\n",
+ __func__,
+ (void *) stack);
+ }
+
+ return 0;
+}
+
+static void
+connector_Fwd_Local_StateChange(RtaConnection *conn)
+{
+ //not implemented
+}