summaryrefslogtreecommitdiffstats
path: root/hicn-light/src/hicn/processor/messageProcessor.c
diff options
context:
space:
mode:
Diffstat (limited to 'hicn-light/src/hicn/processor/messageProcessor.c')
-rw-r--r--hicn-light/src/hicn/processor/messageProcessor.c161
1 files changed, 161 insertions, 0 deletions
diff --git a/hicn-light/src/hicn/processor/messageProcessor.c b/hicn-light/src/hicn/processor/messageProcessor.c
index 4db1a0eb2..7e0ece257 100644
--- a/hicn-light/src/hicn/processor/messageProcessor.c
+++ b/hicn-light/src/hicn/processor/messageProcessor.c
@@ -19,6 +19,12 @@
#include <parc/algol/parc_ArrayList.h>
#include <parc/algol/parc_Memory.h>
+#ifdef WITH_POLICY
+#include <parc/algol/parc_EventTimer.h>
+#ifdef WITH_MAPME
+#include <hicn/core/connection.h>
+#endif /* WITH_MAPME */
+#endif /* WITH_POLICY */
#include <hicn/processor/messageProcessor.h>
#include <hicn/processor/fib.h>
@@ -28,7 +34,9 @@
#include <hicn/content_store/contentStoreLRU.h>
#include <hicn/strategies/loadBalancer.h>
+#ifndef WITH_POLICY
#include <hicn/strategies/loadBalancerWithPD.h>
+#endif /* ! WITH_POLICY */
#include <hicn/strategies/rnd.h>
#include <hicn/strategies/rndSegment.h>
#include <hicn/strategies/strategyImpl.h>
@@ -43,6 +51,10 @@
#include <hicn/utils/address.h>
+#ifdef WITH_POLICY
+#define STATS_INTERVAL 1000 /* ms */
+#endif /* WITH_POLICY */
+
/*
* Copyright (c) 2017-2019 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -94,6 +106,10 @@ struct message_processor {
bool serve_from_cache;
_ProcessorStats stats;
+
+#ifdef WITH_POLICY
+ void * timer;
+#endif /* WITH_POLICY */
};
static void messageProcessor_Drop(MessageProcessor *processor,
@@ -113,6 +129,23 @@ static void messageProcessor_ForwardToInterfaceId(MessageProcessor *processor,
// ============================================================
// Public API
+#ifdef WITH_POLICY
+static void
+messageProcessor_Tick(int fd, PARCEventType type, void *user_data)
+{
+ MessageProcessor *processor = (MessageProcessor*)user_data;
+ uint64_t now = (uint64_t)forwarder_GetTicks(processor->forwarder);
+
+ /* Loop over FIB entries to compute statistics from counters */
+ FibEntryList *fibList = forwarder_GetFibEntries(processor->forwarder);
+
+ for (size_t i = 0; i < fibEntryList_Length(fibList); i++) {
+ FibEntry *entry = (FibEntry *)fibEntryList_Get(fibList, i);
+ fibEntry_UpdateStats(entry, now);
+ }
+}
+#endif /* WITH_POLICY */
+
MessageProcessor *messageProcessor_Create(Forwarder *forwarder) {
size_t objectStoreSize =
configuration_GetObjectStoreSize(forwarder_GetConfiguration(forwarder));
@@ -148,6 +181,20 @@ MessageProcessor *messageProcessor_Create(Forwarder *forwarder) {
processor->store_in_cache = true;
processor->serve_from_cache = true;
+#ifdef WITH_POLICY
+ /* Create statistics timer */
+ Dispatcher *dispatcher = forwarder_GetDispatcher(forwarder);
+ if (!dispatcher)
+ goto ERR;
+ processor->timer = dispatcher_CreateTimer(dispatcher, /* repeat */ true,
+ messageProcessor_Tick, processor);
+ if (!processor->timer)
+ goto ERR;
+ struct timeval timeout = {STATS_INTERVAL / 1000, (STATS_INTERVAL % 1000) * 1000};
+ dispatcher_StartTimer(dispatcher, processor->timer, &timeout);
+ERR:
+#endif /* WITH_POLICY */
+
return processor;
}
@@ -201,6 +248,15 @@ void messageProcessor_Destroy(MessageProcessor **processorPtr) {
contentStoreInterface_Release(&processor->contentStore);
pit_Release(&processor->pit);
+#ifdef WITH_POLICY
+ Dispatcher *dispatcher = forwarder_GetDispatcher(processor->forwarder);
+ if (!dispatcher)
+ goto ERR;
+ dispatcher_StopTimer(dispatcher, processor->timer);
+ dispatcher_DestroyTimerEvent(dispatcher, (PARCEventTimer**)&processor->timer);
+ERR:
+#endif /* WITH_POLICY */
+
parcMemory_Deallocate((void **)&processor);
*processorPtr = NULL;
}
@@ -255,22 +311,33 @@ bool messageProcessor_AddOrUpdateRoute(MessageProcessor *processor,
Name *prefix = name_CreateFromAddress(control->addressType, control->address,
control->len);
FibEntry *entry = fib_Contains(processor->fib, prefix);
+#ifndef WITH_POLICY
bool newEntry = false;
+#endif /* ! WITH_POLICY */
if (entry != NULL) {
fibEntry_AddNexthop(entry, ifidx);
} else {
+#ifdef WITH_POLICY
+ entry = fibEntry_Create(prefix, fwdStrategy, processor->forwarder);
+#else
newEntry = true;
entry = fibEntry_Create(prefix, fwdStrategy);
+#endif /* WITH_POLICY */
fibEntry_AddNexthop(entry, ifidx);
fib_Add(processor->fib, entry);
}
name_Release(&prefix);
+
+#ifndef WITH_POLICY
+ /* For policy implementation, we need access to the ConnectionTable in all
+ * Forwarding Strategies, so it is setup during FIB Entry creation */
if (newEntry && (fwdStrategy == SET_STRATEGY_LOADBALANCER_WITH_DELAY)) {
strategyLoadBalancerWithPD_SetConnectionTable(
fibEntry_GetFwdStrategy(entry),
forwarder_GetConnectionTable(processor->forwarder));
}
+#endif /* ! WITH_POLICY */
return true;
}
@@ -286,6 +353,65 @@ bool messageProcessor_RemoveRoute(MessageProcessor *processor,
return true;
}
+#ifdef WITH_POLICY
+
+bool messageProcessor_AddOrUpdatePolicy(MessageProcessor *processor,
+ add_policy_command *control) {
+ Configuration *config = forwarder_GetConfiguration(processor->forwarder);
+
+ const char *prefixStr = utils_PrefixLenToString(
+ control->addressType, &control->address, &control->len);
+
+ Name *prefix = name_CreateFromAddress(control->addressType, control->address,
+ control->len);
+ FibEntry *entry = fib_Contains(processor->fib, prefix);
+ if (!entry) {
+ strategy_type fwdStrategy =
+ configuration_GetForwardingStrategy(config, prefixStr);
+ if (fwdStrategy == LAST_STRATEGY_VALUE) {
+ fwdStrategy = SET_STRATEGY_LOADBALANCER;
+ }
+ entry = fibEntry_Create(prefix, fwdStrategy, processor->forwarder);
+ fib_Add(processor->fib, entry);
+ }
+ fibEntry_SetPolicy(entry, control->policy);
+
+ name_Release(&prefix);
+
+ return true;
+}
+
+bool messageProcessor_RemovePolicy(MessageProcessor *processor,
+ remove_policy_command *control) {
+ Name *prefix = name_CreateFromAddress(control->addressType, control->address,
+ control->len);
+ FibEntry *entry = fib_Contains(processor->fib, prefix);
+ name_Release(&prefix);
+
+ if (!entry)
+ return false;
+
+ fibEntry_SetPolicy(entry, POLICY_NONE);
+
+ return true;
+}
+
+
+#ifdef WITH_MAPME
+void messageProcessor_onConnectionEvent(const MessageProcessor *processor,
+ const Connection *conn_added, connection_event_t event)
+{
+ FibEntryList *fiblist = forwarder_GetFibEntries(processor->forwarder);
+ for (size_t i = 0; i < fibEntryList_Length(fiblist); i++) {
+ FibEntry *fibEntry = (FibEntry *)fibEntryList_Get(fiblist, i);
+ fibEntry_ReconsiderPolicy(fibEntry);
+ }
+
+}
+#endif /* WITH_MAPME */
+
+#endif /* WITH_POLICY */
+
void messageProcessor_RemoveConnectionIdFromRoutes(MessageProcessor *processor,
unsigned connectionId) {
fib_RemoveConnectionId(processor->fib, connectionId);
@@ -295,12 +421,14 @@ void processor_SetStrategy(MessageProcessor *processor, Name *prefix,
strategy_type strategy) {
FibEntry *entry = fib_Contains(processor->fib, prefix);
if (entry != NULL) {
+#ifndef WITH_POLICY
fibEntry_SetStrategy(entry, strategy);
if (strategy == SET_STRATEGY_LOADBALANCER_WITH_DELAY) {
strategyLoadBalancerWithPD_SetConnectionTable(
fibEntry_GetFwdStrategy(entry),
forwarder_GetConnectionTable(processor->forwarder));
}
+#endif /* ! WITH_POLICY */
}
}
@@ -352,8 +480,13 @@ static void messageProcessor_Drop(MessageProcessor *processor,
* @return true if interest aggregagted (no more forwarding needed), false if
* need to keep processing it.
*/
+#ifdef WITH_POLICY
+static PITVerdict messageProcessor_AggregateInterestInPit(MessageProcessor *processor,
+ Message *interestMessage) {
+#else
static bool messageProcessor_AggregateInterestInPit(MessageProcessor *processor,
Message *interestMessage) {
+#endif /* WITH_POLICY */
PITVerdict verdict = pit_ReceiveInterest(processor->pit, interestMessage);
if (verdict == PITVerdict_Aggregate) {
@@ -444,8 +577,13 @@ static bool _satisfyFromContentStore(MessageProcessor *processor,
*
* @return true if we found a route and tried to forward it, false if no route
*/
+#ifdef WITH_POLICY
+static bool messageProcessor_ForwardViaFib(MessageProcessor *processor,
+ Message *interestMessage, PITVerdict verdict) {
+#else
static bool messageProcessor_ForwardViaFib(MessageProcessor *processor,
Message *interestMessage) {
+#endif /* WITH_POLICY */
FibEntry *fibEntry = fib_Match(processor->fib, interestMessage);
if (fibEntry == NULL) {
return false;
@@ -459,7 +597,12 @@ static bool messageProcessor_ForwardViaFib(MessageProcessor *processor,
pitEntry_AddFibEntry(pitEntry, fibEntry);
NumberSet *nexthops = (NumberSet *)fibEntry_GetNexthopsFromForwardingStrategy(
+#ifdef WITH_POLICY
+ fibEntry, interestMessage, verdict);
+#else
fibEntry, interestMessage);
+#endif /* WITH_POLICY */
+
// this requires some additional checks. It may happen that some of the output
// faces selected by the forwarding strategy are not usable. So far all the
// forwarding strategy return only valid faces (or an empty list)
@@ -503,10 +646,23 @@ static void messageProcessor_ReceiveInterest(MessageProcessor *processor,
processor->stats.countInterestsReceived++;
// (1) Try to aggregate in PIT
+#ifdef WITH_POLICY
+ PITVerdict verdict = messageProcessor_AggregateInterestInPit(processor, interestMessage);
+ switch(verdict) {
+ case PITVerdict_Aggregate:
+ //done
+ return;
+
+ case PITVerdict_Forward:
+ case PITVerdict_Retransmit:
+ break;
+ }
+#else
if (messageProcessor_AggregateInterestInPit(processor, interestMessage)) {
// done
return;
}
+#endif /* WITH_POLICY */
// At this point, we just created a PIT entry. If we don't forward the
// interest, we need to remove the PIT entry.
@@ -520,7 +676,11 @@ static void messageProcessor_ReceiveInterest(MessageProcessor *processor,
}
// (3) Try to forward it
+#ifdef WITH_POLICY
+ if (messageProcessor_ForwardViaFib(processor, interestMessage, verdict)) {
+#else
if (messageProcessor_ForwardViaFib(processor, interestMessage)) {
+#endif /* WITH_POLICY */
// done
return;
}
@@ -595,6 +755,7 @@ static void messageProcessor_ReceiveContentObject(MessageProcessor *processor,
}
// (3) Reverse path forward via PIT entries
messageProcessor_ForwardToNexthops(processor, message, ingressSetUnion);
+
}
numberSet_Release(&ingressSetUnion);