diff options
Diffstat (limited to 'metis/ccnx/forwarder/metis/processor/metis_MessageProcessor.c')
-rw-r--r-- | metis/ccnx/forwarder/metis/processor/metis_MessageProcessor.c | 860 |
1 files changed, 860 insertions, 0 deletions
diff --git a/metis/ccnx/forwarder/metis/processor/metis_MessageProcessor.c b/metis/ccnx/forwarder/metis/processor/metis_MessageProcessor.c new file mode 100644 index 00000000..26e67761 --- /dev/null +++ b/metis/ccnx/forwarder/metis/processor/metis_MessageProcessor.c @@ -0,0 +1,860 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + */ + +#include <config.h> +#include <stdio.h> +#include <string.h> + +#include <ccnx/forwarder/metis/processor/metis_MessageProcessor.h> +#include <parc/algol/parc_Memory.h> +#include <parc/algol/parc_ArrayList.h> + +#include <ccnx/forwarder/metis/processor/metis_StandardPIT.h> +#include <ccnx/forwarder/metis/processor/metis_FIB.h> + +#include <ccnx/forwarder/metis/content_store/metis_ContentStoreInterface.h> +#include <ccnx/forwarder/metis/content_store/metis_LRUContentStore.h> + +#include <ccnx/forwarder/metis/strategies/metis_StrategyImpl.h> +#include <ccnx/forwarder/metis/strategies/strategy_Rnd.h> +#include <ccnx/forwarder/metis/strategies/strategy_LoadBalancer.h> +#include <ccnx/forwarder/metis/strategies/strategy_RndSegment.h> +#include <ccnx/forwarder/metis/strategies/strategy_LoadBalancerWithPD.h> + + +#include <LongBow/runtime.h> + +/** + * @typedef MetisProcessorStats + * @abstract MessageProcessor event counters + * + * @constant countReceived All received messages, the good, the bad, the ugly + * @constant countInterestsReceived Count of received interests + * @constant countObjectsReceived Count of received content objects + * + * @constant countInterestsAggregated Number of Interests suppressed via PIT table aggregation + * @constant countInterestForwarded Number of Interests forwarded, for each outbound interface + * @constant countObjectsForwarded Number of Content Objects forwarded, for each outbound interface + * @constant countInterestsSatisfiedFromStore Number of Interests satisfied from the Content Store + * + * @constant countDropped Number of messages dropped, for any reason + * @constant countInterestsDropped Number of Interests dropped, for any reason + * @constant countDroppedNoRoute Number of Interests dropped because no FIB entry + * @constant countDroppedNoReversePath Number of Content Objects dropped because no PIT entry + * @constant countDroppedNoHopLimit Number of Interests without a HopLimit + * @constant countDroppedZeroHopLimitFromRemote Number of Interest from a remote node with a 0 hoplimit + * + * @constant countDroppedZeroHopLimitToRemote Number of Interest not forwarded to a FIB entry because hoplimit is 0 and its remote + * @constant countSendFailures Number of send failures (problems using MetisIoOperations) + * + * @discussion <#Discussion#> + */ +typedef struct metis_processor_stats { + uint32_t countReceived; + uint32_t countInterestsReceived; + uint32_t countObjectsReceived; + + uint32_t countInterestsAggregated; + + uint32_t countDropped; + uint32_t countInterestsDropped; + uint32_t countDroppedNoRoute; + uint32_t countDroppedNoReversePath; + + uint32_t countDroppedConnectionNotFound; + uint32_t countObjectsDropped; + + uint32_t countSendFailures; + uint32_t countInterestForwarded; + uint32_t countObjectsForwarded; + uint32_t countInterestsSatisfiedFromStore; + + uint32_t countDroppedNoHopLimit; + uint32_t countDroppedZeroHopLimitFromRemote; + uint32_t countDroppedZeroHopLimitToRemote; +} _MetisProcessorStats; + +struct metis_message_processor { + MetisForwarder *metis; + MetisLogger *logger; + MetisTap *tap; + + MetisPIT *pit; + MetisContentStoreInterface *contentStore; + MetisFIB *fib; + + bool store_in_cache; + bool serve_from_cache; + + _MetisProcessorStats stats; +}; + +static void metisMessageProcessor_Drop(MetisMessageProcessor *processor, MetisMessage *message); +static void metisMessageProcessor_ReceiveInterest(MetisMessageProcessor *processor, MetisMessage *interestMessage); +static void metisMessageProcessor_ReceiveContentObject(MetisMessageProcessor *processor, MetisMessage *objectMessage); +static unsigned metisMessageProcessor_ForwardToNexthops(MetisMessageProcessor *processor, MetisMessage *message, const MetisNumberSet *nexthops); + +static void metisMessageProcessor_ForwardToInterfaceId(MetisMessageProcessor *processor, MetisMessage *message, unsigned interfaceId); + +// ============================================================ +// Public API + +MetisMessageProcessor * +metisMessageProcessor_Create(MetisForwarder *metis) +{ + size_t objectStoreSize = metisConfiguration_GetObjectStoreSize(metisForwarder_GetConfiguration(metis)); + + MetisMessageProcessor *processor = parcMemory_AllocateAndClear(sizeof(MetisMessageProcessor)); + assertNotNull(processor, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(MetisMessageProcessor)); + memset(processor, 0, sizeof(MetisMessageProcessor)); + + processor->metis = metis; + processor->logger = metisLogger_Acquire(metisForwarder_GetLogger(metis)); + processor->pit = metisStandardPIT_Create(metis); + + processor->fib = metisFIB_Create(processor->logger); + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "MessageProcessor %p created", + (void *) processor); + } + + MetisContentStoreConfig contentStoreConfig = { + .objectCapacity = objectStoreSize, + }; + + // Currently, this will instantiate an LRUContentStore. Perhaps someday it'll switch stores + // based on the MetisContentStoreConfig passed to it. + processor->contentStore = metisLRUContentStore_Create(&contentStoreConfig, processor->logger); + + //the two flags for the cache are set to true by default. If the cache + //is active it always work as expected unless the use modifies this + //values using metis_control + processor->store_in_cache = true; + processor->serve_from_cache = true; + + return processor; +} + +void +metisMessageProcessor_SetContentObjectStoreSize(MetisMessageProcessor *processor, size_t maximumContentStoreSize) +{ + assertNotNull(processor, "Parameter processor must be non-null"); + metisContentStoreInterface_Release(&processor->contentStore); + + MetisContentStoreConfig contentStoreConfig = { + .objectCapacity = maximumContentStoreSize + }; + + processor->contentStore = metisLRUContentStore_Create(&contentStoreConfig, processor->logger); +} + +void +metisMessageProcessor_ClearCache(MetisMessageProcessor *processor) +{ + assertNotNull(processor, "Parameter processor must be non-null"); + size_t objectStoreSize = metisConfiguration_GetObjectStoreSize(metisForwarder_GetConfiguration(processor->metis)); + + metisContentStoreInterface_Release(&processor->contentStore); + + MetisContentStoreConfig contentStoreConfig = { + .objectCapacity = objectStoreSize, + }; + + processor->contentStore = metisLRUContentStore_Create(&contentStoreConfig, processor->logger); +} + +MetisContentStoreInterface * +metisMessageProcessor_GetContentObjectStore(const MetisMessageProcessor *processor) +{ + assertNotNull(processor, "Parameter processor must be non-null"); + return processor->contentStore; +} + +void +metisMessageProcessor_Destroy(MetisMessageProcessor **processorPtr) +{ + assertNotNull(processorPtr, "Parameter must be non-null double pointer"); + assertNotNull(*processorPtr, "Parameter dereference to non-null pointer"); + + MetisMessageProcessor *processor = *processorPtr; + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "MessageProcessor %p destroyed", + (void *) processor); + } + + metisLogger_Release(&processor->logger); + metisFIB_Destroy(&processor->fib); + metisContentStoreInterface_Release(&processor->contentStore); + metisPIT_Release(&processor->pit); + + parcMemory_Deallocate((void **) &processor); + *processorPtr = NULL; +} + +void +metisMessageProcessor_Receive(MetisMessageProcessor *processor, MetisMessage *message) +{ + assertNotNull(processor, "Parameter processor must be non-null"); + assertNotNull(message, "Parameter message must be non-null"); + + processor->stats.countReceived++; + + if (processor->tap != NULL && processor->tap->isTapOnReceive(processor->tap)) { + processor->tap->tapOnReceive(processor->tap, message); + } + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + char *nameString = "NONAME"; + if (metisMessage_HasName(message)) { + CCNxName *name = metisTlvName_ToCCNxName(metisMessage_GetName(message)); + nameString = ccnxName_ToString(name); + + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "Message %p ingress %3u length %5u received name %s", + (void *) message, + metisMessage_GetIngressConnectionId(message), + metisMessage_Length(message), + nameString); + + parcMemory_Deallocate((void **) &nameString); + ccnxName_Release(&name); + } + } + + switch (metisMessage_GetType(message)) { + case MetisMessagePacketType_Interest: + metisMessageProcessor_ReceiveInterest(processor, message); + break; + + case MetisMessagePacketType_ContentObject: + metisMessageProcessor_ReceiveContentObject(processor, message); + break; + + default: + metisMessageProcessor_Drop(processor, message); + break; + } + + // if someone wanted to save it, they made a copy + metisMessage_Release(&message); +} + +void +metisMessageProcessor_AddTap(MetisMessageProcessor *processor, MetisTap *tap) +{ + assertNotNull(processor, "Parameter processor must be non-null"); + assertNotNull(tap, "Parameter tap must be non-null"); + + processor->tap = tap; +} + +void +metisMessageProcessor_RemoveTap(MetisMessageProcessor *processor, const MetisTap *tap) +{ + assertNotNull(processor, "Parameter processor must be non-null"); + assertNotNull(tap, "Parameter tap must be non-null"); + + if (processor->tap == tap) { + processor->tap = NULL; + } +} + +static void +_metisMessageProcess_CheckForwardingStrategies(MetisMessageProcessor *processor) +{ + MetisFibEntryList *fib_entries = metisMessageProcessor_GetFibEntries(processor); + size_t size = metisFibEntryList_Length(fib_entries); + for (unsigned i = 0; i < size; i++) { + MetisFibEntry *entry = (MetisFibEntry *) metisFibEntryList_Get(fib_entries, i); + const char *strategy = metisFibEntry_GetFwdStrategyType(entry); + if (strcmp(strategy, FWD_STRATEGY_LOADBALANCER_WITH_DELAY) == 0) { + strategyLoadBalancerWithPD_SetConnectionTable(metisFibEntry_GetFwdStrategy(entry), + metisForwarder_GetConnectionTable(processor->metis)); + } + } + metisFibEntryList_Destroy(&fib_entries); +} + +bool +metisMessageProcessor_AddOrUpdateRoute(MetisMessageProcessor *processor, CPIRouteEntry *route) +{ + MetisConfiguration *config = metisForwarder_GetConfiguration(processor->metis); + const char *fwdStrategy = metisConfiguration_GetForwarginStrategy(config, cpiRouteEntry_GetPrefix(route)); + bool res = metisFIB_AddOrUpdate(processor->fib, route, fwdStrategy); + _metisMessageProcess_CheckForwardingStrategies(processor); + return res; +} + +bool +metisMessageProcessor_RemoveRoute(MetisMessageProcessor *processor, CPIRouteEntry *route) +{ + return metisFIB_Remove(processor->fib, route); +} + +void +metisMessageProcessor_RemoveConnectionIdFromRoutes(MetisMessageProcessor *processor, unsigned connectionId) +{ + metisFIB_RemoveConnectionIdFromRoutes(processor->fib, connectionId); +} + +void +metisProcessor_SetStrategy(MetisMessageProcessor *processor, CCNxName *prefix, const char *strategy) +{ + MetisFibEntryList *fib_entries = metisMessageProcessor_GetFibEntries(processor); + MetisTlvName *strategyPrefix = metisTlvName_CreateFromCCNxName(prefix); + size_t size = metisFibEntryList_Length(fib_entries); + for (unsigned i = 0; i < size; i++) { + MetisFibEntry *entry = (MetisFibEntry *) metisFibEntryList_Get(fib_entries, i); + MetisTlvName *entryPrefix = metisFibEntry_GetPrefix(entry); + if (metisTlvName_Equals(entryPrefix, strategyPrefix)) { + metisFibEntry_SetStrategy(entry, strategy); + } + } + metisTlvName_Release(&strategyPrefix); + metisFibEntryList_Destroy(&fib_entries); + _metisMessageProcess_CheckForwardingStrategies(processor); +} + +MetisFibEntryList * +metisMessageProcessor_GetFibEntries(MetisMessageProcessor *processor) +{ + assertNotNull(processor, "Parameter processor must be non-null"); + return metisFIB_GetEntries(processor->fib); +} + +// ============================================================ +// Internal API + +/** + * @function metisMessageProcessor_Drop + * @abstract Whenever we "drop" a message, notify the OnDrop tap and increment countes + * @discussion + * This is a bookkeeping function. It notifies the tap, if its an onDrop tap, and + * it increments the appropriate counters. + * + * The default action for a message is to destroy it in <code>metisMessageProcessor_Receive()</code>, + * so this function does not need to do that. + * + * @param <#param1#> + */ +static void +metisMessageProcessor_Drop(MetisMessageProcessor *processor, MetisMessage *message) +{ + if (processor->tap != NULL && processor->tap->isTapOnDrop && processor->tap->isTapOnDrop(processor->tap)) { + processor->tap->tapOnDrop(processor->tap, message); + } + + processor->stats.countDropped++; + + switch (metisMessage_GetType(message)) { + case MetisMessagePacketType_Interest: + processor->stats.countInterestsDropped++; + break; + + case MetisMessagePacketType_ContentObject: + processor->stats.countObjectsDropped++; + break; + + default: + break; + } + + // dont destroy message here, its done at end of receive +} + +/** + * @function metisMessageProcessor_AggregateInterestInPit + * @abstract Try to aggregate the interest in the PIT + * @discussion + * Tries to aggregate the interest with another interest. + * + * @param <#param1#> + * @return true if interest aggregagted (no more forwarding needed), false if need to keep processing it. + */ +static bool +metisMessageProcessor_AggregateInterestInPit(MetisMessageProcessor *processor, MetisMessage *interestMessage) +{ + MetisPITVerdict verdict = metisPIT_ReceiveInterest(processor->pit, interestMessage); + + if (verdict == MetisPITVerdict_Aggregate) { + // PIT has it, we're done + processor->stats.countInterestsAggregated++; + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "Message %p aggregated in PIT (aggregated count %u)", + (void *) interestMessage, + processor->stats.countInterestsAggregated); + } + + return true; + } + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "Message %p not aggregated in PIT (aggregated count %u)", + (void *) interestMessage, + processor->stats.countInterestsAggregated); + } + + return false; +} + +static bool +_satisfyFromContentStore(MetisMessageProcessor *processor, MetisMessage *interestMessage) +{ + bool result = false; + + if (!processor->serve_from_cache) { + return result; + } + + // See if there's a match in the store. + MetisMessage *objectMessage = metisContentStoreInterface_MatchInterest(processor->contentStore, interestMessage); + + if (objectMessage) { + // If the Interest specified a KeyId restriction and we had a match, check to see if the ContentObject's KeyId + // has been verified. If not, we don't respond with it. + if (metisMessage_HasKeyId(interestMessage) && !metisMessage_IsKeyIdVerified(objectMessage)) { + // We don't match if they specified a KeyId restriction and we haven't yet verified it. + objectMessage = NULL; + } + } + + if (objectMessage != NULL) { + bool hasExpired = false; + bool hasExceededRCT = false; + + uint64_t currentTimeTicks = metisForwarder_GetTicks(processor->metis); + + // Check for ExpiryTime exceeded. + if (metisMessage_HasExpiryTime(objectMessage) + && (currentTimeTicks > metisMessage_GetExpiryTimeTicks(objectMessage))) { + hasExpired = true; + } + + // Check for RCT exceeded. + if (metisMessage_HasRecommendedCacheTime(objectMessage) + && (currentTimeTicks > metisMessage_GetRecommendedCacheTimeTicks(objectMessage))) { + hasExceededRCT = true; + } + + if (!hasExpired) { // && !hasExceededRCT ? It's up to us. + // Remove it from the PIT. nexthops is allocated, so need to destroy + MetisNumberSet *nexthops = metisPIT_SatisfyInterest(processor->pit, objectMessage); + assertNotNull(nexthops, "Illegal state: got a null nexthops for an interest we just inserted."); + + // send message in reply, then done + processor->stats.countInterestsSatisfiedFromStore++; + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "Message %p satisfied from content store (satisfied count %u)", + (void *) interestMessage, + processor->stats.countInterestsSatisfiedFromStore); + } + + metisMessage_ResetPathLabel(objectMessage); + + metisMessageProcessor_ForwardToNexthops(processor, objectMessage, nexthops); + metisNumberSet_Release(&nexthops); + + result = true; + } + + // Remove the retrieved ContentObject from the ContentStore if it has expired, or exceeded its RCT. + if (hasExpired || hasExceededRCT) { + metisContentStoreInterface_RemoveContent(processor->contentStore, objectMessage); + } + } + + return result; +} + +/** + * @function metisMessageProcessor_ForwardViaFib + * @abstract Try to forward the interest via the FIB + * @discussion + * This calls <code>metisMessageProcessor_ForwardToNexthops()</code>, so if we find any nexthops, + * the interest will be sent on its way. Depending on the MetisIoOperations of each nexthop, + * it may be a deferred write and bump up the <code>interestMessage</code> refernce count, or it + * may copy the data out. + * + * A TRUE return means we did our best to forward it via the routes. If those routes are actually + * down or have errors, we still return TRUE. A FALSE return means there were no routes to try. + * + * @param <#param1#> + * @return true if we found a route and tried to forward it, false if no route + */ +static bool +metisMessageProcessor_ForwardViaFib(MetisMessageProcessor *processor, MetisMessage *interestMessage) +{ + MetisFibEntry *fibEntry = metisFIB_Match(processor->fib, interestMessage); + if (fibEntry == NULL) { + return false; + } + + MetisPitEntry *pitEntry = metisPIT_GetPitEntry(processor->pit, interestMessage); + if (pitEntry == NULL) { + return false; + } + + metisPitEntry_AddFibEntry(pitEntry, fibEntry); + + MetisNumberSet *nexthops = (MetisNumberSet *) metisFibEntry_GetNexthopsFromForwardingStrategy(fibEntry, interestMessage); + //this requires some additional checks. It may happen that some of the output faces selected by the forwarding strategy are not + //usable. So far all the forwarding strategy return only valid faces (or an empty list) + for (unsigned i = 0; i < metisNumberSet_Length(nexthops); i++) { + metisPitEntry_AddEgressId(pitEntry, metisNumberSet_GetItem(nexthops, i)); + } + + //The function GetPitEntry encreases the ref counter in the pit entry + //we need to decrease it + metisPitEntry_Release(&pitEntry); + + if (metisMessageProcessor_ForwardToNexthops(processor, interestMessage, nexthops) > 0) { + metisNumberSet_Release(&nexthops); + return true; + } else { + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "Message %p returned an emtpy next hop set", (void *) interestMessage); + } + } + + return false; +} + +static bool +metisMessageProcessor_IsIngressConnectionLocal(MetisMessageProcessor *processor, MetisMessage *interestMessage) +{ + MetisConnectionTable *connTable = metisForwarder_GetConnectionTable(processor->metis); + unsigned ingressConnId = metisMessage_GetIngressConnectionId(interestMessage); + const MetisConnection *ingressConn = metisConnectionTable_FindById(connTable, ingressConnId); + + bool isLocal = false; + if (ingressConn) { + isLocal = metisConnection_IsLocal(ingressConn); + } + return isLocal; +} + +/** + * On ingress, a remote connection must have hop limit > 0. All interests must have a hop limit. + * + * This function will log the error, if any, but it does not drop the message. + * + * If Interest is from a local application, the hop limit is not decremented and may be 0. + * + * If Interest is from a remote connection, the hop limit must be greater than 0 and will be decremented. + * + * @param [<#in out in,out#>] <#name#> <#description#> + * + * @retval true The interest passes the hop limit check + * @retval false The interest fails the hop limit check, should be dropped + * + * Example: + * @code + * <#example#> + * @endcode + */ +static bool +metisMessageProcessor_CheckAndDecrementHopLimitOnIngress(MetisMessageProcessor *processor, MetisMessage *interestMessage) +{ + bool success = true; + if (!metisMessage_HasHopLimit(interestMessage)) { + processor->stats.countDroppedNoHopLimit++; + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "Message %p did not have a hop limit (count %u)", + (void *) interestMessage, + processor->stats.countDroppedNoHopLimit); + } + + success = false; + } else { + // Is the ingress connection remote? If so check for non-zero and decrement + if (!metisMessageProcessor_IsIngressConnectionLocal(processor, interestMessage)) { + uint8_t hoplimit = metisMessage_GetHopLimit(interestMessage); + if (hoplimit == 0) { + processor->stats.countDroppedZeroHopLimitFromRemote++; + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "Message %p from remote host has 0 hop limit (count %u)", + (void *) interestMessage, + processor->stats.countDroppedZeroHopLimitFromRemote); + } + + success = false; + } else { + hoplimit--; + metisMessage_SetHopLimit(interestMessage, hoplimit); + } + } + } + return success; +} + +/** + * @function metisMessageProcessor_ReceiveInterest + * @abstract Receive an interest from the network + * @discussion + * (0) It must have a HopLimit and pass the hoplimit checks + * (1) if interest in the PIT, aggregate in PIT + * (2) if interest in the ContentStore, reply + * (3) if in the FIB, forward + * (4) drop + * + * @param <#param1#> + * @return <#return#> + */ +static void +metisMessageProcessor_ReceiveInterest(MetisMessageProcessor *processor, MetisMessage *interestMessage) +{ + processor->stats.countInterestsReceived++; + + if (!metisMessageProcessor_CheckAndDecrementHopLimitOnIngress(processor, interestMessage)) { + metisMessageProcessor_Drop(processor, interestMessage); + return; + } + + // (1) Try to aggregate in PIT + if (metisMessageProcessor_AggregateInterestInPit(processor, interestMessage)) { + // done + return; + } + + // At this point, we just created a PIT entry. If we don't forward the interest, we need + // to remove the PIT entry. + + // (2) Try to satisfy from content store + if (_satisfyFromContentStore(processor, interestMessage)) { + // done + // If we found a content object in the CS, metisMessageProcess_SatisfyFromContentStore already + // cleared the PIT state + return; + } + + // (3) Try to forward it + if (metisMessageProcessor_ForwardViaFib(processor, interestMessage)) { + // done + return; + } + + // Remove the PIT entry? + processor->stats.countDroppedNoRoute++; + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "Message %p did not match FIB, no route (count %u)", + (void *) interestMessage, + processor->stats.countDroppedNoRoute); + } + + metisMessageProcessor_Drop(processor, interestMessage); +} + +/** + * @function metisMessageProcessor_ReceiveContentObject + * @abstract Process an in-bound content object + * @discussion + * (1) If it does not match anything in the PIT, drop it + * (2) Add to Content Store + * (3) Reverse path forward via PIT entries + * + * @param <#param1#> + */ +static void +metisMessageProcessor_ReceiveContentObject(MetisMessageProcessor *processor, MetisMessage *message) +{ + processor->stats.countObjectsReceived++; + + MetisNumberSet *ingressSetUnion = metisPIT_SatisfyInterest(processor->pit, message); + + if (metisNumberSet_Length(ingressSetUnion) == 0) { + // (1) If it does not match anything in the PIT, drop it + processor->stats.countDroppedNoReversePath++; + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "Message %p did not match PIT, no reverse path (count %u)", + (void *) message, + processor->stats.countDroppedNoReversePath); + } + + metisMessageProcessor_Drop(processor, message); + } else { + // (2) Add to Content Store. Store may remove expired content, if necessary, depending on store policy. + if (processor->store_in_cache) { + uint64_t currentTimeTicks = metisForwarder_GetTicks(processor->metis); + metisContentStoreInterface_PutContent(processor->contentStore, message, currentTimeTicks); + } + // (3) Reverse path forward via PIT entries + metisMessageProcessor_ForwardToNexthops(processor, message, ingressSetUnion); + } + + metisNumberSet_Release(&ingressSetUnion); +} + +/** + * @function metisMessageProcessor_ForwardToNexthops + * @abstract Try to forward to each nexthop listed in the MetisNumberSet + * @discussion + * Will not forward to the ingress connection. + * + * @param <#param1#> + * @return The number of nexthops tried + */ +static unsigned +metisMessageProcessor_ForwardToNexthops(MetisMessageProcessor *processor, MetisMessage *message, const MetisNumberSet *nexthops) +{ + unsigned forwardedCopies = 0; + + size_t length = metisNumberSet_Length(nexthops); + + unsigned ingressId = metisMessage_GetIngressConnectionId(message); + for (size_t i = 0; i < length; i++) { + unsigned egressId = metisNumberSet_GetItem(nexthops, i); + if (egressId != ingressId) { + forwardedCopies++; + metisMessageProcessor_ForwardToInterfaceId(processor, message, egressId); + } + } + return forwardedCopies; +} + +/** + * caller has checked that the hop limit is ok. Try to send out the connection. + */ +static void +metisMessageProcessor_SendWithGoodHopLimit(MetisMessageProcessor *processor, MetisMessage *message, unsigned interfaceId, const MetisConnection *conn) +{ + bool success = metisConnection_Send(conn, message); + if (success) { + switch (metisMessage_GetType(message)) { + case MetisMessagePacketType_Interest: + processor->stats.countInterestForwarded++; + break; + + case MetisMessagePacketType_ContentObject: + processor->stats.countObjectsForwarded++; + break; + + default: + break; + } + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "forward message %p to interface %u (int %u, obj %u)", + (void *) message, + interfaceId, + processor->stats.countInterestForwarded, + processor->stats.countObjectsForwarded); + } + } else { + processor->stats.countSendFailures++; + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "forward message %p to interface %u send failure (count %u)", + (void *) message, + interfaceId, + processor->stats.countSendFailures); + } + metisMessageProcessor_Drop(processor, message); + } +} + +/* + * If the hoplimit is equal to 0, then we may only forward it to local applications. Otherwise, + * we may forward it off the system. + * + */ +static void +metisMessageProcessor_ForwardToInterfaceId(MetisMessageProcessor *processor, MetisMessage *message, unsigned interfaceId) +{ + MetisConnectionTable *connectionTable = metisForwarder_GetConnectionTable(processor->metis); + const MetisConnection *conn = metisConnectionTable_FindById(connectionTable, interfaceId); + + + if (conn != NULL) { + /* + * We can send the message if: + * a) If the message does not carry a hop limit (e.g. content object) + * b) It has a hoplimit and it is positive + * c) Or if the egress connection is local (i.e. it has a hoplimit and it's 0, but this is ok for a local app) + */ + if ((!metisMessage_HasHopLimit(message)) || (metisMessage_GetHopLimit(message) > 0) || metisConnection_IsLocal(conn)) { + metisMessageProcessor_SendWithGoodHopLimit(processor, message, interfaceId, conn); + } else { + // To reach here, the message has to have a hop limit, it has to be 0 and and going to a remote target + processor->stats.countDroppedZeroHopLimitToRemote++; + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "forward message %p to interface %u hop limit 0 and not local (count %u)", + (void *) message, + interfaceId, + processor->stats.countDroppedZeroHopLimitToRemote); + } + } + } else { + processor->stats.countDroppedConnectionNotFound++; + + if (metisLogger_IsLoggable(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug)) { + metisLogger_Log(processor->logger, MetisLoggerFacility_Processor, PARCLogLevel_Debug, __func__, + "forward message %p to interface %u not found (count %u)", + (void *) message, + interfaceId, + processor->stats.countDroppedConnectionNotFound); + } + + metisMessageProcessor_Drop(processor, message); + } +} + +void +metisMessageProcessor_SetCacheStoreFlag(MetisMessageProcessor *processor, bool val) +{ + processor->store_in_cache = val; +} + +bool +metisMessageProcessor_GetCacheStoreFlag(MetisMessageProcessor *processor) +{ + return processor->store_in_cache; +} + +void +metisMessageProcessor_SetCacheServeFlag(MetisMessageProcessor *processor, bool val) +{ + processor->serve_from_cache = val; +} + +bool +metisMessageProcessor_GetCacheServeFlag(MetisMessageProcessor *processor) +{ + return processor->serve_from_cache; +} |