diff options
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c')
-rw-r--r-- | libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c | 1712 |
1 files changed, 1712 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c new file mode 100644 index 00000000..59ad1dcb --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/connector_Forwarder_Metis.c @@ -0,0 +1,1712 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The metis connector does the following per connection: + * - Opens a TCP socket to Metis + * - Creates an "event" for the socket, does not use the buffer to avoid doing extra copy. + * - On read events, uses direct socket operations to read in data + * + * - DOES NOT HANDLE FRAMING ERRORS. If somehow metis and the connector get + * out of whack (technical term), there is no recovery. + * + * - The connection to metis is started in the Opener, but may not complete by the time + * the user sends data down in the Downcall_Read. We should not process the Downcall_Read + * until we get the Upcall_Event of connected. When we finally get the connected event, + * we should make the Downcall_Read pending again (or just call it) to flush the pending + * user data out to metis. + * + * - Because of how we get scheduled, there might be a large batch of messages waiting at the + * forwarder. We don't want to put a giant blob up the stack. So, we keep a deque of TransportMessage + * and only feed a few at a time up. + * + * - Accepts both a PARCBuffer or a CCNxCodecNetworkBufferIoVec as the wire format in the DOWN direction. + * - The UP direction is always a PARCBuffer right now + * + */ + +#include <config.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <fcntl.h> +#include <pthread.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <errno.h> +#include <arpa/inet.h> +#include <signal.h> +#include <sys/socket.h> +#include <sys/ioctl.h> +#include <netdb.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <LongBow/runtime.h> + +#include <parc/algol/parc_Memory.h> +#include <parc/algol/parc_Deque.h> +#include <parc/algol/parc_EventBuffer.h> +#include <parc/algol/parc_EventTimer.h> +#include <parc/algol/parc_Network.h> + +#include <ccnx/transport/common/transport_Message.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/core/rta_Component.h> + +#include "connector_Forwarder.h" + +#include <ccnx/transport/transport_rta/config/config_Forwarder_Metis.h> + +#include <ccnx/api/control/controlPlaneInterface.h> +#include <ccnx/api/control/cpi_ControlFacade.h> + +#include <ccnx/common/codec/ccnxCodec_TlvEncoder.h> +#include <ccnx/common/codec/ccnxCodec_TlvDecoder.h> +#include <ccnx/common/internal/ccnx_TlvDictionary.h> + +#include <ccnx/common/codec/ccnxCodec_TlvPacket.h> +#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_FixedHeader.h> +#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_Types.h> + +#include <ccnx/common/ccnx_WireFormatMessage.h> + +#define MINIMUM_READ_LENGTH 8 + +// The message type for a Metis control packet +#define METIS_CONTROL_TYPE 0xA4 + +// at most 10MB, this is used as the output buffer down to metis +#define METIS_OUTPUT_QUEUE_BYTES (10 * 1024 * 1024) + +// How big should we try to make the output socket size? +#define METIS_SEND_SOCKET_BUFFER 65536 + +// Maximum input backlog in messages, not bytes +#define METIS_INPUT_QUEUE_MESSAGES 100 + +#ifndef DEBUG_OUTPUT +#define DEBUG_OUTPUT 0 +#endif + +static int connector_Fwd_Metis_Init(RtaProtocolStack *stack); +static int connector_Fwd_Metis_Opener(RtaConnection *conn); + +static void _eventCallback(int fd, PARCEventType what, void *connectionVoid); +static void connector_Fwd_Metis_Dequeue(int fd, PARCEventType which_event, void *metisStateVoid); + +static void connector_Fwd_Metis_Downcall_Read(PARCEventQueue *, PARCEventType, void *conn); +static int connector_Fwd_Metis_Closer(RtaConnection *conn); +static int connector_Fwd_Metis_Release(RtaProtocolStack *stack); +static void connector_Fwd_Metis_StateChange(RtaConnection *conn); + +RtaComponentOperations fwd_metis_ops = { + .init = connector_Fwd_Metis_Init, + .open = connector_Fwd_Metis_Opener, + .upcallRead = NULL, + .upcallEvent = NULL, + .downcallRead = connector_Fwd_Metis_Downcall_Read, + .downcallEvent = NULL, + .close = connector_Fwd_Metis_Closer, + .release = connector_Fwd_Metis_Release, + .stateChange = connector_Fwd_Metis_StateChange +}; + +typedef enum { + PacketType_Interest, + PacketType_ContentObject, + PacketType_Control, + PacketType_InterestReturn, + PacketType_Unknown +} _PacketType; + +typedef struct metis_connector_stats { + unsigned countUpcallReads; + unsigned countUpcallWriteDataOk; + unsigned countUpcallWriteDataError; + unsigned countUpcallWriteDataBlocked; + unsigned countUpcallWriteDataQueueFull; + + unsigned countUpcallWriteControlOk; + unsigned countUpcallWriteControlError; + + unsigned countDowncallReads; + unsigned countDowncallWrites; + unsigned countDowncallControl; +} _MetisConnectorStats; + +/** + * This structure holds the read-ahead data for the next message being read based + * on its fixed header + */ +typedef struct next_message_header { + // this is how we frame received messages on a stream connection. We + // wait until we read a complete fixed header, then we can set the length + // of that message and keep waiting until we receive at least that many bytes. + size_t length; + + // at the time when we parse out the message length from the fixed header, + // we also parse out the TLV message type from the fixed header + _PacketType packetType; + uint8_t version; + + // we will read bytes into this structure + union _hdr { + CCNxCodecSchemaV1FixedHeader v1; + uint8_t buffer[MINIMUM_READ_LENGTH]; + } fixedHeader; + + uint8_t *readLocation; + size_t remainingReadLength; + + // The whole message + PARCBuffer *packet; +} NextMessage; + +typedef struct fwd_metis_state { + uint16_t port; + int fd; + + // separate events for read and write on fd so we can individually enable them + PARCEvent *readEvent; + PARCEvent *writeEvent; + + bool isConnected; + + // This is our read-ahead of the next message fixed header + NextMessage nextMessage; + + // the transportMessageQueueEvent is used to dequeue from the queue. + // we make sure its scheduled so long as there's messages in the queue, even if there's + // nothing else being read + PARCDeque *transportMessageQueue; + PARCEventTimer *transportMessageQueueEvent; + + // This buffer is the queue of stuff we need to send to the network + PARCEventBuffer *metisOutputQueue; + + _MetisConnectorStats stats; +} FwdMetisState; + +/** + * @typedef PacketData + * @brief Used to pass a record between reading a packet and sending it up the stack + * @discussion Used internally to pass data between functions + */ +typedef struct packet_data { + FwdMetisState *fwd_state; + RtaConnection *conn; + PARCEventQueue *out; + RtaComponentStats *stats; +} PacketData; + + +// for debugging +static unsigned fwd_metis_references_queued = 0; +static unsigned fwd_metis_references_dequeued = 0; +static unsigned fwd_metis_references_notqueued = 0; + + +typedef enum { + ReadReturnCode_Finished, // read all needed bytes + ReadReturnCode_PartialRead, // still need some bytes + ReadReturnCode_Closed, // the socket is closed + ReadReturnCode_Error, // An error on the socket +} ReadReturnCode; + +// ================================ + +static void +_nextMessage_Display(const NextMessage *next, unsigned indent) +{ + printf("NextMessage %p length %zu type %d version %u readLocation %p remaining %zu\n", + (void *) next, next->length, next->packetType, next->version, (void *) next->readLocation, next->remainingReadLength); + + printf("fixedHeader\n"); + longBowDebug_MemoryDump((const char *) next->fixedHeader.buffer, MINIMUM_READ_LENGTH); + + if (next->packet) { + parcBuffer_Display(next->packet, 3); + } +} + +static int +connector_Fwd_Metis_Init(RtaProtocolStack *stack) +{ + struct sigaction ignore_action; + ignore_action.sa_handler = SIG_IGN; + sigemptyset(&ignore_action.sa_mask); + ignore_action.sa_flags = 0; + sigaction(SIGPIPE, &ignore_action, NULL); + + return 0; +} + + +/** + * Setup the NextMessage structure to begin reading a fixed header + * + * All fields are zeroed and the readLocation is set to the first byte of the fixedHeader. + * The remainingReadLength is set to the size of the fixedHeader. + * + * @param [in] next An allocated NextMessage to initialize + * + * Example: + * @code + * { + * NextMessage nextMessage; + * _initializeNextMessage(&nextMessage); + * } + * @endcode + */ +static void +_initializeNextMessage(NextMessage *next) +{ + memset(next, 0, sizeof(NextMessage)); + next->version = 0xFF; + next->packetType = PacketType_Unknown; + next->readLocation = next->fixedHeader.buffer; + next->remainingReadLength = MINIMUM_READ_LENGTH; +} + +static FwdMetisState * +connector_Fwd_Metis_CreateConnectionState(PARCEventScheduler *scheduler) +{ + FwdMetisState *fwd_state = parcMemory_Allocate(sizeof(FwdMetisState)); + assertNotNull(fwd_state, "parcMemory_Allocate(%zu) returned NULL", sizeof(FwdMetisState)); + + memset(fwd_state, 0, sizeof(FwdMetisState)); + _initializeNextMessage(&fwd_state->nextMessage); + + fwd_state->fd = 0; + fwd_state->readEvent = NULL; + fwd_state->writeEvent = NULL; + fwd_state->transportMessageQueue = parcDeque_Create(); + fwd_state->transportMessageQueueEvent = parcEventTimer_Create(scheduler, 0, connector_Fwd_Metis_Dequeue, fwd_state); + fwd_state->isConnected = false; + fwd_state->metisOutputQueue = parcEventBuffer_Create(); + + return fwd_state; +} + +static bool +_openSocket(FwdMetisState *fwd_state, uint16_t port) +{ + fwd_state->port = port; + fwd_state->fd = socket(PF_INET, SOCK_STREAM, 0); + + if (fwd_state->fd < 0) { + if (DEBUG_OUTPUT) { + printf("%9c %s failed to open PF_INET SOCK_STREAM socket: (%d) %s\n", + ' ', __func__, errno, strerror(errno)); + } + return false; + } + + if (DEBUG_OUTPUT) { + printf("%9c %s create socket %d port %u\n", + ' ', __func__, fwd_state->fd, fwd_state->port); + } + + return true; +} + +/** + * @function connector_Fwd_Metis_SetupSocket + * @abstract Creates the socket and sets the port, but does not call connect + * @discussion + * Creates and sets up the socket descriptor. makes it non-blocking. + * Sets the port in FwdMetisState. + * + * This is a full PF_INET socket, not forced to PF_LOCAL. + * + * The sendbuffer size is set to METIS_OUTPUT_QUEUE_BYTES + * + * precondition: called _openSocket + * + * @param <#param1#> + * @return <#return#> + */ +static bool +_setupSocket(FwdMetisState *fwd_state) +{ + trapUnexpectedStateIf(fwd_state->fd < 1, "Invalid socket %d", fwd_state->fd); + + // Set non-blocking flag + int flags = fcntl(fwd_state->fd, F_GETFL, NULL); + assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno); + int res = fcntl(fwd_state->fd, F_SETFL, flags | O_NONBLOCK); + + if (res < 0) { + if (DEBUG_OUTPUT) { + printf("%9c %s failed to make socket non-blocking: (%d) %s\n", + ' ', __func__, errno, strerror(errno)); + } + + close(fwd_state->fd); + return false; + } + + const int sendBufferSize = METIS_SEND_SOCKET_BUFFER; + res = setsockopt(fwd_state->fd, SOL_SOCKET, SO_SNDBUF, &sendBufferSize, sizeof(int)); + if (res < 0) { + if (DEBUG_OUTPUT) { + printf("%9c %s failed to set SO_SNDBUF to %d: (%d) %s\n", + ' ', __func__, sendBufferSize, errno, strerror(errno)); + } + // This is a non-fatal error + } + +#if defined(SO_NOSIGPIPE) + // turn off SIGPIPE, return EPIPE + const int on = 1; + res = setsockopt(fwd_state->fd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)); + if (res < 0) { + if (DEBUG_OUTPUT) { + printf("%9c %s failed to set SO_NOSIGPIPE to %d: (%d) %s\n", + ' ', __func__, sendBufferSize, errno, strerror(errno)); + } + // this is not a fatal error, so keep going + } +#endif + + return true; +} + +/** + * @function connector_Fwd_Metis_SetupConnectionBuffer + * @abstract Creates the connection buffer and adds it to libevent + * @discussion + * <#Discussion#> + * + * @param <#param1#> + * @return <#return#> + */ +static bool +_setupSocketEvents(FwdMetisState *fwd_state, RtaConnection *conn) +{ + RtaProtocolStack *stack = rtaConnection_GetStack(conn); + PARCEventScheduler *scheduler = rtaFramework_GetEventScheduler(rtaProtocolStack_GetFramework(stack)); + + // the connect() call will be asynchrnous because the socket is non-blocking, so we + // need ET_WRITE to trigger a callback when the socket becomes writable (i.e. connected). + // If there's an error on connect it will be an ET_READ | ET_WRITE event with an error on the socket. + fwd_state->readEvent = parcEvent_Create(scheduler, fwd_state->fd, PARCEventType_Read | PARCEventType_Persist | PARCEventType_EdgeTriggered, _eventCallback, conn); + assertNotNull(fwd_state->readEvent, "Got a null readEvent for socket %d", fwd_state->fd); + + fwd_state->writeEvent = parcEvent_Create(scheduler, fwd_state->fd, PARCEventType_Write | PARCEventType_Persist | PARCEventType_EdgeTriggered, _eventCallback, conn); + assertNotNull(fwd_state->writeEvent, "Got a null readEvent for socket %d", fwd_state->fd); + + // Start the write event. It will be signaled on a connect error or when we are connected. + // The read event is not enabled until after connect. + + int failure = parcEvent_Start(fwd_state->writeEvent); + assertFalse(failure < 0, "Error starting writeEvent event %p: (%d) %s", (void *) fwd_state->writeEvent, errno, strerror(errno)); + + return true; +} + +/** + * The connection to the forwarder succeeded, step the state machine + * + * Change the state of the connection to connected and notify the user that it's ready. + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_connectionSucceeded(FwdMetisState *fwd_state, RtaConnection *conn) +{ + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s Connection %p connected fd %d\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn, fwd_state->fd); + } + + fwd_state->isConnected = true; + + // enable read events + parcEvent_Start(fwd_state->readEvent); + + rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_CONNECTION_OPEN, NULL, NULL); +} + +static void +_readInEnvironmentConnectionSpecification(struct sockaddr_in *addr_in) +{ + char *forwarderIpEnv = getenv(FORWARDER_CONNECTION_ENV); + if (forwarderIpEnv == NULL) { + return; + } + + char forwarderIpAddress[NI_MAXHOST] = { 0 }; + in_port_t forwarderIpPort = 0; + + // Currently, we only support tcp control connections to the forwarder + sscanf(forwarderIpEnv, "tcp://%[^:]:%hu", forwarderIpAddress, &forwarderIpPort); + + // If provided, use the specified address in a canonical form + if (forwarderIpAddress[0] != '\0') { + // Normalize the provided hostname + struct sockaddr_in *addr = (struct sockaddr_in *) parcNetwork_SockAddress(forwarderIpAddress, forwarderIpPort); + char *ipAddress = inet_ntoa(addr->sin_addr); + parcMemory_Deallocate(&addr); + if (ipAddress) { + addr_in->sin_addr.s_addr = inet_addr(ipAddress); + } else { + addr_in->sin_addr.s_addr = inet_addr(forwarderIpAddress); + } + } + + // If provided, use the specified port + if (forwarderIpPort != 0) { + addr_in->sin_port = htons(forwarderIpPort); + } +} + +/** + * @function connector_Fwd_Metis_BeginConnect + * @abstract Begins the non-blocking connect() call to 127.0.0.1 on the port in FwdMetisState + * @discussion + * <#Discussion#> + * + * @param <#param1#> + * @return <#return#> + */ +static bool +connector_Fwd_Metis_BeginConnect(FwdMetisState *fwd_state, RtaConnection *conn) +{ + bool success = false; + + struct sockaddr_in addr_in; + memset(&addr_in, 0, sizeof(addr_in)); + addr_in.sin_port = htons(fwd_state->port); + addr_in.sin_family = AF_INET; + addr_in.sin_addr.s_addr = inet_addr("127.0.0.1"); + + // Override defaults if specified + _readInEnvironmentConnectionSpecification(&addr_in); + + if (DEBUG_OUTPUT) { + char inetAddress[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(addr_in.sin_addr), inetAddress, INET_ADDRSTRLEN); + printf("%9" PRIu64 " %s beginning connect socket %d to port %d on %s\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + fwd_state->fd, + fwd_state->port, + inetAddress); + } + + // This will deliver a PARCEventType_Write event on connect success + int res = connect(fwd_state->fd, (struct sockaddr*) &addr_in, (socklen_t) sizeof(addr_in)); + + if (res == 0) { + // connect succeded immediately + _connectionSucceeded(fwd_state, conn); + success = true; + } else if (errno == EINPROGRESS) { + // connection is deferred + success = true; + } else { + // a hard error + printf("Error connecting: (%d) %s\n", errno, strerror(errno)); + } + + return success; +} + +/** + * We maintain an input queue going up the stack and only dequeue a small number of packets + * with each call from the dispatch loop. THis is to avoid bursting a bunch of packets up the stack. + */ +static void +connector_Fwd_Metis_Dequeue(int fd, PARCEventType which_event, void *metisStateVoid) +{ + FwdMetisState *fwd_state = (FwdMetisState *) metisStateVoid; + + // random small number. What is right value for this? + unsigned max_loops = 6; + + if (DEBUG_OUTPUT) { + printf("%9d %s deque size %zu\n", + 0, + __func__, + parcDeque_Size(fwd_state->transportMessageQueue)); + } + + while (max_loops > 0 && !parcDeque_IsEmpty(fwd_state->transportMessageQueue)) { + max_loops--; + TransportMessage *tm = parcDeque_RemoveFirst(fwd_state->transportMessageQueue); + + RtaConnection *conn = rtaConnection_GetFromTransport(tm); + RtaProtocolStack *stack = rtaConnection_GetStack(conn); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_METIS, RTA_UP); + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS); + + if (rtaComponent_PutMessage(out, tm)) { + rtaComponentStats_Increment(stats, STATS_UPCALL_OUT); + } + } + + // If there are still messages in there, re-schedule + if (!parcDeque_IsEmpty(fwd_state->transportMessageQueue)) { + if (DEBUG_OUTPUT) { + printf("%9d %s rescheduling output queue timer %p\n", + 0, + __func__, + (void *) fwd_state->transportMessageQueueEvent); + } + + struct timeval immediateTimeout = { 0, 0 }; + parcEventTimer_Start(fwd_state->transportMessageQueueEvent, &immediateTimeout); + } +} + +/** + * Create a TCP socket + * Set it non-blocking + * Wrap it in a buffer event + * Set Read and Event callbacks + * + * Return 0 success, -1 failure + */ +static int +connector_Fwd_Metis_Opener(RtaConnection *conn) +{ + bool success = false; + + uint16_t port = metisForwarder_GetPortFromConfig(rtaConnection_GetParameters(conn)); + + PARCEventScheduler *scheduler = rtaFramework_GetEventScheduler(rtaConnection_GetFramework(conn)); + FwdMetisState *fwd_state = connector_Fwd_Metis_CreateConnectionState(scheduler); + + if (_openSocket(fwd_state, port)) { + if (_setupSocket(fwd_state)) { + if (_setupSocketEvents(fwd_state, conn)) { + if (connector_Fwd_Metis_BeginConnect(fwd_state, conn)) { + // stash it away in the per-connection cubby hole + rtaConnection_SetPrivateData(conn, FWD_METIS, fwd_state); + success = true; + } + } + } + } + + if (!success) { + if (fwd_state->fd) { + close(fwd_state->fd); + } + if (fwd_state->readEvent) { + parcEvent_Destroy(&(fwd_state->readEvent)); + } + if (fwd_state->writeEvent) { + parcEvent_Destroy(&(fwd_state->writeEvent)); + } + parcMemory_Deallocate((void **) &fwd_state); + return -1; + } + + // Socket will be ready for use once we get PARCEventQueue_Connected + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s open conn %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn); + } + + return 0; +} + +/** + * We received a Metis control packet. Translate it to a control packet and send it up the stack. + */ +static void +receiveControlMessage(PacketData *data) +{ + CCNxTlvDictionary *packetDictionary = + ccnxWireFormatMessage_FromControlPacketType(data->fwd_state->nextMessage.version, data->fwd_state->nextMessage.packet); + + bool success = ccnxCodecTlvPacket_BufferDecode(data->fwd_state->nextMessage.packet, packetDictionary); + + if (success) { + TransportMessage *tm = transportMessage_CreateFromDictionary(packetDictionary); + transportMessage_SetInfo(tm, rtaConnection_Copy(data->conn), rtaConnection_FreeFunc); + + // send it up the stack + if (rtaComponent_PutMessage(data->out, tm)) { + rtaComponentStats_Increment(data->stats, STATS_UPCALL_OUT); + data->fwd_state->stats.countUpcallWriteControlOk++; + } else { + data->fwd_state->stats.countUpcallWriteControlError++; + } + } else { + assertTrue(success, "Error decoding a Metis control packet\n") + { + parcBuffer_Display(data->fwd_state->nextMessage.packet, 3); + } + } + + // we are now done with our references + ccnxTlvDictionary_Release(&packetDictionary); +} + + +static void +_queueNonControl(PacketData *data) +{ + CCNxTlvDictionary *packetDictionary = ccnxWireFormatMessage_Create(data->fwd_state->nextMessage.packet); + + assertNotNull(packetDictionary, "Got a null packet decode") + { + parcBuffer_Display(data->fwd_state->nextMessage.packet, 3); + } + + TransportMessage *tm = transportMessage_CreateFromDictionary(packetDictionary); + + // add the connection info to the transport message before sending up stack + transportMessage_SetInfo(tm, rtaConnection_Copy(data->conn), rtaConnection_FreeFunc); + + parcDeque_Append(data->fwd_state->transportMessageQueue, tm); + + // start if went from emtpy to 1 + if (parcDeque_Size(data->fwd_state->transportMessageQueue) == 1) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u schedule dequeue event %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(data->conn))), + __func__, + rtaConnection_GetConnectionId(data->conn), + (void *) data->fwd_state->transportMessageQueueEvent); + } + + struct timeval immediateTimeout = { 0, 0 }; + parcEventTimer_Start(data->fwd_state->transportMessageQueueEvent, &immediateTimeout); + } + + // we are now done with our references + ccnxTlvDictionary_Release(&packetDictionary); +} + +/** + * Receive a non-control packet + * + * Non-control messages may be dropped due to lack of input buffer space. + * If the connection has state Block Up or the up queue's length is + * too many messages deep, the non-control message will be dropped. + * + * precondition: the caller knows the message is not a control message + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_receiveNonControl(PacketData *data) +{ + if (rtaConnection_BlockedUp(data->conn)) { + data->fwd_state->stats.countUpcallWriteDataBlocked++; + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u blocked up, drop wireFormat %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(data->conn))), + __func__, + rtaConnection_GetConnectionId(data->conn), + (void *) data->fwd_state->nextMessage.packet); + } + } else { + if (parcDeque_Size(data->fwd_state->transportMessageQueue) < METIS_INPUT_QUEUE_MESSAGES) { + _queueNonControl(data); + data->fwd_state->stats.countUpcallWriteDataOk++; + } else { + data->fwd_state->stats.countUpcallWriteDataQueueFull++; + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u input buffer full, drop wireFormat %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(data->conn))), + __func__, + rtaConnection_GetConnectionId(data->conn), + (void *) data->fwd_state->nextMessage.packet); + } + } + } +} + +/** + * We received an entire packet, send it up the stack in a Transport message. + * + * If its a control message, we make it a CCNxControlMessage here for symmetry with us + * encoding the control messages at this level + */ +static void +connector_Fwd_Metis_SendUpStack(PacketData *data) +{ + // Always send control messages up the stack + if (data->fwd_state->nextMessage.packetType == PacketType_Control) { + receiveControlMessage(data); + } else { + _receiveNonControl(data); + } +} + +/** + * Return the SO_ERROR value for the given socket + * + * If getsockopt returns an error, the return code could be the error from getsockopt. + * + * Typically you will get ECONNREFUSED when you cannot connect and one of the many getsockopt + * errors if there's a problem with the actual socket. + * + * @param [in] fd The socket + * + * @return errno An errno value + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static int +_getSocketError(int fd) +{ + int value; + socklen_t valueLength = sizeof(value); + int res = getsockopt(fd, SOL_SOCKET, SO_ERROR, &value, &valueLength); + if (res < 0) { + value = res; + } + return value; +} + +/** + * Received an event on a socket we have marked as not yet connected + * + * Ether it's ready to go or there's an error. We will receive a PARCEventType_Read and the socket + * will have an SO_ERROR of 0 if it's now connected. If the SO_ERROR is non-zero, there + * was an error on connect. + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_disconnectedEventHandler(FwdMetisState *fwd_state, RtaConnection *conn, PARCEventType what) +{ + if (what & PARCEventType_Read) { + int socketError = _getSocketError(fwd_state->fd); + if (socketError == 0) { + // I don't think these happen, they will be write events + _connectionSucceeded(fwd_state, conn); + } else { + // error on connect + printf("%9" PRIu64 " %s Connection %p got error on SOCK_STREAM, fd %d: %s\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn, + fwd_state->fd, + strerror(errno)); + + // make the event non-pending + parcEvent_Stop(fwd_state->readEvent); + parcEvent_Stop(fwd_state->writeEvent); + + rtaConnection_SetBlockedDown(conn); + + // at least tell the API whats going on + rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_FORWARDER_NOT_AVAILABLE, NULL, NULL); + } + } + + if (what & PARCEventType_Write) { + int socketError = _getSocketError(fwd_state->fd); + if (socketError == 0) { + _connectionSucceeded(fwd_state, conn); + } + } +} + +static void +_setupNextPacketV1(FwdMetisState *fwd_state) +{ + switch (fwd_state->nextMessage.fixedHeader.v1.packetType) { + case CCNxCodecSchemaV1Types_PacketType_Interest: + fwd_state->nextMessage.packetType = PacketType_Interest; + break; + case CCNxCodecSchemaV1Types_PacketType_ContentObject: + fwd_state->nextMessage.packetType = PacketType_ContentObject; + break; + case CCNxCodecSchemaV1Types_PacketType_Control: + fwd_state->nextMessage.packetType = PacketType_Control; + break; + case CCNxCodecSchemaV1Types_PacketType_InterestReturn: + fwd_state->nextMessage.packetType = PacketType_InterestReturn; + break; + default: + fwd_state->nextMessage.packetType = PacketType_Unknown; + break; + } + + size_t fixedHeaderLength = sizeof(CCNxCodecSchemaV1FixedHeader); + fwd_state->nextMessage.length = htons(fwd_state->nextMessage.fixedHeader.v1.packetLength); + + fwd_state->nextMessage.packet = parcBuffer_Allocate(fwd_state->nextMessage.length); + assertNotNull(fwd_state->nextMessage.packet, "Could not allocate packet of size %zu", fwd_state->nextMessage.length); + + // finally copy in the fixed header as we have already read that in + parcBuffer_PutArray(fwd_state->nextMessage.packet, fixedHeaderLength, fwd_state->nextMessage.fixedHeader.buffer); +} + +/** + * Called after reading whole FixedHeader, will setup the packet buffer + * + * After reading the fixed header, we need to allocate a PARCBuffer for the packet. Setup that + * buffer and copy the FixedHeader in to it. Remaining reads will go in to this buffer. + * + * After this function completes, the parsed version, packetType, and length of the nextMessage will + * be filled in, the packet buffer allocated and the fixedHeader copied to that packet buffer. + * + * precondition: forwarder->nextMessage.remainingReadLength == 0 && fwd_state->nextMessage.packet == NULL + * + * @param [in] fwd_state An allocated forwarder connection state that has read in the fixed header + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_setupNextPacket(FwdMetisState *fwd_state) +{ + trapUnexpectedStateIf(fwd_state->nextMessage.packet != NULL, "Calling _setupNextPacket but the packet field is not NULL"); + + fwd_state->nextMessage.version = fwd_state->nextMessage.fixedHeader.buffer[0]; + + switch (fwd_state->nextMessage.version) { + case 1: + _setupNextPacketV1(fwd_state); + break; + + default: + trapUnexpectedState("Illegal packet version %d", fwd_state->nextMessage.version) + { + _nextMessage_Display(&fwd_state->nextMessage, 0); + } + break; + } +} + +/** + * Reads the FixedHeader. If full read will setup the next packet buffer. + * + * Reads up to FixedHeader length bytes. If read whole header will allocate the next packet + * buffer to right size and copy the Fixed Header in to the buffer. + * + * preconditions: + * - fwd_state->nextMessage.packet should be NULL + * - fwd_state->nextMessage.remainingReadLength should be the remaining bytes to read of the Fixed Header + * - fwd_state->nextMessage.readLocation should point to the location in the FixedHeader to start reading + * + * postconditions: + * - fwd_state->nextMessage.remainingReadLength will be decremented by the amount read + * - If remainingReadLength is decremented to 0, will allocate fwd_state->nextMessage.packet and copy in the FixedHeader + * - The fields in fwd_state->nextMessage (length, packetType, version) will be set based on the fixed header + * + * @param [in] fwd_state An allocated forwarder connection state + * + * @retval ReadReturnCode_Finished one entire packet is ready in the buffer + * @retval ReadReturnCode_PartialRead need more bytes + * @retval ReadRetrunCode_Closed The socket to metis is closed (a special case of Error) + * @retval ReadReturnCode_Error An error occured on the socket to metis + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static ReadReturnCode +_readPacketHeader(FwdMetisState *fwd_state) +{ + ReadReturnCode returnCode = ReadReturnCode_Error; + + // This could be switched to MSG_PEEK instead of copying later, but I don't think it makes any significant change. + ssize_t nread = recv(fwd_state->fd, fwd_state->nextMessage.readLocation, fwd_state->nextMessage.remainingReadLength, 0); + if (nread > 0) { + // recv will always runturn at most fwd_state->nextMessage.remainingReadLength, so this won't wrap around to negative. + fwd_state->nextMessage.remainingReadLength -= nread; + + if (fwd_state->nextMessage.remainingReadLength == 0) { + returnCode = ReadReturnCode_Finished; + _setupNextPacket(fwd_state); + } else { + fwd_state->nextMessage.readLocation += nread; + returnCode = ReadReturnCode_PartialRead; + } + } else if (nread == 0) { + // the connection is closed + returnCode = ReadReturnCode_Closed; + } else { + switch (errno) { + case EAGAIN: + // call would block. These can happen becasue _readMessage is in a while loop and we detect + // the end of the loop because we cannot read another fixed header. + returnCode = ReadReturnCode_PartialRead; + break; + + default: + // an error. I think all errors will be hard errors and we close the connection + if (DEBUG_OUTPUT) { + printf("%9c %s socket %d recv error: (%d) %s\n", + ' ', __func__, fwd_state->fd, errno, strerror(errno)); + } + returnCode = ReadReturnCode_Error; + break; + } + } + + return returnCode; +} + + +/** + * We have finished reading the fixed header, reading the message body + * + * Will modify the nextMessage.packet buffer. When the buffer has 0 remaining, the whole packet has been read + * + * precondition: _readHeaderFromMetis read the header and allocated the packet buffer + * + * @param [in] fwd_state An allocated forwarder connection state + * + * @retval ReadReturnCode_Finished one entire packet is ready in the buffer + * @retval ReadReturnCode_PartialRead need more bytes + * @retval ReadRetrunCode_Closed The socket to metis is closed (a special case of Error) + * @retval ReadReturnCode_Error An error occured on the socket to metis + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static ReadReturnCode +_readPacketBody(FwdMetisState *fwd_state) +{ + ReadReturnCode returnCode = ReadReturnCode_Error; + + trapUnexpectedStateIf(fwd_state->nextMessage.packet == NULL, "Trying to read a message with a null packet buffer"); + + size_t remaining = parcBuffer_Remaining(fwd_state->nextMessage.packet); + + if (DEBUG_OUTPUT) { + printf("%9c %s socket %d read up to %zu bytes\n", + ' ', __func__, fwd_state->fd, remaining); + } + + void *overlay = parcBuffer_Overlay(fwd_state->nextMessage.packet, 0); + ssize_t nread = recv(fwd_state->fd, overlay, remaining, 0); + + if (nread > 0) { + // good read + parcBuffer_SetPosition(fwd_state->nextMessage.packet, parcBuffer_Position(fwd_state->nextMessage.packet) + nread); + + if (nread == remaining) { + returnCode = ReadReturnCode_Finished; + } else { + returnCode = ReadReturnCode_PartialRead; + } + } else if (nread == 0) { + // connection closed + returnCode = ReadReturnCode_Closed; + } else { + switch (errno) { + case EAGAIN: + // call would block. These can happen becasue _readMessage is in a while loop and we detect + // the end of the loop because we cannot read the entire message body. + returnCode = ReadReturnCode_PartialRead; + break; + + default: + // an error. I think all errors will be hard errors and we close the connection + if (DEBUG_OUTPUT) { + printf("%9c %s socket %d recv error: (%d) %s\n", + ' ', __func__, fwd_state->fd, errno, strerror(errno)); + } + returnCode = ReadReturnCode_Error; + } + } + + + if (DEBUG_OUTPUT) { + printf("%9c %s socket %u msg_length %zu read_length %zd remaining %zu\n", + ' ', + __func__, + fwd_state->fd, + fwd_state->nextMessage.length, + nread, + parcBuffer_Remaining(fwd_state->nextMessage.packet)); + } + + return returnCode; +} + +/** + * Read packet from metis + * + * Reads the fixed heder. Once fixed header is done, begins reading the packet body. Keeps + * all the incremental state to do partial reads. + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @retval ReadReturnCode_Finished one entire packet is ready in the buffer + * @retval ReadReturnCode_PartialRead need more bytes + * @retval ReadRetrunCode_Closed The socket to metis is closed (a special case of Error) + * @retval ReadReturnCode_Error An error occured on the socket to metis + * + * Example: + * @code + * <#example#> + * @endcode + */ +static ReadReturnCode +_readPacket(FwdMetisState *fwd_state) +{ + ReadReturnCode returnCode = ReadReturnCode_PartialRead; + + // are we still reading the header? + if (fwd_state->nextMessage.remainingReadLength > 0) { + returnCode = _readPacketHeader(fwd_state); + } else { + returnCode = ReadReturnCode_Finished; + } + + // After reading the header, it may be possible to read the body too + if (returnCode == ReadReturnCode_Finished && fwd_state->nextMessage.remainingReadLength == 0) { + returnCode = _readPacketBody(fwd_state); + } + + return returnCode; +} + +/** + * Read as many packets as we can from Metis + * + * Will read the stream socket from metis until we get a PartialRead return code from + * either the attempt to read the header or the body. + * + * On read error, will send a notification message the connection is closed up to + * the API and will disable read and write events. + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_readFromMetis(FwdMetisState *fwd_state, RtaConnection *conn) +{ + RtaProtocolStack *stack = rtaConnection_GetStack(conn); + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS); + + ReadReturnCode readCode; + while ((readCode = _readPacket(fwd_state)) == ReadReturnCode_Finished) { + rtaComponentStats_Increment(stats, STATS_UPCALL_IN); + fwd_state->stats.countUpcallReads++; + + // setup the buffer for reading + parcBuffer_Flip(fwd_state->nextMessage.packet); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s sending packet buffer %p up stack length %zu\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) fwd_state->nextMessage.packet, + parcBuffer_Remaining(fwd_state->nextMessage.packet)); + } + + // this is just to make the signature of connector_Fwd_Metis_SendUpStack tractable, PacketData + // is not exposed outside this scope. + + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_METIS, RTA_UP); + PacketData data = { + .fwd_state = fwd_state, + .conn = conn, + .out = out, + .stats = stats, + }; + + connector_Fwd_Metis_SendUpStack(&data); + + // done with the packet buffer. Release our hold on it. If it was sent up the stack + // another reference count was made. + parcBuffer_Release(&fwd_state->nextMessage.packet); + + // now setup for next packet + _initializeNextMessage(&fwd_state->nextMessage); + } + + if (readCode == ReadReturnCode_Closed) { + fwd_state->isConnected = false; + parcEvent_Stop(fwd_state->readEvent); + parcEvent_Stop(fwd_state->writeEvent); + rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_CONNECTION_CLOSED, NULL, "Socket operation returned closed by remote"); + } else if (readCode == ReadReturnCode_Error) { + fwd_state->isConnected = false; + parcEvent_Stop(fwd_state->readEvent); + parcEvent_Stop(fwd_state->writeEvent); + rtaConnection_SendStatus(conn, FWD_METIS, RTA_UP, notifyStatusCode_CONNECTION_CLOSED, NULL, "Socket operation returned error"); + } + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s total upcall reads in %" PRIu64 " out %" PRIu64 "\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaComponentStats_Get(stats, STATS_UPCALL_IN), + rtaComponentStats_Get(stats, STATS_UPCALL_OUT)); + } +} + +/** + * Append a vector to the buffer + * + * @param [in] wireFormat The wire format packet, assumes current position is start of packet + * @param [in] fwd_output The libevent buffer to add the memory reference to + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_queueIoVecMessageToMetis(CCNxCodecNetworkBufferIoVec *vec, PARCEventBuffer *fwd_output) +{ + fwd_metis_references_queued++; + + int iovcnt = ccnxCodecNetworkBufferIoVec_GetCount(vec); + const struct iovec *array = ccnxCodecNetworkBufferIoVec_GetArray(vec); + + for (int i = 0; i < iovcnt; i++) { + if (parcEventBuffer_Append(fwd_output, array[i].iov_base, array[i].iov_len) < 0) { + trapUnrecoverableState("%s error writing to bev_local", __func__); + } + } +} + +/** + * Append to the buffer + * + * @param [in] wireFormat The wire format packet, assumes current position is start of packet + * @param [in] fwd_output The libevent buffer to add the memory reference to + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_queueBufferMessageToMetis(PARCBuffer *wireFormat, PARCEventBuffer *fwd_output) +{ + fwd_metis_references_queued++; + + void *overlay = parcBuffer_Overlay(wireFormat, 0); + size_t length = parcBuffer_Remaining(wireFormat); + + if (parcEventBuffer_Append(fwd_output, overlay, length) < 0) { + trapUnrecoverableState("%s error writing to bev_local", __func__); + } +} + +/** + * Write as much as possible from the output buffer to metis + * + * Write as much as we can to metis. If there is nothing left, deactivate the write event. + * If there is still bytes left in the output buffer, activate the write event. + * + * postconditions: + * - Write as many bytes as possible from the output buffer to metis + * - If there are still bytes remaining, enable the write event + * - If there are no bytes remaining, disable the write event. + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_dequeueMessagesToMetis(FwdMetisState *fwdConnState) +{ + // if we try to write a 0 length buffer, write will return -1 like an error + if (parcEventBuffer_GetLength(fwdConnState->metisOutputQueue) > 0) { + fwdConnState->stats.countDowncallWrites++; + int nwritten = parcEventBuffer_WriteToFileDescriptor(fwdConnState->metisOutputQueue, fwdConnState->fd, -1); + if (nwritten < 0) { + // an error + trapNotImplemented("Bugzid: 2194"); + } + + if (DEBUG_OUTPUT) { + printf("%9c %s wrote %d bytes to socket %d, %zu bytes remaining\n", + ' ', + __func__, + nwritten, + fwdConnState->fd, + parcEventBuffer_GetLength(fwdConnState->metisOutputQueue)); + } + + // if we could not write the whole buffer, make sure we have a write event pending + if (parcEventBuffer_GetLength(fwdConnState->metisOutputQueue) > 0) { + parcEvent_Start(fwdConnState->writeEvent); + if (DEBUG_OUTPUT) { + printf("%9c %s enabled write event\n", ' ', __func__); + } + } else { + parcEvent_Stop(fwdConnState->writeEvent); + if (DEBUG_OUTPUT) { + printf("%9c %s disabled write event\n", ' ', __func__); + } + } + } +} + + +/** + * Called when we get an event on a socket we believe is connected + * + * libevent will call this with an PARCEventType_Read on connection close too (the read length will be 0). + * + * @param [in] fwd_state An allocated forwarder connection state + * @param [in] conn The corresponding RTA connection + * @param [in] what The Libevent set of events + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_connectedEventHandler(FwdMetisState *fwd_state, RtaConnection *conn, short what) +{ + if (what & PARCEventType_Read) { + _readFromMetis(fwd_state, conn); + } + + if (what & PARCEventType_Write) { + _dequeueMessagesToMetis(fwd_state); + } +} + +/** + * Called for any activity on the socket. Maybe in either connected or disconnected state. + */ +static void +_eventCallback(int fd, PARCEventType what, void *connectionVoid) +{ + RtaConnection *conn = (RtaConnection *) connectionVoid; + FwdMetisState *fwd_state = (FwdMetisState *) rtaConnection_GetPrivateData(conn, FWD_METIS);; + + if (!fwd_state->isConnected) { + _disconnectedEventHandler(fwd_state, conn, what); + + // once we connect, we should try a read immediately too + } + + if (fwd_state->isConnected) { + _connectedEventHandler(fwd_state, conn, what); + } +} + +/** + * Updates the connections's Blocked Down state + * + * If the bytes in our output buffer are greater than METIS_OUTPUT_QUEUE_BYTES, then + * we will set the Blocked Down condition on the connection. This will prevent the + * API connector from accepting more messages. + * + * Messages already in the connection queue will still be processed. + * + * @param [in] fwd_output The libevent buffer to check the backlog + * @param [in] conn The RtaConnection the set or clear the blocked down condition + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_updateBlockedDownState(PARCEventBuffer *fwd_output, RtaConnection *conn) +{ + size_t queue_bytes = parcEventBuffer_GetLength(fwd_output); + if (queue_bytes > METIS_OUTPUT_QUEUE_BYTES) { + // block down + + if (!rtaConnection_BlockedDown(conn)) { + rtaConnection_SetBlockedDown(conn); + } + + // note that we continue execution and put the packet we have in hand on the queue + // setting the blocked down state only affects the API connector. Packets already in the system + // will keep flowing down to us + } else { + // if it is blocked, unblock it + if (rtaConnection_BlockedDown(conn)) { + rtaConnection_ClearBlockedDown(conn); + } + } +} + +static void +connector_Fwd_Metis_Downcall_HandleConnected(FwdMetisState *fwdConnState, TransportMessage *tm, RtaConnection *conn, RtaComponentStats *stats) +{ + _updateBlockedDownState(fwdConnState->metisOutputQueue, conn); + + CCNxTlvDictionary *dictionary = transportMessage_GetDictionary(tm); + + bool queued = false; + + CCNxCodecNetworkBufferIoVec *vec = ccnxWireFormatMessage_GetIoVec(dictionary); + if (vec != NULL) { + _queueIoVecMessageToMetis(vec, fwdConnState->metisOutputQueue); + queued = true; + } else { + PARCBuffer *wireFormat = ccnxWireFormatMessage_GetWireFormatBuffer(dictionary); + if (wireFormat != NULL) { + _queueBufferMessageToMetis(wireFormat, fwdConnState->metisOutputQueue); + queued = true; + } + } + + if (queued) { + rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT); + + if (DEBUG_OUTPUT) { + struct timeval delay = transportMessage_GetDelay(tm); + printf("%9" PRIu64 " %s total downcall reads %" PRIu64 " references queued %u dequeued %u not queued %u last delay %.6f\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaComponentStats_Get(stats, STATS_DOWNCALL_IN), + fwd_metis_references_queued, + fwd_metis_references_dequeued, + fwd_metis_references_notqueued, + delay.tv_sec + delay.tv_usec * 1E-6); + } + } else { + fwd_metis_references_notqueued++; + } + + // The transport message is destroyed in connector_Fwd_Metis_Downcall_Read() +} + +static void +_ackRequest(RtaConnection *conn, PARCJSON *request) +{ + PARCJSON *response = cpiAcks_CreateAck(request); + CCNxTlvDictionary *ackDict = ccnxControlFacade_CreateCPI(response); + + TransportMessage *tm_ack = transportMessage_CreateFromDictionary(ackDict); + ccnxTlvDictionary_Release(&ackDict); + parcJSON_Release(&response); + + transportMessage_SetInfo(tm_ack, rtaConnection_Copy(conn), rtaConnection_FreeFunc); + + RtaProtocolStack *stack = rtaConnection_GetStack(conn); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FWD_METIS, RTA_UP); + if (rtaComponent_PutMessage(out, tm_ack)) { + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS); + rtaComponentStats_Increment(stats, STATS_UPCALL_OUT); + } +} + +static bool +_handleDownControl(FwdMetisState *fwdConnState, RtaConnection *conn, TransportMessage *tm) +{ + bool consumedMessage = false; + + CCNxTlvDictionary *dict = transportMessage_GetDictionary(tm); + if (ccnxTlvDictionary_IsControl(dict)) { + if (ccnxControlFacade_IsCPI(dict)) { + PARCJSON *json = ccnxControlFacade_GetJson(dict); + if (controlPlaneInterface_GetCPIMessageType(json) == CPI_REQUEST) { + if (cpi_getCPIOperation2(json) == CPI_PAUSE) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s conn %p recieved PAUSE\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn); + } + _ackRequest(conn, json); + consumedMessage = true; + } + + if (cpi_getCPIOperation2(json) == CPI_FLUSH) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s conn %p recieved FLUSH\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) conn); + } + _ackRequest(conn, json); + consumedMessage = true; + } + } + } + } + + if (consumedMessage) { + fwdConnState->stats.countDowncallControl++; + } + + return consumedMessage; +} + +/** + * send raw packet from codec to forwarder. We are passed the ProtocolStack on the ptr. + */ +static void +connector_Fwd_Metis_Downcall_Read(PARCEventQueue *in, PARCEventType event, void *ptr) +{ + TransportMessage *tm; + + while ((tm = rtaComponent_GetMessage(in)) != NULL) { + RtaConnection *conn = rtaConnection_GetFromTransport(tm); + FwdMetisState *fwdConnState = rtaConnection_GetPrivateData(conn, FWD_METIS); + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS); + rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN); + fwdConnState->stats.countDowncallReads++; + + bool consumedControl = _handleDownControl(fwdConnState, conn, tm); + if (!consumedControl) { + // we did not consume the message as a control packet for the metis connector + + if (fwdConnState->isConnected) { + // If the socket is connected, this will "do the right thing" and consume the transport message. + connector_Fwd_Metis_Downcall_HandleConnected(fwdConnState, tm, conn, stats); + } else { + // Oops, got a packet before we're connected. + printf("\nConnection %p transport message %p on fd %d that's not open\n", (void *) conn, (void *) tm, fwdConnState->fd); + } + + // now attempt to write to the network + _dequeueMessagesToMetis(fwdConnState); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s total downcall reads in %" PRIu64 " out %" PRIu64 "\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaComponentStats_Get(stats, STATS_DOWNCALL_IN), + rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT)); + } + } + + transportMessage_Destroy(&tm); + } +} + +/** + * Destroy the FwdMetisState object. + * + * Destroys any packets waiting in queue, frees the libevent structures used by the connection to Metis. + * Frees the FwdMetisState object and will NULL *fwdStatePtr. + * + * @param [in,out] fwdStatePtr Double pointer to the allocated state. Will be NULL'd on output. + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +_fwdMetisState_Release(FwdMetisState **fwdStatePtr) +{ + FwdMetisState *fwd_state = *fwdStatePtr; + + while (!parcDeque_IsEmpty(fwd_state->transportMessageQueue)) { + TransportMessage *tm = parcDeque_RemoveFirst(fwd_state->transportMessageQueue); + transportMessage_Destroy(&tm); + } + + parcDeque_Release(&fwd_state->transportMessageQueue); + + if (fwd_state->readEvent) { + parcEvent_Destroy(&(fwd_state->readEvent)); + } + + if (fwd_state->writeEvent) { + parcEvent_Destroy(&(fwd_state->writeEvent)); + } + + parcEventTimer_Destroy(&(fwd_state->transportMessageQueueEvent)); + + if (fwd_state->metisOutputQueue) { + parcEventBuffer_Destroy(&(fwd_state->metisOutputQueue)); + } + + if (fwd_state->nextMessage.packet) { + parcBuffer_Release(&fwd_state->nextMessage.packet); + } + + close(fwd_state->fd); + + parcMemory_Deallocate((void **) &fwd_state); + *fwdStatePtr = NULL; +} + +static int +connector_Fwd_Metis_Closer(RtaConnection *conn) +{ + FwdMetisState *fwd_state = rtaConnection_GetPrivateData(conn, FWD_METIS); + rtaConnection_SetPrivateData(conn, FWD_METIS, NULL); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s called on fwd_state %p\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), __func__, (void *) fwd_state); + } + + RtaComponentStats *stats = rtaConnection_GetStats(conn, FWD_METIS); + rtaComponentStats_Increment(stats, STATS_CLOSES); + + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s closed fwd_state %p deque length %zu\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) fwd_state, + parcDeque_Size(fwd_state->transportMessageQueue)); + + printf("%9" PRIu64 " %s closed fwd_state %p stats: up { reads %u wok %u werr %u wblk %u wfull %u wctrlok %u wctrlerr %u }\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) fwd_state, + fwd_state->stats.countUpcallReads, fwd_state->stats.countUpcallWriteDataOk, fwd_state->stats.countUpcallWriteDataError, + fwd_state->stats.countUpcallWriteDataBlocked, fwd_state->stats.countUpcallWriteDataQueueFull, + fwd_state->stats.countUpcallWriteControlOk, fwd_state->stats.countUpcallWriteControlError); + + printf("%9" PRIu64 " %s closed fwd_state %p stats: dn { reads %u wok %u wctrlok %u }\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + (void *) fwd_state, + fwd_state->stats.countDowncallReads, fwd_state->stats.countDowncallWrites, fwd_state->stats.countDowncallControl); + } + + _fwdMetisState_Release(&fwd_state); + + return 0; +} + +static int +connector_Fwd_Metis_Release(RtaProtocolStack *stack) +{ + return 0; +} + +/** + * Enable to disable the read event based on the Blocked Up state + * + * If we receive a Blocked Up state change and the read event is pending, make it + * not pending. If we receive a not blocked up state change and the read event is not + * pending, make it pending. + * + * @param [<#in#> | <#out#> | <#in,out#>] <#name#> <#description#> + * + * Example: + * @code + * { + * <#example#> + * } + * @endcode + */ +static void +connector_Fwd_Metis_StateChange(RtaConnection *conn) +{ + struct fwd_metis_state *fwd_state = rtaConnection_GetPrivateData(conn, FWD_METIS); + + int isReadPending = parcEvent_Poll(fwd_state->readEvent, PARCEventType_Read); + + + // If we are blocked in the UP direction, disable events on the read queue + if (rtaConnection_BlockedUp(conn)) { + // we only disable it and log it if it was active + if (isReadPending) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u blocked up, disable PARCEventType_Read\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaConnection_GetConnectionId(conn)); + } + + parcEvent_Stop(fwd_state->readEvent); + } + } else { + if ((!isReadPending) && fwd_state->isConnected) { + if (DEBUG_OUTPUT) { + printf("%9" PRIu64 " %s connection %u unblocked up, enable PARCEventType_Read\n", + rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))), + __func__, + rtaConnection_GetConnectionId(conn)); + } + parcEvent_Start(fwd_state->readEvent); + } + } + + // We do not need to do anything with DOWN direction, becasue we're the component sending + // those block down messages. +} |