aboutsummaryrefslogtreecommitdiffstats
path: root/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.c
diff options
context:
space:
mode:
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.c')
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.c127
1 files changed, 127 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.c b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.c
new file mode 100644
index 00000000..4be0c085
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/core/rta_Component.c
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <LongBow/runtime.h>
+
+#include <parc/algol/parc_EventBuffer.h>
+
+#include <ccnx/transport/common/transport_Message.h>
+#include <ccnx/transport/transport_rta/rta_Transport.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h>
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+
+
+#ifndef DEBUG_OUTPUT
+#define DEBUG_OUTPUT 0
+#endif
+
+PARCEventQueue *
+rtaComponent_GetOutputQueue(RtaConnection *conn,
+ RtaComponents component,
+ RtaDirection direction)
+{
+ RtaProtocolStack *stack;
+
+ assertNotNull(conn, "called with null connection\n");
+
+ stack = rtaConnection_GetStack(conn);
+ assertNotNull(stack, "resolved null stack\n");
+
+ return rtaProtocolStack_GetPutQueue(stack, component, direction);
+}
+
+int
+rtaComponent_PutMessage(PARCEventQueue *queue, TransportMessage *tm)
+{
+ RtaConnection *conn = rtaConnection_GetFromTransport(tm);
+ assertNotNull(conn, "Got null connection from transport message\n");
+
+ if (rtaConnection_GetState(conn) != CONN_CLOSED) {
+ PARCEventBuffer *out = parcEventBuffer_GetQueueBufferOutput(queue);
+ int res;
+
+ rtaConnection_IncrementMessagesInQueue(conn);
+
+ if (DEBUG_OUTPUT) {
+ printf("%s queue %-12s tm %p\n",
+ __func__,
+ rtaProtocolStack_GetQueueName(rtaConnection_GetStack(conn), queue),
+ (void *) tm);
+ }
+
+ res = parcEventBuffer_Append(out, (void *) &tm, sizeof(&tm));
+ assertTrue(res == 0, "%s parcEventBuffer_Append returned error\n", __func__);
+ parcEventBuffer_Destroy(&out);
+ return 1;
+ } else {
+ // drop
+ transportMessage_Destroy(&tm);
+
+ return 0;
+ }
+}
+
+TransportMessage *
+rtaComponent_GetMessage(PARCEventQueue *queue)
+{
+ PARCEventBuffer *in = parcEventBuffer_GetQueueBufferInput(queue);
+
+ while (parcEventBuffer_GetLength(in) >= sizeof(TransportMessage *)) {
+ ssize_t len;
+ TransportMessage *tm;
+ RtaConnection *conn;
+
+ len = parcEventBuffer_Read(in, (void *) &tm, sizeof(&tm));
+
+ assertTrue(len == sizeof(TransportMessage *),
+ "parcEventBuffer_Read returned error");
+
+ // Is the transport message for an open connection?
+ conn = rtaConnection_GetFromTransport(tm);
+ assertNotNull(conn, "%s GetInfo returnd null connection\n", __func__);
+
+ if (DEBUG_OUTPUT) {
+ printf("%s queue %-12s tm %p\n",
+ __func__,
+ rtaProtocolStack_GetQueueName(rtaConnection_GetStack(conn), queue),
+ (void *) tm);
+ }
+
+ (void) rtaConnection_DecrementMessagesInQueue(conn);
+
+ if (rtaConnection_GetState(conn) != CONN_CLOSED) {
+ parcEventBuffer_Destroy(&in);
+ return tm;
+ }
+
+ // it's a closed connection
+
+ if (DEBUG_OUTPUT) {
+ printf("%s clearing connection %p reference in transport\n",
+ __func__, (void *) conn);
+ }
+ //drop
+ transportMessage_Destroy(&tm);
+ }
+
+ parcEventBuffer_Destroy(&in);
+ return NULL;
+}