summaryrefslogtreecommitdiffstats
path: root/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c
diff options
context:
space:
mode:
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c')
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c673
1 files changed, 673 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c
new file mode 100644
index 00000000..70bcec63
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c
@@ -0,0 +1,673 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Source code layout:
+// - component_Vegas.c: the component wrapper and session multiplexing
+// - vegas_Session.c: code for a specific basename session
+// - vegas_Segment.c: code for specific segment operations
+
+/**
+ * Component behavior
+ * ===================
+ * This component provides flow-controlled in-order delivery of segmented
+ * content objects using a sequential segment number in the last component
+ * of the object name.
+ *
+ * The state machine described here is within a single RtaConnection. Separate
+ * connections are independent.
+ *
+ * Down Stack Behavior
+ * ------------------------
+ * When an interest comes down the stack, it will initiate a flow-controlled
+ * session. If the last component of the interest name is a segment number,
+ * that is the starting segment number. Otherwise, we assume the interest
+ * name is the base name for a segmented object, including the version number.
+ *
+ * Other types of messages coming down the stack (e.g. control or content objects)
+ * are passed down the stack unaltered.
+ *
+ * If an interest comes down that represents a subset of an existing flow (i.e.
+ * it has a segment number beyond the current starting segment of the flow contol
+ * window), the window is advanced to that segment number and any un-delivered
+ * content objects are dropped.
+ *
+ * If an interest comes down that represents a superset of an existing flow
+ * (i.e. it has a starting segment number less than the current window), the
+ * current flow control sessions is re-wound to the lower sequence number
+ * and continues from there.
+ *
+ * Up Stack Behavior
+ * ------------------------
+ * Non-content objects (e.g. control and interests) are passed up the stack unmodified.
+ *
+ * A content object that matches a flow control session is managed by the session.
+ * They are only passed up the stack in-order, and will be dropped if they are outside
+ * the window.
+ *
+ * A content object that does not match a flow control session is dropped. That's because
+ * the only interests we send down the stack are our own for flow controlled sessions, so
+ * no content object should go up the stack unless its part of a flow controlled session.
+ *
+ * Control Messages
+ * ------------------------
+ * The API may cancel flow control sessions in several ways:
+ *
+ * 1) Close the Connection. This will cancel all in progress sessions and drop
+ * any un-delivered objects.
+ *
+ * 2) Send a Control message down the stack with the base name to cancel. The
+ * name is considered the base name of the flow and does not depend on the
+ * starting segment number.
+ *
+ * { "CPI_CANCEL_FLOW" : { "FLOW_NAME" : <base name w/o segment number> } }
+ *
+ * Implementation Notes
+ * =========================
+ * For each RtaConnection, there's a {@code struct fc_connection_state}. This
+ * contains a list of in-progress sessions indexed by the hash of the base name
+ * (name up to but not including final segment). Right now, it's a linked list
+ * but should be implemented as a hash table.
+ *
+ * Each session is represented by a {@code struct fc_session}.
+ *
+ * Each entry in the flow control window is a {@code fc_window_entry}.
+ *
+ * session->window_head and session->window_tail define the limits of the
+ * congestion window. Everything in the interval [head, tail) is expressed
+ * as an interest. The size of that interval may be larger than the
+ * congestion window cwnd if we're decreaed the window. We never decrease
+ * tail, only the cwnd.
+ *
+ *
+ * Flow Control Algorithm
+ * =========================
+ * Based on TCP Vegas. Please read the Vegas paper. We use similar
+ * variable names to the paper. Code looks quite a bit like the linux
+ * tcp_vegas.c too.
+ *
+ * Here's the differences. In CCN, an Interest is like an ACK token, it
+ * gives the network permission to send. The node issuing Interests needs
+ * to pace them to not exceed the network capacity. This is done by
+ * observing the delay of Content Objects. If the delay grows too quickly,
+ * then we back off linearly. If the delay is not much above what we expected
+ * based on the minimum observed delay, we increase linearly.
+ *
+ * During slow start, the interest window (still called "cwnd") doubles
+ * every other RTT until we exceed the slow_start_threshold or the delay
+ * increases too much.
+ *
+ * The RTT is calculated every RTT based on the observed minimum RTT during
+ * the previous period.
+ *
+ * We use RFC6298 Retransmission Timeout (RTO) calculation methods per
+ * flow control session (object basename).
+ *
+ * Just to be clear, there are two timers working. The RTO timer is for
+ * retransmitting interests if the flow as stalled out. The Vegas RTT
+ * calculation is for congestion window calculations.
+ *
+ * We we receive an out-of-order content object, we'll check the earlier
+ * segments to see if they have passed the Vegas RTT. If so, we'll
+ * re-express the interests.
+ *
+ * Each time we re-express an Interest, we might decrese the congestion
+ * window. If the last time the interest was sent was more recent than
+ * the last time we decreased the congestion window, we'll decrease the
+ * congestion window. If the last expression of the interest was before
+ * the most recent window decrease, the window is left alone. This means
+ * we'll only decreae the window once per re-expression.
+ */
+#include <config.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <limits.h>
+#include <sys/queue.h>
+#include <stdbool.h>
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <LongBow/runtime.h>
+
+#include <parc/algol/parc_Memory.h>
+
+#include <parc/algol/parc_EventQueue.h>
+
+#include <ccnx/transport/common/transport_Message.h>
+#include <ccnx/transport/transport_rta/core/rta_Framework.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h>
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+#include <ccnx/transport/transport_rta/components/component_Flowcontrol.h>
+#include <ccnx/transport/test_tools/traffic_tools.h>
+
+#include <ccnx/api/control/controlPlaneInterface.h>
+#include <ccnx/api/control/cpi_ControlFacade.h>
+
+#include "vegas_private.h"
+
+#include <parc/logging/parc_LogLevel.h>
+
+#ifndef DEBUG_OUTPUT
+#define DEBUG_OUTPUT 0
+#endif
+
+// ===========================================================
+
+typedef struct fc_session_holder {
+ uint64_t basename_hash;
+ CCNxName *basename;
+ VegasSession *session;
+
+ // used by fc_connection_state to hold these
+ // Should change to hashtable on the hash
+ TAILQ_ENTRY(fc_session_holder) list;
+} FcSessionHolder;
+
+/**
+ * This is the per-connection state. It allows us to have multiple
+ * flow control session on one connection for different names
+ */
+struct vegas_connection_state {
+ RtaConnection *parent_connection;
+ RtaFramework *parent_framework;
+
+ TAILQ_HEAD(, fc_session_holder) sessions_head;
+};
+
+
+// ===========================================================
+
+static int component_Fc_Vegas_Init(RtaProtocolStack *stack);
+static int component_Fc_Vegas_Opener(RtaConnection *conn);
+static void component_Fc_Vegas_Upcall_Read(PARCEventQueue *, PARCEventType event, void *conn);
+static void component_Fc_Vegas_Downcall_Read(PARCEventQueue *, PARCEventType event, void *conn);
+static int component_Fc_Vegas_Closer(RtaConnection *conn);
+static int component_Fc_Vegas_Release(RtaProtocolStack *stack);
+static void component_Fc_Vegas_StateChange(RtaConnection *conn);
+
+// Function structs for component variations
+RtaComponentOperations flow_vegas_ops = {
+ .init = component_Fc_Vegas_Init,
+ .open = component_Fc_Vegas_Opener,
+ .upcallRead = component_Fc_Vegas_Upcall_Read,
+ .upcallEvent = NULL,
+ .downcallRead = component_Fc_Vegas_Downcall_Read,
+ .downcallEvent = NULL,
+ .close = component_Fc_Vegas_Closer,
+ .release = component_Fc_Vegas_Release,
+ .stateChange = component_Fc_Vegas_StateChange
+};
+
+
+// ======
+// Session related functions
+static int vegas_HandleInterest(RtaConnection *conn, TransportMessage *tm);
+static FcSessionHolder *vegas_LookupSession(VegasConnectionState *fc, TransportMessage *tm);
+static FcSessionHolder *vegas_LookupSessionByName(VegasConnectionState *fc, CCNxName *name);
+
+static FcSessionHolder *vegas_CreateSessionHolder(VegasConnectionState *fc, RtaConnection *conn,
+ CCNxName *basename, uint64_t name_hash);
+
+static bool vegas_HandleControl(RtaConnection *conn, CCNxTlvDictionary *controlDictionary, PARCEventQueue *outputQueue);
+
+// ================================================
+
+static int
+component_Fc_Vegas_Init(RtaProtocolStack *stack)
+{
+ // we don't do any stack-wide initialization
+ return 0;
+}
+
+static int
+component_Fc_Vegas_Opener(RtaConnection *conn)
+{
+ struct vegas_connection_state *fcConnState = parcMemory_AllocateAndClear(sizeof(struct vegas_connection_state));
+ assertNotNull(fcConnState, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(struct vegas_connection_state));
+
+ fcConnState->parent_connection = rtaConnection_Copy(conn);
+ fcConnState->parent_framework = rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn));
+
+ TAILQ_INIT(&fcConnState->sessions_head);
+
+ rtaConnection_SetPrivateData(conn, FC_VEGAS, fcConnState);
+ rtaComponentStats_Increment(rtaConnection_GetStats(conn, FC_VEGAS), STATS_OPENS);
+
+ return 0;
+}
+
+/*
+ * Read from below.
+ * These should only be content objects associated with our stream.
+ *
+ * Non-content objects are passed up the stack.
+ */
+static void
+component_Fc_Vegas_Upcall_Read(PARCEventQueue *in, PARCEventType event, void *stack_ptr)
+{
+ TransportMessage *tm;
+
+ while ((tm = rtaComponent_GetMessage(in)) != NULL) {
+ struct timeval delay = transportMessage_GetDelay(tm);
+
+ RtaConnection *conn = rtaConnection_GetFromTransport(tm);
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FC_VEGAS);
+
+ rtaComponentStats_Increment(stats, STATS_UPCALL_IN);
+
+ if (transportMessage_IsControl(tm)) {
+ PARCEventQueue *out = rtaComponent_GetOutputQueue(conn, FC_VEGAS, RTA_UP);
+
+ if (rtaComponent_PutMessage(out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ } else {
+ //TODO
+ }
+ } else if (transportMessage_IsContentObject(tm)) {
+ // this takes ownership of the transport message
+ VegasConnectionState *fc = rtaConnection_GetPrivateData(conn, FC_VEGAS);
+ FcSessionHolder *holder = vegas_LookupSession(fc, tm);
+
+ // it's quite possible that we get content objects for sessions that
+ // no longer exist. They are dropped.
+ if (holder != NULL) {
+ vegasSession_ReceiveContentObject(holder->session, tm);
+ } else {
+ transportMessage_Destroy(&tm);
+ }
+ } else {
+ PARCEventQueue *out = rtaComponent_GetOutputQueue(conn, FC_VEGAS, RTA_UP);
+ if (rtaComponent_PutMessage(out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ } else {
+ //TODO
+ }
+ }
+
+ if (DEBUG_OUTPUT) {
+ printf("%s total upcall reads in %" PRIu64 " out %" PRIu64 " last delay %.6f\n",
+ __func__,
+ rtaComponentStats_Get(stats, STATS_UPCALL_IN),
+ rtaComponentStats_Get(stats, STATS_UPCALL_OUT),
+ delay.tv_sec + delay.tv_usec * 1E-6);
+ }
+ }
+}
+
+static void
+component_Fc_Vegas_Downcall_Read(PARCEventQueue *in, PARCEventType event, void *conn)
+{
+ RtaProtocolStack *stack = (RtaProtocolStack *) conn;
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FC_VEGAS, RTA_DOWN);
+ TransportMessage *tm;
+
+// printf("%s reading from queue %p\n", __func__, in);
+
+ while ((tm = rtaComponent_GetMessage(in)) != NULL) {
+ RtaConnection *conn = rtaConnection_GetFromTransport(tm);
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FC_VEGAS);
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN);
+
+ if (transportMessage_IsControl(tm)) {
+ CCNxTlvDictionary *controlDictionary = transportMessage_GetDictionary(tm);
+ if (ccnxControlFacade_IsCPI(controlDictionary) && vegas_HandleControl(conn, controlDictionary, in)) {
+ transportMessage_Destroy(&tm);
+ } else {
+ // we did not consume the message, so forward it down
+ if (rtaComponent_PutMessage(out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT);
+ }
+ }
+ } else if (transportMessage_IsInterest(tm)) {
+ vegas_HandleInterest(conn, tm);
+
+ // The flow controller consumes Interests going down the stack and will
+ // start issuing its own interests instead.
+ transportMessage_Destroy(&tm);
+ } else {
+ if (rtaComponent_PutMessage(out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT);
+ }
+ }
+
+ if (DEBUG_OUTPUT) {
+ struct timeval delay = tm ? transportMessage_GetDelay(tm) : (struct timeval) { 0, 0 };
+ printf("%s total downcall reads in %" PRIu64 " out %" PRIu64 " last delay %.6f\n",
+ __func__,
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_IN),
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT),
+ delay.tv_sec + delay.tv_usec * 1E-6);
+ }
+ }
+}
+
+static int
+component_Fc_Vegas_Closer(RtaConnection *conn)
+{
+ VegasConnectionState *fcConnState;
+
+ assertNotNull(conn, "Got null connection\n");
+ if (conn == NULL) {
+ return -1;
+ }
+
+ fcConnState = rtaConnection_GetPrivateData(conn, FC_VEGAS);
+
+ assertNotNull(fcConnState, "could not retrieve private data for FC_VEGAS on connid %u\n",
+ rtaConnection_GetConnectionId(conn));
+ if (fcConnState == NULL) {
+ return -1;
+ }
+
+ rtaConnection_Destroy(&fcConnState->parent_connection);
+
+ rtaComponentStats_Increment(rtaConnection_GetStats(conn, FC_VEGAS), STATS_CLOSES);
+
+ // close down all the sessions
+ while (!TAILQ_EMPTY(&fcConnState->sessions_head)) {
+ FcSessionHolder *holder = TAILQ_FIRST(&fcConnState->sessions_head);
+
+ vegasSession_Destroy(&holder->session);
+
+ TAILQ_REMOVE(&fcConnState->sessions_head, holder, list);
+ parcMemory_Deallocate((void **) &holder);
+ }
+
+ parcMemory_Deallocate((void **) &fcConnState);
+
+ return 0;
+}
+
+static int
+component_Fc_Vegas_Release(RtaProtocolStack *stack)
+{
+ // no stack-wide memory
+ return 0;
+}
+
+static void
+component_Fc_Vegas_StateChange(RtaConnection *conn)
+{
+ assertNotNull(conn, "Got null connection\n");
+
+ VegasConnectionState *fcConnState = rtaConnection_GetPrivateData(conn, FC_VEGAS);
+ assertNotNull(fcConnState, "could not retrieve private data for FC_VEGAS on connid %u\n",
+ rtaConnection_GetConnectionId(conn));
+
+ // should replace this with a hash table
+ FcSessionHolder *holder;
+ TAILQ_FOREACH(holder, &fcConnState->sessions_head, list)
+ {
+ if (vegasSession_GetConnectionId(holder->session) == rtaConnection_GetConnectionId(conn)) {
+ vegasSession_StateChanged(holder->session);
+ }
+ }
+}
+
+// =======================================================================
+
+/**
+ * If the last component is a segment number, it is ignored
+ */
+static FcSessionHolder *
+vegas_LookupSessionByName(VegasConnectionState *fc, CCNxName *name)
+{
+ uint64_t hash;
+ FcSessionHolder *holder;
+ int trim_segnum = 0;
+
+ assertNotNull(name, "Name is null\n");
+ if (name == NULL) {
+ return NULL;
+ }
+
+ size_t segmentCount = ccnxName_GetSegmentCount(name);
+ assertTrue(segmentCount > 1,
+ "expected name with at least 2 components, but only got %zu, name = '%s'\n",
+ segmentCount,
+ ccnxName_ToString(name));
+
+
+ if (segmentCount > 0) {
+ CCNxNameSegment *segment = ccnxName_GetSegment(name, segmentCount - 1);
+ if (ccnxNameSegment_GetType(segment) == CCNxNameLabelType_CHUNK) {
+ trim_segnum = 1;
+ }
+ }
+
+ hash = ccnxName_LeftMostHashCode(name, segmentCount - trim_segnum);
+
+ if (DEBUG_OUTPUT) {
+ printf("%s name %p hash %16" PRIX64 "\n", __func__, (void *) name, hash);
+ ccnxName_Display(name, 0);
+ }
+
+ // should replace this with a hash table
+ TAILQ_FOREACH(holder, &fc->sessions_head, list)
+ {
+ if (holder->basename_hash == hash) {
+ return holder;
+ }
+ }
+
+ return NULL;
+}
+
+/*
+ * Precondition: only called for Content Objects.
+ * If the last component is a segment number, it is ignored
+ *
+ * Match the name of the content object to an active flow control session,
+ * or return NULL if not found.
+ */
+static FcSessionHolder *
+vegas_LookupSession(VegasConnectionState *fc, TransportMessage *tm)
+{
+ assertTrue(transportMessage_IsContentObject(tm),
+ "Transport message is not a ContentObject\n");
+
+ CCNxTlvDictionary *contentObjectDictionary = transportMessage_GetDictionary(tm);
+ CCNxName *name = ccnxContentObject_GetName(contentObjectDictionary);
+
+ return vegas_LookupSessionByName(fc, name);
+}
+
+// =============================================
+
+/*
+ * Precondition: it's an interest
+ */
+static int
+vegas_HandleInterest(RtaConnection *conn, TransportMessage *tm)
+{
+ assertTrue(transportMessage_IsInterest(tm), "Transport message is not an interest");
+
+ VegasConnectionState *fc = rtaConnection_GetPrivateData(conn, FC_VEGAS);
+ CCNxTlvDictionary *interestDictionary = transportMessage_GetDictionary(tm);
+
+ // we do not modify or destroy this name
+ CCNxName *original_name = ccnxInterest_GetName(interestDictionary);
+
+ CCNxName *basename = ccnxName_Copy(original_name);
+
+ if (DEBUG_OUTPUT) {
+ printf("%s orig name %p basename %p\n", __func__, (void *) original_name, (void *) basename);
+ }
+
+ // can we decode the last component as a segment number?
+ uint64_t segnum = 0;
+ bool segnum_found = trafficTools_GetObjectSegmentFromName(basename, &segnum);
+ if (segnum_found) {
+ // it is a segment number
+ ccnxName_Trim(basename, 1);
+ }
+
+ FcSessionHolder *holder = vegas_LookupSessionByName(fc, basename);
+
+ if (holder == NULL) {
+ // create a new session
+ // This takes ownership of the basename
+ uint64_t name_hash = ccnxName_HashCode(basename);
+ holder = vegas_CreateSessionHolder(fc, conn, basename, name_hash);
+
+ CCNxInterestInterface *interestImpl = ccnxInterestInterface_GetInterface(interestDictionary);
+
+ uint32_t lifetime = ccnxInterest_GetLifetime(interestDictionary);
+
+ PARCBuffer *keyIdRestriction = ccnxInterest_GetKeyIdRestriction(interestDictionary); // might be NULL
+
+ holder->session = vegasSession_Create(fc, conn, basename, segnum,
+ interestImpl, lifetime, keyIdRestriction);
+
+ vegasSession_Start(holder->session);
+
+ rtaConnection_SendStatus(conn,
+ FC_VEGAS,
+ RTA_UP,
+ notifyStatusCode_FLOW_CONTROL_STARTED,
+ original_name,
+ NULL);
+ } else {
+ assertTrue(segnum_found, "Duplicate interest w/o segnum for existing session");
+
+ if (segnum_found) {
+ vegasSession_Seek(holder->session, segnum);
+ }
+
+ ccnxName_Release(&basename);
+ }
+
+ return 0;
+}
+
+static FcSessionHolder *
+vegas_CreateSessionHolder(VegasConnectionState *fc, RtaConnection *conn, CCNxName *basename, uint64_t name_hash)
+{
+ FcSessionHolder *holder = parcMemory_AllocateAndClear(sizeof(FcSessionHolder));
+ assertNotNull(holder, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(FcSessionHolder));
+ holder->basename_hash = name_hash;
+ holder->basename = basename;
+ holder->session = NULL;
+
+ TAILQ_INSERT_TAIL(&fc->sessions_head, holder, list);
+
+ if (DEBUG_OUTPUT) {
+ printf("%s created holder %p hash %016" PRIX64 "\n", __func__, (void *) holder, holder->basename_hash);
+ }
+ return holder;
+}
+
+/**
+ * This is called by a session when it is done
+ */
+void
+vegas_EndSession(VegasConnectionState *fc, VegasSession *session)
+{
+ FcSessionHolder *holder;
+
+ // should replace this with a hash table
+ TAILQ_FOREACH(holder, &fc->sessions_head, list)
+ {
+ if (holder->session == session) {
+ TAILQ_REMOVE(&fc->sessions_head, holder, list);
+ break;
+ }
+ }
+
+ assertNotNull(holder, "invalid state, got null holder");
+
+ rtaConnection_SendStatus(fc->parent_connection,
+ FC_VEGAS,
+ RTA_UP,
+ notifyStatusCode_FLOW_CONTROL_FINISHED,
+ holder->basename,
+ NULL);
+
+ vegasSession_Destroy(&holder->session);
+ parcMemory_Deallocate((void **) &holder);
+}
+
+static void
+vegas_SendControlPlaneResponse(RtaConnection *conn, CCNxTlvDictionary *controlDictionary, PARCEventQueue *outputQueue)
+{
+ TransportMessage *tm = transportMessage_CreateFromDictionary(controlDictionary);
+
+ transportMessage_SetInfo(tm, rtaConnection_Copy(conn), rtaConnection_FreeFunc);
+
+ if (rtaComponent_PutMessage(outputQueue, tm)) {
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FC_VEGAS);
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ }
+}
+
+/**
+ * @function vegas_HandleControl
+ * @abstract Process CPI reqeusts
+ * @discussion
+ * <#Discussion#>
+ *
+ * @param <#param1#>
+ * @return true if we consumed the message, false if it should go down the stack
+ */
+static bool
+vegas_HandleControl(RtaConnection *conn, CCNxTlvDictionary *controlDictionary, PARCEventQueue *outputQueue)
+{
+ bool success = false;
+
+ if (ccnxControlFacade_IsCPI(controlDictionary)) {
+ PARCJSON *json = ccnxControlFacade_GetJson(controlDictionary);
+ if (cpi_getCPIOperation2(json) == CPI_CANCEL_FLOW) {
+ VegasConnectionState *fc = rtaConnection_GetPrivateData(conn, FC_VEGAS);
+ CCNxName *name = cpiCancelFlow_GetFlowName(json);
+
+ PARCJSON *reply = NULL;
+ FcSessionHolder *holder = vegas_LookupSessionByName(fc, name);
+ if (holder != NULL) {
+ if (DEBUG_OUTPUT) {
+ char *string = ccnxName_ToString(name);
+ printf("%s Cancelling flow %s\n", __func__, string);
+ parcMemory_Deallocate((void **) &string);
+ }
+
+ TAILQ_REMOVE(&fc->sessions_head, holder, list);
+ vegasSession_Destroy(&holder->session);
+ parcMemory_Deallocate((void **) &holder);
+
+ reply = cpiAcks_CreateAck(json);
+ } else {
+ if (DEBUG_OUTPUT) {
+ char *string = ccnxName_ToString(name);
+ printf("%s got request to cancel unknown flow %s\n", __func__, string);
+ parcMemory_Deallocate((void **) &string);
+ }
+
+ reply = cpiAcks_CreateNack(json);
+ }
+ CCNxTlvDictionary *response = ccnxControlFacade_CreateCPI(reply);
+ vegas_SendControlPlaneResponse(conn, response, outputQueue);
+ ccnxTlvDictionary_Release(&response);
+
+ parcJSON_Release(&reply);
+ ccnxName_Release(&name);
+
+ // we consume it
+ success = true;
+ }
+ }
+ return success;
+}