diff options
Diffstat (limited to 'hicn-light/src/hicn/processor/messageProcessor.c')
-rw-r--r-- | hicn-light/src/hicn/processor/messageProcessor.c | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/hicn-light/src/hicn/processor/messageProcessor.c b/hicn-light/src/hicn/processor/messageProcessor.c index 4db1a0eb2..7e0ece257 100644 --- a/hicn-light/src/hicn/processor/messageProcessor.c +++ b/hicn-light/src/hicn/processor/messageProcessor.c @@ -19,6 +19,12 @@ #include <parc/algol/parc_ArrayList.h> #include <parc/algol/parc_Memory.h> +#ifdef WITH_POLICY +#include <parc/algol/parc_EventTimer.h> +#ifdef WITH_MAPME +#include <hicn/core/connection.h> +#endif /* WITH_MAPME */ +#endif /* WITH_POLICY */ #include <hicn/processor/messageProcessor.h> #include <hicn/processor/fib.h> @@ -28,7 +34,9 @@ #include <hicn/content_store/contentStoreLRU.h> #include <hicn/strategies/loadBalancer.h> +#ifndef WITH_POLICY #include <hicn/strategies/loadBalancerWithPD.h> +#endif /* ! WITH_POLICY */ #include <hicn/strategies/rnd.h> #include <hicn/strategies/rndSegment.h> #include <hicn/strategies/strategyImpl.h> @@ -43,6 +51,10 @@ #include <hicn/utils/address.h> +#ifdef WITH_POLICY +#define STATS_INTERVAL 1000 /* ms */ +#endif /* WITH_POLICY */ + /* * Copyright (c) 2017-2019 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); @@ -94,6 +106,10 @@ struct message_processor { bool serve_from_cache; _ProcessorStats stats; + +#ifdef WITH_POLICY + void * timer; +#endif /* WITH_POLICY */ }; static void messageProcessor_Drop(MessageProcessor *processor, @@ -113,6 +129,23 @@ static void messageProcessor_ForwardToInterfaceId(MessageProcessor *processor, // ============================================================ // Public API +#ifdef WITH_POLICY +static void +messageProcessor_Tick(int fd, PARCEventType type, void *user_data) +{ + MessageProcessor *processor = (MessageProcessor*)user_data; + uint64_t now = (uint64_t)forwarder_GetTicks(processor->forwarder); + + /* Loop over FIB entries to compute statistics from counters */ + FibEntryList *fibList = forwarder_GetFibEntries(processor->forwarder); + + for (size_t i = 0; i < fibEntryList_Length(fibList); i++) { + FibEntry *entry = (FibEntry *)fibEntryList_Get(fibList, i); + fibEntry_UpdateStats(entry, now); + } +} +#endif /* WITH_POLICY */ + MessageProcessor *messageProcessor_Create(Forwarder *forwarder) { size_t objectStoreSize = configuration_GetObjectStoreSize(forwarder_GetConfiguration(forwarder)); @@ -148,6 +181,20 @@ MessageProcessor *messageProcessor_Create(Forwarder *forwarder) { processor->store_in_cache = true; processor->serve_from_cache = true; +#ifdef WITH_POLICY + /* Create statistics timer */ + Dispatcher *dispatcher = forwarder_GetDispatcher(forwarder); + if (!dispatcher) + goto ERR; + processor->timer = dispatcher_CreateTimer(dispatcher, /* repeat */ true, + messageProcessor_Tick, processor); + if (!processor->timer) + goto ERR; + struct timeval timeout = {STATS_INTERVAL / 1000, (STATS_INTERVAL % 1000) * 1000}; + dispatcher_StartTimer(dispatcher, processor->timer, &timeout); +ERR: +#endif /* WITH_POLICY */ + return processor; } @@ -201,6 +248,15 @@ void messageProcessor_Destroy(MessageProcessor **processorPtr) { contentStoreInterface_Release(&processor->contentStore); pit_Release(&processor->pit); +#ifdef WITH_POLICY + Dispatcher *dispatcher = forwarder_GetDispatcher(processor->forwarder); + if (!dispatcher) + goto ERR; + dispatcher_StopTimer(dispatcher, processor->timer); + dispatcher_DestroyTimerEvent(dispatcher, (PARCEventTimer**)&processor->timer); +ERR: +#endif /* WITH_POLICY */ + parcMemory_Deallocate((void **)&processor); *processorPtr = NULL; } @@ -255,22 +311,33 @@ bool messageProcessor_AddOrUpdateRoute(MessageProcessor *processor, Name *prefix = name_CreateFromAddress(control->addressType, control->address, control->len); FibEntry *entry = fib_Contains(processor->fib, prefix); +#ifndef WITH_POLICY bool newEntry = false; +#endif /* ! WITH_POLICY */ if (entry != NULL) { fibEntry_AddNexthop(entry, ifidx); } else { +#ifdef WITH_POLICY + entry = fibEntry_Create(prefix, fwdStrategy, processor->forwarder); +#else newEntry = true; entry = fibEntry_Create(prefix, fwdStrategy); +#endif /* WITH_POLICY */ fibEntry_AddNexthop(entry, ifidx); fib_Add(processor->fib, entry); } name_Release(&prefix); + +#ifndef WITH_POLICY + /* For policy implementation, we need access to the ConnectionTable in all + * Forwarding Strategies, so it is setup during FIB Entry creation */ if (newEntry && (fwdStrategy == SET_STRATEGY_LOADBALANCER_WITH_DELAY)) { strategyLoadBalancerWithPD_SetConnectionTable( fibEntry_GetFwdStrategy(entry), forwarder_GetConnectionTable(processor->forwarder)); } +#endif /* ! WITH_POLICY */ return true; } @@ -286,6 +353,65 @@ bool messageProcessor_RemoveRoute(MessageProcessor *processor, return true; } +#ifdef WITH_POLICY + +bool messageProcessor_AddOrUpdatePolicy(MessageProcessor *processor, + add_policy_command *control) { + Configuration *config = forwarder_GetConfiguration(processor->forwarder); + + const char *prefixStr = utils_PrefixLenToString( + control->addressType, &control->address, &control->len); + + Name *prefix = name_CreateFromAddress(control->addressType, control->address, + control->len); + FibEntry *entry = fib_Contains(processor->fib, prefix); + if (!entry) { + strategy_type fwdStrategy = + configuration_GetForwardingStrategy(config, prefixStr); + if (fwdStrategy == LAST_STRATEGY_VALUE) { + fwdStrategy = SET_STRATEGY_LOADBALANCER; + } + entry = fibEntry_Create(prefix, fwdStrategy, processor->forwarder); + fib_Add(processor->fib, entry); + } + fibEntry_SetPolicy(entry, control->policy); + + name_Release(&prefix); + + return true; +} + +bool messageProcessor_RemovePolicy(MessageProcessor *processor, + remove_policy_command *control) { + Name *prefix = name_CreateFromAddress(control->addressType, control->address, + control->len); + FibEntry *entry = fib_Contains(processor->fib, prefix); + name_Release(&prefix); + + if (!entry) + return false; + + fibEntry_SetPolicy(entry, POLICY_NONE); + + return true; +} + + +#ifdef WITH_MAPME +void messageProcessor_onConnectionEvent(const MessageProcessor *processor, + const Connection *conn_added, connection_event_t event) +{ + FibEntryList *fiblist = forwarder_GetFibEntries(processor->forwarder); + for (size_t i = 0; i < fibEntryList_Length(fiblist); i++) { + FibEntry *fibEntry = (FibEntry *)fibEntryList_Get(fiblist, i); + fibEntry_ReconsiderPolicy(fibEntry); + } + +} +#endif /* WITH_MAPME */ + +#endif /* WITH_POLICY */ + void messageProcessor_RemoveConnectionIdFromRoutes(MessageProcessor *processor, unsigned connectionId) { fib_RemoveConnectionId(processor->fib, connectionId); @@ -295,12 +421,14 @@ void processor_SetStrategy(MessageProcessor *processor, Name *prefix, strategy_type strategy) { FibEntry *entry = fib_Contains(processor->fib, prefix); if (entry != NULL) { +#ifndef WITH_POLICY fibEntry_SetStrategy(entry, strategy); if (strategy == SET_STRATEGY_LOADBALANCER_WITH_DELAY) { strategyLoadBalancerWithPD_SetConnectionTable( fibEntry_GetFwdStrategy(entry), forwarder_GetConnectionTable(processor->forwarder)); } +#endif /* ! WITH_POLICY */ } } @@ -352,8 +480,13 @@ static void messageProcessor_Drop(MessageProcessor *processor, * @return true if interest aggregagted (no more forwarding needed), false if * need to keep processing it. */ +#ifdef WITH_POLICY +static PITVerdict messageProcessor_AggregateInterestInPit(MessageProcessor *processor, + Message *interestMessage) { +#else static bool messageProcessor_AggregateInterestInPit(MessageProcessor *processor, Message *interestMessage) { +#endif /* WITH_POLICY */ PITVerdict verdict = pit_ReceiveInterest(processor->pit, interestMessage); if (verdict == PITVerdict_Aggregate) { @@ -444,8 +577,13 @@ static bool _satisfyFromContentStore(MessageProcessor *processor, * * @return true if we found a route and tried to forward it, false if no route */ +#ifdef WITH_POLICY +static bool messageProcessor_ForwardViaFib(MessageProcessor *processor, + Message *interestMessage, PITVerdict verdict) { +#else static bool messageProcessor_ForwardViaFib(MessageProcessor *processor, Message *interestMessage) { +#endif /* WITH_POLICY */ FibEntry *fibEntry = fib_Match(processor->fib, interestMessage); if (fibEntry == NULL) { return false; @@ -459,7 +597,12 @@ static bool messageProcessor_ForwardViaFib(MessageProcessor *processor, pitEntry_AddFibEntry(pitEntry, fibEntry); NumberSet *nexthops = (NumberSet *)fibEntry_GetNexthopsFromForwardingStrategy( +#ifdef WITH_POLICY + fibEntry, interestMessage, verdict); +#else fibEntry, interestMessage); +#endif /* WITH_POLICY */ + // this requires some additional checks. It may happen that some of the output // faces selected by the forwarding strategy are not usable. So far all the // forwarding strategy return only valid faces (or an empty list) @@ -503,10 +646,23 @@ static void messageProcessor_ReceiveInterest(MessageProcessor *processor, processor->stats.countInterestsReceived++; // (1) Try to aggregate in PIT +#ifdef WITH_POLICY + PITVerdict verdict = messageProcessor_AggregateInterestInPit(processor, interestMessage); + switch(verdict) { + case PITVerdict_Aggregate: + //done + return; + + case PITVerdict_Forward: + case PITVerdict_Retransmit: + break; + } +#else if (messageProcessor_AggregateInterestInPit(processor, interestMessage)) { // done return; } +#endif /* WITH_POLICY */ // At this point, we just created a PIT entry. If we don't forward the // interest, we need to remove the PIT entry. @@ -520,7 +676,11 @@ static void messageProcessor_ReceiveInterest(MessageProcessor *processor, } // (3) Try to forward it +#ifdef WITH_POLICY + if (messageProcessor_ForwardViaFib(processor, interestMessage, verdict)) { +#else if (messageProcessor_ForwardViaFib(processor, interestMessage)) { +#endif /* WITH_POLICY */ // done return; } @@ -595,6 +755,7 @@ static void messageProcessor_ReceiveContentObject(MessageProcessor *processor, } // (3) Reverse path forward via PIT entries messageProcessor_ForwardToNexthops(processor, message, ingressSetUnion); + } numberSet_Release(&ingressSetUnion); |