aboutsummaryrefslogtreecommitdiffstats
path: root/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c
diff options
context:
space:
mode:
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c')
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c634
1 files changed, 634 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c
new file mode 100644
index 00000000..4e5ea48f
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/connectors/rta_ApiConnection.c
@@ -0,0 +1,634 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implements the API connector. The API connector is a event based component to manage the socket
+ * to the API.
+ *
+ * The API Connector's job is to manage the socket to the API between the RTA Framework and the
+ * API. It does this by using an event directly to manage that socket. It uses the same
+ * event scheduler base as the RTA framework, so its all part of the same event dispatcher.
+ *
+ * The RTA Transport now only speaks CCNxTlvDictionary messages. If we receive old timey Interest,
+ * ContentObject, etc., we translate them to the Dictionary format. The TransportMessage and CCNxMessage
+ * will both go away.
+ *
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <fcntl.h>
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <errno.h>
+
+#include <parc/algol/parc_EventBuffer.h>
+
+#include <parc/algol/parc_Memory.h>
+#include <LongBow/runtime.h>
+
+#include <ccnx/transport/transport_rta/connectors/rta_ApiConnection.h>
+#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h>
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+#include <ccnx/api/control/controlPlaneInterface.h>
+#include <ccnx/api/control/cpi_ControlFacade.h>
+
+#include <ccnx/transport/transport_rta/config/config_Codec_Tlv.h>
+
+#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_TlvDictionary.h>
+
+
+#ifndef DEBUG_OUTPUT
+#define DEBUG_OUTPUT 0
+#endif
+
+#define PAIR_TRANSPORT 0
+#define PAIR_OTHER 1
+
+// we are only putting an 8-byte pointer on the queue, so
+// this should be 50 messages
+#define MAX_API_QUEUE_BYTES 400
+
+
+unsigned api_upcall_writes = 0;
+unsigned api_downcall_reads = 0;
+extern unsigned rta_transport_reads;
+
+// per connection state
+struct rta_api_connection {
+ // A reference to our connection
+ RtaConnection *connection;
+
+ // event queue for socketpair to API
+ PARCEventQueue *bev_api;
+
+ // these are assingned to us by the Transport
+ int api_fd;
+ int transport_fd;
+};
+
+// ==========================================================================================
+// STATIC PROTOTYPES and their headerdoc
+
+/**
+ * PARCEvent calls this when the API queue falls below the watermark
+ *
+ * We watermark the write queue at MAX_API_QUEUE_BYTES bytes. When a write takes
+ * the queue backlog below that amount, PARCEvent calls this.
+ *
+ * @param [in] connVoid Void pointer to the RtaConnection
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void rtaApiConnection_WriteCallback(PARCEventQueue *queue, PARCEventType type, void *conn);
+
+/**
+ * PARCEvent calls this when there's a message from the API
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void rtaApiConnection_Downcall_Read(PARCEventQueue *bev, PARCEventType type, void *conn);
+
+/**
+ * PARCEvent calls this when there's a non-read/write event on the API's socket
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void rtaApiConnection_Downcall_Event(PARCEventQueue *, PARCEventQueueEventType events, void *conn);
+
+
+/**
+ * Drains the input queue and output queue of a connection to the API
+ *
+ * The input queue and output queue contain pointers to CCNxMessages. On close,
+ * we need to drain these queues and release all the messages.
+ *
+ * The API Connector is responsible for only draining its input queue. The output
+ * queue up to the API is drained by the RTA Framework.
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void rtaApiConnection_DrainApiConnection(RtaApiConnection *apiConnection);
+
+/**
+ * Writes a message to the API
+ *
+ * Takes ownership of the message which is passed up to the API
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void rtaApiConnection_WriteMessageToApi(RtaApiConnection *apiConnection, CCNxMetaMessage *msg);
+
+// ==========================================================================================
+// Public API
+
+static void
+rtaApiConnection_SetupSocket(RtaApiConnection *apiConnection, RtaConnection *connection)
+{
+ RtaProtocolStack *stack = rtaConnection_GetStack(connection);
+ PARCEventScheduler *base = rtaFramework_GetEventScheduler(rtaProtocolStack_GetFramework(stack));
+ int error;
+
+ // Set non-blocking flag
+ int flags = fcntl(apiConnection->transport_fd, F_GETFL, NULL);
+ assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno);
+ int failure = fcntl(apiConnection->transport_fd, F_SETFL, flags | O_NONBLOCK);
+ assertFalse(failure, "fcntl failed to set socket non-blocking(%d) %s\n", errno, strerror(errno));
+
+ apiConnection->bev_api = parcEventQueue_Create(base, apiConnection->transport_fd, 0);
+ assertNotNull(apiConnection->bev_api, "Got null result from parcEventQueue_Create");
+
+ // Set buffer size
+ int sendbuff = 1000 * 8;
+
+ error = setsockopt(rtaConnection_GetTransportFd(connection), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
+ assertTrue(error == 0, "Got error setting SO_SNDBUF: %s", strerror(errno));
+
+ parcEventQueue_SetWatermark(apiConnection->bev_api, PARCEventType_Write, MAX_API_QUEUE_BYTES, 0);
+ parcEventQueue_SetCallbacks(apiConnection->bev_api,
+ rtaApiConnection_Downcall_Read,
+ rtaApiConnection_WriteCallback,
+ rtaApiConnection_Downcall_Event,
+ (void *) connection);
+
+ parcEventQueue_Enable(apiConnection->bev_api, PARCEventType_Read | PARCEventType_Write);
+}
+
+RtaApiConnection *
+rtaApiConnection_Create(RtaConnection *connection)
+{
+ RtaApiConnection *apiConnection = parcMemory_AllocateAndClear(sizeof(RtaApiConnection));
+ assertNotNull(apiConnection, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(RtaApiConnection));
+
+ apiConnection->connection = rtaConnection_Copy(connection);
+ apiConnection->api_fd = rtaConnection_GetApiFd(connection);
+ apiConnection->transport_fd = rtaConnection_GetTransportFd(connection);
+ rtaApiConnection_SetupSocket(apiConnection, connection);
+
+ return apiConnection;
+}
+
+void
+rtaApiConnection_Destroy(RtaApiConnection **apiConnectionPtr)
+{
+ assertNotNull(apiConnectionPtr, "Parameter apiConnecitonPtr must be non-null");
+ assertNotNull(*apiConnectionPtr, "Parameter apiConnecitonPtr must dereference to non-null");
+ RtaApiConnection *apiConnection = *apiConnectionPtr;
+
+
+ // Send all the outbound messages up to the API. This at least gets them out
+ // of our output queue on to the API's socket.
+ parcEventQueue_Finished(apiConnection->bev_api, PARCEventType_Write);
+ rtaApiConnection_DrainApiConnection(apiConnection);
+
+ parcEventQueue_Destroy(&(apiConnection->bev_api));
+
+ rtaConnection_Destroy(&apiConnection->connection);
+
+ parcMemory_Deallocate((void **) &apiConnection);
+
+ *apiConnectionPtr = NULL;
+}
+
+static void
+rtaApiConnection_SendToApiAsDictionary(RtaApiConnection *apiConnection, TransportMessage *tm)
+{
+ CCNxMetaMessage *msg = ccnxMetaMessage_Acquire(transportMessage_GetDictionary(tm));
+ rtaApiConnection_WriteMessageToApi(apiConnection, msg);
+}
+
+static CCNxName *
+rtaApiConnection_GetNameFromTransportMessage(TransportMessage *tm)
+{
+ CCNxName *name = NULL;
+ CCNxTlvDictionary *dictionary = transportMessage_GetDictionary(tm);
+ switch (ccnxTlvDictionary_GetSchemaVersion(dictionary)) {
+ case CCNxTlvDictionary_SchemaVersion_V1:
+ name = ccnxTlvDictionary_GetName(dictionary, CCNxCodecSchemaV1TlvDictionary_MessageFastArray_NAME);
+ break;
+
+ default:
+ break;
+ }
+ return name;
+}
+
+/**
+ * Writes the CCNxMessage inside the transport message up to the API.
+ * Its possible that if there's no space in the socket the write will block
+ * and return an error.
+ *
+ * @return true if written to API, false if not (most likely would block)
+ */
+bool
+rtaApiConnection_SendToApi(RtaApiConnection *apiConnection, TransportMessage *tm, RtaComponentStats *stats)
+{
+ assertNotNull(apiConnection, "Parameter apiConnection must be non-null");
+
+ if (DEBUG_OUTPUT) {
+ CCNxName *name = rtaApiConnection_GetNameFromTransportMessage(tm);
+ char *nameString = NULL;
+ if (name) {
+ nameString = ccnxName_ToString(name);
+ }
+
+ struct timeval delay = transportMessage_GetDelay(tm);
+ printf("%9" PRIu64 " %s putting transport msg %p to user fd %d delay %.6f name %s\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ (void *) tm,
+ apiConnection->api_fd,
+ delay.tv_sec + delay.tv_usec * 1E-6,
+ nameString);
+
+ if (nameString) {
+ parcMemory_Deallocate((void **) &nameString);
+ }
+ }
+
+ rtaApiConnection_SendToApiAsDictionary(apiConnection, tm);
+
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p fd_out %d state %p upcalls %u reads %u\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ (void *) apiConnection->connection,
+ apiConnection->transport_fd,
+ (void *) apiConnection,
+ api_upcall_writes,
+ rta_transport_reads);
+ }
+
+ return true;
+}
+
+void
+rtaApiConnection_BlockDown(RtaApiConnection *apiConnection)
+{
+ assertNotNull(apiConnection, "Parameter apiConnection must be non-null");
+ PARCEventType enabled_events = parcEventQueue_GetEnabled(apiConnection->bev_api);
+
+ // we only disable it and log it if it was active
+ if (enabled_events & PARCEventType_Read) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %u blocked down, disable PARCEventType_Read\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ rtaConnection_GetConnectionId(apiConnection->connection));
+ }
+
+ parcEventQueue_Disable(apiConnection->bev_api, PARCEventType_Read);
+ }
+}
+
+void
+rtaApiConnection_UnblockDown(RtaApiConnection *apiConnection)
+{
+ assertNotNull(apiConnection, "Parameter apiConnection must be non-null");
+ PARCEventType enabled_events = parcEventQueue_GetEnabled(apiConnection->bev_api);
+
+ if (!(enabled_events & PARCEventType_Read)) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %u unblocked down, enable PARCEventType_Read\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ rtaConnection_GetConnectionId(apiConnection->connection));
+ }
+ parcEventQueue_Enable(apiConnection->bev_api, PARCEventType_Read);
+ }
+}
+
+// ==========================================================================================
+// Internal implementation
+
+static void
+rtaApiConnection_WriteMessageToApi(RtaApiConnection *apiConnection, CCNxMetaMessage *msg)
+{
+ assertNotNull(msg, "Parameter msg must be non-null");
+
+ int error = parcEventQueue_Write(apiConnection->bev_api, &msg, sizeof(&msg));
+ assertTrue(error == 0,
+ "write to transport_fd %d write error: (%d) %s",
+ apiConnection->transport_fd, errno, strerror(errno));
+
+ // debugging tracking
+ api_upcall_writes++;
+}
+
+static void
+_rtaAPIConnection_ProcessCPIRequest(RtaConnection *conn, PARCJSON *json)
+{
+ // Is it a request type we know about?
+
+ switch (cpi_getCPIOperation2(json)) {
+ case CPI_PAUSE: {
+ RtaConnectionStateType oldstate = rtaConnection_GetState(conn);
+ if (oldstate == CONN_OPEN) {
+ rtaConnection_SetState(conn, CONN_PAUSED);
+ }
+ break;
+ }
+
+ default:
+ // do nothing, don't know about this message type
+ break;
+ }
+}
+
+static void
+connector_Api_ProcessCpiMessage(RtaConnection *conn, CCNxTlvDictionary *controlDictionary)
+{
+ if (ccnxControlFacade_IsCPI(controlDictionary)) {
+ PARCJSON *json = ccnxControlFacade_GetJson(controlDictionary);
+ switch (controlPlaneInterface_GetCPIMessageType(json)) {
+ case CPI_REQUEST: {
+ _rtaAPIConnection_ProcessCPIRequest(conn, json);
+ break;
+ }
+
+ case CPI_RESPONSE:
+ break;
+
+ case CPI_ACK:
+ break;
+
+ default:
+ assertTrue(0, "Got unknown CPI message type: %d", controlPlaneInterface_GetCPIMessageType(json));
+ }
+ }
+}
+
+static void
+rtaApiConnection_ProcessControlFromApi(RtaApiConnection *apiConnection, RtaProtocolStack *stack, CCNxTlvDictionary *controlDictionary)
+{
+ if (ccnxControlFacade_IsCPI(controlDictionary)) {
+ connector_Api_ProcessCpiMessage(apiConnection->connection, controlDictionary);
+ }
+}
+
+static void
+rtaApiConnection_Downcall_ProcessDictionary(RtaApiConnection *apiConnection, RtaProtocolStack *stack,
+ PARCEventQueue *queue_out, RtaComponentStats *stats, CCNxTlvDictionary *messageDictionary)
+{
+ // Look at the control message before checking for the connection closed
+ if (ccnxTlvDictionary_IsControl(messageDictionary)) {
+ rtaApiConnection_ProcessControlFromApi(apiConnection, stack, messageDictionary);
+ }
+
+ // In paused or closed state, we only pass control messages
+ if ((rtaConnection_GetState(apiConnection->connection) == CONN_OPEN) || (ccnxTlvDictionary_IsControl(messageDictionary))) {
+ TransportMessage *tm = transportMessage_CreateFromDictionary(messageDictionary);
+
+ // Set the auxiliary information to the message's connection
+ transportMessage_SetInfo(tm, rtaConnection_Copy(apiConnection->connection), rtaConnection_FreeFunc);
+
+ if (DEBUG_OUTPUT) {
+ CCNxName *name = NULL;
+ if (ccnxTlvDictionary_IsInterest(messageDictionary)) {
+ name = ccnxInterest_GetName(messageDictionary);
+ } else if (ccnxTlvDictionary_IsContentObject(messageDictionary)) {
+ name = ccnxContentObject_GetName(messageDictionary);
+ }
+
+ char *noname = "NONAME";
+ char *nameString = noname;
+ if (name) {
+ nameString = ccnxName_ToString(name);
+ }
+
+ printf("%9" PRIu64 " %s putting transport msg %p from user fd %d: %s\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ (void *) tm, apiConnection->api_fd,
+ nameString);
+
+ if (nameString != noname) {
+ parcMemory_Deallocate((void **) &nameString);
+ }
+
+ //ccnxTlvDictionary_Display(0, messageDictionary);
+ }
+
+ // send down the stack. If it fails, it destroys the message.
+ if (rtaComponent_PutMessage(queue_out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT);
+ }
+ }
+}
+
+static void
+rtaApiConnection_Downcall_ProcessMessage(RtaApiConnection *apiConnection, RtaProtocolStack *stack, PARCEventBuffer *eb_in,
+ PARCEventQueue *queue_out, RtaComponentStats *stats)
+{
+ api_downcall_reads++;
+ CCNxMetaMessage *msg;
+
+ int bytesRemoved = parcEventBuffer_Read(eb_in, &msg, sizeof(CCNxMetaMessage *));
+ assertTrue(bytesRemoved == sizeof(CCNxMetaMessage *),
+ "Error, did not remove an entire pointer, expected %zu got %d",
+ sizeof(CCNxMetaMessage *),
+ bytesRemoved);
+
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN);
+
+ // This will save its own reference to the messageDictionary
+ rtaApiConnection_Downcall_ProcessDictionary(apiConnection, stack, queue_out, stats, msg);
+
+ // At this point, the CCNxMetaMessage passed in by the application thread has been
+ // acquired rtaApiConnection_Downcall_ProcessDictionary(), so we can Release the reference we
+ // acquired in rtaTransport_Send().
+ ccnxMetaMessage_Release(&msg);
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p total downcall reads in %" PRIu64 " out %" PRIu64 "\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ (void *) apiConnection->connection,
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_IN),
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT));
+ }
+}
+
+
+/*
+ * Called by PARCEvent when there's a message to read from the API
+ * Read a message from the API.
+ * rtaConnectionVoid is the RtaConnection associated with the api descriptor
+ */
+static void
+rtaApiConnection_Downcall_Read(PARCEventQueue *bev, PARCEventType type, void *rtaConnectionVoid)
+{
+ RtaConnection *conn = (RtaConnection *) rtaConnectionVoid;
+
+ assertNotNull(rtaConnectionVoid, "Parameter must be a non-null void *");
+
+ RtaProtocolStack *stack = rtaConnection_GetStack(conn);
+ assertNotNull(stack, "rtaConnection_GetStack returned null");
+
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, API_CONNECTOR);
+ assertNotNull(stats, "rtaConnection_GetStats returned null");
+
+ RtaApiConnection *apiConnection = rtaConnection_GetPrivateData(conn, API_CONNECTOR);
+ assertNotNull(apiConnection, "rtaConnection_GetPrivateData got null");
+
+ PARCEventBuffer *eb_in = parcEventBuffer_GetQueueBufferInput(bev);
+
+ PARCEventQueue *queue_out = rtaComponent_GetOutputQueue(conn, API_CONNECTOR, RTA_DOWN);
+ assertNotNull(queue_out, "component_GetOutputQueue returned null");
+
+ while (parcEventBuffer_GetLength(eb_in) >= sizeof(TransportMessage *)) {
+ rtaApiConnection_Downcall_ProcessMessage(apiConnection, stack, eb_in, queue_out, stats);
+ }
+ parcEventBuffer_Destroy(&eb_in);
+}
+
+/*
+ * This is used on the connection to the API out of the transport box
+ */
+static void
+rtaApiConnection_Downcall_Event(PARCEventQueue *bev, PARCEventQueueEventType events, void *ptr)
+{
+}
+
+/**
+ * Drains all the CCNxMessages off an event buffer and destroys them
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void
+drainBuffer(PARCEventBuffer *buffer, RtaConnection *conn)
+{
+ size_t length;
+
+ while ((length = parcEventBuffer_GetLength(buffer)) > 0) {
+ CCNxMetaMessage *msg;
+ ssize_t len;
+
+ len = parcEventBuffer_Read(buffer, &msg, sizeof(CCNxMetaMessage *));
+ assertTrue(len == sizeof(CCNxMetaMessage *),
+ "Removed incorrect length, expected %zu got %zd: (%d) %s",
+ sizeof(CCNxMetaMessage *),
+ len,
+ errno,
+ strerror(errno));
+
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s conn %p drained message %p\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ (void *) conn,
+ (void *) msg);
+ }
+ ccnxMetaMessage_Release(&msg);
+ }
+}
+
+/**
+ * Called on Destroy to clear our input buffer. This does not
+ * drain the output (to API) buffer, that is done by the RTA Framework
+ */
+static void
+rtaApiConnection_DrainApiConnection(RtaApiConnection *apiConnection)
+{
+ // drain and free the transport_fd
+ parcEventQueue_Disable(apiConnection->bev_api, PARCEventType_Read);
+
+ PARCEventBuffer *in = parcEventBuffer_GetQueueBufferInput(apiConnection->bev_api);
+ drainBuffer(in, apiConnection->connection);
+ parcEventBuffer_Destroy(&in);
+
+ // There may be some messages in the output buffer that
+ // have not actually been written to the kernel socket.
+ // Drain those too, as the API will never see them
+
+ if (DEBUG_OUTPUT) {
+ PARCEventBuffer *out = parcEventBuffer_GetQueueBufferOutput(apiConnection->bev_api);
+ printf("%9" PRIu64 " %s conn %p output buffer has %zu bytes\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(apiConnection->connection))),
+ __func__,
+ (void *) apiConnection->connection,
+ parcEventBuffer_GetLength(out));
+ parcEventBuffer_Destroy(&out);
+ }
+}
+
+/**
+ * Called by PARCEvent when we cross below the write watermark
+ */
+static void
+rtaApiConnection_WriteCallback(PARCEventQueue *queue, PARCEventType type, void *connVoid)
+{
+ // we dropped below the write watermark, unblock the connection in the UP direction
+ RtaConnection *conn = (RtaConnection *) connVoid;
+ if (rtaConnection_BlockedUp(conn)) {
+ if (DEBUG_OUTPUT) {
+ printf("%9" PRIu64 " %s connection %u output fell below watermark, unblocking UP\n",
+ rtaFramework_GetTicks(rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn))),
+ __func__,
+ rtaConnection_GetConnectionId(conn));
+ }
+
+ rtaConnection_ClearBlockedUp(conn);
+ }
+}