diff options
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas')
5 files changed, 3642 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c new file mode 100644 index 00000000..70bcec63 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/component_Vegas.c @@ -0,0 +1,673 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Source code layout: +// - component_Vegas.c: the component wrapper and session multiplexing +// - vegas_Session.c: code for a specific basename session +// - vegas_Segment.c: code for specific segment operations + +/** + * Component behavior + * =================== + * This component provides flow-controlled in-order delivery of segmented + * content objects using a sequential segment number in the last component + * of the object name. + * + * The state machine described here is within a single RtaConnection. Separate + * connections are independent. + * + * Down Stack Behavior + * ------------------------ + * When an interest comes down the stack, it will initiate a flow-controlled + * session. If the last component of the interest name is a segment number, + * that is the starting segment number. Otherwise, we assume the interest + * name is the base name for a segmented object, including the version number. + * + * Other types of messages coming down the stack (e.g. control or content objects) + * are passed down the stack unaltered. + * + * If an interest comes down that represents a subset of an existing flow (i.e. + * it has a segment number beyond the current starting segment of the flow contol + * window), the window is advanced to that segment number and any un-delivered + * content objects are dropped. + * + * If an interest comes down that represents a superset of an existing flow + * (i.e. it has a starting segment number less than the current window), the + * current flow control sessions is re-wound to the lower sequence number + * and continues from there. + * + * Up Stack Behavior + * ------------------------ + * Non-content objects (e.g. control and interests) are passed up the stack unmodified. + * + * A content object that matches a flow control session is managed by the session. + * They are only passed up the stack in-order, and will be dropped if they are outside + * the window. + * + * A content object that does not match a flow control session is dropped. That's because + * the only interests we send down the stack are our own for flow controlled sessions, so + * no content object should go up the stack unless its part of a flow controlled session. + * + * Control Messages + * ------------------------ + * The API may cancel flow control sessions in several ways: + * + * 1) Close the Connection. This will cancel all in progress sessions and drop + * any un-delivered objects. + * + * 2) Send a Control message down the stack with the base name to cancel. The + * name is considered the base name of the flow and does not depend on the + * starting segment number. + * + * { "CPI_CANCEL_FLOW" : { "FLOW_NAME" : <base name w/o segment number> } } + * + * Implementation Notes + * ========================= + * For each RtaConnection, there's a {@code struct fc_connection_state}. This + * contains a list of in-progress sessions indexed by the hash of the base name + * (name up to but not including final segment). Right now, it's a linked list + * but should be implemented as a hash table. + * + * Each session is represented by a {@code struct fc_session}. + * + * Each entry in the flow control window is a {@code fc_window_entry}. + * + * session->window_head and session->window_tail define the limits of the + * congestion window. Everything in the interval [head, tail) is expressed + * as an interest. The size of that interval may be larger than the + * congestion window cwnd if we're decreaed the window. We never decrease + * tail, only the cwnd. + * + * + * Flow Control Algorithm + * ========================= + * Based on TCP Vegas. Please read the Vegas paper. We use similar + * variable names to the paper. Code looks quite a bit like the linux + * tcp_vegas.c too. + * + * Here's the differences. In CCN, an Interest is like an ACK token, it + * gives the network permission to send. The node issuing Interests needs + * to pace them to not exceed the network capacity. This is done by + * observing the delay of Content Objects. If the delay grows too quickly, + * then we back off linearly. If the delay is not much above what we expected + * based on the minimum observed delay, we increase linearly. + * + * During slow start, the interest window (still called "cwnd") doubles + * every other RTT until we exceed the slow_start_threshold or the delay + * increases too much. + * + * The RTT is calculated every RTT based on the observed minimum RTT during + * the previous period. + * + * We use RFC6298 Retransmission Timeout (RTO) calculation methods per + * flow control session (object basename). + * + * Just to be clear, there are two timers working. The RTO timer is for + * retransmitting interests if the flow as stalled out. The Vegas RTT + * calculation is for congestion window calculations. + * + * We we receive an out-of-order content object, we'll check the earlier + * segments to see if they have passed the Vegas RTT. If so, we'll + * re-express the interests. + * + * Each time we re-express an Interest, we might decrese the congestion + * window. If the last time the interest was sent was more recent than + * the last time we decreased the congestion window, we'll decrease the + * congestion window. If the last expression of the interest was before + * the most recent window decrease, the window is left alone. This means + * we'll only decreae the window once per re-expression. + */ +#include <config.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/socket.h> +#include <limits.h> +#include <sys/queue.h> +#include <stdbool.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <LongBow/runtime.h> + +#include <parc/algol/parc_Memory.h> + +#include <parc/algol/parc_EventQueue.h> + +#include <ccnx/transport/common/transport_Message.h> +#include <ccnx/transport/transport_rta/core/rta_Framework.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/core/rta_Component.h> +#include <ccnx/transport/transport_rta/components/component_Flowcontrol.h> +#include <ccnx/transport/test_tools/traffic_tools.h> + +#include <ccnx/api/control/controlPlaneInterface.h> +#include <ccnx/api/control/cpi_ControlFacade.h> + +#include "vegas_private.h" + +#include <parc/logging/parc_LogLevel.h> + +#ifndef DEBUG_OUTPUT +#define DEBUG_OUTPUT 0 +#endif + +// =========================================================== + +typedef struct fc_session_holder { + uint64_t basename_hash; + CCNxName *basename; + VegasSession *session; + + // used by fc_connection_state to hold these + // Should change to hashtable on the hash + TAILQ_ENTRY(fc_session_holder) list; +} FcSessionHolder; + +/** + * This is the per-connection state. It allows us to have multiple + * flow control session on one connection for different names + */ +struct vegas_connection_state { + RtaConnection *parent_connection; + RtaFramework *parent_framework; + + TAILQ_HEAD(, fc_session_holder) sessions_head; +}; + + +// =========================================================== + +static int component_Fc_Vegas_Init(RtaProtocolStack *stack); +static int component_Fc_Vegas_Opener(RtaConnection *conn); +static void component_Fc_Vegas_Upcall_Read(PARCEventQueue *, PARCEventType event, void *conn); +static void component_Fc_Vegas_Downcall_Read(PARCEventQueue *, PARCEventType event, void *conn); +static int component_Fc_Vegas_Closer(RtaConnection *conn); +static int component_Fc_Vegas_Release(RtaProtocolStack *stack); +static void component_Fc_Vegas_StateChange(RtaConnection *conn); + +// Function structs for component variations +RtaComponentOperations flow_vegas_ops = { + .init = component_Fc_Vegas_Init, + .open = component_Fc_Vegas_Opener, + .upcallRead = component_Fc_Vegas_Upcall_Read, + .upcallEvent = NULL, + .downcallRead = component_Fc_Vegas_Downcall_Read, + .downcallEvent = NULL, + .close = component_Fc_Vegas_Closer, + .release = component_Fc_Vegas_Release, + .stateChange = component_Fc_Vegas_StateChange +}; + + +// ====== +// Session related functions +static int vegas_HandleInterest(RtaConnection *conn, TransportMessage *tm); +static FcSessionHolder *vegas_LookupSession(VegasConnectionState *fc, TransportMessage *tm); +static FcSessionHolder *vegas_LookupSessionByName(VegasConnectionState *fc, CCNxName *name); + +static FcSessionHolder *vegas_CreateSessionHolder(VegasConnectionState *fc, RtaConnection *conn, + CCNxName *basename, uint64_t name_hash); + +static bool vegas_HandleControl(RtaConnection *conn, CCNxTlvDictionary *controlDictionary, PARCEventQueue *outputQueue); + +// ================================================ + +static int +component_Fc_Vegas_Init(RtaProtocolStack *stack) +{ + // we don't do any stack-wide initialization + return 0; +} + +static int +component_Fc_Vegas_Opener(RtaConnection *conn) +{ + struct vegas_connection_state *fcConnState = parcMemory_AllocateAndClear(sizeof(struct vegas_connection_state)); + assertNotNull(fcConnState, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(struct vegas_connection_state)); + + fcConnState->parent_connection = rtaConnection_Copy(conn); + fcConnState->parent_framework = rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn)); + + TAILQ_INIT(&fcConnState->sessions_head); + + rtaConnection_SetPrivateData(conn, FC_VEGAS, fcConnState); + rtaComponentStats_Increment(rtaConnection_GetStats(conn, FC_VEGAS), STATS_OPENS); + + return 0; +} + +/* + * Read from below. + * These should only be content objects associated with our stream. + * + * Non-content objects are passed up the stack. + */ +static void +component_Fc_Vegas_Upcall_Read(PARCEventQueue *in, PARCEventType event, void *stack_ptr) +{ + TransportMessage *tm; + + while ((tm = rtaComponent_GetMessage(in)) != NULL) { + struct timeval delay = transportMessage_GetDelay(tm); + + RtaConnection *conn = rtaConnection_GetFromTransport(tm); + RtaComponentStats *stats = rtaConnection_GetStats(conn, FC_VEGAS); + + rtaComponentStats_Increment(stats, STATS_UPCALL_IN); + + if (transportMessage_IsControl(tm)) { + PARCEventQueue *out = rtaComponent_GetOutputQueue(conn, FC_VEGAS, RTA_UP); + + if (rtaComponent_PutMessage(out, tm)) { + rtaComponentStats_Increment(stats, STATS_UPCALL_OUT); + } else { + //TODO + } + } else if (transportMessage_IsContentObject(tm)) { + // this takes ownership of the transport message + VegasConnectionState *fc = rtaConnection_GetPrivateData(conn, FC_VEGAS); + FcSessionHolder *holder = vegas_LookupSession(fc, tm); + + // it's quite possible that we get content objects for sessions that + // no longer exist. They are dropped. + if (holder != NULL) { + vegasSession_ReceiveContentObject(holder->session, tm); + } else { + transportMessage_Destroy(&tm); + } + } else { + PARCEventQueue *out = rtaComponent_GetOutputQueue(conn, FC_VEGAS, RTA_UP); + if (rtaComponent_PutMessage(out, tm)) { + rtaComponentStats_Increment(stats, STATS_UPCALL_OUT); + } else { + //TODO + } + } + + if (DEBUG_OUTPUT) { + printf("%s total upcall reads in %" PRIu64 " out %" PRIu64 " last delay %.6f\n", + __func__, + rtaComponentStats_Get(stats, STATS_UPCALL_IN), + rtaComponentStats_Get(stats, STATS_UPCALL_OUT), + delay.tv_sec + delay.tv_usec * 1E-6); + } + } +} + +static void +component_Fc_Vegas_Downcall_Read(PARCEventQueue *in, PARCEventType event, void *conn) +{ + RtaProtocolStack *stack = (RtaProtocolStack *) conn; + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(stack, FC_VEGAS, RTA_DOWN); + TransportMessage *tm; + +// printf("%s reading from queue %p\n", __func__, in); + + while ((tm = rtaComponent_GetMessage(in)) != NULL) { + RtaConnection *conn = rtaConnection_GetFromTransport(tm); + RtaComponentStats *stats = rtaConnection_GetStats(conn, FC_VEGAS); + rtaComponentStats_Increment(stats, STATS_DOWNCALL_IN); + + if (transportMessage_IsControl(tm)) { + CCNxTlvDictionary *controlDictionary = transportMessage_GetDictionary(tm); + if (ccnxControlFacade_IsCPI(controlDictionary) && vegas_HandleControl(conn, controlDictionary, in)) { + transportMessage_Destroy(&tm); + } else { + // we did not consume the message, so forward it down + if (rtaComponent_PutMessage(out, tm)) { + rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT); + } + } + } else if (transportMessage_IsInterest(tm)) { + vegas_HandleInterest(conn, tm); + + // The flow controller consumes Interests going down the stack and will + // start issuing its own interests instead. + transportMessage_Destroy(&tm); + } else { + if (rtaComponent_PutMessage(out, tm)) { + rtaComponentStats_Increment(stats, STATS_DOWNCALL_OUT); + } + } + + if (DEBUG_OUTPUT) { + struct timeval delay = tm ? transportMessage_GetDelay(tm) : (struct timeval) { 0, 0 }; + printf("%s total downcall reads in %" PRIu64 " out %" PRIu64 " last delay %.6f\n", + __func__, + rtaComponentStats_Get(stats, STATS_DOWNCALL_IN), + rtaComponentStats_Get(stats, STATS_DOWNCALL_OUT), + delay.tv_sec + delay.tv_usec * 1E-6); + } + } +} + +static int +component_Fc_Vegas_Closer(RtaConnection *conn) +{ + VegasConnectionState *fcConnState; + + assertNotNull(conn, "Got null connection\n"); + if (conn == NULL) { + return -1; + } + + fcConnState = rtaConnection_GetPrivateData(conn, FC_VEGAS); + + assertNotNull(fcConnState, "could not retrieve private data for FC_VEGAS on connid %u\n", + rtaConnection_GetConnectionId(conn)); + if (fcConnState == NULL) { + return -1; + } + + rtaConnection_Destroy(&fcConnState->parent_connection); + + rtaComponentStats_Increment(rtaConnection_GetStats(conn, FC_VEGAS), STATS_CLOSES); + + // close down all the sessions + while (!TAILQ_EMPTY(&fcConnState->sessions_head)) { + FcSessionHolder *holder = TAILQ_FIRST(&fcConnState->sessions_head); + + vegasSession_Destroy(&holder->session); + + TAILQ_REMOVE(&fcConnState->sessions_head, holder, list); + parcMemory_Deallocate((void **) &holder); + } + + parcMemory_Deallocate((void **) &fcConnState); + + return 0; +} + +static int +component_Fc_Vegas_Release(RtaProtocolStack *stack) +{ + // no stack-wide memory + return 0; +} + +static void +component_Fc_Vegas_StateChange(RtaConnection *conn) +{ + assertNotNull(conn, "Got null connection\n"); + + VegasConnectionState *fcConnState = rtaConnection_GetPrivateData(conn, FC_VEGAS); + assertNotNull(fcConnState, "could not retrieve private data for FC_VEGAS on connid %u\n", + rtaConnection_GetConnectionId(conn)); + + // should replace this with a hash table + FcSessionHolder *holder; + TAILQ_FOREACH(holder, &fcConnState->sessions_head, list) + { + if (vegasSession_GetConnectionId(holder->session) == rtaConnection_GetConnectionId(conn)) { + vegasSession_StateChanged(holder->session); + } + } +} + +// ======================================================================= + +/** + * If the last component is a segment number, it is ignored + */ +static FcSessionHolder * +vegas_LookupSessionByName(VegasConnectionState *fc, CCNxName *name) +{ + uint64_t hash; + FcSessionHolder *holder; + int trim_segnum = 0; + + assertNotNull(name, "Name is null\n"); + if (name == NULL) { + return NULL; + } + + size_t segmentCount = ccnxName_GetSegmentCount(name); + assertTrue(segmentCount > 1, + "expected name with at least 2 components, but only got %zu, name = '%s'\n", + segmentCount, + ccnxName_ToString(name)); + + + if (segmentCount > 0) { + CCNxNameSegment *segment = ccnxName_GetSegment(name, segmentCount - 1); + if (ccnxNameSegment_GetType(segment) == CCNxNameLabelType_CHUNK) { + trim_segnum = 1; + } + } + + hash = ccnxName_LeftMostHashCode(name, segmentCount - trim_segnum); + + if (DEBUG_OUTPUT) { + printf("%s name %p hash %16" PRIX64 "\n", __func__, (void *) name, hash); + ccnxName_Display(name, 0); + } + + // should replace this with a hash table + TAILQ_FOREACH(holder, &fc->sessions_head, list) + { + if (holder->basename_hash == hash) { + return holder; + } + } + + return NULL; +} + +/* + * Precondition: only called for Content Objects. + * If the last component is a segment number, it is ignored + * + * Match the name of the content object to an active flow control session, + * or return NULL if not found. + */ +static FcSessionHolder * +vegas_LookupSession(VegasConnectionState *fc, TransportMessage *tm) +{ + assertTrue(transportMessage_IsContentObject(tm), + "Transport message is not a ContentObject\n"); + + CCNxTlvDictionary *contentObjectDictionary = transportMessage_GetDictionary(tm); + CCNxName *name = ccnxContentObject_GetName(contentObjectDictionary); + + return vegas_LookupSessionByName(fc, name); +} + +// ============================================= + +/* + * Precondition: it's an interest + */ +static int +vegas_HandleInterest(RtaConnection *conn, TransportMessage *tm) +{ + assertTrue(transportMessage_IsInterest(tm), "Transport message is not an interest"); + + VegasConnectionState *fc = rtaConnection_GetPrivateData(conn, FC_VEGAS); + CCNxTlvDictionary *interestDictionary = transportMessage_GetDictionary(tm); + + // we do not modify or destroy this name + CCNxName *original_name = ccnxInterest_GetName(interestDictionary); + + CCNxName *basename = ccnxName_Copy(original_name); + + if (DEBUG_OUTPUT) { + printf("%s orig name %p basename %p\n", __func__, (void *) original_name, (void *) basename); + } + + // can we decode the last component as a segment number? + uint64_t segnum = 0; + bool segnum_found = trafficTools_GetObjectSegmentFromName(basename, &segnum); + if (segnum_found) { + // it is a segment number + ccnxName_Trim(basename, 1); + } + + FcSessionHolder *holder = vegas_LookupSessionByName(fc, basename); + + if (holder == NULL) { + // create a new session + // This takes ownership of the basename + uint64_t name_hash = ccnxName_HashCode(basename); + holder = vegas_CreateSessionHolder(fc, conn, basename, name_hash); + + CCNxInterestInterface *interestImpl = ccnxInterestInterface_GetInterface(interestDictionary); + + uint32_t lifetime = ccnxInterest_GetLifetime(interestDictionary); + + PARCBuffer *keyIdRestriction = ccnxInterest_GetKeyIdRestriction(interestDictionary); // might be NULL + + holder->session = vegasSession_Create(fc, conn, basename, segnum, + interestImpl, lifetime, keyIdRestriction); + + vegasSession_Start(holder->session); + + rtaConnection_SendStatus(conn, + FC_VEGAS, + RTA_UP, + notifyStatusCode_FLOW_CONTROL_STARTED, + original_name, + NULL); + } else { + assertTrue(segnum_found, "Duplicate interest w/o segnum for existing session"); + + if (segnum_found) { + vegasSession_Seek(holder->session, segnum); + } + + ccnxName_Release(&basename); + } + + return 0; +} + +static FcSessionHolder * +vegas_CreateSessionHolder(VegasConnectionState *fc, RtaConnection *conn, CCNxName *basename, uint64_t name_hash) +{ + FcSessionHolder *holder = parcMemory_AllocateAndClear(sizeof(FcSessionHolder)); + assertNotNull(holder, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(FcSessionHolder)); + holder->basename_hash = name_hash; + holder->basename = basename; + holder->session = NULL; + + TAILQ_INSERT_TAIL(&fc->sessions_head, holder, list); + + if (DEBUG_OUTPUT) { + printf("%s created holder %p hash %016" PRIX64 "\n", __func__, (void *) holder, holder->basename_hash); + } + return holder; +} + +/** + * This is called by a session when it is done + */ +void +vegas_EndSession(VegasConnectionState *fc, VegasSession *session) +{ + FcSessionHolder *holder; + + // should replace this with a hash table + TAILQ_FOREACH(holder, &fc->sessions_head, list) + { + if (holder->session == session) { + TAILQ_REMOVE(&fc->sessions_head, holder, list); + break; + } + } + + assertNotNull(holder, "invalid state, got null holder"); + + rtaConnection_SendStatus(fc->parent_connection, + FC_VEGAS, + RTA_UP, + notifyStatusCode_FLOW_CONTROL_FINISHED, + holder->basename, + NULL); + + vegasSession_Destroy(&holder->session); + parcMemory_Deallocate((void **) &holder); +} + +static void +vegas_SendControlPlaneResponse(RtaConnection *conn, CCNxTlvDictionary *controlDictionary, PARCEventQueue *outputQueue) +{ + TransportMessage *tm = transportMessage_CreateFromDictionary(controlDictionary); + + transportMessage_SetInfo(tm, rtaConnection_Copy(conn), rtaConnection_FreeFunc); + + if (rtaComponent_PutMessage(outputQueue, tm)) { + RtaComponentStats *stats = rtaConnection_GetStats(conn, FC_VEGAS); + rtaComponentStats_Increment(stats, STATS_UPCALL_OUT); + } +} + +/** + * @function vegas_HandleControl + * @abstract Process CPI reqeusts + * @discussion + * <#Discussion#> + * + * @param <#param1#> + * @return true if we consumed the message, false if it should go down the stack + */ +static bool +vegas_HandleControl(RtaConnection *conn, CCNxTlvDictionary *controlDictionary, PARCEventQueue *outputQueue) +{ + bool success = false; + + if (ccnxControlFacade_IsCPI(controlDictionary)) { + PARCJSON *json = ccnxControlFacade_GetJson(controlDictionary); + if (cpi_getCPIOperation2(json) == CPI_CANCEL_FLOW) { + VegasConnectionState *fc = rtaConnection_GetPrivateData(conn, FC_VEGAS); + CCNxName *name = cpiCancelFlow_GetFlowName(json); + + PARCJSON *reply = NULL; + FcSessionHolder *holder = vegas_LookupSessionByName(fc, name); + if (holder != NULL) { + if (DEBUG_OUTPUT) { + char *string = ccnxName_ToString(name); + printf("%s Cancelling flow %s\n", __func__, string); + parcMemory_Deallocate((void **) &string); + } + + TAILQ_REMOVE(&fc->sessions_head, holder, list); + vegasSession_Destroy(&holder->session); + parcMemory_Deallocate((void **) &holder); + + reply = cpiAcks_CreateAck(json); + } else { + if (DEBUG_OUTPUT) { + char *string = ccnxName_ToString(name); + printf("%s got request to cancel unknown flow %s\n", __func__, string); + parcMemory_Deallocate((void **) &string); + } + + reply = cpiAcks_CreateNack(json); + } + CCNxTlvDictionary *response = ccnxControlFacade_CreateCPI(reply); + vegas_SendControlPlaneResponse(conn, response, outputQueue); + ccnxTlvDictionary_Release(&response); + + parcJSON_Release(&reply); + ccnxName_Release(&name); + + // we consume it + success = true; + } + } + return success; +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_component_Vegas.c b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_component_Vegas.c new file mode 100644 index 00000000..2a1cfe73 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_component_Vegas.c @@ -0,0 +1,696 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define DEBUG_OUTPUT 0 + +#include "../component_Vegas.c" +#include "../vegas_Session.c" + +#include <sys/un.h> +#include <strings.h> +#include <sys/queue.h> + +#include <LongBow/unit-test.h> +#include <LongBow/runtime.h> + +#include <parc/security/parc_Security.h> +#include <parc/security/parc_PublicKeySignerPkcs12Store.h> +#include <parc/algol/parc_SafeMemory.h> + +#include <ccnx/api/notify/notify_Status.h> + +#include <ccnx/transport/test_tools/traffic_tools.h> +#include <ccnx/common/ccnx_ContentObject.h> +#include <ccnx/common/ccnx_NameSegmentNumber.h> + +#include <ccnx/common/internal/ccnx_ValidationFacadeV1.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework.h> +#include <ccnx/transport/transport_rta/core/rta_Framework_NonThreaded.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.c> +#include <ccnx/transport/transport_rta/core/rta_Connection.c> +#include <ccnx/transport/transport_rta/config/config_All.h> +#include <ccnx/transport/test_tools/traffic_tools.h> + +#include "../../test/testrig_MockFramework.c" + +#ifndef MAXPATH +#define MAXPATH 1024 +#endif + +// file descriptor for random numbers, part of Fixture +static int randomFd; + +typedef struct test_data { + MockFramework *mock; + char keystore_filename[MAXPATH]; + char keystore_password[MAXPATH]; +} TestData; + +static CCNxTransportConfig * +createParams(const char *keystore_name, const char *keystore_passwd) +{ + assertNotNull(keystore_name, "Got null keystore name\n"); + assertNotNull(keystore_passwd, "Got null keystore passwd\n"); + + CCNxStackConfig *stackConfig = apiConnector_ProtocolStackConfig( + testingUpper_ProtocolStackConfig( + vegasFlowController_ProtocolStackConfig( + testingLower_ProtocolStackConfig( + protocolStack_ComponentsConfigArgs(ccnxStackConfig_Create(), + apiConnector_GetName(), + testingUpper_GetName(), + vegasFlowController_GetName(), + testingLower_GetName(), + NULL))))); + + CCNxConnectionConfig *connConfig = apiConnector_ConnectionConfig( + testingUpper_ConnectionConfig( + vegasFlowController_ConnectionConfig( + tlvCodec_ConnectionConfig( + testingLower_ConnectionConfig(ccnxConnectionConfig_Create()))))); + + publicKeySignerPkcs12Store_ConnectionConfig(connConfig, keystore_name, keystore_passwd); + + CCNxTransportConfig *result = ccnxTransportConfig_Create(stackConfig, connConfig); + ccnxStackConfig_Release(&stackConfig); + return result; +} + +static TestData * +_commonSetup(const char *name) +{ + parcSecurity_Init(); + + TestData *data = parcMemory_AllocateAndClear(sizeof(TestData)); + assertNotNull(data, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(TestData)); + + sprintf(data->keystore_filename, "/tmp/keystore_%s_%d.p12", name, getpid()); + sprintf(data->keystore_password, "12345"); + + unlink(data->keystore_filename); + + CCNxTransportConfig *config = createParams(data->keystore_filename, data->keystore_password); + data->mock = mockFramework_Create(config); + ccnxTransportConfig_Destroy(&config); + return data; +} + +static void +_commonTeardown(TestData *data) +{ + mockFramework_Destroy(&data->mock); + unlink(data->keystore_filename); + parcMemory_Deallocate((void **) &data); + + parcSecurity_Fini(); +} + +// ====================================================== + +LONGBOW_TEST_RUNNER(Fc_Vegas) +{ + LONGBOW_RUN_TEST_FIXTURE(Component); +} + +LONGBOW_TEST_RUNNER_SETUP(Fc_Vegas) +{ + parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory); + + randomFd = open("/dev/urandom", O_RDONLY); + + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_RUNNER_TEARDOWN(Fc_Vegas) +{ + close(randomFd); + return LONGBOW_STATUS_SUCCEEDED; +} + +// ============================================================== + +LONGBOW_TEST_FIXTURE(Local) +{ + LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_None); + LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_TestCases); + + LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetSegnumFromObject); +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + longBowTestCase_SetClipBoardData(testCase, _commonSetup(longBowTestCase_GetName(testCase))); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + _commonTeardown(longBowTestCase_GetClipBoardData(testCase)); + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +static CCNxTlvDictionary * +createSignedContentObject(void) +{ + CCNxName *name = ccnxName_CreateFromCString("lci:/some/name"); + PARCBuffer *payload = parcBuffer_Flip(parcBuffer_PutArray(parcBuffer_Allocate(20), 11, (uint8_t *) "the payload")); + CCNxTlvDictionary *contentObject = ccnxContentObject_CreateWithNameAndPayload(name, payload); + parcBuffer_Release(&payload); + ccnxName_Release(&name); + + PARCBuffer *keyid = parcBuffer_Flip(parcBuffer_PutArray(parcBuffer_Allocate(20), 5, (uint8_t *) "keyid")); + ccnxValidationRsaSha256_Set(contentObject, keyid, NULL); + parcBuffer_Release(&keyid); + + PARCBuffer *sigbits = parcBuffer_WrapCString("the signature"); + PARCSignature *signature = parcSignature_Create(PARCSigningAlgorithm_RSA, PARCCryptoHashType_SHA256, parcBuffer_Flip(sigbits)); + ccnxContentObject_SetSignature(contentObject, keyid, signature, NULL); + + parcSignature_Release(&signature); + parcBuffer_Release(&sigbits); + + return contentObject; +} + +static CCNxTlvDictionary * +createSignedContentObjectWithFinalBlockId(uint64_t fbid) +{ + CCNxTlvDictionary *obj = createSignedContentObject(); + ccnxContentObject_SetFinalChunkNumber(obj, fbid); + + return obj; +} + +LONGBOW_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_None) +{ + CCNxTlvDictionary *contentObjectDictionary = createSignedContentObject(); + bool success = vegasSession_GetFinalBlockIdFromContentObject(contentObjectDictionary, NULL); + assertFalse(success, "Should have failed getting FBID from content object"); + ccnxTlvDictionary_Release(&contentObjectDictionary); +} + +LONGBOW_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_TestCases) +{ + struct test_struct { + uint64_t value; + size_t encodedBytes; + uint8_t *encoded; + } test_vector[] = { + { .value = 0x0000000000000000ULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0x00 } }, + { .value = 0x0000000000000001ULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0x01 } }, + { .value = 0x00000000000000FFULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0xFF } }, + { .value = 0x0000000000000100ULL, .encodedBytes = 2, .encoded = (uint8_t[2]) { 0x01, 0x00} }, + { .value = 0x0100000000000100ULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00} }, + { .value = 0x8000000000000100ULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00} }, + { .value = 0xFFFFFFFFFFFFFFFFULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} }, + { .value = 0, .encodedBytes = 0, .encoded = NULL } + }; + + for (int i = 0; test_vector[i].encoded != NULL; i++) { + CCNxTlvDictionary *signed_with_fbid = createSignedContentObjectWithFinalBlockId(test_vector[i].value); + + uint64_t testvalue = -1; + bool success = vegasSession_GetFinalBlockIdFromContentObject(signed_with_fbid, &testvalue); + assertTrue(success, "Failed to get FBID from content object index %d value %016" PRIx64 "\n", + i, + test_vector[i].value) + { + ccnxTlvDictionary_Display(signed_with_fbid, 0); + } + + + assertTrue(testvalue == test_vector[i].value, + "Segment number does not match index %d value %016" PRIx64 ": got %" PRIx64 "\n", + i, + test_vector[i].value, + testvalue); + + ccnxTlvDictionary_Release(&signed_with_fbid); + } +} + +LONGBOW_TEST_CASE(Local, vegasSession_GetSegnumFromObject) +{ + struct test_struct { + bool valid; + uint64_t segnum; + char *uri; + } test_vectors[] = { + { .valid = false, .segnum = 0, .uri = "lci:/foo/bar" }, + { .valid = true, .segnum = 0, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=%00" }, + { .valid = true, .segnum = 0x1020, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=%10%20" }, + { .valid = true, .segnum = 0x6162, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=ab" }, + { .valid = true, .segnum = 0x616263, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abc" }, + { .valid = true, .segnum = 0x61626364, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abcd" }, + { .valid = true, .segnum = 0x6162636465, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abcde" }, + { .valid = true, .segnum = 0x616263646566, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abcdef" }, + { .valid = true, .segnum = 0x61626364656667, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abcdefg" }, + { .valid = true, .segnum = 0x6162636465666768, .uri = "lci:/foo/" CCNxNameLabel_Chunk "=abcdefgh" }, + { .valid = false, .segnum = 0, .uri = NULL } + }; + + for (int i = 0; test_vectors[i].uri != NULL; i++) { + CCNxName *name = ccnxName_CreateFromCString(test_vectors[i].uri); + CCNxTlvDictionary *contentObject = ccnxContentObject_CreateWithNameAndPayload(name, NULL); + + uint64_t testSeqnum = -1; + int failure = vegasSession_GetSegnumFromObject(contentObject, &testSeqnum); + + + + if (test_vectors[i].valid) { + assertFalse(failure, "Incorrect success index %d: got %d expected %d", + i, failure, test_vectors[i].valid); + + assertTrue(testSeqnum == test_vectors[i].segnum, "Incorrect segnum index %d, got %" PRIu64 " expected %" PRIu64, + i, testSeqnum, test_vectors[i].segnum); + } else { + assertTrue(failure, "Incorrect success index %d: got %d expected %d", + i, failure, test_vectors[i].valid); + } + + ccnxName_Release(&name); + ccnxTlvDictionary_Release(&contentObject); + } +} + +// ============================================================== + +LONGBOW_TEST_FIXTURE(Component) +{ + LONGBOW_RUN_TEST_CASE(Component, open_close); + + // these should all be pass through + LONGBOW_RUN_TEST_CASE(Component, content_object_down); + LONGBOW_RUN_TEST_CASE(Component, control_msg_down); + LONGBOW_RUN_TEST_CASE(Component, interest_up); + LONGBOW_RUN_TEST_CASE(Component, control_msg_up); + LONGBOW_RUN_TEST_CASE(Component, cancel_flow); +} + +LONGBOW_TEST_FIXTURE_SETUP(Component) +{ + longBowTestCase_SetClipBoardData(testCase, _commonSetup(longBowTestCase_GetName(testCase))); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Component) +{ + _commonTeardown(longBowTestCase_GetClipBoardData(testCase)); + + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +// ============================================ + +LONGBOW_TEST_CASE(Component, open_close) +{ + // dont actually do anything. make sure no memory leaks in setup and teardown. +} + + +// ============================================ +// Passthrough messages + +LONGBOW_TEST_CASE(Component, content_object_down) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithSignedContentObject(data->mock->connection); + + PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN); + PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_UP); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP); + + rtaComponent_PutMessage(in, truth_tm); + rtaFramework_NonThreadedStep(data->mock->framework); + flow_vegas_ops.downcallRead(read, PARCEventType_Read, (void *) data->mock->stack); + rtaFramework_NonThreadedStep(data->mock->framework); + + TransportMessage *test_tm = rtaComponent_GetMessage(out); + + assertTrue(test_tm == truth_tm, + "Got wrong transport message pointer, got %p expected %p", + (void *) test_tm, + (void *) truth_tm); + + transportMessage_Destroy(&truth_tm); +} + +LONGBOW_TEST_CASE(Component, control_msg_down) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithControlMessage(data->mock->connection); + + PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN); + PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_UP); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP); + + rtaComponent_PutMessage(in, truth_tm); + rtaFramework_NonThreadedStep(data->mock->framework); + flow_vegas_ops.downcallRead(read, PARCEventType_Read, (void *) data->mock->stack); + rtaFramework_NonThreadedStep(data->mock->framework); + TransportMessage *test_tm = rtaComponent_GetMessage(out); + + assertTrue(test_tm == truth_tm, + "Got wrong transport message pointer, got %p expected %p", + (void *) test_tm, + (void *) truth_tm); + + transportMessage_Destroy(&truth_tm); +} + +LONGBOW_TEST_CASE(Component, interest_up) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithInterest(data->mock->connection); + + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN); + PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_DOWN); + PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP); + + rtaComponent_PutMessage(in, truth_tm); + rtaFramework_NonThreadedStep(data->mock->framework); + flow_vegas_ops.upcallRead(read, PARCEventType_Read, (void *) data->mock->stack); + rtaFramework_NonThreadedStep(data->mock->framework); + TransportMessage *test_tm = rtaComponent_GetMessage(out); + + assertTrue(test_tm == truth_tm, + "Got wrong transport message pointer, got %p expected %p", + (void *) test_tm, + (void *) truth_tm); + + transportMessage_Destroy(&truth_tm); +} + +LONGBOW_TEST_CASE(Component, control_msg_up) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithControlMessage(data->mock->connection); + + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN); + PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_DOWN); + PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP); + + rtaComponent_PutMessage(in, truth_tm); + rtaFramework_NonThreadedStep(data->mock->framework); + flow_vegas_ops.upcallRead(read, PARCEventType_Read, (void *) data->mock->stack); + rtaFramework_NonThreadedStep(data->mock->framework); + TransportMessage *test_tm = rtaComponent_GetMessage(out); + + assertTrue(test_tm == truth_tm, + "Got wrong transport message pointer, got %p expected %p", + (void *) test_tm, + (void *) truth_tm); + + transportMessage_Destroy(&test_tm); +} + +// ============================================ +// These should start a flow control session + +/** + * Creates an interest w/o a segment number + * Sends it down the stack to the flow controller + * Flow controller should append segment number 0 to the interest and send that down the stack + */ +LONGBOW_TEST_CASE(Component, interest_down) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithInterest(data->mock->connection); + + // If we can, add a payload to the Interest. Why not. + PARCBuffer *payload = NULL; + CCNxInterest *interest = transportMessage_GetDictionary(truth_tm); + CCNxInterestInterface *impl = ccnxInterestInterface_GetInterface(interest); + if (impl != NULL && impl != &CCNxInterestFacadeV1_Implementation) { + // V1 or greater should support Interest payloads. + payload = parcBuffer_WrapCString("This is a payload."); + impl->setPayload(interest, payload); + } + + PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN); + PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_UP); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP); + + rtaComponent_PutMessage(in, truth_tm); + rtaFramework_NonThreadedStep(data->mock->framework); + flow_vegas_ops.downcallRead(read, PARCEventType_Read, (void *) data->mock->stack); + rtaFramework_NonThreadedStep(data->mock->framework); + + // we should see a status message up the stack and interests + // going down the stack. + + + TransportMessage *test_tm = rtaComponent_GetMessage(in); + assertNotNull(test_tm, "got null transport message back up the queue, expecting status\n"); + + assertTrue(transportMessage_IsControl(test_tm), + "Transport message is not a control object") + { + ccnxTlvDictionary_Display(transportMessage_GetDictionary(test_tm), 0); + } + + CCNxTlvDictionary *test_dict = transportMessage_GetDictionary(test_tm); + + PARCJSON *json = ccnxControlFacade_GetJson(test_dict); + + NotifyStatus *status = notifyStatus_ParseJSON(json); + + assertNotNull(status, "Could not parse NotifyStatus JSON message"); + assertTrue(notifyStatus_GetFiledes(status) == data->mock->connection->api_fd, + "Expected file descriptor %d, actual %d\n", data->mock->connection->api_fd, notifyStatus_GetFiledes(status)); + + assertTrue(notifyStatus_IsFlowControlStarted(status), + "Expected notifyStatus_IsFlowControlStarted to be true, actual code %d", notifyStatus_GetStatusCode(status)); + + notifyStatus_Release(&status); + + transportMessage_Destroy(&test_tm); + + // Read segment 0 interest + trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(transportMessage_GetDictionary(truth_tm)), 0, payload); + + // Now read segment 1 + trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(transportMessage_GetDictionary(truth_tm)), 1, payload); + + if (payload != NULL) { + parcBuffer_Release(&payload); + } + + transportMessage_Destroy(&truth_tm); +} + + +LONGBOW_TEST_CASE(Component, interest_down_slow_retransmit) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithInterest(data->mock->connection); + + VegasConnectionState *fc; + FcSessionHolder *holder; + + PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN); + PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_UP); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP); + + rtaComponent_PutMessage(in, truth_tm); + rtaFramework_NonThreadedStep(data->mock->framework); + flow_vegas_ops.downcallRead(read, PARCEventType_Read, (void *) data->mock->stack); + rtaFramework_NonThreadedStep(data->mock->framework); + + // -------------------------------------- + // Read segment 0 interest + CCNxTlvDictionary *interest = transportMessage_GetDictionary(truth_tm); + + trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(interest), 0, NULL); + + // Now read segment 1 + trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(interest), 1, NULL); + + // -------------------------------------- + // now bump the time and see what happens. + // these are normally set in the timer sallback + fc = rtaConnection_GetPrivateData(data->mock->connection, FC_VEGAS); + holder = TAILQ_FIRST(&fc->sessions_head); + assertNotNull(holder, "got null session holder"); + + printf("*** bump time\n"); + + data->mock->framework->clock_ticks += 1001; + + // RTO timeout will be 1 second + vegasSession_TimerCallback(-1, PARCEventType_Timeout, holder->session); + trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(interest), 0, NULL); + + transportMessage_Destroy(&truth_tm); +} + + +LONGBOW_TEST_CASE(Component, interest_down_fast_retransmit) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithInterest(data->mock->connection); + + CCNxName *basename, *segmentname; + VegasConnectionState *fc; + FcSessionHolder *holder; + + PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN); + PARCEventQueue *read = rtaProtocolStack_GetPutQueue(data->mock->stack, FC_VEGAS, RTA_UP); + PARCEventQueue *out = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP); + + rtaComponent_PutMessage(in, truth_tm); + rtaFramework_NonThreadedStep(data->mock->framework); + flow_vegas_ops.downcallRead(read, PARCEventType_Read, (void *) data->mock->stack); + rtaFramework_NonThreadedStep(data->mock->framework); + + // -------------------------------------- + // Read segment 0 interest + CCNxTlvDictionary *interest = transportMessage_GetDictionary(truth_tm); + + trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(interest), 0, NULL); + + // Now read segment 1 + trafficTools_ReadAndVerifySegment(out, ccnxInterest_GetName(interest), 1, NULL); + + // -------------------------------------- + // now bump the time and see what happens. + // these are normally set in the timer sallback + fc = rtaConnection_GetPrivateData(data->mock->connection, FC_VEGAS); + holder = TAILQ_FIRST(&fc->sessions_head); + assertNotNull(holder, "got null session holder"); + + + data->mock->framework->clock_ticks += 20; + printf("*** bump time %" PRIu64 "\n", data->mock->framework->clock_ticks); + vegasSession_TimerCallback(-1, PARCEventType_Timeout, holder->session); + + // -------------------------------------- + // send an out-of-order content object, should see a fast retransmit + + basename = ccnxName_Copy(ccnxInterest_GetName(interest)); + segmentname = ccnxName_Copy(basename); + + CCNxNameSegment *segment = ccnxNameSegmentNumber_Create(CCNxNameLabelType_CHUNK, 1); + ccnxName_Append(segmentname, segment); + ccnxNameSegment_Release(&segment); + + transportMessage_Destroy(&truth_tm); + + // this takes ownership of segment name + TransportMessage *reply = + trafficTools_CreateTransportMessageWithSignedContentObjectWithName(data->mock->connection, + segmentname, data->keystore_filename, data->keystore_password); + + rtaComponent_PutMessage(out, reply); + + data->mock->framework->clock_ticks += 40; + printf("*** bump time %" PRIu64 "\n", data->mock->framework->clock_ticks); + rtaFramework_NonThreadedStepCount(data->mock->framework, 5); + vegasSession_TimerCallback(-1, PARCEventType_Timeout, holder->session); + + trafficTools_ReadAndVerifySegment(out, basename, 0, NULL); + + ccnxName_Release(&segmentname); + ccnxName_Release(&basename); +} + +/** + * Send an interest down the stack to start a flow controller, then send + * a control message to cancel it. + */ +LONGBOW_TEST_CASE(Component, cancel_flow) +{ + TestData *data = longBowTestCase_GetClipBoardData(testCase); + + TransportMessage *truth_tm = trafficTools_CreateTransportMessageWithInterest(data->mock->connection); + + PARCEventQueue *in = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN); + + CCNxName *flowName = ccnxName_Acquire(ccnxInterest_GetName(transportMessage_GetDictionary(truth_tm))); + + // ================================ + // This will signal to the flow controller that it should start a flow + // We give up ownership of "truth_tm" at this point + rtaComponent_PutMessage(in, truth_tm); + rtaFramework_NonThreadedStepCount(data->mock->framework, 5); + + // ================================ + // we should see a status message up the stack + + TransportMessage *test_tm = rtaComponent_GetMessage(in); + assertNotNull(test_tm, "got null transport message back up the queue, expecting status\n"); + + assertTrue(transportMessage_IsControl(test_tm), "Transport message is not a Control") + { + ccnxTlvDictionary_Display(transportMessage_GetDictionary(test_tm), 0); + } + + CCNxTlvDictionary *controlDictionary = transportMessage_GetDictionary(test_tm); + + PARCJSON *json = ccnxControlFacade_GetJson(controlDictionary); + + NotifyStatus *status = notifyStatus_ParseJSON(json); + assertTrue(notifyStatus_IsFlowControlStarted(status), + "Expected notifyStatus_IsFlowControlStarted to be true. Actual code %d\n", notifyStatus_GetStatusCode(status)); + notifyStatus_Release(&status); + + // ================================ + // After the notification, the flow is "started" and we can cancel it + + // Now that its started, send a cancel + PARCJSON *cancelFlow = cpiCancelFlow_Create(flowName); + CCNxTlvDictionary *cancelDictionary = ccnxControlFacade_CreateCPI(cancelFlow); + parcJSON_Release(&cancelFlow); + + TransportMessage *cancelTm = transportMessage_CreateFromDictionary(cancelDictionary); + transportMessage_SetInfo(cancelTm, rtaConnection_Copy(data->mock->connection), rtaConnection_FreeFunc); + rtaComponent_PutMessage(in, cancelTm); + rtaFramework_NonThreadedStepCount(data->mock->framework, 5); + + // now verify that its gone + VegasConnectionState *fc = rtaConnection_GetPrivateData(data->mock->connection, FC_VEGAS); + FcSessionHolder *holder = TAILQ_FIRST(&fc->sessions_head); + assertNull(holder, "The session list is not empty!"); + + ccnxTlvDictionary_Release(&cancelDictionary); + transportMessage_Destroy(&test_tm); + ccnxName_Release(&flowName); +} + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(Fc_Vegas); + exit(longBowMain(argc, argv, testRunner, NULL)); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_vegas_Session.c b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_vegas_Session.c new file mode 100644 index 00000000..b48e9a8a --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/test/test_vegas_Session.c @@ -0,0 +1,672 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define DEBUG_OUTPUT 0 + +#include "../component_Vegas.c" +#include "../vegas_Session.c" + +#include <sys/un.h> +#include <strings.h> +#include <sys/queue.h> + +#include <LongBow/unit-test.h> +#include <LongBow/runtime.h> + +#include <ccnx/transport/transport_rta/core/rta_Framework.h> +#include <ccnx/transport/transport_rta/core/rta_Framework_NonThreaded.h> + +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.c> +#include <ccnx/transport/transport_rta/core/rta_Connection.c> + +#include <parc/security/parc_Security.h> +#include <parc/security/parc_PublicKeySignerPkcs12Store.h> +#include <ccnx/transport/transport_rta/config/config_All.h> + +#include <ccnx/api/notify/notify_Status.h> + +#include <ccnx/transport/test_tools/traffic_tools.h> + +#include <ccnx/common/ccnx_ContentObject.h> +#include <ccnx/common/internal/ccnx_ValidationFacadeV1.h> + +#include "../../test/testrig_MockFramework.c" + +#ifndef MAXPATH +#define MAXPATH 1024 +#endif + +// file descriptor for random numbers, part of Fixture +static int randomFd; + +typedef struct test_data { + MockFramework *mock; + char keystore_filename[MAXPATH]; + char keystore_password[MAXPATH]; +} TestData; + +static CCNxTransportConfig * +createParams(const char *keystore_name, const char *keystore_passwd) +{ + assertNotNull(keystore_name, "Got null keystore name\n"); + assertNotNull(keystore_passwd, "Got null keystore passwd\n"); + + CCNxStackConfig *stackConfig = apiConnector_ProtocolStackConfig( + testingUpper_ProtocolStackConfig( + vegasFlowController_ProtocolStackConfig( + testingLower_ProtocolStackConfig( + protocolStack_ComponentsConfigArgs(ccnxStackConfig_Create(), + apiConnector_GetName(), + testingUpper_GetName(), + vegasFlowController_GetName(), + testingLower_GetName(), + NULL))))); + + CCNxConnectionConfig *connConfig = apiConnector_ConnectionConfig( + testingUpper_ConnectionConfig( + vegasFlowController_ConnectionConfig( + testingLower_ConnectionConfig(ccnxConnectionConfig_Create())))); + + + publicKeySignerPkcs12Store_ConnectionConfig(connConfig, keystore_name, keystore_passwd); + + CCNxTransportConfig *result = ccnxTransportConfig_Create(stackConfig, connConfig); + ccnxStackConfig_Release(&stackConfig); + return result; +} + +static TestData * +_commonSetup(const char *name) +{ + parcSecurity_Init(); + + TestData *data = parcMemory_Allocate(sizeof(TestData)); + + assertNotNull(data, "Got null memory from parcMemory_Allocate"); + + sprintf(data->keystore_filename, "/tmp/keystore_%s_%d.p12", name, getpid()); + sprintf(data->keystore_password, "12345"); + + unlink(data->keystore_filename); + + CCNxTransportConfig *config = createParams(data->keystore_filename, data->keystore_password); + data->mock = mockFramework_Create(config); + ccnxTransportConfig_Destroy(&config); + return data; +} + +static void +_commonTeardown(TestData *data) +{ + mockFramework_Destroy(&data->mock); + unlink(data->keystore_filename); + + parcMemory_Deallocate((void **) &data); + + parcSecurity_Fini(); +} + + + +// ====================================================== + +LONGBOW_TEST_RUNNER(VegasSession) +{ + LONGBOW_RUN_TEST_FIXTURE(Local); + LONGBOW_RUN_TEST_FIXTURE(IterateFinalChunkNumber); +} + +LONGBOW_TEST_RUNNER_SETUP(VegasSession) +{ + parcMemory_SetInterface(&PARCSafeMemoryAsPARCMemory); + + randomFd = open("/dev/urandom", O_RDONLY); + + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_RUNNER_TEARDOWN(VegasSession) +{ + close(randomFd); + return LONGBOW_STATUS_SUCCEEDED; +} + +// ============================================================== + +LONGBOW_TEST_FIXTURE(Local) +{ + LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_None); + LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_TestCases); + LONGBOW_RUN_TEST_CASE(Local, vegasSession_GetSegnumFromObject); + + LONGBOW_RUN_TEST_CASE(Local, vegasSession_ReceiveContentObject_InOrder_LastBlockSetsFinalId); + LONGBOW_RUN_TEST_CASE(Local, vegasSession_ReceiveContentObject_InOrder_FirstAndLastBlocksSetsFinalId); +} + +LONGBOW_TEST_FIXTURE_SETUP(Local) +{ + longBowTestCase_SetClipBoardData(testCase, _commonSetup(longBowTestCase_GetName(testCase))); + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(Local) +{ + _commonTeardown(longBowTestCase_GetClipBoardData(testCase)); + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +static CCNxTlvDictionary * +createSignedContentObject(void) +{ + CCNxName *name = ccnxName_CreateFromCString("ccnx:/some/name"); + PARCBuffer *payload = parcBuffer_Flip(parcBuffer_PutArray(parcBuffer_Allocate(20), 11, (uint8_t *) "the payload")); + CCNxTlvDictionary *contentObject = ccnxContentObject_CreateWithNameAndPayload(name, payload); + parcBuffer_Release(&payload); + ccnxName_Release(&name); + + PARCBuffer *keyid = parcBuffer_Flip(parcBuffer_PutArray(parcBuffer_Allocate(20), 5, (uint8_t *) "keyid")); + ccnxValidationRsaSha256_Set(contentObject, keyid, NULL); + parcBuffer_Release(&keyid); + + PARCBuffer *sigbits = parcBuffer_Flip(parcBuffer_PutArray(parcBuffer_Allocate(20), 13, (uint8_t *) "the signature")); + + switch (ccnxTlvDictionary_GetSchemaVersion(contentObject)) { + case CCNxTlvDictionary_SchemaVersion_V1: + ccnxValidationFacadeV1_SetPayload(contentObject, sigbits); + break; + default: + trapNotImplemented("Unsupprted schema version in createSignedContentObject()"); + break; + } + + parcBuffer_Release(&sigbits); + + return contentObject; +} + +static CCNxTlvDictionary * +createSignedContentObjectWithFinalBlockId(uint64_t fbid) +{ + CCNxTlvDictionary *obj = createSignedContentObject(); + ccnxContentObject_SetFinalChunkNumber(obj, fbid); + return obj; +} + +LONGBOW_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_None) +{ + CCNxTlvDictionary *contentObjectDictionary = createSignedContentObject(); + bool success = vegasSession_GetFinalBlockIdFromContentObject(contentObjectDictionary, NULL); + assertFalse(success, "Should have failed getting FBID from content object"); + ccnxTlvDictionary_Release(&contentObjectDictionary); +} + +LONGBOW_TEST_CASE(Local, vegasSession_GetFinalBlockIdFromContentObject_TestCases) +{ + struct test_struct { + uint64_t value; + size_t encodedBytes; + uint8_t *encoded; + } test_vector[] = { + { .value = 0x0000000000000000ULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0x00 } }, + { .value = 0x0000000000000001ULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0x01 } }, + { .value = 0x00000000000000FFULL, .encodedBytes = 1, .encoded = (uint8_t[1]) { 0xFF } }, + { .value = 0x0000000000000100ULL, .encodedBytes = 2, .encoded = (uint8_t[2]) { 0x01, 0x00} }, + { .value = 0x0100000000000100ULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00} }, + { .value = 0x8000000000000100ULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00} }, + { .value = 0xFFFFFFFFFFFFFFFFULL, .encodedBytes = 8, .encoded = (uint8_t[8]) { 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} }, + { .value = 0, .encodedBytes = 0, .encoded = NULL } + }; + + for (int i = 0; test_vector[i].encoded != NULL; i++) { + CCNxTlvDictionary *signed_with_fbid = createSignedContentObjectWithFinalBlockId(test_vector[i].value); + + uint64_t testvalue = -1; + bool success = vegasSession_GetFinalBlockIdFromContentObject(signed_with_fbid, &testvalue); + assertTrue(success, "Failed to get FBID from content object index %d value %016" PRIx64 "\n", + i, + test_vector[i].value) + { + ccnxTlvDictionary_Display(signed_with_fbid, 0); + } + + + assertTrue(testvalue == test_vector[i].value, + "Segment number does not match index %d value %016" PRIx64 ": got %" PRIx64 "\n", + i, + test_vector[i].value, + testvalue); + + ccnxTlvDictionary_Release(&signed_with_fbid); + } +} + +LONGBOW_TEST_CASE(Local, vegasSession_GetSegnumFromObject) +{ + struct test_struct { + bool valid; + uint64_t segnum; + char *uri; + } test_vectors[] = { + { .valid = false, .segnum = 0, .uri = "ccnx:/foo/bar" }, + { .valid = true, .segnum = 0, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=%00" }, + { .valid = true, .segnum = 0x1020, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=%10%20" }, + { .valid = true, .segnum = 0x6162, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=ab" }, + { .valid = true, .segnum = 0x616263, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abc" }, + { .valid = true, .segnum = 0x61626364, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abcd" }, + { .valid = true, .segnum = 0x6162636465, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abcde" }, + { .valid = true, .segnum = 0x616263646566, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abcdef" }, + { .valid = true, .segnum = 0x61626364656667, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abcdefg" }, + { .valid = true, .segnum = 0x6162636465666768, .uri = "ccnx:/foo/" CCNxNameLabel_Chunk "=abcdefgh" }, + { .valid = false, .segnum = 0, .uri = NULL } + }; + + for (int i = 0; test_vectors[i].uri != NULL; i++) { + CCNxName *name = ccnxName_CreateFromCString(test_vectors[i].uri); + CCNxTlvDictionary *contentObject = ccnxContentObject_CreateWithNameAndPayload(name, NULL); + + uint64_t testSeqnum = -1; + int failure = vegasSession_GetSegnumFromObject(contentObject, &testSeqnum); + + + + if (test_vectors[i].valid) { + assertFalse(failure, "Incorrect success index %d: got %d expected %d", + i, failure, test_vectors[i].valid); + + assertTrue(testSeqnum == test_vectors[i].segnum, "Incorrect segnum index %d, got %" PRIu64 " expected %" PRIu64, + i, testSeqnum, test_vectors[i].segnum); + } else { + assertTrue(failure, "Incorrect success index %d: got %d expected %d", + i, failure, test_vectors[i].valid); + } + + ccnxName_Release(&name); + ccnxTlvDictionary_Release(&contentObject); + } +} + + +// ================================================================= +// Tests related to the FinalBlockId and how the publisher sets it in +// a stream of content objects + +#define DO_NOT_SET ((uint64_t) -1) +#define SENTINEL ((uint64_t) -1) + +typedef struct test_vector { + uint64_t chunk; + uint64_t setFinalBlockId; + bool isLast; + bool interestReceived; + bool dataReceived; +} TestVector; + + +static void +_verifyFlowStartNotification(TestData *data, TransportMessage *notify) +{ + assertNotNull(notify, "got null transport message back up the queue, expecting status\n"); + + assertTrue(transportMessage_IsControl(notify), + "Transport message is not a control object") + { + ccnxTlvDictionary_Display(transportMessage_GetDictionary(notify), 0); + } + + CCNxTlvDictionary *test_dict = transportMessage_GetDictionary(notify); + + PARCJSON *json = ccnxControlFacade_GetJson(test_dict); + + NotifyStatus *status = notifyStatus_ParseJSON(json); + + assertNotNull(status, "Could not parse NotifyStatus JSON message"); + assertTrue(notifyStatus_GetFiledes(status) == data->mock->connection->api_fd, + "Expected file descriptor %d, actual %d\n", data->mock->connection->api_fd, notifyStatus_GetFiledes(status)); + + assertTrue(notifyStatus_IsFlowControlStarted(status), + "Expected notifyStatus_IsFlowControlStarted to be true, actual code %d", notifyStatus_GetStatusCode(status)); + + notifyStatus_Release(&status); +} + + +static CCNxName * +_startFlow(TestData *data) +{ + TransportMessage *downInterest = trafficTools_CreateTransportMessageWithInterest(data->mock->connection); + CCNxName *sessionName = ccnxName_Acquire(ccnxInterest_GetName(transportMessage_GetDictionary(downInterest))); + PARCEventQueue *upperQueue = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN); + + rtaComponent_PutMessage(upperQueue, downInterest); + rtaFramework_NonThreadedStepCount(data->mock->framework, 10); + + // we should see a status message up the stack and interests + // going down the stack. + + TransportMessage *notify = rtaComponent_GetMessage(upperQueue); + _verifyFlowStartNotification(data, notify); + transportMessage_Destroy(¬ify); + + return sessionName; +} + +/* + * Caveat: this only works because we create a single session + */ +static VegasSession * +_grabSession(TestData *data, CCNxName *name) +{ + VegasConnectionState *fc = rtaConnection_GetPrivateData(data->mock->connection, FC_VEGAS); + + FcSessionHolder *holder = vegas_LookupSessionByName(fc, name); + + assertNotNull(holder, "Could not find the session holder in the flow controller"); + return holder->session; +} + +/* + * a tick is 1 milli-second, but it could be different depending on how + * the framework is started + */ +static void +_bumpTime(TestData *data, unsigned ticks, CCNxName *name) +{ + data->mock->framework->clock_ticks += ticks; + vegasSession_TimerCallback(-1, PARCEventType_Timeout, _grabSession(data, name)); +} + +static uint64_t +_getChunkNumberFromName(const CCNxName *name) +{ + size_t segmentCount = ccnxName_GetSegmentCount(name); + CCNxNameSegment *lastSegment = ccnxName_GetSegment(name, segmentCount - 1); + CCNxNameLabelType nameType = ccnxNameSegment_GetType(lastSegment); + assertTrue(nameType == CCNxNameLabelType_CHUNK, "Wrong segment type got %d expected %d", nameType, CCNxNameLabelType_CHUNK); + uint64_t chunkNumber = ccnxNameSegmentNumber_Value(lastSegment); + return chunkNumber; +} + +static TestVector * +_getVector(TestVector *vectors, uint64_t chunkNumber) +{ + // find the test vector for this chunk + for (int i = 0; vectors[i].chunk != SENTINEL; i++) { + if (vectors[i].chunk == chunkNumber) { + return &vectors[i]; + } + } + trapIllegalValue(chunkNumber, "Could not find chunk number in test vector"); +} + +static TransportMessage * +_createReponseContentObject(CCNxName *name, uint64_t finalBlockid) +{ + CCNxContentObject *obj = ccnxContentObject_CreateWithNameAndPayload(name, NULL); + assertNotNull(obj, "Got null content object."); + + if (finalBlockid != DO_NOT_SET) { + bool success = ccnxContentObject_SetFinalChunkNumber(obj, finalBlockid); + assertTrue(success, "Failed to set final chunk number"); + } + + CCNxMetaMessage *message = ccnxMetaMessage_CreateFromContentObject(obj); + TransportMessage *response = transportMessage_CreateFromDictionary(message); + + ccnxMetaMessage_Release(&message); + ccnxContentObject_Release(&obj); + + return response; +} + +/* + * Returns true if the unit test is finished + */ +static bool +_respondToDownInterest(TestData *data, TestVector *vectors) +{ + PARCEventQueue *lowerQueue = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_LOWER, RTA_UP); + + bool finished = false; + TransportMessage *msg = rtaComponent_GetMessage(lowerQueue); + if (msg) { + // it should be an Interest with a chunk number + assertTrue(transportMessage_IsInterest(msg), "Got unexpected message") + { + ccnxTlvDictionary_Display(transportMessage_GetDictionary(msg), 3); + } + + CCNxTlvDictionary *interestDictionary = transportMessage_GetDictionary(msg); + CCNxName *name = ccnxInterest_GetName(interestDictionary); + uint64_t chunkNumber = _getChunkNumberFromName(name); + + TestVector *vector = _getVector(vectors, chunkNumber); + + vector->interestReceived = true; + + // create a content object and set the FinalBlockId if vector says to + TransportMessage *response = _createReponseContentObject(name, vector->setFinalBlockId); + RtaConnection *connection = transportMessage_GetInfo(msg); + RtaConnection *connectionRef = rtaConnection_Copy(connection); + transportMessage_SetInfo(response, connectionRef, rtaConnection_FreeFunc); + + rtaComponent_PutMessage(lowerQueue, response); + + finished = vector->isLast; + + transportMessage_Destroy(&msg); + } + return finished; +} + +/* + * Returns true if received the last message + */ +static bool +_consumeUpperContentObject(TestData *data, TestVector *vectors) +{ + PARCEventQueue *upperQueue = rtaProtocolStack_GetPutQueue(data->mock->stack, TESTING_UPPER, RTA_DOWN); + + bool finished = false; + TransportMessage *msg = rtaComponent_GetMessage(upperQueue); + if (msg) { + // it should be a content object + assertTrue(transportMessage_IsContentObject(msg), "Got unexpected message") + { + ccnxTlvDictionary_Display(transportMessage_GetDictionary(msg), 3); + } + + CCNxTlvDictionary *objectDictionary = transportMessage_GetDictionary(msg); + CCNxName *name = ccnxContentObject_GetName(objectDictionary); + uint64_t chunkNumber = _getChunkNumberFromName(name); + + TestVector *vector = _getVector(vectors, chunkNumber); + + // we should not have seen it before + assertFalse(vector->dataReceived, "Duplicate Content Object chunk %" PRIu64, chunkNumber) + { + ccnxName_Display(name, 3); + } + + vector->dataReceived = true; + + finished = vector->isLast; + + transportMessage_Destroy(&msg); + } + + return finished; +} + +static void +_runTestVector(TestData *data, TestVector vectors[]) +{ + CCNxName *sessionName = _startFlow(data); + + bool finished = false; + + while (!finished) { + rtaFramework_NonThreadedStep(data->mock->framework); + finished = _respondToDownInterest(data, vectors); + + rtaFramework_NonThreadedStep(data->mock->framework); + finished &= _consumeUpperContentObject(data, vectors); + + if (!finished) { + _bumpTime(data, 5, sessionName); + } + } + + ccnxName_Release(&sessionName); +} + +/* + * First chunk sets final block ID, last chunk does not. Should keep reading until + * the real last chunk set to itself. + */ +LONGBOW_TEST_CASE(Local, vegasSession_ReceiveContentObject_InOrder_FirstBlockSetsLastDoesNotFinalId) +{ + TestVector vectors[] = { + { .chunk = 0, .setFinalBlockId = 5, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 1, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 2, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 3, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 4, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 5, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 6, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 7, .setFinalBlockId = 7, .isLast = true, .interestReceived = false, .dataReceived = false }, + { .chunk = SENTINEL, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + }; + + TestData *data = longBowTestCase_GetClipBoardData(testCase); + _runTestVector(data, vectors); +} + +/* + * FinalBlockId unset until last chunk, which sets to itself + */ +LONGBOW_TEST_CASE(Local, vegasSession_ReceiveContentObject_InOrder_LastBlockSetsFinalId) +{ + TestVector vectors[] = { + { .chunk = 0, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 1, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 2, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 3, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 4, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 5, .setFinalBlockId = 5, .isLast = true, .interestReceived = false, .dataReceived = false }, + { .chunk = SENTINEL, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + }; + + TestData *data = longBowTestCase_GetClipBoardData(testCase); + _runTestVector(data, vectors); +} + +/* + * First chunk sets FinalBlockId and last chunks, and last chunk sets it to itself + */ +LONGBOW_TEST_CASE(Local, vegasSession_ReceiveContentObject_InOrder_FirstAndLastBlocksSetsFinalId) +{ + TestVector vectors[] = { + { .chunk = 0, .setFinalBlockId = 7, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 1, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 2, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 3, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 4, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 5, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 6, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + { .chunk = 7, .setFinalBlockId = 7, .isLast = true, .interestReceived = false, .dataReceived = false }, + { .chunk = SENTINEL, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }, + }; + + TestData *data = longBowTestCase_GetClipBoardData(testCase); + _runTestVector(data, vectors); +} + +// ============================================ + +LONGBOW_TEST_FIXTURE(IterateFinalChunkNumber) +{ + LONGBOW_RUN_TEST_CASE(IterateFinalChunkNumber, vegasSession_ReceiveContentObject_InOrder_FirstSetsSecondIncreasesLastSetsFinalId); +} + +LONGBOW_TEST_FIXTURE_SETUP(IterateFinalChunkNumber) +{ + return LONGBOW_STATUS_SUCCEEDED; +} + +LONGBOW_TEST_FIXTURE_TEARDOWN(IterateFinalChunkNumber) +{ + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + if (outstandingAllocations != 0) { + printf("%s leaks memory by %d allocations\n", longBowTestCase_GetName(testCase), outstandingAllocations); + return LONGBOW_STATUS_MEMORYLEAK; + } + return LONGBOW_STATUS_SUCCEEDED; +} + +/* + * First chunk sets FinalBlockId, later chunk increases it by N, then final chunk set to self. + * + * In this test, we programmatically create the TestVector array so we can run different iterations of N. + */ +LONGBOW_TEST_CASE(IterateFinalChunkNumber, vegasSession_ReceiveContentObject_InOrder_FirstSetsSecondIncreasesLastSetsFinalId) +{ + const unsigned minsize = 5; + const unsigned maxsize = 20; + + for (unsigned size = minsize; size < maxsize; size++) { + longBowTestCase_SetClipBoardData(testCase, _commonSetup(longBowTestCase_GetName(testCase))); + + TestVector vectors[size]; + + // set initial state + for (int i = 0; i < size; i++) { + vectors[i] = (TestVector) { .chunk = i, .setFinalBlockId = DO_NOT_SET, .isLast = false, .interestReceived = false, .dataReceived = false }; + } + + // first vectors sets it to minsize + vectors[0].setFinalBlockId = minsize; + + // minsize sets it to the end + vectors[minsize - 1].setFinalBlockId = size; + + // last one sets it to itself + vectors[size - 1].setFinalBlockId = size; + vectors[size - 1].isLast = true; + + TestData *data = longBowTestCase_GetClipBoardData(testCase); + _runTestVector(data, vectors); + + _commonTeardown(longBowTestCase_GetClipBoardData(testCase)); + + uint32_t outstandingAllocations = parcSafeMemory_ReportAllocation(STDERR_FILENO); + assertTrue(outstandingAllocations == 0, "Memory leak for size %u", size); + } +} + + + +// ============================================ + +int +main(int argc, char *argv[]) +{ + LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(VegasSession); + exit(longBowMain(argc, argv, testRunner, NULL)); +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_Session.c b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_Session.c new file mode 100644 index 00000000..a77adc34 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_Session.c @@ -0,0 +1,1379 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * See additional comments in component_Vegas.c + * + * Flow Control Algorithm + * ========================= + * Based on TCP Vegas. Please read the Vegas paper. We use similar + * variable names to the paper. Code looks quite a bit like the linux + * tcp_vegas.c too. + * + * Here's the differences. In CCN, an Interest is like an ACK token, it + * gives the network permission to send. The node issuing Interests needs + * to pace them to not exceed the network capacity. This is done by + * observing the delay of Content Objects. If the delay grows too quickly, + * then we back off linearly. If the delay is not much above what we expected + * based on the minimum observed delay, we increase linearly. + * + * During slow start, the interest window (still called "cwnd") doubles + * every other RTT until we exceed the slow_start_threshold or the delay + * increases too much. + * + * The RTT is calculated every RTT based on the observed minimum RTT during + * the previous period. + * + * We use RFC6298 Retransmission Timeout (RTO) calculation methods per + * flow control session (object basename). + * + * Just to be clear, there are two timers working. The RTO timer is for + * retransmitting interests if the flow as stalled out. The Vegas RTT + * calculation is for congestion window calculations. + * + * We we receive an out-of-order content object, we'll check the earlier + * segments to see if they have passed the Vegas RTT. If so, we'll + * re-express the interests. + * + * Each time we re-express an Interest, we might decrese the congestion + * window. If the last time the interest was sent was more recent than + * the last time we decreased the congestion window, we'll decrease the + * congestion window. If the last expression of the interest was before + * the most recent window decrease, the window is left alone. This means + * we'll only decreae the window once per re-expression. + */ + +#include <config.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/socket.h> +#include <limits.h> +#include <sys/queue.h> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#include <LongBow/runtime.h> + +#include <parc/algol/parc_Memory.h> +#include <parc/algol/parc_SafeMemory.h> + +#include <parc/algol/parc_EventTimer.h> + +#include <ccnx/common/ccnx_NameSegmentNumber.h> +#include <ccnx/common/ccnx_WireFormatMessage.h> + +#include <ccnx/transport/common/transport_Message.h> +#include <ccnx/transport/transport_rta/core/rta_Framework.h> +#include <ccnx/transport/transport_rta/core/rta_Framework_Services.h> +#include <ccnx/transport/transport_rta/core/rta_ProtocolStack.h> +#include <ccnx/transport/transport_rta/core/rta_Connection.h> +#include <ccnx/transport/transport_rta/core/rta_Component.h> +#include <ccnx/transport/transport_rta/components/component_Flowcontrol.h> +#include "vegas_private.h" + +#include <ccnx/transport/test_tools/traffic_tools.h> + +#include <ccnx/common/internal/ccnx_InterestDefault.h> + +#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_TlvDictionary.h> + + +#define USE_MIN_BASE_RTT 0 + + +// initial congestion window of 2 interests +#define FC_INIT_CWND 2 + +// maximum cwnd (at 8KB/object, makes this 128 MB) +#define FC_MAX_CWND 16384 + +#define FC_MAX_SSTHRESH FC_MAX_CWND + +// initial RTT in msec (100 msec) +#define FC_INIT_RTT_MSEC 100 + +// initial RTO in msec +#define FC_INIT_RTO_MSEC 1000 + +#define FC_MSS 8704 +#define min(a, b) ((a < b) ? a : b) +#define max(a, b) ((a > b) ? a : b) + +// =========================================================== +struct vegas_connection_state; + +struct fc_window_entry { + bool valid; + ticks t; + ticks t_first_request; + segnum_t segnum; + + // set to true on the first interest request for + // the segment, false on subsequent requests + // Needed for Karn's algorithm on RTT sampling for RTO + bool first_request; + + // Content Object read + TransportMessage *transport_msg; +}; + +struct vegas_session { + RtaConnection *parent_connection; + RtaFramework *parent_framework; + VegasConnectionState *parent_fc; + + // next sampling time + ticks next_rtt_sample; + + // minimum observed RTT + int64_t base_RTT; // absolute minimum observed + int64_t min_RTT; // minimum RTT in current sample + int cnt_RTT; // number of RTTs seen in current sample + int64_t sum_RTT; // sum of RTTs + int slow_start_threshold; + + // the currently observed RTT + ticks current_rtt; + + // we do one detailed sample per RTT + bool sample_in_progress; + ticks sample_start; + uint64_t sample_segnum; + uint64_t sample_bytes_recevied; + + // Only adjust the cwnd every 2 RTTs. This + // indicates if we should adjust the RTT at the + // end of this sampling period + int do_fc_this_rtt; + + // circular buffer for segments + // tail - head (mod FC_MAX_CWND) is how may outstanding interests + // are in-flight. If the cwnd has been reduced, it could be larger + // than current_cwnd. + uint64_t starting_segnum; // segnum of the head + int window_head; // window index to read from + int window_tail; // window index to insert at + + uint32_t current_cwnd; + ticks last_cwnd_adjust; + + uint64_t final_segnum; // if we know the final block ID + + struct fc_window_entry window[FC_MAX_CWND]; + + PARCEventTimer *tick_event; + + // we will generate Interests with the same version as was received to start the session. + // Will also use the same lifetime settings as the original Interest. + + CCNxInterestInterface *interestInterface; + uint32_t lifetime; + PARCBuffer *keyIdRestriction; + CCNxName *basename; + uint64_t name_hash; + + uint64_t cnt_old_segments; + uint64_t cnt_fast_reexpress; + + // These are for RTO calculation + ticks SRTT; + ticks RTTVAR; + ticks RTO; + ticks next_rto; // when the next timer expires + + PARCLogLevel logLevel; +}; + +// Control parameters, measured in segments (tcp) or objects (ccn) +static int alpha = 2; +static int beta = 32; +static int _gamma = 1; + +// =========================================================== + + + +static void vegasSession_ExpressInterests(VegasSession *session); +static int vegasSession_ExpressInterestForEntry(VegasSession *session, struct fc_window_entry *entry); + +static void vegasSession_FastReexpress(VegasSession *session, struct fc_window_entry *ack_entry); +static void vegasSession_ForwardObjectsInOrder(VegasSession *session); + +static int vegasSession_GetSegnumFromObject(CCNxTlvDictionary *contentObjectDictionary, uint64_t *segnum); +static struct fc_window_entry * +vegasSession_GetWindowEntry(VegasSession *session, TransportMessage *tm, uint64_t segnum); + +static void vegasSession_ReleaseWindowEntry(struct fc_window_entry *entry); +static void vegasSession_RunAlgorithmOnReceive(VegasSession *session, struct fc_window_entry *entry); + +static void vegasSession_SetTimer(VegasSession *session, ticks tick_delay); +static void vegasSession_SlowReexpress(VegasSession *session); + +// ======================================================================= + + +static struct fc_window_entry * +vegasSession_GetWindowEntry(VegasSession *session, TransportMessage *tm, uint64_t segnum) +{ + int offset; + struct fc_window_entry *entry; + + offset = ((segnum - session->starting_segnum) + session->window_head) % FC_MAX_CWND; + entry = &session->window[offset]; + + assertTrue(entry->valid, "Requesting window entry for invalid entry %p", (void *) entry); + assertTrue(segnum == entry->segnum, "Expected seqnum not equal to window entry, expected %" PRIu64 ", got %" PRIu64, segnum, entry->segnum); + + if (entry->transport_msg != NULL) { + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__, + "session %p duplicate segment %" PRIu64 "", (void *) session, entry->segnum); + } + + transportMessage_Destroy(&entry->transport_msg); + } + + // store the content object + entry->transport_msg = tm; + + return entry; +} + +static int +vegasSession_GetSegnumFromObject(CCNxTlvDictionary *contentObjectDictionary, uint64_t *segnum) +{ + CCNxName *name = ccnxContentObject_GetName(contentObjectDictionary); + assertNotNull(name, "Content Object has null name") + { + ccnxTlvDictionary_Display(contentObjectDictionary, 0); + } + + bool success = trafficTools_GetObjectSegmentFromName(name, segnum); + + if (success) { + return 0; + } + return -1; +} + +static void +vegasSession_ReduceCongestionWindow(VegasSession *session) +{ + if (session->current_cwnd <= session->slow_start_threshold) { + // 3/4 it + session->current_cwnd = session->current_cwnd / 2 + session->current_cwnd / 4; + } else { + // in linear mode + session->current_cwnd--; + } + + if (session->current_cwnd < 2) { + session->current_cwnd = 2; + } + + session->last_cwnd_adjust = rtaFramework_GetTicks(session->parent_framework); +} + +static void +vegasSession_RunAlgorithmOnReceive(VegasSession *session, struct fc_window_entry *entry) +{ + ticks now; + int64_t fc_rtt; + + now = rtaFramework_GetTicks(session->parent_framework); + + // perform statistics updates. + + // If the codec did not include the raw message, we cannot increment the bytes counter + PARCBuffer *wireFormat = ccnxWireFormatMessage_GetWireFormatBuffer(transportMessage_GetDictionary(entry->transport_msg)); + + if (wireFormat) { + session->sample_bytes_recevied += parcBuffer_Remaining(wireFormat); + } + + + /* add +1 so never have 0 RTT */ + fc_rtt = ((int64_t) now - (int64_t) entry->t_first_request) + 1; + if (fc_rtt <= 0) { + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Error)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Error, __func__, + "session %p sock %3d : recv segment %" PRIu64 " with negative RTT, t = %" PRIu64 "", + (void *) session, + rtaConnection_GetConnectionId(session->parent_connection), + entry->segnum, + entry->t); + } + + return; + } + + /* record the absolute minimum RTT ever seen */ + if (fc_rtt < session->base_RTT) { + session->base_RTT = fc_rtt; + } + + /* find the minimum RTT for the sample period */ + session->min_RTT = min(session->min_RTT, fc_rtt); + session->cnt_RTT++; + session->sum_RTT += fc_rtt; + + // calculate RTO as per RFC6298 + if (entry->first_request) { + if (session->SRTT == 0) { + // this is the first one, so do 2.2 + session->SRTT = fc_rtt; + session->RTTVAR = fc_rtt >> 1; + session->RTO = session->SRTT + + max(rtaFramework_UsecToTicks(1000000), 4 * session->RTTVAR); + } else { + // RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'| + // using beta = 1/4, so we want 3/4 * RTTVAR + int64_t abs = ((int64_t) session->SRTT - (int64_t) fc_rtt); + + if (abs < 0) { + abs = -1 * abs; + } + + session->RTTVAR = ((session->RTTVAR >> 1) + (session->RTTVAR >> 2)) + (abs >> 2); + + // SRTT <- (1 - alpha) * SRTT + alpha * R' + // using alpha = 1/8 and (1-alpha) = 1/2 + 1/4 + 1/8 = 7/8 + session->SRTT = (session->SRTT >> 1) + (session->SRTT >> 2) + (session->SRTT >> 3) + (abs >> 3); + + session->RTO = session->SRTT + + max(rtaFramework_UsecToTicks(1000000), 4 * session->RTTVAR); + } + } + + // we received a packet :) yay. + // we get to extend the RTO expiry + session->next_rto = now + session->RTO; +} + +/* + * called inside workq_mutex lock. + * After we deliver each segment, we increment session->starting_segnum. After we deliver the + * the terminal segement of a stream, session->starting_segnum will be 1 past the final block id. + */ +static void +vegasSession_ForwardObjectsInOrder(VegasSession *session) +{ + while (session->window_head != session->window_tail) { + struct fc_window_entry *entry = &session->window[ session->window_head ]; + + // sanity checks + assertTrue(entry->valid, "Window entry %p for window_head index %u", (void *) entry, session->window_head); + assertTrue(entry->segnum == session->starting_segnum, + "Expected seqnum not equal to window entry, expected %" PRIu64 ", got %" PRIu64, + session->starting_segnum, + entry->segnum); + + if (entry->transport_msg != NULL) { + PARCEventQueue *out = rtaComponent_GetOutputQueue(session->parent_connection, FC_VEGAS, RTA_UP); + RtaComponentStats *stats = rtaConnection_GetStats(session->parent_connection, FC_VEGAS); + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__, + "session %p fd %d forward segment %" PRIu64 " up stack", + (void *) session, + rtaConnection_GetConnectionId(session->parent_connection), + entry->segnum); + } + + if (rtaComponent_PutMessage(out, entry->transport_msg)) { + // if we successfully put the message up the stack, null + // the entry so the transport message will not be destroyed + // when this window entry is released. + entry->transport_msg = NULL; + rtaComponentStats_Increment(stats, STATS_UPCALL_OUT); + } + + vegasSession_ReleaseWindowEntry(entry); + session->starting_segnum++; + session->window_head = (session->window_head + 1) % FC_MAX_CWND; + } else { + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__, + "session %p fd %d no message segment %" PRIu64 ", no more in order messages", + rtaConnection_GetConnectionId(session->parent_connection), + entry->segnum); + } + + return; + } + } +} + +static int +fc_ssthresh(VegasSession *session) +{ + return min(session->slow_start_threshold, session->current_cwnd - 1); +} + +/** + * Slow-start increase, double the cwnd + */ +static void +fc_slow_start(VegasSession *session) +{ + session->last_cwnd_adjust = rtaFramework_GetTicks(session->parent_framework); + session->current_cwnd = session->current_cwnd << 1; +} + +static +int +fc_in_cwnd_reduction(VegasSession *session) +{ + return 0; +} + +/* + * Similar to the tcp_current_ssthresh. If cwnd > ssthresh, then + * increase ssthres to 1/2 to cwnd, except if we're in a cwnd reduction + * period. + */ +static inline uint32_t +fc_current_ssthresh(VegasSession *session) +{ + if (fc_in_cwnd_reduction(session)) { + return session->slow_start_threshold; + } else { + return max(session->slow_start_threshold, + ((session->current_cwnd >> 1) + + (session->current_cwnd >> 2))); + } +} + +static void +vegasSession_CongestionAvoidanceDebug(VegasSession *session, ticks now) +{ + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) { + ticks diff = 0; + + if (session->min_RTT != INT_MAX) { + diff = session->current_cwnd * (session->min_RTT - session->base_RTT) / session->base_RTT; + } + + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__, + "session %p do_cong %d currentRTT %5" PRIu64 " cntRTT %3d minRTT %5" PRId64 " baseRTT %5" PRId64 " cwnd %3d next %8" PRIu64 " SRTT %" PRIu64 " RTO %" PRIu64 " oldsegs %" PRIu64 " fast %" PRIu64 " diff %" PRIu64 " allocs %u", + (void *) session, + session->do_fc_this_rtt, + session->current_rtt, + session->cnt_RTT, + session->min_RTT == INT_MAX ? 0 : session->min_RTT, + session->base_RTT == INT_MAX ? 0 : session->base_RTT, + session->current_cwnd, + session->next_rtt_sample, + session->SRTT, + session->RTO, + session->cnt_old_segments, + session->cnt_fast_reexpress, + diff, + parcMemory_Outstanding()); + } +} + +static void +vegasSession_LossBasedAvoidance(VegasSession *session) +{ + session->current_rtt = session->current_rtt * 2; + if (session->current_rtt > 4000) { + session->current_rtt = 4000; + } +} + +/** + * This is the Vegas algorithm + */ +static void +vegasSession_TimeBasedAvoidance(VegasSession *session) +{ + ticks rtt, diff; + uint64_t target_cwnd; + + rtt = session->min_RTT; + + /* + * calculate the target cwnd in segments + */ + target_cwnd = session->current_cwnd * session->base_RTT / rtt; + + diff = session->current_cwnd * (rtt - session->base_RTT) / session->base_RTT; + + if ((diff > _gamma && session->current_cwnd <= session->slow_start_threshold)) { + /* If we're in slow start and going too fast, slow down */ + session->current_cwnd = min(session->current_cwnd, (uint32_t) target_cwnd + 1); + session->slow_start_threshold = fc_ssthresh(session); + session->last_cwnd_adjust = rtaFramework_GetTicks(session->parent_framework); + } else if (session->current_cwnd <= session->slow_start_threshold) { + /* Slow start */ + fc_slow_start(session); + } else { + /* Congestion avoidance. */ + + // if (diff > beta || session->cnt_old_segments ) { + if (diff > beta) { + /* The old window was too fast, so + * we slow down. + */ + + session->current_cwnd--; + session->slow_start_threshold = fc_ssthresh(session); + session->last_cwnd_adjust = rtaFramework_GetTicks(session->parent_framework); + } else if (diff < alpha) { + /* room to grow */ + session->current_cwnd++; + session->last_cwnd_adjust = rtaFramework_GetTicks(session->parent_framework); + } else { + /* middle ground, no changes necessary */ + } + } + + if (session->current_cwnd < 2) { + session->current_cwnd = 2; + } else if (session->current_cwnd > FC_MAX_CWND) { + session->current_cwnd = FC_MAX_CWND; + } + + session->slow_start_threshold = fc_current_ssthresh(session); +} + +static void +vegasSession_CongestionAvoidance(VegasSession *session) +{ + ticks now = rtaFramework_GetTicks(session->parent_framework); + + vegasSession_CongestionAvoidanceDebug(session, now); + + if (session->do_fc_this_rtt) { + if (session->cnt_RTT <= 2) { + vegasSession_LossBasedAvoidance(session); + } else { + vegasSession_TimeBasedAvoidance(session); + } + + session->do_fc_this_rtt = 0; + } else { + session->do_fc_this_rtt = 1; + } + + // Now finish up the statistics and setup for next RTT interval + + session->next_rtt_sample = now + session->current_rtt; + + // low-pass filter the base_RTT from the min_RTT + // base_RTT = 15/16 base_RTT + 1/16 min_RTT = (240 * base_RTT + 16 * min_RTT ) / 256 + + if (!USE_MIN_BASE_RTT && (session->cnt_RTT > 0)) { + session->base_RTT = (240 * session->base_RTT + 16 * session->min_RTT) >> 8; + if (session->base_RTT == 0) { + session->base_RTT = 1; + } + } + + // Smooth the RTT for (3 * current + 1 * minimum) / 4 + + if (session->cnt_RTT > 0) { + session->current_rtt = (12 * session->current_rtt + 4 * session->min_RTT) >> 4; + } + + session->current_rtt = max(session->current_rtt, FC_INIT_RTT_MSEC); + + // reset stats + session->sample_bytes_recevied = 0; + session->min_RTT = INT_MAX; + session->cnt_RTT = 0; + session->cnt_old_segments = 0; + session->cnt_fast_reexpress = 0; + session->sum_RTT = 0; + + vegasSession_CongestionAvoidanceDebug(session, now); +} + +/** + * Slow (course grain) retransmission due to RTO expiry. + * Re-express the first segment of the window. + */ +static +void +vegasSession_SlowReexpress(VegasSession *session) +{ + struct fc_window_entry *entry = &session->window[ session->window_head ]; + + assertTrue(entry->valid, "entry %p segnum %" PRIu64 " invalid state, in window but not valid", + (void *) entry, entry->segnum); + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__, + "Session %p conn %p RTO re-expression for segnum %" PRIu64 "", + (void *) session, (void *) session->parent_connection, entry->segnum); + } + + entry->first_request = false; + vegasSession_ExpressInterestForEntry(session, entry); +} + +/** + * Do fast retransmissions based on SRTT smoothed estimate. + * ack_entry is the entry for a content object we just received. Look earlier segments + * and if they were asked for more than SRTT ago, ask again. + */ +static void +vegasSession_FastReexpress(VegasSession *session, struct fc_window_entry *ack_entry) +{ + ticks now = rtaFramework_GetTicks(session->parent_framework); + int64_t delta; + uint64_t segnum; + uint64_t top_segnum; + + // This method is called after forward_in_order, so it's possible that + // ack_entry is no longer valid, meaning we've moved the window past it. + // In that case, we're done. + if (ack_entry->valid == false) { + return; + } + + // we don't retransmit beyond the current cwnd. ack_entry might be outside + // the cwnd. + + top_segnum = min(ack_entry->segnum, session->starting_segnum + session->current_cwnd); + + for (segnum = session->starting_segnum; segnum < top_segnum; segnum++) { + int index = (session->window_head + (segnum - session->starting_segnum)) % FC_MAX_CWND; + delta = (int64_t) now - ((int64_t) session->window[index].t + (int64_t) session->SRTT); + + // allow up to -1 slack, because the RunAlgorithm adds +1 to fc_rtt. + if (delta >= -1) { + // we have past the SRTT timeout + + // if we last re-transmitted him since the last cwnd adjustment, adjust again + if ((int64_t) session->window[index].t - (int64_t) session->last_cwnd_adjust >= 0) { + vegasSession_ReduceCongestionWindow(session); + } + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__, + "session %p conn %p RTO re-expression for segnum %" PRIu64 "", + (void *) session, (void *) session->parent_connection, session->window[index].segnum); + } + + session->window[index].first_request = false; + session->cnt_fast_reexpress++; + vegasSession_ExpressInterestForEntry(session, &session->window[index]); + } + } +} + +/** + * Generates an Interest message for the window entry. + * + * No side effects, apart from putting on Interest on the down queue. + * If the down direction is blocked, this function will not put an interest in the down queue. It will + * look like a lost interest to the flow controller, which should cause the flow controller to slow down. + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +static int +vegasSession_ExpressInterestForEntry(VegasSession *session, struct fc_window_entry *entry) +{ + if (!rtaConnection_BlockedDown(session->parent_connection)) { + ticks now = rtaFramework_GetTicks(session->parent_framework); + PARCEventQueue *q_out; + TransportMessage *tm_out; + CCNxName *chunk_name; + + entry->t = now; + + chunk_name = ccnxName_Copy(session->basename); + + CCNxNameSegment *segment = ccnxNameSegmentNumber_Create(CCNxNameLabelType_CHUNK, entry->segnum); + ccnxName_Append(chunk_name, segment); + ccnxNameSegment_Release(&segment); + + assertNotNull(session->interestInterface, "Got a NULL interestInterface. Should not happen."); + + CCNxTlvDictionary *interestDictionary = + session->interestInterface->create(chunk_name, + session->lifetime, + NULL, // ppkid + NULL, // content object hash + CCNxInterestDefault_HopLimit); + + if (session->keyIdRestriction != NULL) { + session->interestInterface->setKeyIdRestriction(interestDictionary, session->keyIdRestriction); + } + + tm_out = transportMessage_CreateFromDictionary(interestDictionary); + transportMessage_SetInfo(tm_out, rtaConnection_Copy(session->parent_connection), rtaConnection_FreeFunc); + + q_out = rtaComponent_GetOutputQueue(session->parent_connection, FC_VEGAS, RTA_DOWN); + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) { + char *string = ccnxName_ToString(chunk_name); + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__, + "session %p entry %p segname %p segnum %" PRIu64 " %s sent", + (void *) session, + (void *) entry, + (void *) chunk_name, + entry->segnum, + string); + parcMemory_Deallocate((void **) &string); + } + + ccnxTlvDictionary_Release(&interestDictionary); + ccnxName_Release(&chunk_name); + + if (rtaComponent_PutMessage(q_out, tm_out)) { + rtaComponentStats_Increment(rtaConnection_GetStats(session->parent_connection, FC_VEGAS), + STATS_DOWNCALL_OUT); + } + } else { + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) { + CCNxName *segment_name = ccnxName_Copy(session->basename); + ccnxName_Append(segment_name, ccnxNameSegmentNumber_Create(CCNxNameLabelType_CHUNK, entry->segnum)); + char *string = ccnxName_ToString(segment_name); + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__, + "session %p entry %p segname %p segnum %" PRIu64 " %s SUPPRESSED BLOCKED DOWN QUEUE", + (void *) session, + (void *) entry, + (void *) segment_name, + entry->segnum, + string); + parcMemory_Deallocate((void **) &string); + ccnxName_Release(&segment_name); + } + } + + return 0; +} + +/* + * Express interests out to the max allowed by the cwnd. This function will operate + * even if the down queue is blocked. Those interests will be treated as lost, which will cause + * the flow controller to slow down. + */ +static void +vegasSession_ExpressInterests(VegasSession *session) +{ + ticks now = rtaFramework_GetTicks(session->parent_framework); + + // how many interests are currently outstanding? + int wsize = session->window_tail - session->window_head; + if (wsize < 0) { + wsize += FC_MAX_CWND; + } + + // if we know the FBID, don't ask for anything beyond that + while (wsize < session->current_cwnd && (wsize + session->starting_segnum <= session->final_segnum)) { + // expreess them + struct fc_window_entry *entry = &session->window[session->window_tail]; + + assertFalse(entry->valid, + "Window entry %d marked as valid, but its outside the cwind!", + session->window_tail); + + session->window_tail = (session->window_tail + 1) % FC_MAX_CWND; + + memset(entry, 0, sizeof(struct fc_window_entry)); + + entry->valid = true; + entry->segnum = session->starting_segnum + wsize; + entry->first_request = true; + entry->t_first_request = now; + + if (session->sample_in_progress == 0) { + // make this interest the sample for the RTT + session->sample_in_progress = true; + session->sample_segnum = entry->segnum; + session->sample_start = now; + session->sample_bytes_recevied = 0; + } + + vegasSession_ExpressInterestForEntry(session, entry); + + wsize++; + } +} + +/* + * This is dispatched from the event loop, so its a loosely accurate time + */ +static void +vegasSession_TimerCallback(int fd, PARCEventType what, void *user_data) +{ + VegasSession *session = (VegasSession *) user_data; + int64_t delta; + ticks now; + + assertTrue(what & PARCEventType_Timeout, "%s got unknown signal %d", __func__, what); + + now = rtaFramework_GetTicks(session->parent_framework); + delta = ((int64_t) now - (int64_t) session->next_rtt_sample); + + if (delta >= 0) { + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__, + "Session %p processing timer, delta %" PRId64, + (void *) session, delta); + } + + // This entry is ready for processing + vegasSession_CongestionAvoidance(session); + + // set the next timer + vegasSession_SetTimer(session, session->current_rtt); + } else { + vegasSession_SetTimer(session, -1 * delta); + } + + // check for retransmission + delta = ((int64_t) now - (int64_t) session->next_rto); + if (delta >= 0) { + // Do this once per RTO + vegasSession_SlowReexpress(session); + + // we're now in a doubling regeme. Reset the + // moving average and double the RTO. + session->SRTT = 0; + session->RTTVAR = 0; + session->RTO = session->RTO * 2; + session->next_rto = now + session->RTO; + } +} + +/** + * precondition: the entry is valid + */ +static void +vegasSession_ReleaseWindowEntry(struct fc_window_entry *entry) +{ + assertTrue(entry->valid, "Called on invalid window entry"); + if (!entry->valid) { + return; + } + + if (entry->transport_msg != NULL) { + transportMessage_Destroy(&entry->transport_msg); + } + entry->valid = false; +} + +static void +vegasSession_SetTimer(VegasSession *session, ticks tick_delay) +{ + struct timeval timeout; + uint64_t usec = rtaFramework_TicksToUsec(tick_delay); + const unsigned usec_per_sec = 1000000; + + timeout.tv_sec = usec / usec_per_sec; + timeout.tv_usec = (int) (usec - timeout.tv_sec * usec_per_sec); + + // this replaces any prior events + parcEventTimer_Start(session->tick_event, &timeout); + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__, + "session %p tick_delay %" PRIu64 " timeout %.6f", + (void *) session, + tick_delay, + timeout.tv_sec + 1E-6 * timeout.tv_usec); + } +} + +// ============================================= +// Private API + +/** + * Unsets the final segment number indicating we do not know the value + * + * Sets the final segment number to the maximum possible value, which effectively + * lets us run off to infinity. + * + * @param [in] session An allocated vegas session + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void +_vegasSession_UnsetFinalSegnum(VegasSession *session) +{ + session->final_segnum = ULLONG_MAX; +} + +VegasSession * +vegasSession_Create(VegasConnectionState *fc, RtaConnection *conn, CCNxName *basename, segnum_t begin, + CCNxInterestInterface *interestInterface, uint32_t lifetime, PARCBuffer *keyIdRestriction) +{ + assertNotNull(conn, "Called with null connection"); + assertNotNull(basename, + "conn %p connid %u called with null basename", + (void *) conn, + rtaConnection_GetConnectionId(conn)); + + if (conn == NULL || basename == NULL) { + return NULL; + } + + VegasSession *session = parcMemory_AllocateAndClear(sizeof(VegasSession)); + assertNotNull(session, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(VegasSession)); + session->parent_connection = conn; + session->parent_framework = rtaProtocolStack_GetFramework(rtaConnection_GetStack(conn)); + session->interestInterface = interestInterface; + session->lifetime = lifetime; + session->basename = basename; + if (keyIdRestriction != NULL) { + session->keyIdRestriction = parcBuffer_Acquire(keyIdRestriction); + } + session->parent_fc = fc; + + session->tick_event = parcEventTimer_Create(rtaFramework_GetEventScheduler(session->parent_framework), 0, vegasSession_TimerCallback, (void *) session); + + session->starting_segnum = 0; + session->current_cwnd = FC_INIT_CWND; + session->min_RTT = INT_MAX; + session->base_RTT = INT_MAX; + session->do_fc_this_rtt = 0; + session->current_rtt = rtaFramework_UsecToTicks(FC_INIT_RTT_MSEC * 1000); + session->slow_start_threshold = FC_MAX_SSTHRESH; + + session->SRTT = 0; + session->RTTVAR = 0; + session->RTO = rtaFramework_UsecToTicks(FC_INIT_RTO_MSEC * 1000); + session->next_rto = ULLONG_MAX; + session->cnt_old_segments = 0; + session->cnt_fast_reexpress = 0; + + _vegasSession_UnsetFinalSegnum(session); + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Notice)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Notice, __func__, + "session %p initialized connid %u ", + (void *) session, + rtaConnection_GetConnectionId(conn)); + } + return session; +} + +static void +vegasSession_Close(VegasSession *session) +{ + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Notice)) { + char *p = ccnxName_ToString(session->basename); + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Notice, __func__, + "session %p close starting segnum %" PRIu64 " final chunk ID %" PRIu64 " for name %s", + (void *) session, session->starting_segnum, session->final_segnum, p); + parcMemory_Deallocate((void **) &p); + } + + ccnxName_Release(&session->basename); + + while (session->window_head != session->window_tail) { + struct fc_window_entry *entry = &session->window[ session->window_head ]; + + // sanity checks + assertTrue(entry->valid, "connid %u session %p entry %d in window but not valid", + rtaConnection_GetConnectionId(session->parent_connection), + (void *) session, + session->window_head); + + if (entry->valid) { + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) { + char *p = ccnxName_ToString(session->basename); + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__, + "session %p releasing window entry %d", (void *) session, session->window_head); + parcMemory_Deallocate((void **) &p); + } + + vegasSession_ReleaseWindowEntry(entry); + } + + session->window_head = (session->window_head + 1) % FC_MAX_CWND; + } +} + +void +vegasSession_Destroy(VegasSession **sessionPtr) +{ + VegasSession *session; + + assertNotNull(sessionPtr, "Called with null double pointer"); + session = *sessionPtr; + + if (session->keyIdRestriction != NULL) { + parcBuffer_Release(&session->keyIdRestriction); + } + + vegasSession_Close(session); + + parcEventTimer_Destroy(&(session->tick_event)); + parcMemory_Deallocate((void **) &session); + sessionPtr = NULL; +} + +int +vegasSession_Start(VegasSession *session) +{ + ticks now = rtaFramework_GetTicks(session->parent_framework); + + // express the initial interests + vegasSession_ExpressInterests(session); + + session->next_rtt_sample = now - 1; + session->next_rto = now + session->RTO; + + // put it on the work queue for procesing + + vegasSession_SetTimer(session, session->current_rtt); + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__, + "Session %p start", (void *) session); + } + + return 0; +} + +int +vegasSession_Pause(VegasSession *session) +{ + trapNotImplemented("vegasSession_Pause"); +} + +int +vegasSession_Resume(VegasSession *session) +{ + trapNotImplemented("vegasSession_Resume"); +} + +int +vegasSession_Seek(VegasSession *session, segnum_t absolutePosition) +{ + trapNotImplemented("vegasSession_See)"); +} + +/** + * Retrieves the final block ID from the content object + * + * Retreives the final block ID from the object, if it exists, and returns it in + * an output parameter. Returns true if found and returned, false otherwise. + * + * @param [in] obj The Content Object to get the FBID form + * @param [out] output Pointer to the seqnum ouptut + * + * @return true If the content object contained a FBID and the output set + * @return false If there is no FBID in the content object + * + * Example: + * @code + * <#example#> + * @endcode + */ +static bool +vegasSession_GetFinalBlockIdFromContentObject(CCNxTlvDictionary *obj, uint64_t *output) +{ + bool result = false; + if (ccnxContentObject_HasFinalChunkNumber(obj)) { + *output = ccnxContentObject_GetFinalChunkNumber(obj); + result = true; + } + return result; +} + +/** + * Sets the final block id in the session based on the signed info + * + * If the final block id exists in the signed info, set the session's FBID. + * + * Rules on FinalChunkNumber: + * + * 1) The “final chunk” of a stream is identified by a content object having a FinalChunkNumber + * set in its metadata that equals the chunk number in its name. + * + * 2) An application may set the FinalChunkNumber early to let a receiver know when the end is coming. These early advisories are not binding. + * + * 3) If the application has ever set the FinalChunkNumber it may not decrease it. If the actual end happens before a previous advisory, + * the application must publish no-payload content objects such that Rule #1 is satisfied + * + * + * @param [in,out] session The Vegas session + * @param [in] obj The signed content object to get the FBID from + * @param [in] nameChunkNumber is the chunk number in the name + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void +vegasSession_SetFinalBlockId(VegasSession *session, CCNxTlvDictionary *contentObjectDictionary, uint64_t nameChunkNumber) +{ + // Get the FinalChunkNumber out of the metadata and update our notion of it + uint64_t finalChunkNumber; + if (vegasSession_GetFinalBlockIdFromContentObject(contentObjectDictionary, &finalChunkNumber)) { + session->final_segnum = finalChunkNumber; + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__, + "Session %p finalChunkNumber %" PRIu64, (void *) session, session->final_segnum); + } + } else { + // There is no final chunk number in the metadata. If the nameChunkNumber == session->final_seqnum, then + // our idea of the final_seqnum is wrong and we should unset it as the producer did not actually close + // the stream when they said they would + + if (session->final_segnum == nameChunkNumber) { + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Warning)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Warning, __func__, + "Session %p finalChunkNumber %" PRIu64 " not set in final chunk, resetting", + (void *) session, session->final_segnum); + } + + _vegasSession_UnsetFinalSegnum(session); + } + } +} + +/** + * We received a duplicate segment from before the start of the current congestion window + * + * + * If we receive a segment from before the start of the current congestion window, then it + * must be a duplicate (we don't have skip forward implemented). Reduce the congestion window size. + * We only reduce the window once per RTT interval no matter how many early duplicates we get. + * + * @param [in,out] session The Vegas session to reduce the window of. + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void +vegasSession_ReceivedBeforeWindowStart(VegasSession *session, uint64_t segnum) +{ + // once per cwnd, reduce the window on out-of-order + if (session->cnt_old_segments == 0) { + vegasSession_ReduceCongestionWindow(session); + } + + session->cnt_old_segments++; + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__, + "Session %p connid %3u : recv old segment %" PRIu64 ", starting is %" PRIu64 ", cnt %" PRIu64 "", + (void *) session, + rtaConnection_GetConnectionId(session->parent_connection), + segnum, + session->starting_segnum, + session->cnt_old_segments); + } +} + +static void +vegasSession_SendMoreInterests(VegasSession *session, struct fc_window_entry *entry) +{ + // This will check if there's any earlier segments whose + // RTT has expired and will re-ask for them. This is the + // out-of-order fast retransmit. + vegasSession_FastReexpress(session, entry); + + // have we finished? + if (session->starting_segnum < session->final_segnum) { + // express more interests if we have the window for it + vegasSession_ExpressInterests(session); + } else + if (session->starting_segnum > session->final_segnum) { + // if starting_segment > final_segnum it means that we have delivered the last + // segment up the stack. + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Info, __func__, + "Session %p connid %u starting_segnum %" PRIu64 ", final_segnum %" PRIu64 ", FINAL SEGMENT DELIVERED, CLOSING", + (void *) session, + rtaConnection_GetConnectionId(session->parent_connection), + session->starting_segnum, + session->final_segnum); + } + + parcEventTimer_Stop(session->tick_event); + vegas_EndSession(session->parent_fc, session); + } + // else session->starting_segnum == session->final_segnum, we're not done yet. +} + +static CCNxName * +vegasSession_GetNameFromTransportMessage(TransportMessage *tm) +{ + CCNxName *name = NULL; + CCNxTlvDictionary *dictionary = transportMessage_GetDictionary(tm); + switch (ccnxTlvDictionary_GetSchemaVersion(dictionary)) { + case CCNxTlvDictionary_SchemaVersion_V1: + name = ccnxTlvDictionary_GetName(dictionary, CCNxCodecSchemaV1TlvDictionary_MessageFastArray_NAME); + break; + + default: + break; + } + return name; +} + + + +int +vegasSession_ReceiveContentObject(VegasSession *session, TransportMessage *tm) +{ + assertTrue(transportMessage_IsContentObject(tm), + "Transport message is not a content object"); + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) { + CCNxName *name = vegasSession_GetNameFromTransportMessage(tm); + char *nameString = NULL; + if (name) { + nameString = ccnxName_ToString(name); + } + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__, + "Session %p connid %3u receive tm %p: %s", + (void *) session, + rtaConnection_GetConnectionId(session->parent_connection), + (void *) tm, + nameString); + if (nameString) { + parcMemory_Deallocate((void **) &nameString); + } + } + + CCNxTlvDictionary *contentObjectDictionary = transportMessage_GetDictionary(tm); + + // get segment number + uint64_t segnum; + int res = vegasSession_GetSegnumFromObject(contentObjectDictionary, &segnum); + if (res != 0) { + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Warning)) { + CCNxName *name = vegasSession_GetNameFromTransportMessage(tm); + char *nameString = NULL; + if (name) { + nameString = ccnxName_ToString(name); + } + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Warning, __func__, + "Session %p connid %3u receive tm %p has no segment number: %s", + (void *) session, + rtaConnection_GetConnectionId(session->parent_connection), + (void *) tm, + nameString); + if (nameString) { + parcMemory_Deallocate((void **) &nameString); + } + } + + // couldn't figure it out + transportMessage_Destroy(&tm); + return -1; + } + + // drop out of order + if (segnum < session->starting_segnum) { + vegasSession_ReceivedBeforeWindowStart(session, segnum); + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) { + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__, + "Session %p connid %3u : tm %p received segnum %" PRIu64 " before current head %" PRIu64 "", + (void *) session, + __func__, + rtaConnection_GetConnectionId(session->parent_connection), + (void *) tm, + segnum, + session->starting_segnum); + } + + transportMessage_Destroy(&tm); + return -1; + } + + // Update our idea of the final chunk number. This must be done + // before running the algorithm because session->final_segnum is used + // to decide if we're done. + vegasSession_SetFinalBlockId(session, contentObjectDictionary, segnum); + + + // now run the algorithm on the received object + + struct fc_window_entry *entry = vegasSession_GetWindowEntry(session, tm, segnum); + + if (rtaLogger_IsLoggable(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug)) { + CCNxName *name = vegasSession_GetNameFromTransportMessage(tm); + char *nameString = NULL; + if (name) { + nameString = ccnxName_ToString(name); + } + rtaLogger_Log(rtaFramework_GetLogger(session->parent_framework), RtaLoggerFacility_Flowcontrol, PARCLogLevel_Debug, __func__, + "Session %p connid %3u receive tm %p segment %" PRIu64 " receive: %s", + (void *) session, + rtaConnection_GetConnectionId(session->parent_connection), + (void *) tm, + segnum, + nameString); + if (nameString) { + parcMemory_Deallocate((void **) &nameString); + } + } + + vegasSession_RunAlgorithmOnReceive(session, entry); + + // forward in-order objects to the user fc + if (!rtaConnection_BlockedUp(session->parent_connection)) { + vegasSession_ForwardObjectsInOrder(session); + } + + vegasSession_SendMoreInterests(session, entry); + + return 0; +} + +unsigned +vegasSession_GetConnectionId(VegasSession *session) +{ + assertNotNull(session, "Parameter session must be non-null"); + return rtaConnection_GetConnectionId(session->parent_connection); +} + +void +vegasSession_StateChanged(VegasSession *session) +{ + if (rtaConnection_BlockedUp(session->parent_connection)) { + // if we're blocked in the up direction, don't do anything. We make this + // check every time we're about ti send stuff up the stack in vegasSession_ReceiveContentObject(). + } else { + // unblocked, forward packets + vegasSession_ForwardObjectsInOrder(session); + } + + if (rtaConnection_BlockedDown(session->parent_connection)) { + // stop generating interests + } else { + // restart interests + } +} diff --git a/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_private.h b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_private.h new file mode 100644 index 00000000..b2088567 --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/transport_rta/components/Flowcontrol_Vegas/vegas_private.h @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file vegas_private.h + * @brief <#Brief Description#> + * + * <#Detailed Description#> + * + */ +#ifndef Libccnx_vegas_private_h +#define Libccnx_vegas_private_h + +#include <ccnx/common/ccnx_Name.h> +#include <ccnx/common/internal/ccnx_ContentObjectInterface.h> + +typedef uint64_t segnum_t; + +struct vegas_session; +typedef struct vegas_session VegasSession; + +struct vegas_connection_state; +typedef struct vegas_connection_state VegasConnectionState; + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [in] fc An allocated Vegas flow controller + * @param [in] conn The RTA connection owning the flow + * @param [in] basename The name without a chunk number + * @param [in] begin The chunk number to begin requesting at + * @param [in] interestInterface The {@link CCNxInterestInterface} to use to generate new Interests + * @param [in] lifetime The default lifetime, in milli-seconds, to use for generated Interests + * @param [in] keyIdRestriction The KeyIdRestriction, if any, from the originating Interest + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +VegasSession *vegasSession_Create(VegasConnectionState *fc, RtaConnection *conn, CCNxName *basename, + segnum_t begin, CCNxInterestInterface *interestInterface, uint32_t lifetime, + PARCBuffer *keyIdRestriction); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +void vegasSession_Destroy(VegasSession **sessionPtr); +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +int vegasSession_Start(VegasSession *session); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +int vegasSession_Pause(VegasSession *session); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +int vegasSession_Resume(VegasSession *session); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +int vegasSession_Seek(VegasSession *session, segnum_t absolutePosition); + +/** + * <#One Line Description#> + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +int vegasSession_ReceiveContentObject(VegasSession *session, TransportMessage *tm); + + +/** + * Tell a session that there was a state change in its connection + * + * The caller should ensure that the session's connection is the right one by + * using {@link vegasSession_GetConnectionId}. + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +void vegasSession_StateChanged(VegasSession *session); + +/** + * Returns the connection id used by the session + * + * <#Paragraphs Of Explanation#> + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +unsigned vegasSession_GetConnectionId(VegasSession *session); + + +/** + * <#One Line Description#> + * + * Called by a session when it is done + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + * + * @see <#references#> + */ +void vegas_EndSession(VegasConnectionState *fc, VegasSession *session); +#endif // Libccnx_vegas_private_h |