diff options
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/rta_Transport.c')
-rw-r--r-- | libccnx-transport-rta/ccnx/transport/transport_rta/rta_Transport.c | 543 |
1 files changed, 543 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/rta_Transport.c b/libccnx-transport-rta/ccnx/transport/transport_rta/rta_Transport.c new file mode 100644 index 00000000..9724322c --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/rta_Transport.c @@ -0,0 +1,543 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This is the API-thread's interface to the RTA framework. It is thread-safe + * and executes in the API's thread. + * + * The only data maintained here is a mapping from the SYSTEM parameters hash + * to the stack_id. + * + * Communication with the Framework is done over a socket pair. + */ +#include <config.h> + +#include <LongBow/runtime.h> + +#include <stdio.h> +#include <stdlib.h> +#include <errno.h> +#include <unistd.h> +#include <pthread.h> +#include <errno.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <string.h> +#include <fcntl.h> +#include <sys/socket.h> + +#include <parc/algol/parc_Memory.h> +//#include <parc/logging/parc_Log.h> +//#include <parc/logging/parc_LogReporterTextStdout.h> +#include <parc/concurrent/parc_RingBuffer_1x1.h> +#include <parc/concurrent/parc_Notifier.h> +#include <parc/algol/parc_Deque.h> +#include <parc/concurrent/parc_Synchronizer.h> + +#include <ccnx/transport/transport_rta/rta_Transport.h> +#include <ccnx/transport/common/transport_private.h> +#include <ccnx/transport/transport_rta/core/rta_Framework.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/commands/rta_Command.h> +#include <ccnx/transport/transport_rta/core/components.h> +#include <ccnx/transport/transport_rta/core/rta_ConnectionTable.h> + +// These are some internal diagnostic counters used in the debugger +// for when things are going really bad. They are incremented on each +// call to read or write. +unsigned rta_transport_reads = 0; +unsigned rta_transport_read_spin = 0; +unsigned rta_transport_writes = 0; + +// =================================================== +// The external interface + +const struct transport_operations rta_ops = { + .Create = (void * (*)(void))rtaTransport_Create, + .Open = (int (*)(void *, CCNxTransportConfig *))rtaTransport_Open, + .Send = (int (*)(void *, int, CCNxMetaMessage *, const struct timeval *restrict timeout))rtaTransport_Send, + .Recv = (TransportIOStatus (*)(void *, int, CCNxMetaMessage **, const struct timeval *restrict timeout))rtaTransport_Recv, + .Close = (int (*)(void *, int))rtaTransport_Close, + .Destroy = (int (*)(void **))rtaTransport_Destroy, + .PassCommand = (int (*)(void *, void *))rtaTransport_PassCommand +}; + +/** + * @typedef _StackEntry + * @abstract Tracks the JSON descriptions of protocol stacks + * @constant hash The hash of the JSON description + * @constant stack_id the id of the stack associated with that hash + * @constant list The linked-list member + * @discussion <#Discussion#> + */ +typedef struct json_hash_table { + PARCHashCode hash; + int stack_id; +} _StackEntry; + +typedef struct socket_pair { + int up; + int down; +} _RTASocketPair; + +struct rta_transport { + RtaFramework *framework; /**< The RTA Framework holding the transport */ + + PARCRingBuffer1x1 *commandRingBuffer; /**< Written from Transport down to Framework */ + + PARCNotifier *commandNotifier; /**< Shared with the Framework to indicates writes to the ring buffer */ + + unsigned int nextStackId; + + PARCDeque *list; +}; + +static _StackEntry * +_rtaTransport_GetStack(const RTATransport *transport, PARCHashCode hash) +{ + _StackEntry *result = NULL; + + PARCIterator *iterator = parcDeque_Iterator(transport->list); + while (parcIterator_HasNext(iterator)) { + _StackEntry *entry = parcIterator_Next(iterator); + if (entry->hash == hash) { + result = entry; + break; + } + } + parcIterator_Release(&iterator); + + return result; +} + +static _StackEntry * +_rtaTransport_AddStack(RTATransport *transport, CCNxStackConfig *stackConfig) +{ + PARCHashCode hash = ccnxStackConfig_HashCode(stackConfig); + + _StackEntry *entry = parcMemory_AllocateAndClear(sizeof(_StackEntry)); + assertNotNull(entry, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(_StackEntry)); + entry->hash = hash; + entry->stack_id = transport->nextStackId++; + + parcDeque_Append(transport->list, entry); + + return entry; +} + +static void +_rtaTransport_CommandBufferEntryDestroyer(void **entryPtr) +{ +} + +static bool +_rtaTransport_SendCommandToFramework(RTATransport *transport, const RtaCommand *command) +{ + bool success = rtaCommand_Write(command, transport->commandRingBuffer); + if (success) { + parcNotifier_Notify(transport->commandNotifier); + return true; + } + return false; +} + +RTATransport * +rtaTransport_Create(void) +{ + RTATransport *transport = parcMemory_AllocateAndClear(sizeof(RTATransport)); + + if (transport != NULL) { + transport->nextStackId = 1; + + transport->commandRingBuffer = parcRingBuffer1x1_Create(128, _rtaTransport_CommandBufferEntryDestroyer); + transport->commandNotifier = parcNotifier_Create(); + + transport->framework = rtaFramework_Create(transport->commandRingBuffer, transport->commandNotifier); + assertNotNull(transport->framework, "rtaFramework_Create returned null"); + + rtaFramework_Start(transport->framework); + transport->list = parcDeque_Create(); + } + + return transport; +} + +int +rtaTransport_Destroy(RTATransport **ctxPtr) +{ + assertNotNull(ctxPtr, "called with null context pointer"); + RTATransport *transport = *ctxPtr; + + // %%%%% LOCK (notice this lock never gets unlocked, it just gets deleted) + parcDeque_Lock(transport->list); + + // This blocks until shutdown (state FRAMEWORK_SHUTDOWN) + rtaFramework_Shutdown(transport->framework); + + // This will close and drain all the API fds + rtaFramework_Destroy(&transport->framework); + + parcNotifier_Release(&transport->commandNotifier); + parcRingBuffer1x1_Release(&transport->commandRingBuffer); + + // Destroy the state we have stored locally to map JSON protocol stack descriptions + // to stack_id identifiers. + + for (size_t index = 0; index < parcDeque_Size(transport->list); index++) { + _StackEntry *entry = parcDeque_GetAtIndex(transport->list, index); + parcMemory_Deallocate((void **) &entry); + } + + parcDeque_Release(&transport->list); + + parcMemory_Deallocate((void **) ctxPtr); + +// printf("rta_transport writes=%9u reads=%9u spins=%9u\n", rta_transport_writes, rta_transport_reads, rta_transport_read_spin); + return 0; +} + +static _RTASocketPair +_rtaTransport_CreateSocketPair(const RTATransport *transport, int bufferSize) +{ + int fds[2]; + + bool success = (socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) == 0); + assertTrue(success, "socketpair(PF_LOCAL, SOCK_STREAM, ...) failed."); + + _RTASocketPair result = { .up = fds[0], .down = fds[1] }; + + // Set buffer size + int sendbuff = bufferSize; + + success = (setsockopt(result.up, SOL_SOCKET, SO_RCVBUF, &sendbuff, sizeof(sendbuff)) == 0); + assertTrue(success, "Expected success for setsockopt SO_RCVBUF"); + + success = (setsockopt(result.down, SOL_SOCKET, SO_RCVBUF, &sendbuff, sizeof(sendbuff)) == 0); + assertTrue(success, "Expected success for setsockopt SO_RCVBUF"); + + return result; +} + +/** + * Returns the protocol stack entry from our table + * + * Determine if we already have a protocol stack with the same structure as the user asks for. + * If so, return that entry, otherwise return NULL + * + * @param [in] transport The RTA transport + * @param [in] transportConfig the configuration the user is asking for + * + * @return non-NULL The existing protocol stack holder + * @return NULL Configuration does not exist + */ +static _StackEntry * +_rtaTransport_GetProtocolStackEntry(RTATransport *transport, CCNxTransportConfig *transportConfig) +{ + PARCHashCode hash = ccnxStackConfig_HashCode(ccnxTransportConfig_GetStackConfig(transportConfig)); + + _StackEntry *stack = _rtaTransport_GetStack(transport, hash); + return stack; +} + +/** + * Add a protocol stack + * + * Adds an entry to our local table of Config -> stack_id mapping and sends a + * command over the command socket to create the protocol stack. + * + * @param [in] transport The RTA transport + * @param [in] transportConfig the user specified configuration + * + * @return non-NULL The holder of the protocol stack mapping + * @return NULL An error + */ +static _StackEntry * +_rtaTransport_AddProtocolStackEntry(RTATransport *transport, const CCNxTransportConfig *transportConfig) +{ + CCNxStackConfig *stackConfig = ccnxTransportConfig_GetStackConfig(transportConfig); + + _StackEntry *stack = _rtaTransport_AddStack(transport, stackConfig); + + RtaCommandCreateProtocolStack *createStack = rtaCommandCreateProtocolStack_Create(stack->stack_id, stackConfig); + + // request for a new protocol stack, create it + + // now actually create the protocol stack by writing a command over the thread boundary + // using the Command socket. + RtaCommand *command = rtaCommand_CreateCreateProtocolStack(createStack); + _rtaTransport_SendCommandToFramework(transport, command); + + rtaCommand_Release(&command); + rtaCommandCreateProtocolStack_Release(&createStack); + + return stack; +} + +/** + * Create a new connection + * + * We have resolved that a matching protocol stack exists, and is represented by + * protocolStackHashEntry. We now want to send a command over the command socket to + * create a connection in that stack. + * + * @param [in] transport The RTA transport + * @param [in] transportConfig The user requested configuration + * @param [in] protocolStackHashEntry The protocol stack holder + * @param [in] pair A _RTASocketPair representing the queue of data between the API and the transport stack. + */ +static void +_rtaTransport_CreateConnection(RTATransport *transport, CCNxTransportConfig *transportConfig, _StackEntry *stack, _RTASocketPair pair) +{ + RtaCommandOpenConnection *openConnection = + rtaCommandOpenConnection_Create(stack->stack_id, + pair.up, + pair.down, + ccnxConnectionConfig_GetJson(ccnxTransportConfig_GetConnectionConfig(transportConfig))); + + RtaCommand *command = rtaCommand_CreateOpenConnection(openConnection); + _rtaTransport_SendCommandToFramework(transport, command); + + rtaCommand_Release(&command); + rtaCommandOpenConnection_Release(&openConnection); +} + +int +rtaTransport_Open(RTATransport *transport, CCNxTransportConfig *transportConfig) +{ + ccnxTransportConfig_OptionalAssertValid(transportConfig); + + assertNotNull(transport, "Parameter transport must be a valid RTATransport"); + + _RTASocketPair pair = _rtaTransport_CreateSocketPair(transport, sizeof(void *) * 128); + + parcDeque_Lock(transport->list); + { + _StackEntry *stack = _rtaTransport_GetProtocolStackEntry(transport, transportConfig); + if (stack == NULL) { + stack = _rtaTransport_AddProtocolStackEntry(transport, transportConfig); + } + assertNotNull(stack, "Got NULL hash entry from _rtaTransport_AddProtocolStackEntry"); + + _rtaTransport_CreateConnection(transport, transportConfig, stack, pair); + } + parcDeque_Unlock(transport->list); + + return pair.up; +} + +/** + * timeout is either NULL or a pointer to an unsigned integer containing the number of microseconds to wait for input. + * + * @return <0 An error occured + * @return 0 A timeout occurred waiting for the filedescriptor to have some output space available. + * @return >0 The filedescriptor has some output space available. + */ +static int +_rtaTransport_SendSelect(const int fd, const uint64_t *microSeconds) +{ + struct timeval timeval; + fd_set writeSet; + + FD_ZERO(&writeSet); // clear the set + FD_SET(fd, &writeSet); // add our file descriptor to the set + + struct timeval *timeout = NULL; + + if (microSeconds != NULL) { + timeval.tv_sec = (int) (*microSeconds / 1000000); + timeval.tv_usec = (int) (*microSeconds % 1000000); + timeout = &timeval; + } + + int selectResult = select(fd + 1, NULL, &writeSet, NULL, timeout); + + return selectResult; +} + +bool +rtaTransport_Send(RTATransport *transport, int queueId, const CCNxMetaMessage *message, const uint64_t *microSeconds) +{ + // Acquire a reference to the incoming CCNxMetaMessage so if the caller releases it immediately, + // a reference still exists for the transport. This reference is released once the + // message is processed lower in the stack. + CCNxMetaMessage *metaMessage = ccnxMetaMessage_Acquire(message); + + rta_transport_writes++; + + int selectResult = _rtaTransport_SendSelect(queueId, microSeconds); + if (selectResult < 0) { + // We couldn't send it. Release our reference and return signaling failure. + ccnxMetaMessage_Release(&metaMessage); + return false; + } else if (selectResult == 0) { + errno = EWOULDBLOCK; + ccnxMetaMessage_Release(&metaMessage); + return false; + } else if (selectResult > 0) { + ssize_t count = write(queueId, &metaMessage, sizeof(&metaMessage)); + if (count == sizeof(&metaMessage)) { + return true; + } + } + + // We couldn't send it. Release our reference and return signaling failure. + ccnxMetaMessage_Release(&metaMessage); + + return false; +} + +//#if 1 +/** + * @return -1 An error occured + * @return 0 A timeout occurred waiting for the filedescriptor to have some input available. + * @return >0 The filedescriptor has some input ready. + */ +static int +_rtaTransport_ReceiveSelect(const int fd, const uint64_t *microSeconds) +{ + fd_set readSet; + + FD_ZERO(&readSet); // clear the set + FD_SET(fd, &readSet); // add our file descriptor to the set + + struct timeval *timeout = NULL; + struct timeval timeval; + + if (microSeconds != NULL) { + timeval.tv_sec = (int) (*microSeconds / 1000000); + timeval.tv_usec = (int) (*microSeconds % 1000000); + timeout = &timeval; + } + int selectResult = select(fd + 1, &readSet, NULL, NULL, (struct timeval *) timeout); + + return selectResult; +} + +TransportIOStatus +rtaTransport_Recv(RTATransport *transport, const int queueId, CCNxMetaMessage **msgPtr, const uint64_t *microSeconds) +{ + // The effect here is to transfer the reference to the CCNxMetaMessage to the application-side thread. + // Thus, no acquire or release here as the caller is responsible for releasing the CCNxMetaMessage + + int selectResult = _rtaTransport_ReceiveSelect(queueId, microSeconds); + + if (selectResult == -1) { + // errno should have been set by the select(2) system call. + return TransportIOStatus_Error; + } else if (selectResult == 0) { + // errno = EWOULDBLOCK; + errno = ENOMSG; + return TransportIOStatus_Timeout; + } + + size_t remaining = sizeof(&*msgPtr); + uint8_t *bytes = (uint8_t *) msgPtr; + + do { + ssize_t nread = read(queueId, &bytes[sizeof(&*msgPtr) - remaining], remaining); + if (nread == -1 && errno != EINTR) { + return TransportIOStatus_Error; + } + if (nread == 0) { + rta_transport_read_spin++; + } + remaining -= nread; + } while (remaining > 0); + + rta_transport_reads++; + + errno = 0; + return TransportIOStatus_Success; +} +//#else +///** +// * @return -1 An error occured +// * @return 0 A timeout occurred waiting for the filedescriptor to have some input available. +// * @return >0 The filedescriptor has some input ready. +// */ +//static int +//_rtaTransport_Select(const int fd, const struct timeval *restrict timeout) +//{ +// fd_set readSet; +// +// FD_ZERO(&readSet); // clear the set +// FD_SET(fd, &readSet); // add our file descriptor to the set +// +// int selectResult = select(fd + 1, &readSet, NULL, NULL, (struct timeval *) timeout); +// +// return selectResult; +//} +// +//TransportIOStatus +//rtaTransport_Recv(RTATransport *transport, const int queueId, CCNxMetaMessage **msgPtr, const struct timeval *restrict timeout) +//{ +// // The effect here is to transfer the reference to the CCNxMetaMessage to the application-side thread. +// // Thus, no acquire or release here as the caller is responsible for releasing the CCNxMetaMessage +// +// int selectResult = _rtaTransport_Select(queueId, timeout); +// +// if (selectResult == -1) { +// // errno should have been set by the select(2) system call. +// return TransportIOStatus_Error; +// } else if (selectResult == 0) { +//// errno = EWOULDBLOCK; +// errno = ENOMSG; +// return TransportIOStatus_Timeout; +// } +// +// size_t remaining = sizeof(&*msgPtr); +// uint8_t *bytes = (uint8_t *) msgPtr; +// +// do { +// ssize_t nread = read(queueId, &bytes[sizeof(&*msgPtr) - remaining], remaining); +// if (nread == -1 && errno != EINTR) { +// return TransportIOStatus_Error; +// } +// if (nread == 0) { +// rta_transport_read_spin++; +// } +// remaining -= nread; +// } while (remaining > 0); +// +// rta_transport_reads++; +// +// errno = 0; +// return TransportIOStatus_Success; +//} +//#endif + +int +rtaTransport_Close(RTATransport *transport, int api_fd) +{ + RtaCommandCloseConnection *commandClose = rtaCommandCloseConnection_Create(api_fd); + RtaCommand *command = rtaCommand_CreateCloseConnection(commandClose); + rtaCommandCloseConnection_Release(&commandClose); + + _rtaTransport_SendCommandToFramework(transport, command); + + rtaCommand_Release(&command); + + return 0; +} + +int +rtaTransport_PassCommand(RTATransport *transport, const RtaCommand *rtacommand) +{ + _rtaTransport_SendCommandToFramework(transport, rtacommand); + + return 0; +} |