aboutsummaryrefslogtreecommitdiffstats
path: root/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas
diff options
context:
space:
mode:
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas')
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c673
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_component_Vegas.c696
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_vegas_Session.c672
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_Session.c1379
-rw-r--r--libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_private.h222
5 files changed, 3642 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c
new file mode 100644
index 00000000..70bcec63
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c
@@ -0,0 +1,673 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Source code layout:
+// - component_Vegas.c: the component wrapper and session multiplexing
+// - vegas_Session.c: code for a specific basename session
+// - vegas_Segment.c: code for specific segment operations
+
+/**
+ * Component behavior
+ * ===================
+ * This component provides flow-controlled in-order delivery of segmented
+ * content objects using a sequential segment number in the last component
+ * of the object name.
+ *
+ * The state machine described here is within a single RtaConnection. Separate
+ * connections are independent.
+ *
+ * Down Stack Behavior
+ * ------------------------
+ * When an interest comes down the stack, it will initiate a flow-controlled
+ * session. If the last component of the interest name is a segment number,
+ * that is the starting segment number. Otherwise, we assume the interest
+ * name is the base name for a segmented object, including the version number.
+ *
+ * Other types of messages coming down the stack (e.g. control or content objects)
+ * are passed down the stack unaltered.
+ *
+ * If an interest comes down that represents a subset of an existing flow (i.e.
+ * it has a segment number beyond the current starting segment of the flow contol
+ * window), the window is advanced to that segment number and any un-delivered
+ * content objects are dropped.
+ *
+ * If an interest comes down that represents a superset of an existing flow
+ * (i.e. it has a starting segment number less than the current window), the
+ * current flow control sessions is re-wound to the lower sequence number
+ * and continues from there.
+ *
+ * Up Stack Behavior
+ * ------------------------
+ * Non-content objects (e.g. control and interests) are passed up the stack unmodified.
+ *
+ * A content object that matches a flow control session is managed by the session.
+ * They are only passed up the stack in-order, and will be dropped if they are outside
+ * the window.
+ *
+ * A content object that does not match a flow control session is dropped. That's because
+ * the only interests we send down the stack are our own for flow controlled sessions, so
+ * no content object should go up the stack unless its part of a flow controlled session.
+ *
+ * Control Messages
+ * ------------------------
+ * The API may cancel flow control sessions in several ways:
+ *
+ * 1) Close the Connection. This will cancel all in progress sessions and drop
+ * any un-delivered objects.
+ *
+ * 2) Send a Control message down the stack with the base name to cancel. The
+ * name is considered the base name of the flow and does not depend on the
+ * starting segment number.
+ *
+ * { "CPI_CANCEL_FLOW" : { "FLOW_NAME" : <base name w/o segment number> } }
+ *
+ * Implementation Notes
+ * =========================
+ * For each RtaConnection, there's a {@code struct fc_connection_state}. This
+ * contains a list of in-progress sessions indexed by the hash of the base name
+ * (name up to but not including final segment). Right now, it's a linked list
+ * but should be implemented as a hash table.
+ *
+ * Each session is represented by a {@code struct fc_session}.
+ *
+ * Each entry in the flow control window is a {@code fc_window_entry}.
+ *
+ * session->window_head and session->window_tail define the limits of the
+ * congestion window. Everything in the interval [head, tail) is expressed
+ * as an interest. The size of that interval may be larger than the
+ * congestion window cwnd if we're decreaed the window. We never decrease
+ * tail, only the cwnd.
+ *
+ *
+ * Flow Control Algorithm
+ * =========================
+ * Based on TCP Vegas. Please read the Vegas paper. We use similar
+ * variable names to the paper. Code looks quite a bit like the linux
+ * tcp_vegas.c too.
+ *
+ * Here's the differences. In CCN, an Interest is like an ACK token, it
+ * gives the network permission to send. The node issuing Interests needs
+ * to pace them to not exceed the network capacity. This is done by
+ * observing the delay of Content Objects. If the delay grows too quickly,
+ * then we back off linearly. If the delay is not much above what we expected
+ * based on the minimum observed delay, we increase linearly.
+ *
+ * During slow start, the interest window (still called "cwnd") doubles
+ * every other RTT until we exceed the slow_start_threshold or the delay
+ * increases too much.
+ *
+ * The RTT is calculated every RTT based on the observed minimum RTT during
+ * the previous period.
+ *
+ * We use RFC6298 Retransmission Timeout (RTO) calculation methods per
+ * flow control session (object basename).
+ *
+ * Just to be clear, there are two timers working. The RTO timer is for
+ * retransmitting interests if the flow as stalled out. The Vegas RTT
+ * calculation is for congestion window calculations.
+ *
+ * We we receive an out-of-order content object, we'll check the earlier
+ * segments to see if they have passed the Vegas RTT. If so, we'll
+ * re-express the interests.
+ *
+ * Each time we re-express an Interest, we might decrese the congestion
+ * window. If the last time the interest was sent was more recent than
+ * the last time we decreased the congestion window, we'll decrease the
+ * congestion window. If the last expression of the interest was before
+ * the most recent window decrease, the window is left alone. This means
+ * we'll only decreae the window once per re-expression.
+ */
+#include <config.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <limits.h>
+#include <sys/queue.h>
+#include <stdbool.h>
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <LongBow/runtime.h>
+
+#include <parc/algol/parc_Memory.h>
+
+#include <parc/algol/parc_EventQueue.h>
+
+#include <ccnx/transport/common/transport_Message.h>
+#include <ccnx/transport/transport_rta/core/rta_Framework.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h>
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+#include <ccnx/transport/transport_rta/components/component_Flowcontrol.h>
+#include <ccnx/transport/test_tools/traffic_tools.h>
+
+#include <ccnx/api/control/controlPlaneInterface.h>
+#include <ccnx/api/control/cpi_ControlFacade.h>
+
+#include "vegas_private.h"
+
+#include <parc/logging/parc_LogLevel.h>
+
+#ifndef DEBUG_OUTPUT
+#define DEBUG_OUTPUT 0
+#endif
+
+// ===========================================================
+
+typedef struct fc_session_holder {
+ uint64_t basename_hash;
+ CCNxName *basename;
+ VegasSession *session;
+
+ // used by fc_connection_state to hold these
+ // Should change to hashtable on the hash
+ TAILQ_ENTRY(fc_session_holder) list;
+} FcSessionHolder;
+
+/**
+ * This is the per-connection state. It allows us to have multiple
+ * flow control session on one connection for different names
+ */
+struct vegas_connection_state {
+ RtaConnection *parent_connection;
+ RtaFramework *parent_framework;
+
+ TAILQ_HEAD(, fc_session_holder) sessions_head;
+};
+
+
+// ===========================================================
+
+static int component_Fc_Vegas_Init(RtaProtocolStack *stack);
+static int component_Fc_Vegas_Opener(RtaConnection *conn);
+static void component_Fc_Vegas_Upcall_Read(PARCEventQueue *, PARCEventType event, void *conn);
+static void component_Fc_Vegas_Downcall_Read(PARCEventQueue *, PARCEventType event, void *conn);
+static int component_Fc_Vegas_Closer(RtaConnection *conn);
+static int component_Fc_Vegas_Release(RtaProtocolStack *stack);
+static void component_Fc_Vegas_StateChange(RtaConnection *conn);
+
+// Function structs for component variations
+RtaComponentOperations flow_vegas_ops = {
+ .init = component_Fc_Vegas_Init,
+ .open = component_Fc_Vegas_Opener,
+ .upcallRead = component_Fc_Vegas_Upcall_Read,
+ .upcallEvent = NULL,
+ .downcallRead = component_Fc_Vegas_Downcall_Read,
+ .downcallEvent = NULL,
+ .close = component_Fc_Vegas_Closer,
+ .release = component_Fc_Vegas_Release,
+ .stateChange = component_Fc_Vegas_StateChange
+};
+
+
+// ======
+// Session related functions
+static int vegas_HandleInterest(RtaConnection *conn, TransportMessage *tm);
+static FcSessionHolder *vegas_LookupSession(VegasConnectionState *fc, TransportMessage *tm);
+static FcSessionHolder *vegas_LookupSessionByName(VegasConnectionState *fc, CCNxName *name);
+
+static FcSessionHolder *vegas_CreateSessionHolder(VegasConnectionState *fc, RtaConnection *conn,
+ CCNxName *basename, uint64_t name_hash);
+
+static bool vegas_HandleControl(RtaConnection *conn, CCNxTlvDictionary *controlDictionary, PARCEventQueue *outputQueue);
+
+// ================================================
+
+static int
+component_Fc_Vegas_Init(RtaProtocolStack *stack)
+{
+ // we don't do any stack-wide initialization
+ return 0;
+}
+
+static int
+component_Fc_Vegas_Opener(RtaConnection *conn)
+{
+ struct vegas_connection_state *fcConnState = parcMemory_AllocateAndClear(sizeof(struct vegas_connection_state));
+ assertNotNull(fcConnState, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(struct vegas_connection_state));
+
+ fcConnState->parent_connection = rtaConnection_Copy(conn);
+ fcConnState->parent_framework = rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn));
+
+ TAILQ_INIT(&fcConnState->sessions_head);
+
+ rtaConnection_SetPrivateData(conn, FC_VEGAS, fcConnState);
+ rtaComponentStats_Increment(rtaConnection_GetStats(conn, FC_VEGAS), STATS_OPENS);
+
+ return 0;
+}
+
+/*
+ * Read from below.
+ * These should only be content objects associated with our stream.
+ *
+ * Non-content objects are passed up the stack.
+ */
+static void
+component_Fc_Vegas_Upcall_Read(PARCEventQueue *in, PARCEventType event, void *stack_ptr)
+{
+ TransportMessage *tm;
+
+ while ((tm = rtaComponent_GetMessage(in)) != NULL) {
+ struct timeval delay = transportMessage_GetDelay(tm);
+
+ RtaConnection *conn = rtaConnection_GetFromTransport(tm);
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FC_VEGAS);
+
+ rtaComponentStats_Increment(stats, STATS_UPCALL_IN);
+
+ if (transportMessage_IsControl(tm)) {
+ PARCEventQueue *out = rtaComponent_GetOutputQueue(conn, FC_VEGAS, RTA_UP);
+
+ if (rtaComponent_PutMessage(out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ } else {
+ //TODO
+ }
+ } else if (transportMessage_IsContentObject(tm)) {
+ // this takes ownership of the transport message
+ VegasConnectionState *fc = rtaConnection_GetPrivateData(conn, FC_VEGAS);
+ FcSessionHolder *holder = vegas_LookupSession(fc, tm);
+
+ // it's quite possible that we get content objects for sessions that
+ // no longer exist. They are dropped.
+ if (holder != NULL) {
+ vegasSession_ReceiveContentObject(holder->session, tm);
+ } else {
+ transportMessage_Destroy(&tm);
+ }
+ } else {
+ PARCEventQueue *out = rtaComponent_GetOutputQueue(conn, FC_VEGAS, RTA_UP);
+ if (rtaComponent_PutMessage(out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ } else {
+ //TODO
+ }
+ }
+
+ if (DEBUG_OUTPUT) {
+ printf("%s total upcall reads in %" PRIu64 " out %" PRIu64 " last delay %.6f\n",
+ __func__,
+ rtaComponentStats_Get(stats, STATS_UPCALL_IN),
+ rtaComponentStats_Get(stats, STATS_UPCALL_OUT),
+ delay.tv_sec + delay.tv_usec * 1E-6);
+ }
+ }
+}
+
+static void
+component_Fc_Vegas_Downcall_Read(PARCEventQueue *in, PARCEventType event, void *conn)
+{
+ RtaProtocolStack *stack = (RtaProtocolStack *) conn;
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FC_VEGAS, RTA_DOWN);
+ TransportMessage *tm;
+
+// printf("%s reading from queue %p\n", __func__, in);
+
+ while ((tm = rtaComponent_GetMessage(in)) != NULL) {
+ RtaConnection *conn = rtaConnection_GetFromTransport(tm);
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FC_VEGAS);
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN);
+
+ if (transportMessage_IsControl(tm)) {
+ CCNxTlvDictionary *controlDictionary = transportMessage_GetDictionary(tm);
+ if (ccnxControlFacade_IsCPI(controlDictionary) && vegas_HandleControl(conn, controlDictionary, in)) {
+ transportMessage_Destroy(&tm);
+ } else {
+ // we did not consume the message, so forward it down
+ if (rtaComponent_PutMessage(out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT);
+ }
+ }
+ } else if (transportMessage_IsInterest(tm)) {
+ vegas_HandleInterest(conn, tm);
+
+ // The flow controller consumes Interests going down the stack and will
+ // start issuing its own interests instead.
+ transportMessage_Destroy(&tm);
+ } else {
+ if (rtaComponent_PutMessage(out, tm)) {
+ rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT);
+ }
+ }
+
+ if (DEBUG_OUTPUT) {
+ struct timeval delay = tm ? transportMessage_GetDelay(tm) : (struct timeval) { 0, 0 };
+ printf("%s total downcall reads in %" PRIu64 " out %" PRIu64 " last delay %.6f\n",
+ __func__,
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_IN),
+ rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT),
+ delay.tv_sec + delay.tv_usec * 1E-6);
+ }
+ }
+}
+
+static int
+component_Fc_Vegas_Closer(RtaConnection *conn)
+{
+ VegasConnectionState *fcConnState;
+
+ assertNotNull(conn, "Got null connection\n");
+ if (conn == NULL) {
+ return -1;
+ }
+
+ fcConnState = rtaConnection_GetPrivateData(conn, FC_VEGAS);
+
+ assertNotNull(fcConnState, "could not retrieve private data for FC_VEGAS on connid %u\n",
+ rtaConnection_GetConnectionId(conn));
+ if (fcConnState == NULL) {
+ return -1;
+ }
+
+ rtaConnection_Destroy(&fcConnState->parent_connection);
+
+ rtaComponentStats_Increment(rtaConnection_GetStats(conn, FC_VEGAS), STATS_CLOSES);
+
+ // close down all the sessions
+ while (!TAILQ_EMPTY(&fcConnState->sessions_head)) {
+ FcSessionHolder *holder = TAILQ_FIRST(&fcConnState->sessions_head);
+
+ vegasSession_Destroy(&holder->session);
+
+ TAILQ_REMOVE(&fcConnState->sessions_head, holder, list);
+ parcMemory_Deallocate((void **) &holder);
+ }
+
+ parcMemory_Deallocate((void **) &fcConnState);
+
+ return 0;
+}
+
+static int
+component_Fc_Vegas_Release(RtaProtocolStack *stack)
+{
+ // no stack-wide memory
+ return 0;
+}
+
+static void
+component_Fc_Vegas_StateChange(RtaConnection *conn)
+{
+ assertNotNull(conn, "Got null connection\n");
+
+ VegasConnectionState *fcConnState = rtaConnection_GetPrivateData(conn, FC_VEGAS);
+ assertNotNull(fcConnState, "could not retrieve private data for FC_VEGAS on connid %u\n",
+ rtaConnection_GetConnectionId(conn));
+
+ // should replace this with a hash table
+ FcSessionHolder *holder;
+ TAILQ_FOREACH(holder, &fcConnState->sessions_head, list)
+ {
+ if (vegasSession_GetConnectionId(holder->session) == rtaConnection_GetConnectionId(conn)) {
+ vegasSession_StateChanged(holder->session);
+ }
+ }
+}
+
+// =======================================================================
+
+/**
+ * If the last component is a segment number, it is ignored
+ */
+static FcSessionHolder *
+vegas_LookupSessionByName(VegasConnectionState *fc, CCNxName *name)
+{
+ uint64_t hash;
+ FcSessionHolder *holder;
+ int trim_segnum = 0;
+
+ assertNotNull(name, "Name is null\n");
+ if (name == NULL) {
+ return NULL;
+ }
+
+ size_t segmentCount = ccnxName_GetSegmentCount(name);
+ assertTrue(segmentCount > 1,
+ "expected name with at least 2 components, but only got %zu, name = '%s'\n",
+ segmentCount,
+ ccnxName_ToString(name));
+
+
+ if (segmentCount > 0) {
+ CCNxNameSegment *segment = ccnxName_GetSegment(name, segmentCount - 1);
+ if (ccnxNameSegment_GetType(segment) == CCNxNameLabelType_CHUNK) {
+ trim_segnum = 1;
+ }
+ }
+
+ hash = ccnxName_LeftMostHashCode(name, segmentCount - trim_segnum);
+
+ if (DEBUG_OUTPUT) {
+ printf("%s name %p hash %16" PRIX64 "\n", __func__, (void *) name, hash);
+ ccnxName_Display(name, 0);
+ }
+
+ // should replace this with a hash table
+ TAILQ_FOREACH(holder, &fc->sessions_head, list)
+ {
+ if (holder->basename_hash == hash) {
+ return holder;
+ }
+ }
+
+ return NULL;
+}
+
+/*
+ * Precondition: only called for Content Objects.
+ * If the last component is a segment number, it is ignored
+ *
+ * Match the name of the content object to an active flow control session,
+ * or return NULL if not found.
+ */
+static FcSessionHolder *
+vegas_LookupSession(VegasConnectionState *fc, TransportMessage *tm)
+{
+ assertTrue(transportMessage_IsContentObject(tm),
+ "Transport message is not a ContentObject\n");
+
+ CCNxTlvDictionary *contentObjectDictionary = transportMessage_GetDictionary(tm);
+ CCNxName *name = ccnxContentObject_GetName(contentObjectDictionary);
+
+ return vegas_LookupSessionByName(fc, name);
+}
+
+// =============================================
+
+/*
+ * Precondition: it's an interest
+ */
+static int
+vegas_HandleInterest(RtaConnection *conn, TransportMessage *tm)
+{
+ assertTrue(transportMessage_IsInterest(tm), "Transport message is not an interest");
+
+ VegasConnectionState *fc = rtaConnection_GetPrivateData(conn, FC_VEGAS);
+ CCNxTlvDictionary *interestDictionary = transportMessage_GetDictionary(tm);
+
+ // we do not modify or destroy this name
+ CCNxName *original_name = ccnxInterest_GetName(interestDictionary);
+
+ CCNxName *basename = ccnxName_Copy(original_name);
+
+ if (DEBUG_OUTPUT) {
+ printf("%s orig name %p basename %p\n", __func__, (void *) original_name, (void *) basename);
+ }
+
+ // can we decode the last component as a segment number?
+ uint64_t segnum = 0;
+ bool segnum_found = trafficTools_GetObjectSegmentFromName(basename, &segnum);
+ if (segnum_found) {
+ // it is a segment number
+ ccnxName_Trim(basename, 1);
+ }
+
+ FcSessionHolder *holder = vegas_LookupSessionByName(fc, basename);
+
+ if (holder == NULL) {
+ // create a new session
+ // This takes ownership of the basename
+ uint64_t name_hash = ccnxName_HashCode(basename);
+ holder = vegas_CreateSessionHolder(fc, conn, basename, name_hash);
+
+ CCNxInterestInterface *interestImpl = ccnxInterestInterface_GetInterface(interestDictionary);
+
+ uint32_t lifetime = ccnxInterest_GetLifetime(interestDictionary);
+
+ PARCBuffer *keyIdRestriction = ccnxInterest_GetKeyIdRestriction(interestDictionary); // might be NULL
+
+ holder->session = vegasSession_Create(fc, conn, basename, segnum,
+ interestImpl, lifetime, keyIdRestriction);
+
+ vegasSession_Start(holder->session);
+
+ rtaConnection_SendStatus(conn,
+ FC_VEGAS,
+ RTA_UP,
+ notifyStatusCode_FLOW_CONTROL_STARTED,
+ original_name,
+ NULL);
+ } else {
+ assertTrue(segnum_found, "Duplicate interest w/o segnum for existing session");
+
+ if (segnum_found) {
+ vegasSession_Seek(holder->session, segnum);
+ }
+
+ ccnxName_Release(&basename);
+ }
+
+ return 0;
+}
+
+static FcSessionHolder *
+vegas_CreateSessionHolder(VegasConnectionState *fc, RtaConnection *conn, CCNxName *basename, uint64_t name_hash)
+{
+ FcSessionHolder *holder = parcMemory_AllocateAndClear(sizeof(FcSessionHolder));
+ assertNotNull(holder, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(FcSessionHolder));
+ holder->basename_hash = name_hash;
+ holder->basename = basename;
+ holder->session = NULL;
+
+ TAILQ_INSERT_TAIL(&fc->sessions_head, holder, list);
+
+ if (DEBUG_OUTPUT) {
+ printf("%s created holder %p hash %016" PRIX64 "\n", __func__, (void *) holder, holder->basename_hash);
+ }
+ return holder;
+}
+
+/**
+ * This is called by a session when it is done
+ */
+void
+vegas_EndSession(VegasConnectionState *fc, VegasSession *session)
+{
+ FcSessionHolder *holder;
+
+ // should replace this with a hash table
+ TAILQ_FOREACH(holder, &fc->sessions_head, list)
+ {
+ if (holder->session == session) {
+ TAILQ_REMOVE(&fc->sessions_head, holder, list);
+ break;
+ }
+ }
+
+ assertNotNull(holder, "invalid state, got null holder");
+
+ rtaConnection_SendStatus(fc->parent_connection,
+ FC_VEGAS,
+ RTA_UP,
+ notifyStatusCode_FLOW_CONTROL_FINISHED,
+ holder->basename,
+ NULL);
+
+ vegasSession_Destroy(&holder->session);
+ parcMemory_Deallocate((void **) &holder);
+}
+
+static void
+vegas_SendControlPlaneResponse(RtaConnection *conn, CCNxTlvDictionary *controlDictionary, PARCEventQueue *outputQueue)
+{
+ TransportMessage *tm = transportMessage_CreateFromDictionary(controlDictionary);
+
+ transportMessage_SetInfo(tm, rtaConnection_Copy(conn), rtaConnection_FreeFunc);
+
+ if (rtaComponent_PutMessage(outputQueue, tm)) {
+ RtaComponentStats *stats = rtaConnection_GetStats(conn, FC_VEGAS);
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ }
+}
+
+/**
+ * @function vegas_HandleControl
+ * @abstract Process CPI reqeusts
+ * @discussion
+ * <#Discussion#>
+ *
+ * @param <#param1#>
+ * @return true if we consumed the message, false if it should go down the stack
+ */
+static bool
+vegas_HandleControl(RtaConnection *conn, CCNxTlvDictionary *controlDictionary, PARCEventQueue *outputQueue)
+{
+ bool success = false;
+
+ if (ccnxControlFacade_IsCPI(controlDictionary)) {
+ PARCJSON *json = ccnxControlFacade_GetJson(controlDictionary);
+ if (cpi_getCPIOperation2(json) == CPI_CANCEL_FLOW) {
+ VegasConnectionState *fc = rtaConnection_GetPrivateData(conn, FC_VEGAS);
+ CCNxName *name = cpiCancelFlow_GetFlowName(json);
+
+ PARCJSON *reply = NULL;
+ FcSessionHolder *holder = vegas_LookupSessionByName(fc, name);
+ if (holder != NULL) {
+ if (DEBUG_OUTPUT) {
+ char *string = ccnxName_ToString(name);
+ printf("%s Cancelling flow %s\n", __func__, string);
+ parcMemory_Deallocate((void **) &string);
+ }
+
+ TAILQ_REMOVE(&fc->sessions_head, holder, list);
+ vegasSession_Destroy(&holder->session);
+ parcMemory_Deallocate((void **) &holder);
+
+ reply = cpiAcks_CreateAck(json);
+ } else {
+ if (DEBUG_OUTPUT) {
+ char *string = ccnxName_ToString(name);
+ printf("%s got request to cancel unknown flow %s\n", __func__, string);
+ parcMemory_Deallocate((void **) &string);
+ }
+
+ reply = cpiAcks_CreateNack(json);
+ }
+ CCNxTlvDictionary *response = ccnxControlFacade_CreateCPI(reply);
+ vegas_SendControlPlaneResponse(conn, response, outputQueue);
+ ccnxTlvDictionary_Release(&response);
+
+ parcJSON_Release(&reply);
+ ccnxName_Release(&name);
+
+ // we consume it
+ success = true;
+ }
+ }
+ return success;
+}
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_component_Vegas.c b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_component_Vegas.c
new file mode 100644
index 00000000..2a1cfe73
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_component_Vegas.c
@@ -0,0 +1,696 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define DEBUG_OUTPUT 0
+
+#include "../component_Vegas.c"
+#include "../vegas_Session.c"
+
+#include <sys/un.h>
+#include <strings.h>
+#include <sys/queue.h>
+
+#include <LongBow/unit-test.h>
+#include <LongBow/runtime.h>
+
+#include <parc/security/parc_Security.h>
+#include <parc/security/parc_PublicKeySignerPkcs12Store.h>
+#include <parc/algol/parc_SafeMemory.h>
+
+#include <ccnx/api/notify/notify_Status.h>
+
+#include <ccnx/transport/test_tools/traffic_tools.h>
+#include <ccnx/common/ccnx_ContentObject.h>
+#include <ccnx/common/ccnx_NameSegmentNumber.h>
+
+#include <ccnx/common/internal/ccnx_ValidationFacadeV1.h>
+
+#include <ccnx/transport/transport_rta/core/rta_Framework.h>
+#include <ccnx/transport/transport_rta/core/rta_Framework_NonThreaded.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.c>
+#include <ccnx/transport/transport_rta/core/rta_Connection.c>
+#include <ccnx/transport/transport_rta/config/config_All.h>
+#include <ccnx/transport/test_tools/traffic_tools.h>
+
+#include "../../test/testrig_MockFramework.c"
+
+#ifndef MAXPATH
+#define MAXPATH 1024
+#endif
+
+// file descriptor for random numbers, part of Fixture
+static int randomFd;
+
+typedef struct test_data {
+ MockFramework *mock;
+ char keystore_filename[MAXPATH];
+ char keystore_password[MAXPATH];
+} TestData;
+
+static CCNxTransportConfig *
+createParams(const char *keystore_name, const char *keystore_passwd)
+{
+ assertNotNull(keystore_name, "Got null keystore name\n");
+ assertNotNull(keystore_passwd, "Got null keystore passwd\n");
+
+ CCNxStackConfig *stackConfig = apiConnector_ProtocolStackConfig(
+ testingUpper_ProtocolStackConfig(
+ vegasFlowController_ProtocolStackConfig(
+ testingLower_ProtocolStackConfig(
+ protocolStack_ComponentsConfigArgs(ccnxStackConfig_Create(),
+ apiConnector_GetName(),
+ testingUpper_GetName(),
+ vegasFlowController_GetName(),
+ testingLower_GetName(),
+ NULL)))));
+
+ CCNxConnectionConfig *connConfig = apiConnector_ConnectionConfig(
+ testingUpper_ConnectionConfig(
+ vegasFlowController_ConnectionConfig(
+ tlvCodec_ConnectionConfig(
+ testingLower_ConnectionConfig(ccnxConnectionConfig_Create())))));
+
+ publicKeySignerPkcs12Store_ConnectionConfig(connConfig, keystore_name, keystore_passwd);
+
+ CCNxTransportConfig *result = ccnxTransportConfig_Create(stackConfig, connConfig);
+ ccnxStackConfig_Release(&stackConfig);
+ return result;
+}
+
+static TestData *
+_commonSetup(const char *name)
+{
+ parcSecurity_Init();
+
+ TestData *data = parcMemory_AllocateAndClear(sizeof(TestData));
+ assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData));
+
+ sprintf(data->keystore_filename, "/tmp/keystore_%s_%d.p12", name, getpid());
+ sprintf(data->keystore_password, "12345");
+
+ unlink(data->keystore_filename);
+
+ CCNxTransportConfig *config = createParams(data->keystore_filename, data->keystore_password);
+ data->mock = mockFramework_Create(config);
+ ccnxTransportConfig_Destroy(&config);
+ return data;
+}
+
+static void
+_commonTeardown(TestData *data)
+{
+ mockFramework_Destroy(&data->mock);
+ unlink(data->keystore_filename);
+ parcMemory_Deallocate((void **) &data);
+
+ parcSecurity_Fini();
+}
+
+// ======================================================
+
+LONGBOW_TEST_RUNNER(Fc_Vegas)
+{
+ LONGBOW_RUN_TEST_FIXTURE(Component);
+}
+
+LONGBOW_TEST_RUNNER_SETUP(Fc_Vegas)
+{
+ parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory);
+
+ randomFd = open("/dev/urandom", O_RDONLY);
+
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_RUNNER_TEARDOWN(Fc_Vegas)
+{
+ close(randomFd);
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+// ==============================================================
+
+LONGBOW_TEST_FIXTURE(Local)
+{
+ LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_None);
+ LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_TestCases);
+
+ LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetSegnumFromObject);
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(Local)
+{
+ longBowTestCase_SetClipBoardData(testCase, _commonSetup(longBowTestCase_GetName(testCase)));
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(Local)
+{
+ _commonTeardown(longBowTestCase_GetClipBoardData(testCase));
+ uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO);
+ if (outstandingAllocations != 0) {
+ printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations);
+ return LONGBOW_STATUS_MEMORYLEAK;
+ }
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+static CCNxTlvDictionary *
+createSignedContentObject(void)
+{
+ CCNxName *name = ccnxName_CreateFromCString("lci:/some/name");
+ PARCBuffer *payload = parcBuffer_Flip(parcBuffer_PutArray(parcBuffer_Allocate(20), 11, (uint8_t *) "the payload"));
+ CCNxTlvDictionary *contentObject = ccnxContentObject_CreateWithNameAndPayload(name, payload);
+ parcBuffer_Release(&payload);
+ ccnxName_Release(&name);
+
+ PARCBuffer *keyid = parcBuffer_Flip(parcBuffer_PutArray(parcBuffer_Allocate(20), 5, (uint8_t *) "keyid"));
+ ccnxValidationRsaSha256_Set(contentObject, keyid, NULL);
+ parcBuffer_Release(&keyid);
+
+ PARCBuffer *sigbits = parcBuffer_WrapCString("the signature");
+ PARCSignature *signature = parcSignature_Create(PARCSigningAlgorithm_RSA, PARCCryptoHashType_SHA256, parcBuffer_Flip(sigbits));
+ ccnxContentObject_SetSignature(contentObject, keyid, signature, NULL);
+
+ parcSignature_Release(&signature);
+ parcBuffer_Release(&sigbits);
+
+ return contentObject;
+}
+
+static CCNxTlvDictionary *
+createSignedContentObjectWithFinalBlockId(uint64_t fbid)
+{
+ CCNxTlvDictionary *obj = createSignedContentObject();
+ ccnxContentObject_SetFinalChunkNumber(obj, fbid);
+
+ return obj;
+}
+
+LONGBOW_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_None)
+{
+ CCNxTlvDictionary *contentObjectDictionary = createSignedContentObject();
+ bool success = vegasSession_GetFinalBlockIdFromContentObject(contentObjectDictionary, NULL);
+ assertFalse(success, "Should have failed getting FBID from content object");
+ ccnxTlvDictionary_Release(&contentObjectDictionary);
+}
+
+LONGBOW_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_TestCases)
+{
+ struct test_struct {
+ uint64_t value;
+ size_t encodedBytes;
+ uint8_t *encoded;
+ } test_vector[] = {
+ { .value = 0x0000000000000000ULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0x00 } },
+ { .value = 0x0000000000000001ULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0x01 } },
+ { .value = 0x00000000000000FFULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0xFF } },
+ { .value = 0x0000000000000100ULL, .encodedBytes = 2, .encoded = (uint8_t[2]) { 0x01, 0x00} },
+ { .value = 0x0100000000000100ULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00} },
+ { .value = 0x8000000000000100ULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00} },
+ { .value = 0xFFFFFFFFFFFFFFFFULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} },
+ { .value = 0, .encodedBytes = 0, .encoded = NULL }
+ };
+
+ for (int i = 0; test_vector[i].encoded != NULL; i++) {
+ CCNxTlvDictionary *signed_with_fbid = createSignedContentObjectWithFinalBlockId(test_vector[i].value);
+
+ uint64_t testvalue = -1;
+ bool success = vegasSession_GetFinalBlockIdFromContentObject(signed_with_fbid, &testvalue);
+ assertTrue(success, "Failed to get FBID from content object index %d value %016" PRIx64 "\n",
+ i,
+ test_vector[i].value)
+ {
+ ccnxTlvDictionary_Display(signed_with_fbid, 0);
+ }
+
+
+ assertTrue(testvalue == test_vector[i].value,
+ "Segment number does not match index %d value %016" PRIx64 ": got %" PRIx64 "\n",
+ i,
+ test_vector[i].value,
+ testvalue);
+
+ ccnxTlvDictionary_Release(&signed_with_fbid);
+ }
+}
+
+LONGBOW_TEST_CASE(Local, vegasSession_GetSegnumFromObject)
+{
+ struct test_struct {
+ bool valid;
+ uint64_t segnum;
+ char *uri;
+ } test_vectors[] = {
+ { .valid = false, .segnum = 0, .uri = "lci:/foo/bar" },
+ { .valid = true, .segnum = 0, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=%00" },
+ { .valid = true, .segnum = 0x1020, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=%10%20" },
+ { .valid = true, .segnum = 0x6162, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=ab" },
+ { .valid = true, .segnum = 0x616263, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abc" },
+ { .valid = true, .segnum = 0x61626364, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abcd" },
+ { .valid = true, .segnum = 0x6162636465, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abcde" },
+ { .valid = true, .segnum = 0x616263646566, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abcdef" },
+ { .valid = true, .segnum = 0x61626364656667, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abcdefg" },
+ { .valid = true, .segnum = 0x6162636465666768, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abcdefgh" },
+ { .valid = false, .segnum = 0, .uri = NULL }
+ };
+
+ for (int i = 0; test_vectors[i].uri != NULL; i++) {
+ CCNxName *name = ccnxName_CreateFromCString(test_vectors[i].uri);
+ CCNxTlvDictionary *contentObject = ccnxContentObject_CreateWithNameAndPayload(name, NULL);
+
+ uint64_t testSeqnum = -1;
+ int failure = vegasSession_GetSegnumFromObject(contentObject, &testSeqnum);
+
+
+
+ if (test_vectors[i].valid) {
+ assertFalse(failure, "Incorrect success index %d: got %d expected %d",
+ i, failure, test_vectors[i].valid);
+
+ assertTrue(testSeqnum == test_vectors[i].segnum, "Incorrect segnum index %d, got %" PRIu64 " expected %" PRIu64,
+ i, testSeqnum, test_vectors[i].segnum);
+ } else {
+ assertTrue(failure, "Incorrect success index %d: got %d expected %d",
+ i, failure, test_vectors[i].valid);
+ }
+
+ ccnxName_Release(&name);
+ ccnxTlvDictionary_Release(&contentObject);
+ }
+}
+
+// ==============================================================
+
+LONGBOW_TEST_FIXTURE(Component)
+{
+ LONGBOW_RUN_TEST_CASE(Component, open_close);
+
+ // these should all be pass through
+ LONGBOW_RUN_TEST_CASE(Component, content_object_down);
+ LONGBOW_RUN_TEST_CASE(Component, control_msg_down);
+ LONGBOW_RUN_TEST_CASE(Component, interest_up);
+ LONGBOW_RUN_TEST_CASE(Component, control_msg_up);
+ LONGBOW_RUN_TEST_CASE(Component, cancel_flow);
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(Component)
+{
+ longBowTestCase_SetClipBoardData(testCase, _commonSetup(longBowTestCase_GetName(testCase)));
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(Component)
+{
+ _commonTeardown(longBowTestCase_GetClipBoardData(testCase));
+
+ uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO);
+ if (outstandingAllocations != 0) {
+ printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations);
+ return LONGBOW_STATUS_MEMORYLEAK;
+ }
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+// ============================================
+
+LONGBOW_TEST_CASE(Component, open_close)
+{
+ // dont actually do anything. make sure no memory leaks in setup and teardown.
+}
+
+
+// ============================================
+// Passthrough messages
+
+LONGBOW_TEST_CASE(Component, content_object_down)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithSignedContentObject(data->mock->connection);
+
+ PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN);
+ PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_UP);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP);
+
+ rtaComponent_PutMessage(in, truth_tm);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ flow_vegas_ops.downcallRead(read, PARCEventType_Read, (void *) data->mock->stack);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+
+ TransportMessage *test_tm = rtaComponent_GetMessage(out);
+
+ assertTrue(test_tm == truth_tm,
+ "Got wrong transport message pointer, got %p expected %p",
+ (void *) test_tm,
+ (void *) truth_tm);
+
+ transportMessage_Destroy(&truth_tm);
+}
+
+LONGBOW_TEST_CASE(Component, control_msg_down)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithControlMessage(data->mock->connection);
+
+ PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN);
+ PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_UP);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP);
+
+ rtaComponent_PutMessage(in, truth_tm);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ flow_vegas_ops.downcallRead(read, PARCEventType_Read, (void *) data->mock->stack);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ TransportMessage *test_tm = rtaComponent_GetMessage(out);
+
+ assertTrue(test_tm == truth_tm,
+ "Got wrong transport message pointer, got %p expected %p",
+ (void *) test_tm,
+ (void *) truth_tm);
+
+ transportMessage_Destroy(&truth_tm);
+}
+
+LONGBOW_TEST_CASE(Component, interest_up)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithInterest(data->mock->connection);
+
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN);
+ PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_DOWN);
+ PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP);
+
+ rtaComponent_PutMessage(in, truth_tm);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ flow_vegas_ops.upcallRead(read, PARCEventType_Read, (void *) data->mock->stack);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ TransportMessage *test_tm = rtaComponent_GetMessage(out);
+
+ assertTrue(test_tm == truth_tm,
+ "Got wrong transport message pointer, got %p expected %p",
+ (void *) test_tm,
+ (void *) truth_tm);
+
+ transportMessage_Destroy(&truth_tm);
+}
+
+LONGBOW_TEST_CASE(Component, control_msg_up)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithControlMessage(data->mock->connection);
+
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN);
+ PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_DOWN);
+ PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP);
+
+ rtaComponent_PutMessage(in, truth_tm);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ flow_vegas_ops.upcallRead(read, PARCEventType_Read, (void *) data->mock->stack);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ TransportMessage *test_tm = rtaComponent_GetMessage(out);
+
+ assertTrue(test_tm == truth_tm,
+ "Got wrong transport message pointer, got %p expected %p",
+ (void *) test_tm,
+ (void *) truth_tm);
+
+ transportMessage_Destroy(&test_tm);
+}
+
+// ============================================
+// These should start a flow control session
+
+/**
+ * Creates an interest w/o a segment number
+ * Sends it down the stack to the flow controller
+ * Flow controller should append segment number 0 to the interest and send that down the stack
+ */
+LONGBOW_TEST_CASE(Component, interest_down)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithInterest(data->mock->connection);
+
+ // If we can, add a payload to the Interest. Why not.
+ PARCBuffer *payload = NULL;
+ CCNxInterest *interest = transportMessage_GetDictionary(truth_tm);
+ CCNxInterestInterface *impl = ccnxInterestInterface_GetInterface(interest);
+ if (impl != NULL && impl != &CCNxInterestFacadeV1_Implementation) {
+ // V1 or greater should support Interest payloads.
+ payload = parcBuffer_WrapCString("This is a payload.");
+ impl->setPayload(interest, payload);
+ }
+
+ PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN);
+ PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_UP);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP);
+
+ rtaComponent_PutMessage(in, truth_tm);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ flow_vegas_ops.downcallRead(read, PARCEventType_Read, (void *) data->mock->stack);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+
+ // we should see a status message up the stack and interests
+ // going down the stack.
+
+
+ TransportMessage *test_tm = rtaComponent_GetMessage(in);
+ assertNotNull(test_tm, "got null transport message back up the queue, expecting status\n");
+
+ assertTrue(transportMessage_IsControl(test_tm),
+ "Transport message is not a control object")
+ {
+ ccnxTlvDictionary_Display(transportMessage_GetDictionary(test_tm), 0);
+ }
+
+ CCNxTlvDictionary *test_dict = transportMessage_GetDictionary(test_tm);
+
+ PARCJSON *json = ccnxControlFacade_GetJson(test_dict);
+
+ NotifyStatus *status = notifyStatus_ParseJSON(json);
+
+ assertNotNull(status, "Could not parse NotifyStatus JSON message");
+ assertTrue(notifyStatus_GetFiledes(status) == data->mock->connection->api_fd,
+ "Expected file descriptor %d, actual %d\n", data->mock->connection->api_fd, notifyStatus_GetFiledes(status));
+
+ assertTrue(notifyStatus_IsFlowControlStarted(status),
+ "Expected notifyStatus_IsFlowControlStarted to be true, actual code %d", notifyStatus_GetStatusCode(status));
+
+ notifyStatus_Release(&status);
+
+ transportMessage_Destroy(&test_tm);
+
+ // Read segment 0 interest
+ trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(transportMessage_GetDictionary(truth_tm)), 0, payload);
+
+ // Now read segment 1
+ trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(transportMessage_GetDictionary(truth_tm)), 1, payload);
+
+ if (payload != NULL) {
+ parcBuffer_Release(&payload);
+ }
+
+ transportMessage_Destroy(&truth_tm);
+}
+
+
+LONGBOW_TEST_CASE(Component, interest_down_slow_retransmit)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithInterest(data->mock->connection);
+
+ VegasConnectionState *fc;
+ FcSessionHolder *holder;
+
+ PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN);
+ PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_UP);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP);
+
+ rtaComponent_PutMessage(in, truth_tm);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ flow_vegas_ops.downcallRead(read, PARCEventType_Read, (void *) data->mock->stack);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+
+ // --------------------------------------
+ // Read segment 0 interest
+ CCNxTlvDictionary *interest = transportMessage_GetDictionary(truth_tm);
+
+ trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(interest), 0, NULL);
+
+ // Now read segment 1
+ trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(interest), 1, NULL);
+
+ // --------------------------------------
+ // now bump the time and see what happens.
+ // these are normally set in the timer sallback
+ fc = rtaConnection_GetPrivateData(data->mock->connection, FC_VEGAS);
+ holder = TAILQ_FIRST(&fc->sessions_head);
+ assertNotNull(holder, "got null session holder");
+
+ printf("*** bump time\n");
+
+ data->mock->framework->clock_ticks += 1001;
+
+ // RTO timeout will be 1 second
+ vegasSession_TimerCallback(-1, PARCEventType_Timeout, holder->session);
+ trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(interest), 0, NULL);
+
+ transportMessage_Destroy(&truth_tm);
+}
+
+
+LONGBOW_TEST_CASE(Component, interest_down_fast_retransmit)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithInterest(data->mock->connection);
+
+ CCNxName *basename, *segmentname;
+ VegasConnectionState *fc;
+ FcSessionHolder *holder;
+
+ PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN);
+ PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_UP);
+ PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP);
+
+ rtaComponent_PutMessage(in, truth_tm);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ flow_vegas_ops.downcallRead(read, PARCEventType_Read, (void *) data->mock->stack);
+ rtaFramework_NonThreadedStep(data->mock->framework);
+
+ // --------------------------------------
+ // Read segment 0 interest
+ CCNxTlvDictionary *interest = transportMessage_GetDictionary(truth_tm);
+
+ trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(interest), 0, NULL);
+
+ // Now read segment 1
+ trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(interest), 1, NULL);
+
+ // --------------------------------------
+ // now bump the time and see what happens.
+ // these are normally set in the timer sallback
+ fc = rtaConnection_GetPrivateData(data->mock->connection, FC_VEGAS);
+ holder = TAILQ_FIRST(&fc->sessions_head);
+ assertNotNull(holder, "got null session holder");
+
+
+ data->mock->framework->clock_ticks += 20;
+ printf("*** bump time %" PRIu64 "\n", data->mock->framework->clock_ticks);
+ vegasSession_TimerCallback(-1, PARCEventType_Timeout, holder->session);
+
+ // --------------------------------------
+ // send an out-of-order content object, should see a fast retransmit
+
+ basename = ccnxName_Copy(ccnxInterest_GetName(interest));
+ segmentname = ccnxName_Copy(basename);
+
+ CCNxNameSegment *segment = ccnxNameSegmentNumber_Create(CCNxNameLabelType_CHUNK, 1);
+ ccnxName_Append(segmentname, segment);
+ ccnxNameSegment_Release(&segment);
+
+ transportMessage_Destroy(&truth_tm);
+
+ // this takes ownership of segment name
+ TransportMessage *reply =
+ trafficTools_CreateTransportMessageWithSignedContentObjectWithName(data->mock->connection,
+ segmentname, data->keystore_filename, data->keystore_password);
+
+ rtaComponent_PutMessage(out, reply);
+
+ data->mock->framework->clock_ticks += 40;
+ printf("*** bump time %" PRIu64 "\n", data->mock->framework->clock_ticks);
+ rtaFramework_NonThreadedStepCount(data->mock->framework, 5);
+ vegasSession_TimerCallback(-1, PARCEventType_Timeout, holder->session);
+
+ trafficTools_ReadAndVerifySegment(out, basename, 0, NULL);
+
+ ccnxName_Release(&segmentname);
+ ccnxName_Release(&basename);
+}
+
+/**
+ * Send an interest down the stack to start a flow controller, then send
+ * a control message to cancel it.
+ */
+LONGBOW_TEST_CASE(Component, cancel_flow)
+{
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+
+ TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithInterest(data->mock->connection);
+
+ PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN);
+
+ CCNxName *flowName = ccnxName_Acquire(ccnxInterest_GetName(transportMessage_GetDictionary(truth_tm)));
+
+ // ================================
+ // This will signal to the flow controller that it should start a flow
+ // We give up ownership of "truth_tm" at this point
+ rtaComponent_PutMessage(in, truth_tm);
+ rtaFramework_NonThreadedStepCount(data->mock->framework, 5);
+
+ // ================================
+ // we should see a status message up the stack
+
+ TransportMessage *test_tm = rtaComponent_GetMessage(in);
+ assertNotNull(test_tm, "got null transport message back up the queue, expecting status\n");
+
+ assertTrue(transportMessage_IsControl(test_tm), "Transport message is not a Control")
+ {
+ ccnxTlvDictionary_Display(transportMessage_GetDictionary(test_tm), 0);
+ }
+
+ CCNxTlvDictionary *controlDictionary = transportMessage_GetDictionary(test_tm);
+
+ PARCJSON *json = ccnxControlFacade_GetJson(controlDictionary);
+
+ NotifyStatus *status = notifyStatus_ParseJSON(json);
+ assertTrue(notifyStatus_IsFlowControlStarted(status),
+ "Expected notifyStatus_IsFlowControlStarted to be true. Actual code %d\n", notifyStatus_GetStatusCode(status));
+ notifyStatus_Release(&status);
+
+ // ================================
+ // After the notification, the flow is "started" and we can cancel it
+
+ // Now that its started, send a cancel
+ PARCJSON *cancelFlow = cpiCancelFlow_Create(flowName);
+ CCNxTlvDictionary *cancelDictionary = ccnxControlFacade_CreateCPI(cancelFlow);
+ parcJSON_Release(&cancelFlow);
+
+ TransportMessage *cancelTm = transportMessage_CreateFromDictionary(cancelDictionary);
+ transportMessage_SetInfo(cancelTm, rtaConnection_Copy(data->mock->connection), rtaConnection_FreeFunc);
+ rtaComponent_PutMessage(in, cancelTm);
+ rtaFramework_NonThreadedStepCount(data->mock->framework, 5);
+
+ // now verify that its gone
+ VegasConnectionState *fc = rtaConnection_GetPrivateData(data->mock->connection, FC_VEGAS);
+ FcSessionHolder *holder = TAILQ_FIRST(&fc->sessions_head);
+ assertNull(holder, "The session list is not empty!");
+
+ ccnxTlvDictionary_Release(&cancelDictionary);
+ transportMessage_Destroy(&test_tm);
+ ccnxName_Release(&flowName);
+}
+
+int
+main(int argc, char *argv[])
+{
+ LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(Fc_Vegas);
+ exit(longBowMain(argc, argv, testRunner, NULL));
+}
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_vegas_Session.c b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_vegas_Session.c
new file mode 100644
index 00000000..b48e9a8a
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_vegas_Session.c
@@ -0,0 +1,672 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define DEBUG_OUTPUT 0
+
+#include "../component_Vegas.c"
+#include "../vegas_Session.c"
+
+#include <sys/un.h>
+#include <strings.h>
+#include <sys/queue.h>
+
+#include <LongBow/unit-test.h>
+#include <LongBow/runtime.h>
+
+#include <ccnx/transport/transport_rta/core/rta_Framework.h>
+#include <ccnx/transport/transport_rta/core/rta_Framework_NonThreaded.h>
+
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.c>
+#include <ccnx/transport/transport_rta/core/rta_Connection.c>
+
+#include <parc/security/parc_Security.h>
+#include <parc/security/parc_PublicKeySignerPkcs12Store.h>
+#include <ccnx/transport/transport_rta/config/config_All.h>
+
+#include <ccnx/api/notify/notify_Status.h>
+
+#include <ccnx/transport/test_tools/traffic_tools.h>
+
+#include <ccnx/common/ccnx_ContentObject.h>
+#include <ccnx/common/internal/ccnx_ValidationFacadeV1.h>
+
+#include "../../test/testrig_MockFramework.c"
+
+#ifndef MAXPATH
+#define MAXPATH 1024
+#endif
+
+// file descriptor for random numbers, part of Fixture
+static int randomFd;
+
+typedef struct test_data {
+ MockFramework *mock;
+ char keystore_filename[MAXPATH];
+ char keystore_password[MAXPATH];
+} TestData;
+
+static CCNxTransportConfig *
+createParams(const char *keystore_name, const char *keystore_passwd)
+{
+ assertNotNull(keystore_name, "Got null keystore name\n");
+ assertNotNull(keystore_passwd, "Got null keystore passwd\n");
+
+ CCNxStackConfig *stackConfig = apiConnector_ProtocolStackConfig(
+ testingUpper_ProtocolStackConfig(
+ vegasFlowController_ProtocolStackConfig(
+ testingLower_ProtocolStackConfig(
+ protocolStack_ComponentsConfigArgs(ccnxStackConfig_Create(),
+ apiConnector_GetName(),
+ testingUpper_GetName(),
+ vegasFlowController_GetName(),
+ testingLower_GetName(),
+ NULL)))));
+
+ CCNxConnectionConfig *connConfig = apiConnector_ConnectionConfig(
+ testingUpper_ConnectionConfig(
+ vegasFlowController_ConnectionConfig(
+ testingLower_ConnectionConfig(ccnxConnectionConfig_Create()))));
+
+
+ publicKeySignerPkcs12Store_ConnectionConfig(connConfig, keystore_name, keystore_passwd);
+
+ CCNxTransportConfig *result = ccnxTransportConfig_Create(stackConfig, connConfig);
+ ccnxStackConfig_Release(&stackConfig);
+ return result;
+}
+
+static TestData *
+_commonSetup(const char *name)
+{
+ parcSecurity_Init();
+
+ TestData *data = parcMemory_Allocate(sizeof(TestData));
+
+ assertNotNull(data, "Got null memory from parcMemory_Allocate");
+
+ sprintf(data->keystore_filename, "/tmp/keystore_%s_%d.p12", name, getpid());
+ sprintf(data->keystore_password, "12345");
+
+ unlink(data->keystore_filename);
+
+ CCNxTransportConfig *config = createParams(data->keystore_filename, data->keystore_password);
+ data->mock = mockFramework_Create(config);
+ ccnxTransportConfig_Destroy(&config);
+ return data;
+}
+
+static void
+_commonTeardown(TestData *data)
+{
+ mockFramework_Destroy(&data->mock);
+ unlink(data->keystore_filename);
+
+ parcMemory_Deallocate((void **) &data);
+
+ parcSecurity_Fini();
+}
+
+
+
+// ======================================================
+
+LONGBOW_TEST_RUNNER(VegasSession)
+{
+ LONGBOW_RUN_TEST_FIXTURE(Local);
+ LONGBOW_RUN_TEST_FIXTURE(IterateFinalChunkNumber);
+}
+
+LONGBOW_TEST_RUNNER_SETUP(VegasSession)
+{
+ parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory);
+
+ randomFd = open("/dev/urandom", O_RDONLY);
+
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_RUNNER_TEARDOWN(VegasSession)
+{
+ close(randomFd);
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+// ==============================================================
+
+LONGBOW_TEST_FIXTURE(Local)
+{
+ LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_None);
+ LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_TestCases);
+ LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetSegnumFromObject);
+
+ LONGBOW_RUN_TEST_CASE(Local, vegasSession_ReceiveContentObject_InOrder_LastBlockSetsFinalId);
+ LONGBOW_RUN_TEST_CASE(Local, vegasSession_ReceiveContentObject_InOrder_FirstAndLastBlocksSetsFinalId);
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(Local)
+{
+ longBowTestCase_SetClipBoardData(testCase, _commonSetup(longBowTestCase_GetName(testCase)));
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(Local)
+{
+ _commonTeardown(longBowTestCase_GetClipBoardData(testCase));
+ uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO);
+ if (outstandingAllocations != 0) {
+ printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations);
+ return LONGBOW_STATUS_MEMORYLEAK;
+ }
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+static CCNxTlvDictionary *
+createSignedContentObject(void)
+{
+ CCNxName *name = ccnxName_CreateFromCString("ccnx:/some/name");
+ PARCBuffer *payload = parcBuffer_Flip(parcBuffer_PutArray(parcBuffer_Allocate(20), 11, (uint8_t *) "the payload"));
+ CCNxTlvDictionary *contentObject = ccnxContentObject_CreateWithNameAndPayload(name, payload);
+ parcBuffer_Release(&payload);
+ ccnxName_Release(&name);
+
+ PARCBuffer *keyid = parcBuffer_Flip(parcBuffer_PutArray(parcBuffer_Allocate(20), 5, (uint8_t *) "keyid"));
+ ccnxValidationRsaSha256_Set(contentObject, keyid, NULL);
+ parcBuffer_Release(&keyid);
+
+ PARCBuffer *sigbits = parcBuffer_Flip(parcBuffer_PutArray(parcBuffer_Allocate(20), 13, (uint8_t *) "the signature"));
+
+ switch (ccnxTlvDictionary_GetSchemaVersion(contentObject)) {
+ case CCNxTlvDictionary_SchemaVersion_V1:
+ ccnxValidationFacadeV1_SetPayload(contentObject, sigbits);
+ break;
+ default:
+ trapNotImplemented("Unsupprted schema version in createSignedContentObject()");
+ break;
+ }
+
+ parcBuffer_Release(&sigbits);
+
+ return contentObject;
+}
+
+static CCNxTlvDictionary *
+createSignedContentObjectWithFinalBlockId(uint64_t fbid)
+{
+ CCNxTlvDictionary *obj = createSignedContentObject();
+ ccnxContentObject_SetFinalChunkNumber(obj, fbid);
+ return obj;
+}
+
+LONGBOW_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_None)
+{
+ CCNxTlvDictionary *contentObjectDictionary = createSignedContentObject();
+ bool success = vegasSession_GetFinalBlockIdFromContentObject(contentObjectDictionary, NULL);
+ assertFalse(success, "Should have failed getting FBID from content object");
+ ccnxTlvDictionary_Release(&contentObjectDictionary);
+}
+
+LONGBOW_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_TestCases)
+{
+ struct test_struct {
+ uint64_t value;
+ size_t encodedBytes;
+ uint8_t *encoded;
+ } test_vector[] = {
+ { .value = 0x0000000000000000ULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0x00 } },
+ { .value = 0x0000000000000001ULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0x01 } },
+ { .value = 0x00000000000000FFULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0xFF } },
+ { .value = 0x0000000000000100ULL, .encodedBytes = 2, .encoded = (uint8_t[2]) { 0x01, 0x00} },
+ { .value = 0x0100000000000100ULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00} },
+ { .value = 0x8000000000000100ULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00} },
+ { .value = 0xFFFFFFFFFFFFFFFFULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} },
+ { .value = 0, .encodedBytes = 0, .encoded = NULL }
+ };
+
+ for (int i = 0; test_vector[i].encoded != NULL; i++) {
+ CCNxTlvDictionary *signed_with_fbid = createSignedContentObjectWithFinalBlockId(test_vector[i].value);
+
+ uint64_t testvalue = -1;
+ bool success = vegasSession_GetFinalBlockIdFromContentObject(signed_with_fbid, &testvalue);
+ assertTrue(success, "Failed to get FBID from content object index %d value %016" PRIx64 "\n",
+ i,
+ test_vector[i].value)
+ {
+ ccnxTlvDictionary_Display(signed_with_fbid, 0);
+ }
+
+
+ assertTrue(testvalue == test_vector[i].value,
+ "Segment number does not match index %d value %016" PRIx64 ": got %" PRIx64 "\n",
+ i,
+ test_vector[i].value,
+ testvalue);
+
+ ccnxTlvDictionary_Release(&signed_with_fbid);
+ }
+}
+
+LONGBOW_TEST_CASE(Local, vegasSession_GetSegnumFromObject)
+{
+ struct test_struct {
+ bool valid;
+ uint64_t segnum;
+ char *uri;
+ } test_vectors[] = {
+ { .valid = false, .segnum = 0, .uri = "ccnx:/foo/bar" },
+ { .valid = true, .segnum = 0, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=%00" },
+ { .valid = true, .segnum = 0x1020, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=%10%20" },
+ { .valid = true, .segnum = 0x6162, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=ab" },
+ { .valid = true, .segnum = 0x616263, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abc" },
+ { .valid = true, .segnum = 0x61626364, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abcd" },
+ { .valid = true, .segnum = 0x6162636465, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abcde" },
+ { .valid = true, .segnum = 0x616263646566, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abcdef" },
+ { .valid = true, .segnum = 0x61626364656667, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abcdefg" },
+ { .valid = true, .segnum = 0x6162636465666768, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abcdefgh" },
+ { .valid = false, .segnum = 0, .uri = NULL }
+ };
+
+ for (int i = 0; test_vectors[i].uri != NULL; i++) {
+ CCNxName *name = ccnxName_CreateFromCString(test_vectors[i].uri);
+ CCNxTlvDictionary *contentObject = ccnxContentObject_CreateWithNameAndPayload(name, NULL);
+
+ uint64_t testSeqnum = -1;
+ int failure = vegasSession_GetSegnumFromObject(contentObject, &testSeqnum);
+
+
+
+ if (test_vectors[i].valid) {
+ assertFalse(failure, "Incorrect success index %d: got %d expected %d",
+ i, failure, test_vectors[i].valid);
+
+ assertTrue(testSeqnum == test_vectors[i].segnum, "Incorrect segnum index %d, got %" PRIu64 " expected %" PRIu64,
+ i, testSeqnum, test_vectors[i].segnum);
+ } else {
+ assertTrue(failure, "Incorrect success index %d: got %d expected %d",
+ i, failure, test_vectors[i].valid);
+ }
+
+ ccnxName_Release(&name);
+ ccnxTlvDictionary_Release(&contentObject);
+ }
+}
+
+
+// =================================================================
+// Tests related to the FinalBlockId and how the publisher sets it in
+// a stream of content objects
+
+#define DO_NOT_SET ((uint64_t) -1)
+#define SENTINEL ((uint64_t) -1)
+
+typedef struct test_vector {
+ uint64_t chunk;
+ uint64_t setFinalBlockId;
+ bool isLast;
+ bool interestReceived;
+ bool dataReceived;
+} TestVector;
+
+
+static void
+_verifyFlowStartNotification(TestData *data, TransportMessage *notify)
+{
+ assertNotNull(notify, "got null transport message back up the queue, expecting status\n");
+
+ assertTrue(transportMessage_IsControl(notify),
+ "Transport message is not a control object")
+ {
+ ccnxTlvDictionary_Display(transportMessage_GetDictionary(notify), 0);
+ }
+
+ CCNxTlvDictionary *test_dict = transportMessage_GetDictionary(notify);
+
+ PARCJSON *json = ccnxControlFacade_GetJson(test_dict);
+
+ NotifyStatus *status = notifyStatus_ParseJSON(json);
+
+ assertNotNull(status, "Could not parse NotifyStatus JSON message");
+ assertTrue(notifyStatus_GetFiledes(status) == data->mock->connection->api_fd,
+ "Expected file descriptor %d, actual %d\n", data->mock->connection->api_fd, notifyStatus_GetFiledes(status));
+
+ assertTrue(notifyStatus_IsFlowControlStarted(status),
+ "Expected notifyStatus_IsFlowControlStarted to be true, actual code %d", notifyStatus_GetStatusCode(status));
+
+ notifyStatus_Release(&status);
+}
+
+
+static CCNxName *
+_startFlow(TestData *data)
+{
+ TransportMessage *downInterest = trafficTools_CreateTransportMessageWithInterest(data->mock->connection);
+ CCNxName *sessionName = ccnxName_Acquire(ccnxInterest_GetName(transportMessage_GetDictionary(downInterest)));
+ PARCEventQueue *upperQueue = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN);
+
+ rtaComponent_PutMessage(upperQueue, downInterest);
+ rtaFramework_NonThreadedStepCount(data->mock->framework, 10);
+
+ // we should see a status message up the stack and interests
+ // going down the stack.
+
+ TransportMessage *notify = rtaComponent_GetMessage(upperQueue);
+ _verifyFlowStartNotification(data, notify);
+ transportMessage_Destroy(&notify);
+
+ return sessionName;
+}
+
+/*
+ * Caveat: this only works because we create a single session
+ */
+static VegasSession *
+_grabSession(TestData *data, CCNxName *name)
+{
+ VegasConnectionState *fc = rtaConnection_GetPrivateData(data->mock->connection, FC_VEGAS);
+
+ FcSessionHolder *holder = vegas_LookupSessionByName(fc, name);
+
+ assertNotNull(holder, "Could not find the session holder in the flow controller");
+ return holder->session;
+}
+
+/*
+ * a tick is 1 milli-second, but it could be different depending on how
+ * the framework is started
+ */
+static void
+_bumpTime(TestData *data, unsigned ticks, CCNxName *name)
+{
+ data->mock->framework->clock_ticks += ticks;
+ vegasSession_TimerCallback(-1, PARCEventType_Timeout, _grabSession(data, name));
+}
+
+static uint64_t
+_getChunkNumberFromName(const CCNxName *name)
+{
+ size_t segmentCount = ccnxName_GetSegmentCount(name);
+ CCNxNameSegment *lastSegment = ccnxName_GetSegment(name, segmentCount - 1);
+ CCNxNameLabelType nameType = ccnxNameSegment_GetType(lastSegment);
+ assertTrue(nameType == CCNxNameLabelType_CHUNK, "Wrong segment type got %d expected %d", nameType, CCNxNameLabelType_CHUNK);
+ uint64_t chunkNumber = ccnxNameSegmentNumber_Value(lastSegment);
+ return chunkNumber;
+}
+
+static TestVector *
+_getVector(TestVector *vectors, uint64_t chunkNumber)
+{
+ // find the test vector for this chunk
+ for (int i = 0; vectors[i].chunk != SENTINEL; i++) {
+ if (vectors[i].chunk == chunkNumber) {
+ return &vectors[i];
+ }
+ }
+ trapIllegalValue(chunkNumber, "Could not find chunk number in test vector");
+}
+
+static TransportMessage *
+_createReponseContentObject(CCNxName *name, uint64_t finalBlockid)
+{
+ CCNxContentObject *obj = ccnxContentObject_CreateWithNameAndPayload(name, NULL);
+ assertNotNull(obj, "Got null content object.");
+
+ if (finalBlockid != DO_NOT_SET) {
+ bool success = ccnxContentObject_SetFinalChunkNumber(obj, finalBlockid);
+ assertTrue(success, "Failed to set final chunk number");
+ }
+
+ CCNxMetaMessage *message = ccnxMetaMessage_CreateFromContentObject(obj);
+ TransportMessage *response = transportMessage_CreateFromDictionary(message);
+
+ ccnxMetaMessage_Release(&message);
+ ccnxContentObject_Release(&obj);
+
+ return response;
+}
+
+/*
+ * Returns true if the unit test is finished
+ */
+static bool
+_respondToDownInterest(TestData *data, TestVector *vectors)
+{
+ PARCEventQueue *lowerQueue = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP);
+
+ bool finished = false;
+ TransportMessage *msg = rtaComponent_GetMessage(lowerQueue);
+ if (msg) {
+ // it should be an Interest with a chunk number
+ assertTrue(transportMessage_IsInterest(msg), "Got unexpected message")
+ {
+ ccnxTlvDictionary_Display(transportMessage_GetDictionary(msg), 3);
+ }
+
+ CCNxTlvDictionary *interestDictionary = transportMessage_GetDictionary(msg);
+ CCNxName *name = ccnxInterest_GetName(interestDictionary);
+ uint64_t chunkNumber = _getChunkNumberFromName(name);
+
+ TestVector *vector = _getVector(vectors, chunkNumber);
+
+ vector->interestReceived = true;
+
+ // create a content object and set the FinalBlockId if vector says to
+ TransportMessage *response = _createReponseContentObject(name, vector->setFinalBlockId);
+ RtaConnection *connection = transportMessage_GetInfo(msg);
+ RtaConnection *connectionRef = rtaConnection_Copy(connection);
+ transportMessage_SetInfo(response, connectionRef, rtaConnection_FreeFunc);
+
+ rtaComponent_PutMessage(lowerQueue, response);
+
+ finished = vector->isLast;
+
+ transportMessage_Destroy(&msg);
+ }
+ return finished;
+}
+
+/*
+ * Returns true if received the last message
+ */
+static bool
+_consumeUpperContentObject(TestData *data, TestVector *vectors)
+{
+ PARCEventQueue *upperQueue = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN);
+
+ bool finished = false;
+ TransportMessage *msg = rtaComponent_GetMessage(upperQueue);
+ if (msg) {
+ // it should be a content object
+ assertTrue(transportMessage_IsContentObject(msg), "Got unexpected message")
+ {
+ ccnxTlvDictionary_Display(transportMessage_GetDictionary(msg), 3);
+ }
+
+ CCNxTlvDictionary *objectDictionary = transportMessage_GetDictionary(msg);
+ CCNxName *name = ccnxContentObject_GetName(objectDictionary);
+ uint64_t chunkNumber = _getChunkNumberFromName(name);
+
+ TestVector *vector = _getVector(vectors, chunkNumber);
+
+ // we should not have seen it before
+ assertFalse(vector->dataReceived, "Duplicate Content Object chunk %" PRIu64, chunkNumber)
+ {
+ ccnxName_Display(name, 3);
+ }
+
+ vector->dataReceived = true;
+
+ finished = vector->isLast;
+
+ transportMessage_Destroy(&msg);
+ }
+
+ return finished;
+}
+
+static void
+_runTestVector(TestData *data, TestVector vectors[])
+{
+ CCNxName *sessionName = _startFlow(data);
+
+ bool finished = false;
+
+ while (!finished) {
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ finished = _respondToDownInterest(data, vectors);
+
+ rtaFramework_NonThreadedStep(data->mock->framework);
+ finished &= _consumeUpperContentObject(data, vectors);
+
+ if (!finished) {
+ _bumpTime(data, 5, sessionName);
+ }
+ }
+
+ ccnxName_Release(&sessionName);
+}
+
+/*
+ * First chunk sets final block ID, last chunk does not. Should keep reading until
+ * the real last chunk set to itself.
+ */
+LONGBOW_TEST_CASE(Local, vegasSession_ReceiveContentObject_InOrder_FirstBlockSetsLastDoesNotFinalId)
+{
+ TestVector vectors[] = {
+ { .chunk = 0, .setFinalBlockId = 5, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 1, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 2, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 3, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 4, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 5, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 6, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 7, .setFinalBlockId = 7, .isLast = true, .interestReceived = false, .dataReceived = false },
+ { .chunk = SENTINEL, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ };
+
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+ _runTestVector(data, vectors);
+}
+
+/*
+ * FinalBlockId unset until last chunk, which sets to itself
+ */
+LONGBOW_TEST_CASE(Local, vegasSession_ReceiveContentObject_InOrder_LastBlockSetsFinalId)
+{
+ TestVector vectors[] = {
+ { .chunk = 0, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 1, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 2, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 3, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 4, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 5, .setFinalBlockId = 5, .isLast = true, .interestReceived = false, .dataReceived = false },
+ { .chunk = SENTINEL, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ };
+
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+ _runTestVector(data, vectors);
+}
+
+/*
+ * First chunk sets FinalBlockId and last chunks, and last chunk sets it to itself
+ */
+LONGBOW_TEST_CASE(Local, vegasSession_ReceiveContentObject_InOrder_FirstAndLastBlocksSetsFinalId)
+{
+ TestVector vectors[] = {
+ { .chunk = 0, .setFinalBlockId = 7, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 1, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 2, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 3, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 4, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 5, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 6, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ { .chunk = 7, .setFinalBlockId = 7, .isLast = true, .interestReceived = false, .dataReceived = false },
+ { .chunk = SENTINEL, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false },
+ };
+
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+ _runTestVector(data, vectors);
+}
+
+// ============================================
+
+LONGBOW_TEST_FIXTURE(IterateFinalChunkNumber)
+{
+ LONGBOW_RUN_TEST_CASE(IterateFinalChunkNumber, vegasSession_ReceiveContentObject_InOrder_FirstSetsSecondIncreasesLastSetsFinalId);
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(IterateFinalChunkNumber)
+{
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(IterateFinalChunkNumber)
+{
+ uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO);
+ if (outstandingAllocations != 0) {
+ printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations);
+ return LONGBOW_STATUS_MEMORYLEAK;
+ }
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+/*
+ * First chunk sets FinalBlockId, later chunk increases it by N, then final chunk set to self.
+ *
+ * In this test, we programmatically create the TestVector array so we can run different iterations of N.
+ */
+LONGBOW_TEST_CASE(IterateFinalChunkNumber, vegasSession_ReceiveContentObject_InOrder_FirstSetsSecondIncreasesLastSetsFinalId)
+{
+ const unsigned minsize = 5;
+ const unsigned maxsize = 20;
+
+ for (unsigned size = minsize; size < maxsize; size++) {
+ longBowTestCase_SetClipBoardData(testCase, _commonSetup(longBowTestCase_GetName(testCase)));
+
+ TestVector vectors[size];
+
+ // set initial state
+ for (int i = 0; i < size; i++) {
+ vectors[i] = (TestVector) { .chunk = i, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false };
+ }
+
+ // first vectors sets it to minsize
+ vectors[0].setFinalBlockId = minsize;
+
+ // minsize sets it to the end
+ vectors[minsize - 1].setFinalBlockId = size;
+
+ // last one sets it to itself
+ vectors[size - 1].setFinalBlockId = size;
+ vectors[size - 1].isLast = true;
+
+ TestData *data = longBowTestCase_GetClipBoardData(testCase);
+ _runTestVector(data, vectors);
+
+ _commonTeardown(longBowTestCase_GetClipBoardData(testCase));
+
+ uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO);
+ assertTrue(outstandingAllocations == 0, "Memory leak for size %u", size);
+ }
+}
+
+
+
+// ============================================
+
+int
+main(int argc, char *argv[])
+{
+ LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(VegasSession);
+ exit(longBowMain(argc, argv, testRunner, NULL));
+}
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_Session.c b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_Session.c
new file mode 100644
index 00000000..a77adc34
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_Session.c
@@ -0,0 +1,1379 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * See additional comments in component_Vegas.c
+ *
+ * Flow Control Algorithm
+ * =========================
+ * Based on TCP Vegas. Please read the Vegas paper. We use similar
+ * variable names to the paper. Code looks quite a bit like the linux
+ * tcp_vegas.c too.
+ *
+ * Here's the differences. In CCN, an Interest is like an ACK token, it
+ * gives the network permission to send. The node issuing Interests needs
+ * to pace them to not exceed the network capacity. This is done by
+ * observing the delay of Content Objects. If the delay grows too quickly,
+ * then we back off linearly. If the delay is not much above what we expected
+ * based on the minimum observed delay, we increase linearly.
+ *
+ * During slow start, the interest window (still called "cwnd") doubles
+ * every other RTT until we exceed the slow_start_threshold or the delay
+ * increases too much.
+ *
+ * The RTT is calculated every RTT based on the observed minimum RTT during
+ * the previous period.
+ *
+ * We use RFC6298 Retransmission Timeout (RTO) calculation methods per
+ * flow control session (object basename).
+ *
+ * Just to be clear, there are two timers working. The RTO timer is for
+ * retransmitting interests if the flow as stalled out. The Vegas RTT
+ * calculation is for congestion window calculations.
+ *
+ * We we receive an out-of-order content object, we'll check the earlier
+ * segments to see if they have passed the Vegas RTT. If so, we'll
+ * re-express the interests.
+ *
+ * Each time we re-express an Interest, we might decrese the congestion
+ * window. If the last time the interest was sent was more recent than
+ * the last time we decreased the congestion window, we'll decrease the
+ * congestion window. If the last expression of the interest was before
+ * the most recent window decrease, the window is left alone. This means
+ * we'll only decreae the window once per re-expression.
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <limits.h>
+#include <sys/queue.h>
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <LongBow/runtime.h>
+
+#include <parc/algol/parc_Memory.h>
+#include <parc/algol/parc_SafeMemory.h>
+
+#include <parc/algol/parc_EventTimer.h>
+
+#include <ccnx/common/ccnx_NameSegmentNumber.h>
+#include <ccnx/common/ccnx_WireFormatMessage.h>
+
+#include <ccnx/transport/common/transport_Message.h>
+#include <ccnx/transport/transport_rta/core/rta_Framework.h>
+#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h>
+#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h>
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+#include <ccnx/transport/transport_rta/components/component_Flowcontrol.h>
+#include "vegas_private.h"
+
+#include <ccnx/transport/test_tools/traffic_tools.h>
+
+#include <ccnx/common/internal/ccnx_InterestDefault.h>
+
+#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_TlvDictionary.h>
+
+
+#define USE_MIN_BASE_RTT 0
+
+
+// initial congestion window of 2 interests
+#define FC_INIT_CWND 2
+
+// maximum cwnd (at 8KB/object, makes this 128 MB)
+#define FC_MAX_CWND 16384
+
+#define FC_MAX_SSTHRESH FC_MAX_CWND
+
+// initial RTT in msec (100 msec)
+#define FC_INIT_RTT_MSEC 100
+
+// initial RTO in msec
+#define FC_INIT_RTO_MSEC 1000
+
+#define FC_MSS 8704
+#define min(a, b) ((a < b) ? a : b)
+#define max(a, b) ((a > b) ? a : b)
+
+// ===========================================================
+struct vegas_connection_state;
+
+struct fc_window_entry {
+ bool valid;
+ ticks t;
+ ticks t_first_request;
+ segnum_t segnum;
+
+ // set to true on the first interest request for
+ // the segment, false on subsequent requests
+ // Needed for Karn's algorithm on RTT sampling for RTO
+ bool first_request;
+
+ // Content Object read
+ TransportMessage *transport_msg;
+};
+
+struct vegas_session {
+ RtaConnection *parent_connection;
+ RtaFramework *parent_framework;
+ VegasConnectionState *parent_fc;
+
+ // next sampling time
+ ticks next_rtt_sample;
+
+ // minimum observed RTT
+ int64_t base_RTT; // absolute minimum observed
+ int64_t min_RTT; // minimum RTT in current sample
+ int cnt_RTT; // number of RTTs seen in current sample
+ int64_t sum_RTT; // sum of RTTs
+ int slow_start_threshold;
+
+ // the currently observed RTT
+ ticks current_rtt;
+
+ // we do one detailed sample per RTT
+ bool sample_in_progress;
+ ticks sample_start;
+ uint64_t sample_segnum;
+ uint64_t sample_bytes_recevied;
+
+ // Only adjust the cwnd every 2 RTTs. This
+ // indicates if we should adjust the RTT at the
+ // end of this sampling period
+ int do_fc_this_rtt;
+
+ // circular buffer for segments
+ // tail - head (mod FC_MAX_CWND) is how may outstanding interests
+ // are in-flight. If the cwnd has been reduced, it could be larger
+ // than current_cwnd.
+ uint64_t starting_segnum; // segnum of the head
+ int window_head; // window index to read from
+ int window_tail; // window index to insert at
+
+ uint32_t current_cwnd;
+ ticks last_cwnd_adjust;
+
+ uint64_t final_segnum; // if we know the final block ID
+
+ struct fc_window_entry window[FC_MAX_CWND];
+
+ PARCEventTimer *tick_event;
+
+ // we will generate Interests with the same version as was received to start the session.
+ // Will also use the same lifetime settings as the original Interest.
+
+ CCNxInterestInterface *interestInterface;
+ uint32_t lifetime;
+ PARCBuffer *keyIdRestriction;
+ CCNxName *basename;
+ uint64_t name_hash;
+
+ uint64_t cnt_old_segments;
+ uint64_t cnt_fast_reexpress;
+
+ // These are for RTO calculation
+ ticks SRTT;
+ ticks RTTVAR;
+ ticks RTO;
+ ticks next_rto; // when the next timer expires
+
+ PARCLogLevel logLevel;
+};
+
+// Control parameters, measured in segments (tcp) or objects (ccn)
+static int alpha = 2;
+static int beta = 32;
+static int _gamma = 1;
+
+// ===========================================================
+
+
+
+static void vegasSession_ExpressInterests(VegasSession *session);
+static int vegasSession_ExpressInterestForEntry(VegasSession *session, struct fc_window_entry *entry);
+
+static void vegasSession_FastReexpress(VegasSession *session, struct fc_window_entry *ack_entry);
+static void vegasSession_ForwardObjectsInOrder(VegasSession *session);
+
+static int vegasSession_GetSegnumFromObject(CCNxTlvDictionary *contentObjectDictionary, uint64_t *segnum);
+static struct fc_window_entry *
+vegasSession_GetWindowEntry(VegasSession *session, TransportMessage *tm, uint64_t segnum);
+
+static void vegasSession_ReleaseWindowEntry(struct fc_window_entry *entry);
+static void vegasSession_RunAlgorithmOnReceive(VegasSession *session, struct fc_window_entry *entry);
+
+static void vegasSession_SetTimer(VegasSession *session, ticks tick_delay);
+static void vegasSession_SlowReexpress(VegasSession *session);
+
+// =======================================================================
+
+
+static struct fc_window_entry *
+vegasSession_GetWindowEntry(VegasSession *session, TransportMessage *tm, uint64_t segnum)
+{
+ int offset;
+ struct fc_window_entry *entry;
+
+ offset = ((segnum - session->starting_segnum) + session->window_head) % FC_MAX_CWND;
+ entry = &session->window[offset];
+
+ assertTrue(entry->valid, "Requesting window entry for invalid entry %p", (void *) entry);
+ assertTrue(segnum == entry->segnum, "Expected seqnum not equal to window entry, expected %" PRIu64 ", got %" PRIu64, segnum, entry->segnum);
+
+ if (entry->transport_msg != NULL) {
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__,
+ "session %p duplicate segment %" PRIu64 "", (void *) session, entry->segnum);
+ }
+
+ transportMessage_Destroy(&entry->transport_msg);
+ }
+
+ // store the content object
+ entry->transport_msg = tm;
+
+ return entry;
+}
+
+static int
+vegasSession_GetSegnumFromObject(CCNxTlvDictionary *contentObjectDictionary, uint64_t *segnum)
+{
+ CCNxName *name = ccnxContentObject_GetName(contentObjectDictionary);
+ assertNotNull(name, "Content Object has null name")
+ {
+ ccnxTlvDictionary_Display(contentObjectDictionary, 0);
+ }
+
+ bool success = trafficTools_GetObjectSegmentFromName(name, segnum);
+
+ if (success) {
+ return 0;
+ }
+ return -1;
+}
+
+static void
+vegasSession_ReduceCongestionWindow(VegasSession *session)
+{
+ if (session->current_cwnd <= session->slow_start_threshold) {
+ // 3/4 it
+ session->current_cwnd = session->current_cwnd / 2 + session->current_cwnd / 4;
+ } else {
+ // in linear mode
+ session->current_cwnd--;
+ }
+
+ if (session->current_cwnd < 2) {
+ session->current_cwnd = 2;
+ }
+
+ session->last_cwnd_adjust = rtaFramework_GetTicks(session->parent_framework);
+}
+
+static void
+vegasSession_RunAlgorithmOnReceive(VegasSession *session, struct fc_window_entry *entry)
+{
+ ticks now;
+ int64_t fc_rtt;
+
+ now = rtaFramework_GetTicks(session->parent_framework);
+
+ // perform statistics updates.
+
+ // If the codec did not include the raw message, we cannot increment the bytes counter
+ PARCBuffer *wireFormat = ccnxWireFormatMessage_GetWireFormatBuffer(transportMessage_GetDictionary(entry->transport_msg));
+
+ if (wireFormat) {
+ session->sample_bytes_recevied += parcBuffer_Remaining(wireFormat);
+ }
+
+
+ /* add +1 so never have 0 RTT */
+ fc_rtt = ((int64_t) now - (int64_t) entry->t_first_request) + 1;
+ if (fc_rtt <= 0) {
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Error)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Error, __func__,
+ "session %p sock %3d : recv segment %" PRIu64 " with negative RTT, t = %" PRIu64 "",
+ (void *) session,
+ rtaConnection_GetConnectionId(session->parent_connection),
+ entry->segnum,
+ entry->t);
+ }
+
+ return;
+ }
+
+ /* record the absolute minimum RTT ever seen */
+ if (fc_rtt < session->base_RTT) {
+ session->base_RTT = fc_rtt;
+ }
+
+ /* find the minimum RTT for the sample period */
+ session->min_RTT = min(session->min_RTT, fc_rtt);
+ session->cnt_RTT++;
+ session->sum_RTT += fc_rtt;
+
+ // calculate RTO as per RFC6298
+ if (entry->first_request) {
+ if (session->SRTT == 0) {
+ // this is the first one, so do 2.2
+ session->SRTT = fc_rtt;
+ session->RTTVAR = fc_rtt >> 1;
+ session->RTO = session->SRTT +
+ max(rtaFramework_UsecToTicks(1000000), 4 * session->RTTVAR);
+ } else {
+ // RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'|
+ // using beta = 1/4, so we want 3/4 * RTTVAR
+ int64_t abs = ((int64_t) session->SRTT - (int64_t) fc_rtt);
+
+ if (abs < 0) {
+ abs = -1 * abs;
+ }
+
+ session->RTTVAR = ((session->RTTVAR >> 1) + (session->RTTVAR >> 2)) + (abs >> 2);
+
+ // SRTT <- (1 - alpha) * SRTT + alpha * R'
+ // using alpha = 1/8 and (1-alpha) = 1/2 + 1/4 + 1/8 = 7/8
+ session->SRTT = (session->SRTT >> 1) + (session->SRTT >> 2) + (session->SRTT >> 3) + (abs >> 3);
+
+ session->RTO = session->SRTT +
+ max(rtaFramework_UsecToTicks(1000000), 4 * session->RTTVAR);
+ }
+ }
+
+ // we received a packet :) yay.
+ // we get to extend the RTO expiry
+ session->next_rto = now + session->RTO;
+}
+
+/*
+ * called inside workq_mutex lock.
+ * After we deliver each segment, we increment session->starting_segnum. After we deliver the
+ * the terminal segement of a stream, session->starting_segnum will be 1 past the final block id.
+ */
+static void
+vegasSession_ForwardObjectsInOrder(VegasSession *session)
+{
+ while (session->window_head != session->window_tail) {
+ struct fc_window_entry *entry = &session->window[ session->window_head ];
+
+ // sanity checks
+ assertTrue(entry->valid, "Window entry %p for window_head index %u", (void *) entry, session->window_head);
+ assertTrue(entry->segnum == session->starting_segnum,
+ "Expected seqnum not equal to window entry, expected %" PRIu64 ", got %" PRIu64,
+ session->starting_segnum,
+ entry->segnum);
+
+ if (entry->transport_msg != NULL) {
+ PARCEventQueue *out = rtaComponent_GetOutputQueue(session->parent_connection, FC_VEGAS, RTA_UP);
+ RtaComponentStats *stats = rtaConnection_GetStats(session->parent_connection, FC_VEGAS);
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__,
+ "session %p fd %d forward segment %" PRIu64 " up stack",
+ (void *) session,
+ rtaConnection_GetConnectionId(session->parent_connection),
+ entry->segnum);
+ }
+
+ if (rtaComponent_PutMessage(out, entry->transport_msg)) {
+ // if we successfully put the message up the stack, null
+ // the entry so the transport message will not be destroyed
+ // when this window entry is released.
+ entry->transport_msg = NULL;
+ rtaComponentStats_Increment(stats, STATS_UPCALL_OUT);
+ }
+
+ vegasSession_ReleaseWindowEntry(entry);
+ session->starting_segnum++;
+ session->window_head = (session->window_head + 1) % FC_MAX_CWND;
+ } else {
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__,
+ "session %p fd %d no message segment %" PRIu64 ", no more in order messages",
+ rtaConnection_GetConnectionId(session->parent_connection),
+ entry->segnum);
+ }
+
+ return;
+ }
+ }
+}
+
+static int
+fc_ssthresh(VegasSession *session)
+{
+ return min(session->slow_start_threshold, session->current_cwnd - 1);
+}
+
+/**
+ * Slow-start increase, double the cwnd
+ */
+static void
+fc_slow_start(VegasSession *session)
+{
+ session->last_cwnd_adjust = rtaFramework_GetTicks(session->parent_framework);
+ session->current_cwnd = session->current_cwnd << 1;
+}
+
+static
+int
+fc_in_cwnd_reduction(VegasSession *session)
+{
+ return 0;
+}
+
+/*
+ * Similar to the tcp_current_ssthresh. If cwnd > ssthresh, then
+ * increase ssthres to 1/2 to cwnd, except if we're in a cwnd reduction
+ * period.
+ */
+static inline uint32_t
+fc_current_ssthresh(VegasSession *session)
+{
+ if (fc_in_cwnd_reduction(session)) {
+ return session->slow_start_threshold;
+ } else {
+ return max(session->slow_start_threshold,
+ ((session->current_cwnd >> 1) +
+ (session->current_cwnd >> 2)));
+ }
+}
+
+static void
+vegasSession_CongestionAvoidanceDebug(VegasSession *session, ticks now)
+{
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) {
+ ticks diff = 0;
+
+ if (session->min_RTT != INT_MAX) {
+ diff = session->current_cwnd * (session->min_RTT - session->base_RTT) / session->base_RTT;
+ }
+
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__,
+ "session %p do_cong %d currentRTT %5" PRIu64 " cntRTT %3d minRTT %5" PRId64 " baseRTT %5" PRId64 " cwnd %3d next %8" PRIu64 " SRTT %" PRIu64 " RTO %" PRIu64 " oldsegs %" PRIu64 " fast %" PRIu64 " diff %" PRIu64 " allocs %u",
+ (void *) session,
+ session->do_fc_this_rtt,
+ session->current_rtt,
+ session->cnt_RTT,
+ session->min_RTT == INT_MAX ? 0 : session->min_RTT,
+ session->base_RTT == INT_MAX ? 0 : session->base_RTT,
+ session->current_cwnd,
+ session->next_rtt_sample,
+ session->SRTT,
+ session->RTO,
+ session->cnt_old_segments,
+ session->cnt_fast_reexpress,
+ diff,
+ parcMemory_Outstanding());
+ }
+}
+
+static void
+vegasSession_LossBasedAvoidance(VegasSession *session)
+{
+ session->current_rtt = session->current_rtt * 2;
+ if (session->current_rtt > 4000) {
+ session->current_rtt = 4000;
+ }
+}
+
+/**
+ * This is the Vegas algorithm
+ */
+static void
+vegasSession_TimeBasedAvoidance(VegasSession *session)
+{
+ ticks rtt, diff;
+ uint64_t target_cwnd;
+
+ rtt = session->min_RTT;
+
+ /*
+ * calculate the target cwnd in segments
+ */
+ target_cwnd = session->current_cwnd * session->base_RTT / rtt;
+
+ diff = session->current_cwnd * (rtt - session->base_RTT) / session->base_RTT;
+
+ if ((diff > _gamma && session->current_cwnd <= session->slow_start_threshold)) {
+ /* If we're in slow start and going too fast, slow down */
+ session->current_cwnd = min(session->current_cwnd, (uint32_t) target_cwnd + 1);
+ session->slow_start_threshold = fc_ssthresh(session);
+ session->last_cwnd_adjust = rtaFramework_GetTicks(session->parent_framework);
+ } else if (session->current_cwnd <= session->slow_start_threshold) {
+ /* Slow start */
+ fc_slow_start(session);
+ } else {
+ /* Congestion avoidance. */
+
+ // if (diff > beta || session->cnt_old_segments ) {
+ if (diff > beta) {
+ /* The old window was too fast, so
+ * we slow down.
+ */
+
+ session->current_cwnd--;
+ session->slow_start_threshold = fc_ssthresh(session);
+ session->last_cwnd_adjust = rtaFramework_GetTicks(session->parent_framework);
+ } else if (diff < alpha) {
+ /* room to grow */
+ session->current_cwnd++;
+ session->last_cwnd_adjust = rtaFramework_GetTicks(session->parent_framework);
+ } else {
+ /* middle ground, no changes necessary */
+ }
+ }
+
+ if (session->current_cwnd < 2) {
+ session->current_cwnd = 2;
+ } else if (session->current_cwnd > FC_MAX_CWND) {
+ session->current_cwnd = FC_MAX_CWND;
+ }
+
+ session->slow_start_threshold = fc_current_ssthresh(session);
+}
+
+static void
+vegasSession_CongestionAvoidance(VegasSession *session)
+{
+ ticks now = rtaFramework_GetTicks(session->parent_framework);
+
+ vegasSession_CongestionAvoidanceDebug(session, now);
+
+ if (session->do_fc_this_rtt) {
+ if (session->cnt_RTT <= 2) {
+ vegasSession_LossBasedAvoidance(session);
+ } else {
+ vegasSession_TimeBasedAvoidance(session);
+ }
+
+ session->do_fc_this_rtt = 0;
+ } else {
+ session->do_fc_this_rtt = 1;
+ }
+
+ // Now finish up the statistics and setup for next RTT interval
+
+ session->next_rtt_sample = now + session->current_rtt;
+
+ // low-pass filter the base_RTT from the min_RTT
+ // base_RTT = 15/16 base_RTT + 1/16 min_RTT = (240 * base_RTT + 16 * min_RTT ) / 256
+
+ if (!USE_MIN_BASE_RTT && (session->cnt_RTT > 0)) {
+ session->base_RTT = (240 * session->base_RTT + 16 * session->min_RTT) >> 8;
+ if (session->base_RTT == 0) {
+ session->base_RTT = 1;
+ }
+ }
+
+ // Smooth the RTT for (3 * current + 1 * minimum) / 4
+
+ if (session->cnt_RTT > 0) {
+ session->current_rtt = (12 * session->current_rtt + 4 * session->min_RTT) >> 4;
+ }
+
+ session->current_rtt = max(session->current_rtt, FC_INIT_RTT_MSEC);
+
+ // reset stats
+ session->sample_bytes_recevied = 0;
+ session->min_RTT = INT_MAX;
+ session->cnt_RTT = 0;
+ session->cnt_old_segments = 0;
+ session->cnt_fast_reexpress = 0;
+ session->sum_RTT = 0;
+
+ vegasSession_CongestionAvoidanceDebug(session, now);
+}
+
+/**
+ * Slow (course grain) retransmission due to RTO expiry.
+ * Re-express the first segment of the window.
+ */
+static
+void
+vegasSession_SlowReexpress(VegasSession *session)
+{
+ struct fc_window_entry *entry = &session->window[ session->window_head ];
+
+ assertTrue(entry->valid, "entry %p segnum %" PRIu64 " invalid state, in window but not valid",
+ (void *) entry, entry->segnum);
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__,
+ "Session %p conn %p RTO re-expression for segnum %" PRIu64 "",
+ (void *) session, (void *) session->parent_connection, entry->segnum);
+ }
+
+ entry->first_request = false;
+ vegasSession_ExpressInterestForEntry(session, entry);
+}
+
+/**
+ * Do fast retransmissions based on SRTT smoothed estimate.
+ * ack_entry is the entry for a content object we just received. Look earlier segments
+ * and if they were asked for more than SRTT ago, ask again.
+ */
+static void
+vegasSession_FastReexpress(VegasSession *session, struct fc_window_entry *ack_entry)
+{
+ ticks now = rtaFramework_GetTicks(session->parent_framework);
+ int64_t delta;
+ uint64_t segnum;
+ uint64_t top_segnum;
+
+ // This method is called after forward_in_order, so it's possible that
+ // ack_entry is no longer valid, meaning we've moved the window past it.
+ // In that case, we're done.
+ if (ack_entry->valid == false) {
+ return;
+ }
+
+ // we don't retransmit beyond the current cwnd. ack_entry might be outside
+ // the cwnd.
+
+ top_segnum = min(ack_entry->segnum, session->starting_segnum + session->current_cwnd);
+
+ for (segnum = session->starting_segnum; segnum < top_segnum; segnum++) {
+ int index = (session->window_head + (segnum - session->starting_segnum)) % FC_MAX_CWND;
+ delta = (int64_t) now - ((int64_t) session->window[index].t + (int64_t) session->SRTT);
+
+ // allow up to -1 slack, because the RunAlgorithm adds +1 to fc_rtt.
+ if (delta >= -1) {
+ // we have past the SRTT timeout
+
+ // if we last re-transmitted him since the last cwnd adjustment, adjust again
+ if ((int64_t) session->window[index].t - (int64_t) session->last_cwnd_adjust >= 0) {
+ vegasSession_ReduceCongestionWindow(session);
+ }
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__,
+ "session %p conn %p RTO re-expression for segnum %" PRIu64 "",
+ (void *) session, (void *) session->parent_connection, session->window[index].segnum);
+ }
+
+ session->window[index].first_request = false;
+ session->cnt_fast_reexpress++;
+ vegasSession_ExpressInterestForEntry(session, &session->window[index]);
+ }
+ }
+}
+
+/**
+ * Generates an Interest message for the window entry.
+ *
+ * No side effects, apart from putting on Interest on the down queue.
+ * If the down direction is blocked, this function will not put an interest in the down queue. It will
+ * look like a lost interest to the flow controller, which should cause the flow controller to slow down.
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static int
+vegasSession_ExpressInterestForEntry(VegasSession *session, struct fc_window_entry *entry)
+{
+ if (!rtaConnection_BlockedDown(session->parent_connection)) {
+ ticks now = rtaFramework_GetTicks(session->parent_framework);
+ PARCEventQueue *q_out;
+ TransportMessage *tm_out;
+ CCNxName *chunk_name;
+
+ entry->t = now;
+
+ chunk_name = ccnxName_Copy(session->basename);
+
+ CCNxNameSegment *segment = ccnxNameSegmentNumber_Create(CCNxNameLabelType_CHUNK, entry->segnum);
+ ccnxName_Append(chunk_name, segment);
+ ccnxNameSegment_Release(&segment);
+
+ assertNotNull(session->interestInterface, "Got a NULL interestInterface. Should not happen.");
+
+ CCNxTlvDictionary *interestDictionary =
+ session->interestInterface->create(chunk_name,
+ session->lifetime,
+ NULL, // ppkid
+ NULL, // content object hash
+ CCNxInterestDefault_HopLimit);
+
+ if (session->keyIdRestriction != NULL) {
+ session->interestInterface->setKeyIdRestriction(interestDictionary, session->keyIdRestriction);
+ }
+
+ tm_out = transportMessage_CreateFromDictionary(interestDictionary);
+ transportMessage_SetInfo(tm_out, rtaConnection_Copy(session->parent_connection), rtaConnection_FreeFunc);
+
+ q_out = rtaComponent_GetOutputQueue(session->parent_connection, FC_VEGAS, RTA_DOWN);
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) {
+ char *string = ccnxName_ToString(chunk_name);
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__,
+ "session %p entry %p segname %p segnum %" PRIu64 " %s sent",
+ (void *) session,
+ (void *) entry,
+ (void *) chunk_name,
+ entry->segnum,
+ string);
+ parcMemory_Deallocate((void **) &string);
+ }
+
+ ccnxTlvDictionary_Release(&interestDictionary);
+ ccnxName_Release(&chunk_name);
+
+ if (rtaComponent_PutMessage(q_out, tm_out)) {
+ rtaComponentStats_Increment(rtaConnection_GetStats(session->parent_connection, FC_VEGAS),
+ STATS_DOWNCALL_OUT);
+ }
+ } else {
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) {
+ CCNxName *segment_name = ccnxName_Copy(session->basename);
+ ccnxName_Append(segment_name, ccnxNameSegmentNumber_Create(CCNxNameLabelType_CHUNK, entry->segnum));
+ char *string = ccnxName_ToString(segment_name);
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__,
+ "session %p entry %p segname %p segnum %" PRIu64 " %s SUPPRESSED BLOCKED DOWN QUEUE",
+ (void *) session,
+ (void *) entry,
+ (void *) segment_name,
+ entry->segnum,
+ string);
+ parcMemory_Deallocate((void **) &string);
+ ccnxName_Release(&segment_name);
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Express interests out to the max allowed by the cwnd. This function will operate
+ * even if the down queue is blocked. Those interests will be treated as lost, which will cause
+ * the flow controller to slow down.
+ */
+static void
+vegasSession_ExpressInterests(VegasSession *session)
+{
+ ticks now = rtaFramework_GetTicks(session->parent_framework);
+
+ // how many interests are currently outstanding?
+ int wsize = session->window_tail - session->window_head;
+ if (wsize < 0) {
+ wsize += FC_MAX_CWND;
+ }
+
+ // if we know the FBID, don't ask for anything beyond that
+ while (wsize < session->current_cwnd && (wsize + session->starting_segnum <= session->final_segnum)) {
+ // expreess them
+ struct fc_window_entry *entry = &session->window[session->window_tail];
+
+ assertFalse(entry->valid,
+ "Window entry %d marked as valid, but its outside the cwind!",
+ session->window_tail);
+
+ session->window_tail = (session->window_tail + 1) % FC_MAX_CWND;
+
+ memset(entry, 0, sizeof(struct fc_window_entry));
+
+ entry->valid = true;
+ entry->segnum = session->starting_segnum + wsize;
+ entry->first_request = true;
+ entry->t_first_request = now;
+
+ if (session->sample_in_progress == 0) {
+ // make this interest the sample for the RTT
+ session->sample_in_progress = true;
+ session->sample_segnum = entry->segnum;
+ session->sample_start = now;
+ session->sample_bytes_recevied = 0;
+ }
+
+ vegasSession_ExpressInterestForEntry(session, entry);
+
+ wsize++;
+ }
+}
+
+/*
+ * This is dispatched from the event loop, so its a loosely accurate time
+ */
+static void
+vegasSession_TimerCallback(int fd, PARCEventType what, void *user_data)
+{
+ VegasSession *session = (VegasSession *) user_data;
+ int64_t delta;
+ ticks now;
+
+ assertTrue(what & PARCEventType_Timeout, "%s got unknown signal %d", __func__, what);
+
+ now = rtaFramework_GetTicks(session->parent_framework);
+ delta = ((int64_t) now - (int64_t) session->next_rtt_sample);
+
+ if (delta >= 0) {
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__,
+ "Session %p processing timer, delta %" PRId64,
+ (void *) session, delta);
+ }
+
+ // This entry is ready for processing
+ vegasSession_CongestionAvoidance(session);
+
+ // set the next timer
+ vegasSession_SetTimer(session, session->current_rtt);
+ } else {
+ vegasSession_SetTimer(session, -1 * delta);
+ }
+
+ // check for retransmission
+ delta = ((int64_t) now - (int64_t) session->next_rto);
+ if (delta >= 0) {
+ // Do this once per RTO
+ vegasSession_SlowReexpress(session);
+
+ // we're now in a doubling regeme. Reset the
+ // moving average and double the RTO.
+ session->SRTT = 0;
+ session->RTTVAR = 0;
+ session->RTO = session->RTO * 2;
+ session->next_rto = now + session->RTO;
+ }
+}
+
+/**
+ * precondition: the entry is valid
+ */
+static void
+vegasSession_ReleaseWindowEntry(struct fc_window_entry *entry)
+{
+ assertTrue(entry->valid, "Called on invalid window entry");
+ if (!entry->valid) {
+ return;
+ }
+
+ if (entry->transport_msg != NULL) {
+ transportMessage_Destroy(&entry->transport_msg);
+ }
+ entry->valid = false;
+}
+
+static void
+vegasSession_SetTimer(VegasSession *session, ticks tick_delay)
+{
+ struct timeval timeout;
+ uint64_t usec = rtaFramework_TicksToUsec(tick_delay);
+ const unsigned usec_per_sec = 1000000;
+
+ timeout.tv_sec = usec / usec_per_sec;
+ timeout.tv_usec = (int) (usec - timeout.tv_sec * usec_per_sec);
+
+ // this replaces any prior events
+ parcEventTimer_Start(session->tick_event, &timeout);
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__,
+ "session %p tick_delay %" PRIu64 " timeout %.6f",
+ (void *) session,
+ tick_delay,
+ timeout.tv_sec + 1E-6 * timeout.tv_usec);
+ }
+}
+
+// =============================================
+// Private API
+
+/**
+ * Unsets the final segment number indicating we do not know the value
+ *
+ * Sets the final segment number to the maximum possible value, which effectively
+ * lets us run off to infinity.
+ *
+ * @param [in] session An allocated vegas session
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void
+_vegasSession_UnsetFinalSegnum(VegasSession *session)
+{
+ session->final_segnum = ULLONG_MAX;
+}
+
+VegasSession *
+vegasSession_Create(VegasConnectionState *fc, RtaConnection *conn, CCNxName *basename, segnum_t begin,
+ CCNxInterestInterface *interestInterface, uint32_t lifetime, PARCBuffer *keyIdRestriction)
+{
+ assertNotNull(conn, "Called with null connection");
+ assertNotNull(basename,
+ "conn %p connid %u called with null basename",
+ (void *) conn,
+ rtaConnection_GetConnectionId(conn));
+
+ if (conn == NULL || basename == NULL) {
+ return NULL;
+ }
+
+ VegasSession *session = parcMemory_AllocateAndClear(sizeof(VegasSession));
+ assertNotNull(session, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(VegasSession));
+ session->parent_connection = conn;
+ session->parent_framework = rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn));
+ session->interestInterface = interestInterface;
+ session->lifetime = lifetime;
+ session->basename = basename;
+ if (keyIdRestriction != NULL) {
+ session->keyIdRestriction = parcBuffer_Acquire(keyIdRestriction);
+ }
+ session->parent_fc = fc;
+
+ session->tick_event = parcEventTimer_Create(rtaFramework_GetEventScheduler(session->parent_framework), 0, vegasSession_TimerCallback, (void *) session);
+
+ session->starting_segnum = 0;
+ session->current_cwnd = FC_INIT_CWND;
+ session->min_RTT = INT_MAX;
+ session->base_RTT = INT_MAX;
+ session->do_fc_this_rtt = 0;
+ session->current_rtt = rtaFramework_UsecToTicks(FC_INIT_RTT_MSEC * 1000);
+ session->slow_start_threshold = FC_MAX_SSTHRESH;
+
+ session->SRTT = 0;
+ session->RTTVAR = 0;
+ session->RTO = rtaFramework_UsecToTicks(FC_INIT_RTO_MSEC * 1000);
+ session->next_rto = ULLONG_MAX;
+ session->cnt_old_segments = 0;
+ session->cnt_fast_reexpress = 0;
+
+ _vegasSession_UnsetFinalSegnum(session);
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Notice)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Notice, __func__,
+ "session %p initialized connid %u ",
+ (void *) session,
+ rtaConnection_GetConnectionId(conn));
+ }
+ return session;
+}
+
+static void
+vegasSession_Close(VegasSession *session)
+{
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Notice)) {
+ char *p = ccnxName_ToString(session->basename);
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Notice, __func__,
+ "session %p close starting segnum %" PRIu64 " final chunk ID %" PRIu64 " for name %s",
+ (void *) session, session->starting_segnum, session->final_segnum, p);
+ parcMemory_Deallocate((void **) &p);
+ }
+
+ ccnxName_Release(&session->basename);
+
+ while (session->window_head != session->window_tail) {
+ struct fc_window_entry *entry = &session->window[ session->window_head ];
+
+ // sanity checks
+ assertTrue(entry->valid, "connid %u session %p entry %d in window but not valid",
+ rtaConnection_GetConnectionId(session->parent_connection),
+ (void *) session,
+ session->window_head);
+
+ if (entry->valid) {
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) {
+ char *p = ccnxName_ToString(session->basename);
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__,
+ "session %p releasing window entry %d", (void *) session, session->window_head);
+ parcMemory_Deallocate((void **) &p);
+ }
+
+ vegasSession_ReleaseWindowEntry(entry);
+ }
+
+ session->window_head = (session->window_head + 1) % FC_MAX_CWND;
+ }
+}
+
+void
+vegasSession_Destroy(VegasSession **sessionPtr)
+{
+ VegasSession *session;
+
+ assertNotNull(sessionPtr, "Called with null double pointer");
+ session = *sessionPtr;
+
+ if (session->keyIdRestriction != NULL) {
+ parcBuffer_Release(&session->keyIdRestriction);
+ }
+
+ vegasSession_Close(session);
+
+ parcEventTimer_Destroy(&(session->tick_event));
+ parcMemory_Deallocate((void **) &session);
+ sessionPtr = NULL;
+}
+
+int
+vegasSession_Start(VegasSession *session)
+{
+ ticks now = rtaFramework_GetTicks(session->parent_framework);
+
+ // express the initial interests
+ vegasSession_ExpressInterests(session);
+
+ session->next_rtt_sample = now - 1;
+ session->next_rto = now + session->RTO;
+
+ // put it on the work queue for procesing
+
+ vegasSession_SetTimer(session, session->current_rtt);
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__,
+ "Session %p start", (void *) session);
+ }
+
+ return 0;
+}
+
+int
+vegasSession_Pause(VegasSession *session)
+{
+ trapNotImplemented("vegasSession_Pause");
+}
+
+int
+vegasSession_Resume(VegasSession *session)
+{
+ trapNotImplemented("vegasSession_Resume");
+}
+
+int
+vegasSession_Seek(VegasSession *session, segnum_t absolutePosition)
+{
+ trapNotImplemented("vegasSession_See)");
+}
+
+/**
+ * Retrieves the final block ID from the content object
+ *
+ * Retreives the final block ID from the object, if it exists, and returns it in
+ * an output parameter. Returns true if found and returned, false otherwise.
+ *
+ * @param [in] obj The Content Object to get the FBID form
+ * @param [out] output Pointer to the seqnum ouptut
+ *
+ * @return true If the content object contained a FBID and the output set
+ * @return false If there is no FBID in the content object
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static bool
+vegasSession_GetFinalBlockIdFromContentObject(CCNxTlvDictionary *obj, uint64_t *output)
+{
+ bool result = false;
+ if (ccnxContentObject_HasFinalChunkNumber(obj)) {
+ *output = ccnxContentObject_GetFinalChunkNumber(obj);
+ result = true;
+ }
+ return result;
+}
+
+/**
+ * Sets the final block id in the session based on the signed info
+ *
+ * If the final block id exists in the signed info, set the session's FBID.
+ *
+ * Rules on FinalChunkNumber:
+ *
+ * 1) The “final chunk” of a stream is identified by a content object having a FinalChunkNumber
+ * set in its metadata that equals the chunk number in its name.
+ *
+ * 2) An application may set the FinalChunkNumber early to let a receiver know when the end is coming. These early advisories are not binding.
+ *
+ * 3) If the application has ever set the FinalChunkNumber it may not decrease it. If the actual end happens before a previous advisory,
+ * the application must publish no-payload content objects such that Rule #1 is satisfied
+ *
+ *
+ * @param [in,out] session The Vegas session
+ * @param [in] obj The signed content object to get the FBID from
+ * @param [in] nameChunkNumber is the chunk number in the name
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void
+vegasSession_SetFinalBlockId(VegasSession *session, CCNxTlvDictionary *contentObjectDictionary, uint64_t nameChunkNumber)
+{
+ // Get the FinalChunkNumber out of the metadata and update our notion of it
+ uint64_t finalChunkNumber;
+ if (vegasSession_GetFinalBlockIdFromContentObject(contentObjectDictionary, &finalChunkNumber)) {
+ session->final_segnum = finalChunkNumber;
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__,
+ "Session %p finalChunkNumber %" PRIu64, (void *) session, session->final_segnum);
+ }
+ } else {
+ // There is no final chunk number in the metadata. If the nameChunkNumber == session->final_seqnum, then
+ // our idea of the final_seqnum is wrong and we should unset it as the producer did not actually close
+ // the stream when they said they would
+
+ if (session->final_segnum == nameChunkNumber) {
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Warning)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Warning, __func__,
+ "Session %p finalChunkNumber %" PRIu64 " not set in final chunk, resetting",
+ (void *) session, session->final_segnum);
+ }
+
+ _vegasSession_UnsetFinalSegnum(session);
+ }
+ }
+}
+
+/**
+ * We received a duplicate segment from before the start of the current congestion window
+ *
+ *
+ * If we receive a segment from before the start of the current congestion window, then it
+ * must be a duplicate (we don't have skip forward implemented). Reduce the congestion window size.
+ * We only reduce the window once per RTT interval no matter how many early duplicates we get.
+ *
+ * @param [in,out] session The Vegas session to reduce the window of.
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+static void
+vegasSession_ReceivedBeforeWindowStart(VegasSession *session, uint64_t segnum)
+{
+ // once per cwnd, reduce the window on out-of-order
+ if (session->cnt_old_segments == 0) {
+ vegasSession_ReduceCongestionWindow(session);
+ }
+
+ session->cnt_old_segments++;
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__,
+ "Session %p connid %3u : recv old segment %" PRIu64 ", starting is %" PRIu64 ", cnt %" PRIu64 "",
+ (void *) session,
+ rtaConnection_GetConnectionId(session->parent_connection),
+ segnum,
+ session->starting_segnum,
+ session->cnt_old_segments);
+ }
+}
+
+static void
+vegasSession_SendMoreInterests(VegasSession *session, struct fc_window_entry *entry)
+{
+ // This will check if there's any earlier segments whose
+ // RTT has expired and will re-ask for them. This is the
+ // out-of-order fast retransmit.
+ vegasSession_FastReexpress(session, entry);
+
+ // have we finished?
+ if (session->starting_segnum < session->final_segnum) {
+ // express more interests if we have the window for it
+ vegasSession_ExpressInterests(session);
+ } else
+ if (session->starting_segnum > session->final_segnum) {
+ // if starting_segment > final_segnum it means that we have delivered the last
+ // segment up the stack.
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__,
+ "Session %p connid %u starting_segnum %" PRIu64 ", final_segnum %" PRIu64 ", FINAL SEGMENT DELIVERED, CLOSING",
+ (void *) session,
+ rtaConnection_GetConnectionId(session->parent_connection),
+ session->starting_segnum,
+ session->final_segnum);
+ }
+
+ parcEventTimer_Stop(session->tick_event);
+ vegas_EndSession(session->parent_fc, session);
+ }
+ // else session->starting_segnum == session->final_segnum, we're not done yet.
+}
+
+static CCNxName *
+vegasSession_GetNameFromTransportMessage(TransportMessage *tm)
+{
+ CCNxName *name = NULL;
+ CCNxTlvDictionary *dictionary = transportMessage_GetDictionary(tm);
+ switch (ccnxTlvDictionary_GetSchemaVersion(dictionary)) {
+ case CCNxTlvDictionary_SchemaVersion_V1:
+ name = ccnxTlvDictionary_GetName(dictionary, CCNxCodecSchemaV1TlvDictionary_MessageFastArray_NAME);
+ break;
+
+ default:
+ break;
+ }
+ return name;
+}
+
+
+
+int
+vegasSession_ReceiveContentObject(VegasSession *session, TransportMessage *tm)
+{
+ assertTrue(transportMessage_IsContentObject(tm),
+ "Transport message is not a content object");
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) {
+ CCNxName *name = vegasSession_GetNameFromTransportMessage(tm);
+ char *nameString = NULL;
+ if (name) {
+ nameString = ccnxName_ToString(name);
+ }
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__,
+ "Session %p connid %3u receive tm %p: %s",
+ (void *) session,
+ rtaConnection_GetConnectionId(session->parent_connection),
+ (void *) tm,
+ nameString);
+ if (nameString) {
+ parcMemory_Deallocate((void **) &nameString);
+ }
+ }
+
+ CCNxTlvDictionary *contentObjectDictionary = transportMessage_GetDictionary(tm);
+
+ // get segment number
+ uint64_t segnum;
+ int res = vegasSession_GetSegnumFromObject(contentObjectDictionary, &segnum);
+ if (res != 0) {
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Warning)) {
+ CCNxName *name = vegasSession_GetNameFromTransportMessage(tm);
+ char *nameString = NULL;
+ if (name) {
+ nameString = ccnxName_ToString(name);
+ }
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Warning, __func__,
+ "Session %p connid %3u receive tm %p has no segment number: %s",
+ (void *) session,
+ rtaConnection_GetConnectionId(session->parent_connection),
+ (void *) tm,
+ nameString);
+ if (nameString) {
+ parcMemory_Deallocate((void **) &nameString);
+ }
+ }
+
+ // couldn't figure it out
+ transportMessage_Destroy(&tm);
+ return -1;
+ }
+
+ // drop out of order
+ if (segnum < session->starting_segnum) {
+ vegasSession_ReceivedBeforeWindowStart(session, segnum);
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) {
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__,
+ "Session %p connid %3u : tm %p received segnum %" PRIu64 " before current head %" PRIu64 "",
+ (void *) session,
+ __func__,
+ rtaConnection_GetConnectionId(session->parent_connection),
+ (void *) tm,
+ segnum,
+ session->starting_segnum);
+ }
+
+ transportMessage_Destroy(&tm);
+ return -1;
+ }
+
+ // Update our idea of the final chunk number. This must be done
+ // before running the algorithm because session->final_segnum is used
+ // to decide if we're done.
+ vegasSession_SetFinalBlockId(session, contentObjectDictionary, segnum);
+
+
+ // now run the algorithm on the received object
+
+ struct fc_window_entry *entry = vegasSession_GetWindowEntry(session, tm, segnum);
+
+ if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) {
+ CCNxName *name = vegasSession_GetNameFromTransportMessage(tm);
+ char *nameString = NULL;
+ if (name) {
+ nameString = ccnxName_ToString(name);
+ }
+ rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__,
+ "Session %p connid %3u receive tm %p segment %" PRIu64 " receive: %s",
+ (void *) session,
+ rtaConnection_GetConnectionId(session->parent_connection),
+ (void *) tm,
+ segnum,
+ nameString);
+ if (nameString) {
+ parcMemory_Deallocate((void **) &nameString);
+ }
+ }
+
+ vegasSession_RunAlgorithmOnReceive(session, entry);
+
+ // forward in-order objects to the user fc
+ if (!rtaConnection_BlockedUp(session->parent_connection)) {
+ vegasSession_ForwardObjectsInOrder(session);
+ }
+
+ vegasSession_SendMoreInterests(session, entry);
+
+ return 0;
+}
+
+unsigned
+vegasSession_GetConnectionId(VegasSession *session)
+{
+ assertNotNull(session, "Parameter session must be non-null");
+ return rtaConnection_GetConnectionId(session->parent_connection);
+}
+
+void
+vegasSession_StateChanged(VegasSession *session)
+{
+ if (rtaConnection_BlockedUp(session->parent_connection)) {
+ // if we're blocked in the up direction, don't do anything. We make this
+ // check every time we're about ti send stuff up the stack in vegasSession_ReceiveContentObject().
+ } else {
+ // unblocked, forward packets
+ vegasSession_ForwardObjectsInOrder(session);
+ }
+
+ if (rtaConnection_BlockedDown(session->parent_connection)) {
+ // stop generating interests
+ } else {
+ // restart interests
+ }
+}
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_private.h b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_private.h
new file mode 100644
index 00000000..b2088567
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_private.h
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file vegas_private.h
+ * @brief <#Brief Description#>
+ *
+ * <#Detailed Description#>
+ *
+ */
+#ifndef Libccnx_vegas_private_h
+#define Libccnx_vegas_private_h
+
+#include <ccnx/common/ccnx_Name.h>
+#include <ccnx/common/internal/ccnx_ContentObjectInterface.h>
+
+typedef uint64_t segnum_t;
+
+struct vegas_session;
+typedef struct vegas_session VegasSession;
+
+struct vegas_connection_state;
+typedef struct vegas_connection_state VegasConnectionState;
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [in] fc An allocated Vegas flow controller
+ * @param [in] conn The RTA connection owning the flow
+ * @param [in] basename The name without a chunk number
+ * @param [in] begin The chunk number to begin requesting at
+ * @param [in] interestInterface The {@link CCNxInterestInterface} to use to generate new Interests
+ * @param [in] lifetime The default lifetime, in milli-seconds, to use for generated Interests
+ * @param [in] keyIdRestriction The KeyIdRestriction, if any, from the originating Interest
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+VegasSession *vegasSession_Create(VegasConnectionState *fc, RtaConnection *conn, CCNxName *basename,
+ segnum_t begin, CCNxInterestInterface *interestInterface, uint32_t lifetime,
+ PARCBuffer *keyIdRestriction);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+void vegasSession_Destroy(VegasSession **sessionPtr);
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+int vegasSession_Start(VegasSession *session);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+int vegasSession_Pause(VegasSession *session);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+int vegasSession_Resume(VegasSession *session);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+int vegasSession_Seek(VegasSession *session, segnum_t absolutePosition);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+int vegasSession_ReceiveContentObject(VegasSession *session, TransportMessage *tm);
+
+
+/**
+ * Tell a session that there was a state change in its connection
+ *
+ * The caller should ensure that the session's connection is the right one by
+ * using {@link vegasSession_GetConnectionId}.
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+void vegasSession_StateChanged(VegasSession *session);
+
+/**
+ * Returns the connection id used by the session
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+unsigned vegasSession_GetConnectionId(VegasSession *session);
+
+
+/**
+ * <#One Line Description#>
+ *
+ * Called by a session when it is done
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+void vegas_EndSession(VegasConnectionState *fc, VegasSession *session);
+#endif // Libccnx_vegas_private_h