aboutsummaryrefslogtreecommitdiffstats
path: root/hicn-light/src/hicn/core/forwarder.c
diff options
context:
space:
mode:
Diffstat (limited to 'hicn-light/src/hicn/core/forwarder.c')
-rw-r--r--hicn-light/src/hicn/core/forwarder.c1837
1 files changed, 1456 insertions, 381 deletions
diff --git a/hicn-light/src/hicn/core/forwarder.c b/hicn-light/src/hicn/core/forwarder.c
index 94e8cc885..8c276bfef 100644
--- a/hicn-light/src/hicn/core/forwarder.c
+++ b/hicn-light/src/hicn/core/forwarder.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2023 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -23,6 +23,20 @@
* the event scheduler
*/
+/* Bypass FIB and send packet one by one */
+//#define BYPASS_FIB 1
+
+/* Send packets one by one : only effective if FIB is not bypassed */
+//#define USE_SEND_PACKET 1
+
+/* Batch sending: only if the previous option is undefined */
+#define USE_QUEUE true
+
+/* Shall we send mapme updates to advertise all local prefixes on newly created
+ * faces
+ */
+//#define ADVERTISE_PREFIXES_ON_NEW_FACES
+
#ifndef _WIN32
#include <arpa/inet.h>
#include <sys/socket.h>
@@ -30,8 +44,7 @@
#endif
#include <errno.h>
#include <fcntl.h>
-#include <signal.h>
-#include <hicn/hicn-light/config.h>
+//#include <hicn/hicn-light/config.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
@@ -41,90 +54,71 @@
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
-#include <parc/algol/parc_ArrayList.h>
-#include <parc/algol/parc_Memory.h>
-#include <parc/algol/parc_Object.h>
-#include <parc/logging/parc_LogReporterTextStdout.h>
-
-#include <hicn/core/connectionManager.h>
-#include <hicn/core/connectionTable.h>
-#include <hicn/core/dispatcher.h>
-#include <hicn/core/forwarder.h>
-#include <hicn/core/messagePacketType.h>
+#include "connection_table.h"
+#include "fib.h"
+#include "forwarder.h"
+#include "listener_table.h"
#ifdef WITH_MAPME
-#include <hicn/core/mapme.h>
+#include "mapme.h"
#endif /* WITH_MAPME */
-#include <hicn/config/configuration.h>
-#include <hicn/config/configurationFile.h>
-#include <hicn/config/configurationListeners.h>
-#include <hicn/processor/messageProcessor.h>
+#include "msgbuf.h"
+#include "msgbuf_pool.h"
+#include "packet_cache.h"
+#include "../config/configuration.h"
+// #include "../config/configuration_file.h"
+#include "../config/commands.h"
+#include "../io/base.h" // MAX_MSG
+
+#ifdef WITH_POLICY_STATS
+#include <hicn/core/policy_stats.h>
+#endif /* WITH_POLICY_STATS */
#include <hicn/core/wldr.h>
+#include <hicn/interest_manifest.h>
+#include <hicn/util/log.h>
-#include <parc/assert/parc_Assert.h>
-
-// the router's clock frequency (we now use the monotonic clock)
-#define HZ 1000
-
-// these will all be a little off because its all integer division
-#define MSEC_PER_TICK (1000 / HZ)
-#define USEC_PER_TICK (1000000 / HZ)
-#define NSEC_PER_TICK ((1000000000ULL) / HZ)
-#define MSEC_TO_TICKS(msec) \
- ((msec < FC_MSEC_PER_TICK) ? 1 : msec / FC_MSEC_PER_TICK)
-#define NSEC_TO_TICKS(nsec) ((nsec < NSEC_PER_TICK) ? 1 : nsec / NSEC_PER_TICK)
+struct forwarder_s {
+ // uint16_t server_port;
-struct forwarder {
- Dispatcher *dispatcher;
-
- uint16_t server_port;
+ // used by seed48 and nrand48
+ unsigned short seed[3];
- PARCEventSignal *signal_int;
- PARCEventSignal *signal_term;
-#ifndef _WIN32
- PARCEventSignal *signal_usr1;
-#endif
- PARCEventTimer *keepalive_event;
+ connection_table_t *connection_table;
+ listener_table_t *listener_table;
+ configuration_t *config;
- // will skew the virtual clock forward. In normal operaiton, it is 0.
- Ticks clockOffset;
+ pkt_cache_t *pkt_cache;
+ fib_t *fib;
+ msgbuf_pool_t *msgbuf_pool;
- unsigned nextConnectionid;
- Messenger *messenger;
- ConnectionManager *connectionManager;
- ConnectionTable *connectionTable;
- ListenerSet *listenerSet;
- Configuration *config;
+#ifdef WITH_MAPME
+ mapme_t *mapme;
+#endif /* WITH_MAPME */
- // we'll eventually want to setup a threadpool of these
- MessageProcessor *processor;
+ bool store_in_cs;
+ bool serve_from_cs;
- Logger *logger;
+ forwarder_stats_t stats;
+#ifdef WITH_POLICY_STATS
+ policy_stats_mgr_t policy_stats_mgr;
+#endif /* WITH_POLICY_STATS */
- PARCClock *clock;
+ /*
+ * The message forwarder has to decide whether to queue incoming packets for
+ * batching, or trigger the transmission on the connection
+ */
+ unsigned *pending_conn;
-#if !defined(__APPLE__)
- hicn_socket_helper_t
- *hicnSocketHelper; // state required to manage hicn connections
-#endif
- // used by seed48 and nrand48
- unsigned short seed[3];
+ subscription_table_t *subscriptions;
-#ifdef WITH_MAPME
- MapMe *mapme;
-#endif /* WITH_MAPME */
+ // Used to store the msgbufs that need to be released
+ off_t *acquired_msgbuf_ids;
};
-// signal traps through the event scheduler
-static void _signal_cb(int, PARCEventType, void *);
-
-// A no-op keepalive to prevent Libevent from exiting the dispatch loop
-static void _keepalive_cb(int, PARCEventType, void *);
-
/**
* Reseed our pseudo-random number generator.
*/
-static void forwarder_Seed(Forwarder *forwarder) {
+static void forwarder_seed(forwarder_t *forwarder) {
#ifndef _WIN32
int fd;
ssize_t res;
@@ -150,452 +144,1533 @@ static void forwarder_Seed(Forwarder *forwarder) {
#endif
}
-Logger *forwarder_GetLogger(const Forwarder *forwarder) {
- return forwarder->logger;
-}
+forwarder_t *forwarder_create(configuration_t *configuration) {
+ forwarder_t *forwarder = malloc(sizeof(forwarder_t));
+ if (!forwarder) goto ERR_MALLOC;
-// ============================================================================
-// Setup and destroy section
+ forwarder_seed(forwarder);
+ srand(forwarder->seed[0] ^ forwarder->seed[1] ^ forwarder->seed[2]);
-Forwarder *forwarder_Create(Logger *logger) {
- Forwarder *forwarder = parcMemory_AllocateAndClear(sizeof(Forwarder));
- parcAssertNotNull(forwarder, "parcMemory_AllocateAndClear(%zu) returned NULL",
- sizeof(Forwarder));
- memset(forwarder, 0, sizeof(Forwarder));
- forwarder_Seed(forwarder);
+ forwarder->config = configuration;
- forwarder->clock = parcClock_Monotonic();
- forwarder->clockOffset = 0;
+ forwarder->listener_table = listener_table_create();
+ if (!forwarder->listener_table) goto ERR_LISTENER_TABLE;
- if (logger) {
- forwarder->logger = logger_Acquire(logger);
- logger_SetClock(forwarder->logger, forwarder->clock);
- } else {
- PARCLogReporter *reporter = parcLogReporterTextStdout_Create();
- forwarder->logger = logger_Create(reporter, forwarder->clock);
- parcLogReporter_Release(&reporter);
- }
+ forwarder->connection_table = connection_table_create();
+ if (!forwarder->connection_table) goto ERR_CONNECTION_TABLE;
- forwarder->nextConnectionid = 1;
- forwarder->dispatcher = dispatcher_Create(forwarder->logger);
- forwarder->messenger = messenger_Create(forwarder->dispatcher);
- forwarder->connectionManager = connectionManager_Create(forwarder);
- forwarder->connectionTable = connectionTable_Create();
- forwarder->listenerSet = listenerSet_Create();
- forwarder->config = configuration_Create(forwarder);
- forwarder->processor = messageProcessor_Create(forwarder);
-
- forwarder->signal_term = dispatcher_CreateSignalEvent(
- forwarder->dispatcher, _signal_cb, forwarder, SIGTERM);
- dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_term);
-
- forwarder->signal_int = dispatcher_CreateSignalEvent(
- forwarder->dispatcher, _signal_cb, forwarder, SIGINT);
- dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_int);
-#ifndef _WIN32
- forwarder->signal_usr1 = dispatcher_CreateSignalEvent(
- forwarder->dispatcher, _signal_cb, forwarder, SIGPIPE);
- dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_usr1);
-#endif
+ forwarder->fib = fib_create(forwarder);
+ if (!forwarder->fib) goto ERR_FIB;
-#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \
- defined(PUNTING)
- forwarder->hicnSocketHelper = hicn_create();
- if (!forwarder->hicnSocketHelper)
- goto ERR_SOCKET;
-#endif /* __APPLE__ */
-
-#ifdef WITH_MAPME
- if (!(mapme_create(&forwarder->mapme, forwarder)))
- goto ERR_MAPME;
-#endif /* WITH_MAPME */
+ forwarder->msgbuf_pool = msgbuf_pool_create();
+ if (!forwarder->msgbuf_pool) goto ERR_PACKET_POOL;
+ size_t objectStoreSize = configuration_get_cs_size(configuration);
+ forwarder->pkt_cache = pkt_cache_create(objectStoreSize);
+ if (!forwarder->pkt_cache) goto ERR_PKT_CACHE;
- /* ignore child */
-#ifndef _WIN32
- signal(SIGCHLD, SIG_IGN);
+ forwarder->subscriptions = subscription_table_create();
+ if (!forwarder->subscriptions) goto ERR_SUBSCRIPTION;
- /* ignore tty signals */
- signal(SIGTSTP, SIG_IGN);
- signal(SIGTTOU, SIG_IGN);
- signal(SIGTTIN, SIG_IGN);
-#endif
+ // the two flags for the cs are set to true by default. If the cs
+ // is active it always work as expected unless the use modifies this
+ // values using controller
+ if (objectStoreSize != 0) {
+ forwarder->store_in_cs = true;
+ forwarder->serve_from_cs = true;
+ }
- // We no longer use this for ticks, but we need to have at least one event
- // schedule to keep Libevent happy.
+#ifdef WITH_MAPME
+ forwarder->mapme = mapme_create(forwarder);
+ if (!forwarder->mapme) goto ERR_MAPME;
+#endif /* WITH_MAPME */
- struct timeval wtnow_timeout;
- timerclear(&wtnow_timeout);
+#ifdef WITH_POLICY_STATS
+ if (policy_stats_mgr_initialize(&forwarder->policy_stats_mgr, forwarder) < 0)
+ goto ERR_MGR;
+#endif /* WITH_POLICY_STATS */
- wtnow_timeout.tv_sec = 0;
- wtnow_timeout.tv_usec = 50000; // 20 Hz keepalive
+ memset(&forwarder->stats, 0, sizeof(forwarder_stats_t));
+ vector_init(forwarder->pending_conn, MAX_MSG, 0);
+ vector_init(forwarder->acquired_msgbuf_ids, MAX_MSG, 0);
- PARCEventScheduler *base =
- dispatcher_GetEventScheduler(forwarder->dispatcher);
- forwarder->keepalive_event = parcEventTimer_Create(
- base, PARCEventType_Persist, _keepalive_cb, (void *)forwarder);
- parcEventTimer_Start(forwarder->keepalive_event, &wtnow_timeout);
+ char *n_suffixes_per_split_str = getenv("N_SUFFIXES_PER_SPLIT");
+ if (n_suffixes_per_split_str)
+ configuration_set_suffixes_per_split(forwarder_get_configuration(forwarder),
+ atoi(n_suffixes_per_split_str));
return forwarder;
+ERR_MGR:
#ifdef WITH_MAPME
ERR_MAPME:
#endif /* WITH_MAPME */
-#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \
- defined(PUNTING)
- hicn_free(forwarder->hicnSocketHelper);
-ERR_SOCKET:
-#endif
- listenerSet_Destroy(&(forwarder->listenerSet));
- connectionManager_Destroy(&(forwarder->connectionManager));
- connectionTable_Destroy(&(forwarder->connectionTable));
- messageProcessor_Destroy(&(forwarder->processor));
- configuration_Destroy(&(forwarder->config));
- messenger_Destroy(&(forwarder->messenger));
-
- dispatcher_DestroySignalEvent(forwarder->dispatcher,
- &(forwarder->signal_int));
- dispatcher_DestroySignalEvent(forwarder->dispatcher,
- &(forwarder->signal_term));
-#ifndef _WIN32
- dispatcher_DestroySignalEvent(forwarder->dispatcher,
- &(forwarder->signal_usr1));
-#endif
-
- parcClock_Release(&forwarder->clock);
- logger_Release(&forwarder->logger);
-
- // do the dispatcher last
- dispatcher_Destroy(&(forwarder->dispatcher));
- parcMemory_Deallocate((void **)&forwarder);
+ERR_SUBSCRIPTION:
+ subscription_table_free(forwarder->subscriptions);
+ERR_PKT_CACHE:
+ pkt_cache_free(forwarder->pkt_cache);
+
+ msgbuf_pool_free(forwarder->msgbuf_pool);
+ERR_PACKET_POOL:
+ fib_free(forwarder->fib);
+ERR_FIB:
+ connection_table_free(forwarder->connection_table);
+ERR_CONNECTION_TABLE:
+ listener_table_free(forwarder->listener_table);
+ERR_LISTENER_TABLE:
+ free(forwarder);
+ERR_MALLOC:
return NULL;
}
-void forwarder_Destroy(Forwarder **ptr) {
- parcAssertNotNull(ptr, "Parameter must be non-null double pointer");
- parcAssertNotNull(*ptr, "Parameter must dereference to non-null pointer");
- Forwarder *forwarder = *ptr;
-#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \
- defined(PUNTING)
- hicn_free(forwarder->hicnSocketHelper);
-#endif
- parcEventTimer_Destroy(&(forwarder->keepalive_event));
-
- listenerSet_Destroy(&(forwarder->listenerSet));
- connectionManager_Destroy(&(forwarder->connectionManager));
- connectionTable_Destroy(&(forwarder->connectionTable));
- messageProcessor_Destroy(&(forwarder->processor));
- configuration_Destroy(&(forwarder->config));
+void forwarder_free(forwarder_t *forwarder) {
+ assert(forwarder);
- // the messenger is used by many of the other pieces, so destroy it last
- messenger_Destroy(&(forwarder->messenger));
+ policy_stats_mgr_finalize(&forwarder->policy_stats_mgr);
#ifdef WITH_MAPME
mapme_free(forwarder->mapme);
#endif /* WITH_MAPME */
- dispatcher_DestroySignalEvent(forwarder->dispatcher,
- &(forwarder->signal_int));
- dispatcher_DestroySignalEvent(forwarder->dispatcher,
- &(forwarder->signal_term));
-#ifndef _WIN32
- dispatcher_DestroySignalEvent(forwarder->dispatcher,
- &(forwarder->signal_usr1));
-#endif
+ pkt_cache_free(forwarder->pkt_cache);
+ msgbuf_pool_free(forwarder->msgbuf_pool);
+ fib_free(forwarder->fib);
+ connection_table_free(forwarder->connection_table);
+ listener_table_free(forwarder->listener_table);
+ subscription_table_free(forwarder->subscriptions);
+ configuration_free(forwarder->config);
+ vector_free(forwarder->pending_conn);
+ vector_free(forwarder->acquired_msgbuf_ids);
+ free(forwarder);
+}
+
+/*
+ * An event occurred that might trigger an update of the FIB cache. It is
+ * possible that the flags have been reset following a connection add or remote.
+ * The objective of this function is to prepare the cache entry, and to alert of
+ * any change for both consumer and producer prefixes.
+ */
+void forwarder_on_route_event(const forwarder_t *forwarder,
+ fib_entry_t *entry) {
+ commands_notify_route(forwarder, entry);
+
+#ifdef ADVERTISE_PREFIXES_ON_NEW_FACES
+ nexthops_t new_nexthops = NEXTHOPS_EMPTY;
+#endif /* ADVERTISE_PREFIXES_ON_NEW_FACES */
+
+ nexthops_t *nexthops = NULL;
+ char *prefix_type_s;
+
+ const connection_table_t *table =
+ forwarder_get_connection_table(entry->forwarder);
+
+ const hicn_prefix_t *prefix = fib_entry_get_prefix(entry);
+
+ WITH_INFO({
+ char buf[MAXSZ_HICN_PREFIX];
+ hicn_prefix_snprintf(buf, MAXSZ_HICN_NAME, prefix);
+ INFO("fib_entry_on_event: %s", buf);
+ )};
+
+ if (!fib_entry_has_local_nexthop(entry)) {
+ /* Recompute FIB cache, then check whether it has changed based on hash */
+ prefix_type_s = "consumer";
+ nexthops = fib_entry_get_nexthops(entry);
+ nexthops_reset(nexthops);
+ fib_entry_filter_nexthops(entry, nexthops, ~0, false);
+#ifdef ADVERTISE_PREFIXES_ON_NEW_FACES
+ } else {
+ /* Check available non-local connections (on which we would send MAP-Me
+ * updates */
+ prefix_type_s = "producer";
+
+ nexthops = fib_entry_get_mapme_nexthops(entry, &new_nexthops);
+ fib_entry_filter_nexthops(entry, nexthops, ~0, true);
+
+#ifdef WITH_MAPME
+ mapme_set_adjacencies(forwarder->mapme, entry, nexthops, NULL);
+#endif /* WITH_MAPME */
+#endif /* ADVERTISE_PREFIXES_ON_NEW_FACES */
+ }
+
+ if (!nexthops)
+ return;
- parcClock_Release(&forwarder->clock);
- logger_Release(&forwarder->logger);
+ if (!fib_entry_nexthops_changed(entry, nexthops)) return;
- // do the dispatcher last
- dispatcher_Destroy(&(forwarder->dispatcher));
+ /* Send notification */
+ WITH_INFO({
+ char buf[MAXSZ_HICN_PREFIX];
+ hicn_prefix_snprintf(buf, MAXSZ_HICN_NAME, prefix);
+ INFO("Active interfaces changed for %s prefix %s", prefix_type_s, buf);
+ });
- parcMemory_Deallocate((void **)&forwarder);
- *ptr = NULL;
+ netdevice_flags_t flags = NETDEVICE_FLAGS_EMPTY;
+ nexthops_foreach(nexthops, nh, {
+ connection_t *connection = connection_table_get_by_id(table, nh);
+ netdevice_flags_add(flags, connection_get_interface_type(connection));
+ });
+
+ hicn_ip_prefix_t ip_prefix;
+ hicn_prefix_get_ip_prefix(prefix, &ip_prefix);
+ commands_notify_active_interface_update(forwarder, &ip_prefix, flags);
}
-void forwarder_SetupAllListeners(Forwarder *forwarder, uint16_t port,
- const char *localPath) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
+int forwarder_add_connection(const forwarder_t *forwarder,
+ const char *symbolic_name, face_type_t type,
+ address_pair_t *pair, policy_tags_t tags,
+ int priority, face_state_t admin_state) {
+ connection_table_t *table = forwarder_get_connection_table(forwarder);
+ connection_t *connection = connection_table_get_by_pair(table, pair);
+
+ if (!connection) {
+ connection = connection_create(type, symbolic_name, pair, forwarder);
+ if (!connection) {
+ ERROR("Failed to create %s connection", face_type_str(type));
+ return -1;
+ }
+
+ } else {
+ WARN("Connection already exists");
+ }
+
+#ifdef WITH_POLICY
+ connection_set_tags(connection, tags);
+ connection_set_priority(connection, priority);
+#endif /* WITH_POLICY */
- configurationListeners_SetupAll(forwarder->config, port, localPath);
+ connection_set_admin_state(connection, admin_state);
+ return 0;
}
-void forwarder_SetupLocalListeners(Forwarder *forwarder, uint16_t port) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
- configurationListeners_SetutpLocalIPv4(forwarder->config, port);
+int forwarder_remove_connection(const forwarder_t *forwarder,
+ unsigned connection_id, bool finalize) {
+ /* Remove connection from the FIB */
+ forwarder_remove_connection_id_from_routes(forwarder, connection_id);
+
+ /* Remove connection */
+ connection_table_t *table = forwarder_get_connection_table(forwarder);
+
+ /* Hook: connection deleted through the control protocol */
+ connection_t *connection = connection_table_at(table, connection_id);
+ forwarder_on_connection_event(forwarder, connection, CONNECTION_EVENT_DELETE);
+
+ connection_table_remove_by_id(table, connection_id);
+ if (finalize) connection_finalize(connection);
+
+ return 0;
}
-void forwarder_SetupFromConfigFile(Forwarder *forwarder, const char *filename) {
- ConfigurationFile *configFile = configurationFile_Create(forwarder, filename);
- if (configFile) {
- configurationFile_Process(configFile);
- configurationFile_Release(&configFile);
- }
+/*
+ * This is currently called from commands.c for every command sent to update
+ * a connection.
+ */
+void forwarder_on_connection_event(const forwarder_t *forwarder,
+ const connection_t *connection,
+ connection_event_t event) {
+ assert(connection);
+
+ commands_notify_connection(forwarder, event, connection);
+
+ unsigned conn_id = connection_get_id(connection);
+
+ /* We need to send a MapMe update on the newly selected connections for
+ * each concerned fib_entry : connection is involved, or no more involved */
+ fib_t *fib = forwarder_get_fib(forwarder);
+ fib_foreach_entry(fib, entry, {
+ const nexthops_t *nexthops = fib_entry_get_nexthops(entry);
+
+ if (!fib_entry_has_local_nexthop(entry)) {
+ /* Consumer prefix */
+ /*
+ * A new connection has no impact until it is added to FIB, which will
+ * be handled in a route event
+ */
+ if (event == CONNECTION_EVENT_CREATE) break;
+
+ /*
+ * For each FIB entry, trigger an event only if the connection is part
+ * of nexthops */
+ // XXX Replace this by a function
+ nexthops_foreach(nexthops, nexthop, {
+ if (nexthop != conn_id) continue;
+ forwarder_on_route_event(forwarder, entry);
+ break;
+ });
+ } else {
+ /* Producer prefix */
+ if (connection_is_local(connection)) break;
+
+ // XXX we could optimize event more
+ forwarder_on_route_event(forwarder, entry);
+ }
+ });
+}
+
+void forwarder_setup_local_listeners(forwarder_t *forwarder, uint16_t port) {
+ assert(forwarder);
+ listener_setup_local(forwarder, port);
}
-Configuration *forwarder_GetConfiguration(Forwarder *forwarder) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
+configuration_t *forwarder_get_configuration(forwarder_t *forwarder) {
+ assert(forwarder);
return forwarder->config;
}
-// ============================================================================
+subscription_table_t *forwarder_get_subscriptions(
+ const forwarder_t *forwarder) {
+ return forwarder->subscriptions;
+}
+
+connection_table_t *forwarder_get_connection_table(
+ const forwarder_t *forwarder) {
+ assert(forwarder);
+ return forwarder->connection_table;
+}
-unsigned forwarder_GetNextConnectionId(Forwarder *forwarder) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
- return forwarder->nextConnectionid++;
+listener_table_t *forwarder_get_listener_table(const forwarder_t *forwarder) {
+ assert(forwarder);
+ return forwarder->listener_table;
}
-Messenger *forwarder_GetMessenger(Forwarder *forwarder) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
- return forwarder->messenger;
+pkt_cache_t *forwarder_get_pkt_cache(const forwarder_t *forwarder) {
+ assert(forwarder);
+ return forwarder->pkt_cache;
}
-Dispatcher *forwarder_GetDispatcher(Forwarder *forwarder) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
- return forwarder->dispatcher;
+void forwarder_cs_set_store(forwarder_t *forwarder, bool val) {
+ assert(forwarder);
+ forwarder->store_in_cs = val;
}
-#ifdef WITH_POLICY
-ConnectionTable *forwarder_GetConnectionTable(const Forwarder *forwarder) {
-#else
-ConnectionTable *forwarder_GetConnectionTable(Forwarder *forwarder) {
-#endif /* WITH_POLICY */
- parcAssertNotNull(forwarder, "Parameter must be non-null");
- return forwarder->connectionTable;
+bool forwarder_cs_get_store(forwarder_t *forwarder) {
+ assert(forwarder);
+ return forwarder->store_in_cs;
}
-ListenerSet *forwarder_GetListenerSet(Forwarder *forwarder) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
- return forwarder->listenerSet;
+void forwarder_cs_set_serve(forwarder_t *forwarder, bool val) {
+ assert(forwarder);
+ forwarder->serve_from_cs = val;
}
-void forwarder_SetChacheStoreFlag(Forwarder *forwarder, bool val) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
- messageProcessor_SetCacheStoreFlag(forwarder->processor, val);
+bool forwarder_cs_get_serve(forwarder_t *forwarder) {
+ assert(forwarder);
+ return forwarder->serve_from_cs;
}
-bool forwarder_GetChacheStoreFlag(Forwarder *forwarder) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
- return messageProcessor_GetCacheStoreFlag(forwarder->processor);
+void forwarder_cs_set_size(forwarder_t *forwarder, size_t size) {
+ assert(forwarder);
+
+ if (pkt_cache_set_cs_size(forwarder->pkt_cache, size) < 0) {
+ ERROR(
+ "Unable to resize the CS: provided maximum size (%u) is smaller than "
+ "the number of elements currently stored in the CS (%u). Clear the "
+ "CS "
+ "and retry.",
+ size, pkt_cache_get_cs_size(forwarder->pkt_cache));
+ }
}
-void forwarder_SetChacheServeFlag(Forwarder *forwarder, bool val) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
- messageProcessor_SetCacheServeFlag(forwarder->processor, val);
+size_t forwarder_cs_get_size(forwarder_t *forwarder) {
+ assert(forwarder);
+ return pkt_cache_get_cs_size(forwarder->pkt_cache);
}
-bool forwarder_GetChacheServeFlag(Forwarder *forwarder) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
- return messageProcessor_GetCacheServeFlag(forwarder->processor);
+size_t forwarder_cs_get_num_stale_entries(forwarder_t *forwarder) {
+ assert(forwarder);
+ return pkt_cache_get_num_cs_stale_entries(forwarder->pkt_cache);
}
-void forwarder_ReceiveCommand(Forwarder *forwarder, command_id command,
- struct iovec *message, unsigned ingressId) {
- configuration_ReceiveCommand(forwarder->config, command, message, ingressId);
+void forwarder_cs_clear(forwarder_t *forwarder) {
+ assert(forwarder);
+
+ pkt_cache_cs_clear(forwarder->pkt_cache);
}
-void forwarder_Receive(Forwarder *forwarder, Message *message) {
- parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null");
- parcAssertNotNull(message, "Parameter message must be non-null");
+/**
+ * @function forwarder_Drop
+ * @abstract Whenever we "drop" a message, increment counters
+ * @discussion
+ * This is a bookkeeping function. It increments the appropriate counters.
+ *
+ * The default action for a message is to destroy it in
+ * <code>forwarder_Receive()</code>, so this function does not need to do
+ * that.
+ *
+ */
+static ssize_t forwarder_drop(forwarder_t *forwarder, off_t msgbuf_id) {
+ forwarder->stats.countDropped++;
+
+ const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ const msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
- // this takes ownership of the message, so we're done here
+ switch (msgbuf_get_type(msgbuf)) {
+ case HICN_PACKET_TYPE_INTEREST:
+ forwarder->stats.countInterestsDropped++;
+ break;
- // this are the checks needed to implement WLDR. We set wldr only on the STAs
- // and we let the AP to react according to choise of the client.
- // if the STA enables wldr using the set command, the AP enable wldr as well
- // otherwise, if the STA disable it the AP remove wldr
- // WLDR should be enabled only on the STAs using the command line
- // TODO
- // disable WLDR command line on the AP
- const Connection *conn = connectionTable_FindById(
- forwarder->connectionTable, message_GetIngressConnectionId(message));
+ case HICN_PACKET_TYPE_DATA:
+ forwarder->stats.countObjectsDropped++;
+ break;
+
+ default:
+ forwarder->stats.countOtherDropped++;
+ break;
+ }
+
+ return msgbuf_get_len(msgbuf);
+ // dont destroy message here, its done at end of receive
+}
+
+#ifndef BYPASS_FIB
+/*
+ * If the hoplimit is equal to 0, then we may only forward it to local
+ * applications. Otherwise, we may forward it off the system.
+ *
+ */
+
+static ssize_t forwarder_forward_via_connection(forwarder_t *forwarder,
+ off_t msgbuf_id,
+ unsigned conn_id) {
+ connection_table_t *table = forwarder_get_connection_table(forwarder);
+
+ const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+
+ connection_t *conn = connection_table_get_by_id(table, conn_id);
if (!conn) {
- /*
- * Drop is a static method in messageProcessor which might or might not need
- * to be called for accounting purposes. This call was initially absent so
- * the behaviour was kept like this, as this situation is unlikely. We need
- * to release memory though, as this is not done in Drop anyways.
- */
- //messageProcessor_Drop(forwarder->processor, message);
- message_Release(&message);
- return;
+ forwarder->stats.countDroppedConnectionNotFound++;
+ WARN("forward msgbuf %lu to interface %u not found (count %u)", msgbuf_id,
+ conn_id, forwarder->stats.countDroppedConnectionNotFound);
+ return forwarder_drop(forwarder, msgbuf_id);
}
- if (message_HasWldr(message)) {
- if (connection_HasWldr(conn)) {
+ /* Always queue the packet... */
+ // DEBUG("Queueing packet\n");
+
+#if defined(USE_SEND_PACKET) || !defined(__linux__)
+
+ // Here we need to update the path label of a data packet before send
+ // it. The path label update can be done here because the packet is sent
+ // directly to the socket
+ if (msgbuf_get_type(msgbuf) == HICN_PACKET_TYPE_DATA)
+ msgbuf_update_pathlabel(msgbuf, connection_get_id(conn));
+
+ bool success = connection_send_packet(conn, msgbuf_get_packet(msgbuf),
+ msgbuf_get_len(msgbuf));
+#else
+
+ // In this case we cannot update the path label even if it is need because
+ // the packet is not copied and only the packet id is enqueued to the ring
+ // buffer associated the output interface. If the path label is updated here
+ // all data packets get delivered to the next hop with the same label that is
+ // associated to the last connection used. For this reason the path label
+ // update must be done before the packet is actually sent inside the different
+ // IO implementations.
+ bool success = connection_send(conn, msgbuf_id, USE_QUEUE);
+
+#endif
+
+ /* ... and mark the connection as pending if this is not yet the case */
+ if (!vector_contains(forwarder->pending_conn, conn_id))
+ vector_push(forwarder->pending_conn, conn_id);
+
+ if (!success) {
+ forwarder->stats.countSendFailures++;
+
+ DEBUG("forward msgbuf %llu to interface %u send failure (count %u)",
+ msgbuf_id, conn_id, forwarder->stats.countSendFailures);
+ return forwarder_drop(forwarder, msgbuf_id);
+ }
+
+ switch (msgbuf_get_type(msgbuf)) {
+ case HICN_PACKET_TYPE_INTEREST:
+ forwarder->stats.countInterestForwarded++;
+ break;
+
+ case HICN_PACKET_TYPE_DATA:
+ forwarder->stats.countObjectsForwarded++;
+ break;
+
+ default:
+ break;
+ }
+
+ TRACE("forward msgbuf %p (size=%u) to interface %u", msgbuf,
+ msgbuf_get_len(msgbuf), conn_id);
+ return msgbuf_get_len(msgbuf);
+}
+
+/**
+ * @function forwarder_forward_to_nexthops
+ * @abstract Try to forward to each nexthop listed in the NumberSet
+ * @discussion
+ * Will not forward to the ingress connection.
+ *
+ * @return The number of nexthops tried
+ */
+static unsigned forwarder_forward_to_nexthops(forwarder_t *forwarder,
+ off_t msgbuf_id,
+ const nexthops_t *nexthops) {
+ // DEBUG("[forwarder_forward_to_nexthops] num=%d/%d",
+ // nexthops_get_curlen(nexthops), nexthops_get_len(nexthops));
+ unsigned forwardedCopies = 0;
+
+ const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+ unsigned ingressId = msgbuf_get_connection_id(msgbuf);
+
+ nexthops_foreach(nexthops, nexthop, {
+ // DEBUG("[forwarder_forward_to_nexthops] - nexthop = %d");
+ if (nexthop == ingressId) continue;
+
+ forwardedCopies++;
+ // INFO("[forwarder_forward_to_nexthops] - nexthop = %d OK", nexthop);
+ forwarder_forward_via_connection(forwarder, msgbuf_id, nexthop);
+ });
+
+ return forwardedCopies;
+}
+
+static bool forwarder_forward_via_fib(forwarder_t *forwarder, off_t msgbuf_id,
+ pkt_cache_verdict_t verdict,
+ pkt_cache_entry_t *entry) {
+ assert(forwarder && entry && msgbuf_id_is_valid(msgbuf_id));
+
+ bool ret = true;
+
+ const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+ assert(msgbuf_get_type(msgbuf) == HICN_PACKET_TYPE_INTEREST);
+
+ fib_entry_t *fib_entry = fib_match_msgbuf(forwarder->fib, msgbuf);
+ if (!fib_entry) return false;
+
+ nexthops_t *nexthops = fib_entry_get_nexthops(fib_entry);
+
+ /* Backup flags and cur_len*/
+ uint_fast32_t flags = nexthops->flags;
+ size_t cur_len = nexthops_get_curlen(nexthops);
+
+ /* This affects the nexthops */
+ nexthops = strategy_lookup_nexthops(&fib_entry->strategy, nexthops, msgbuf);
+
+ if (nexthops_get_curlen(nexthops) == 0) {
+ ERROR("Message %p returned an empty next hop set", msgbuf);
+ ret = false;
+ goto END;
+ }
+
+ pit_entry_t *pit_entry = &entry->u.pit_entry;
+ if (!pit_entry) {
+ ret = false;
+ goto END;
+ }
+
+ pit_entry_set_fib_entry(pit_entry, fib_entry);
+
+ // this requires some additional checks. It may happen that some of the output
+ // faces selected by the forwarding strategy are not usable. So far all the
+ // forwarding strategy return only valid faces (or an empty list)
+ nexthops_foreach(nexthops, nexthop, {
+ // DEBUG("Adding egress to PIT for nexthop %d", nexthop);
+ pit_entry_egress_add(pit_entry, nexthop);
+ });
+
+ if (forwarder_forward_to_nexthops(forwarder, msgbuf_id, nexthops) <= 0) {
+ ERROR("Error forwarding mMessage %p to next hops", msgbuf);
+ ret = false;
+ }
+
+END:
+ /* Restore flags & curlen */
+ nexthops->flags = flags;
+ nexthops->cur_elts = cur_len;
+
+ return ret;
+}
+
+#endif /* ! BYPASS_FIB */
+
+int _forwarder_forward_upon_interest(
+ forwarder_t *forwarder, msgbuf_pool_t *msgbuf_pool, off_t data_msgbuf_id,
+ off_t interest_msgbuf_id, pkt_cache_entry_t *entry,
+ pkt_cache_verdict_t verdict, bool is_aggregated) {
+ msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, interest_msgbuf_id);
+
+ // - Aggregation can be perfomed, do not forward
+ if (verdict == PKT_CACHE_VERDICT_AGGREGATE_INTEREST) {
+ forwarder_drop(forwarder, interest_msgbuf_id);
+ return (int)msgbuf_get_len(msgbuf);
+ }
+
+ // - Data packet matching the interest was found, forward reply
+ if (verdict == PKT_CACHE_VERDICT_FORWARD_DATA) {
+ assert(forwarder->serve_from_cs == true);
+
+ msgbuf_t *interest_msgbuf = msgbuf_pool_at(msgbuf_pool, interest_msgbuf_id);
+ msgbuf_t *data_msgbuf = msgbuf_pool_at(msgbuf_pool, data_msgbuf_id);
+
+ msgbuf_reset_pathlabel(data_msgbuf);
+ forwarder_forward_via_connection(forwarder, data_msgbuf_id,
+ msgbuf_get_connection_id(interest_msgbuf));
+ return (int)msgbuf_get_len(msgbuf);
+ }
+
+ // - For aggregated interest, the interest forwarding is done in
+ // `_forwarder_forward_aggregated_interest()`
+ if (is_aggregated) return (int)msgbuf_get_len(msgbuf);
+
+ // - Try to forward the interest
+ int rc =
+ forwarder_forward_via_fib(forwarder, interest_msgbuf_id, verdict, entry);
+ if (!rc) {
+ // - Not able to forward, drop the packet
+ forwarder->stats.countDroppedNoRoute++;
+ INFO("Message %lu did not match FIB, no route (count %u)",
+ interest_msgbuf_id, forwarder->stats.countDroppedNoRoute);
+
+ forwarder_drop(forwarder, interest_msgbuf_id);
+ return -1;
+ }
+
+ return (int)msgbuf_get_len(msgbuf);
+}
+
+static void _forwarder_update_interest_stats(forwarder_t *forwarder,
+ pkt_cache_verdict_t verdict,
+ msgbuf_t *msgbuf,
+ bool has_expire_ts,
+ uint64_t expire_ts) {
+ long expiration = has_expire_ts ? expire_ts : -1;
+ switch (verdict) {
+ case PKT_CACHE_VERDICT_FORWARD_INTEREST:
+ DEBUG("Message added to PIT (expiration=%ld)", expiration);
+ break;
+
+ case PKT_CACHE_VERDICT_AGGREGATE_INTEREST:
+ forwarder->stats.countInterestsAggregated++;
+ DEBUG("Message aggregated in PIT (expiration=%ld)", expiration);
+ break;
+
+ case PKT_CACHE_VERDICT_RETRANSMIT_INTEREST:
+ forwarder->stats.countInterestsRetransmitted++;
+ DEBUG("Message retransmitted (expiration=%ld)", expiration);
+ break;
+
+ case PKT_CACHE_VERDICT_FORWARD_DATA:
+ forwarder->stats.countInterestsSatisfiedFromStore++;
+ DEBUG("Message satisfied from content store (expiration=%ld)",
+ expiration);
+ break;
+
+ case PKT_CACHE_VERDICT_INTEREST_EXPIRED_FORWARD_INTEREST:
+ forwarder->stats.countInterestsExpired++;
+ DEBUG("Message replaced expired interest (expiration=%ld)", expiration);
+ break;
+
+ case PKT_CACHE_VERDICT_DATA_EXPIRED_FORWARD_INTEREST:
+ forwarder->stats.countDataExpired++;
+ DEBUG("Message replaced expired data (expiration=%ld)", expiration);
+ break;
+
+ case PKT_CACHE_VERDICT_ERROR:
+ ERROR("Invalid packet cache content");
+ break;
+
+ default:
+ break;
+ }
+}
+
+/**
+ * Return the interest manifest from the interest payload
+ */
+static int _forwarder_get_interest_manifest(
+ msgbuf_t *msgbuf, interest_manifest_header_t **int_manifest_header,
+ size_t *payload_size) {
+ uint8_t *payload;
+
+ hicn_packet_buffer_t *pkbuf = msgbuf_get_pkbuf(msgbuf);
+
+ hicn_payload_type_t payload_type;
+ HICN_UNUSED(int rc) = hicn_packet_get_payload_type(pkbuf, &payload_type);
+ // XXX ASSERT HERE !!!
+ if (rc != HICN_LIB_ERROR_NONE) return -1;
+ // assert(rc == HICN_LIB_ERROR_NONE);
+
+ if (payload_type != HPT_MANIFEST) return -1;
+
+ rc = hicn_packet_get_payload(pkbuf, &payload, payload_size, false);
+ // assert(rc == HICN_LIB_ERROR_NONE);
+ if (rc != HICN_LIB_ERROR_NONE) return -1;
+
+ *int_manifest_header = (interest_manifest_header_t *)payload;
+
+ return 0;
+}
+
+// Manifest is split using splitting strategy, then every
+// sub-manifest is sent using the forwarding strategy defined for the prefix
+int _forwarder_forward_aggregated_interest(
+ forwarder_t *forwarder, interest_manifest_header_t *int_manifest_header,
+ msgbuf_t *msgbuf, off_t msgbuf_id, pkt_cache_entry_t **entries) {
+ assert(msgbuf_id_is_valid(msgbuf_id) &&
+ msgbuf_get_type(msgbuf) == HICN_PACKET_TYPE_INTEREST);
+
+ int ret = -1;
+
+ fib_entry_t *fib_entry = fib_match_msgbuf(forwarder->fib, msgbuf);
+ if (!fib_entry) goto END;
+
+ nexthops_t *nexthops = fib_entry_get_nexthops(fib_entry);
+ if (nexthops_get_curlen(nexthops) == 0) {
+ ret = 0;
+ goto END;
+ }
+
+ /* Backup flags and cur_len*/
+ uint_fast32_t flags = nexthops->flags;
+ size_t cur_len = nexthops_get_curlen(nexthops);
+
+ size_t n_suffixes_per_split = configuration_get_suffixes_per_split(
+ forwarder_get_configuration(forwarder));
+ int_manifest_split_strategy_t disaggregation_strategy =
+ configuration_get_split_strategy(forwarder_get_configuration(forwarder));
+ switch (disaggregation_strategy) {
+ case INT_MANIFEST_SPLIT_STRATEGY_NONE:
+ n_suffixes_per_split = int_manifest_header->n_suffixes + 1;
+
+ case INT_MANIFEST_SPLIT_STRATEGY_MAX_N_SUFFIXES: {
+ // Generate sub-manifests: same as original manifest,
+ // but different suffix in the header and different bitmap
+
+ int total_len = 0;
+ // Suffixes in manifest plus the one in the header
+ int total_suffixes = int_manifest_header->n_suffixes + 1;
+
+ // Save copy of original bitmap to use as a reference
+ // to generate bitmaps for sub-manifests
+ hicn_uword original_bitmap[BITMAP_SIZE] = {0};
+ memcpy(&original_bitmap, int_manifest_header->request_bitmap,
+ BITMAP_SIZE * sizeof(hicn_uword));
+
+ size_t suffix_index = 0; // Position of suffix in initial manifest
+ interest_manifest_header_t *manifest = NULL;
+ size_t payload_size;
+ while (suffix_index < total_suffixes) {
+ // If more than one sub-manifest,
+ // clone original interest manifest and update suffix
+ if (suffix_index > 0) {
+ msgbuf_t *clone;
+ off_t clone_id =
+ msgbuf_pool_clone(forwarder->msgbuf_pool, &clone, msgbuf_id);
+ msgbuf_pool_acquire(clone);
+ forwarder_acquired_msgbuf_ids_push(forwarder, clone_id);
+
+ msgbuf_id = clone_id;
+ msgbuf = clone;
+ }
+
+ hicn_uword curr_bitmap[BITMAP_SIZE] = {0};
+ size_t first_suffix_index_in_submanifest = suffix_index;
+ suffix_index = interest_manifest_update_bitmap(
+ original_bitmap, curr_bitmap, suffix_index, total_suffixes,
+ n_suffixes_per_split);
+ size_t first_suffix_index_in_next_submanifest = suffix_index;
+
+ // Update manifest bitmap in current msgbuf
+
+ int ret =
+ _forwarder_get_interest_manifest(msgbuf, &manifest, &payload_size);
+ _ASSERT(ret == 0);
+ memcpy(manifest->request_bitmap, curr_bitmap,
+ BITMAP_SIZE * sizeof(hicn_uword));
+ WITH_TRACE({
+ bitmap_print(manifest->request_bitmap, BITMAP_SIZE);
+ printf("\n");
+ });
+
+ /*
+ * Update PIT entries for suffixes in current sub-manifest.
+ *
+ * Note that strategy lookup affects the nexthops, and we need to
+ *restore the initial state before every lookup
+ */
+ nexthops->flags = flags;
+ nexthops->cur_elts = cur_len;
+ nexthops =
+ strategy_lookup_nexthops(&fib_entry->strategy, nexthops, msgbuf);
+
+ if (nexthops_get_curlen(nexthops) == 0) {
+ ERROR("Message %p returned an empty next hop set", msgbuf);
+ goto RESTORE;
+ }
+
+ for (size_t i = first_suffix_index_in_submanifest;
+ i < first_suffix_index_in_next_submanifest; i++) {
+ if (!bitmap_is_set_no_check(manifest->request_bitmap, i)) continue;
+
+ pit_entry_t *pit_entry = &(entries[i]->u.pit_entry);
+ if (!pit_entry) goto RESTORE;
+
+ pit_entry_set_fib_entry(pit_entry, fib_entry);
+ nexthops_foreach(nexthops, nexthop,
+ { pit_entry_egress_add(pit_entry, nexthop); });
+ }
+
+ // Serialize manifest before sending it
+ interest_manifest_serialize(int_manifest_header);
+
+ if (forwarder_forward_to_nexthops(forwarder, msgbuf_id, nexthops) <=
+ 0) {
+ ERROR("Message %p returned an empty next hop set", msgbuf);
+ continue;
+ }
+
+ total_len += msgbuf_get_len(msgbuf);
+ }
+
+ ret = total_len;
+ goto END;
+ }
+
+ default:
+ break;
+ }
+
+RESTORE:
+ /* Restore flags & curlen */
+ nexthops->flags = flags;
+ nexthops->cur_elts = cur_len;
+
+END:
+ return ret;
+}
+
+static ssize_t forwarder_process_single_interest(forwarder_t *forwarder,
+ msgbuf_pool_t *msgbuf_pool,
+ msgbuf_t *msgbuf,
+ off_t msgbuf_id) {
+ pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR;
+ off_t data_msgbuf_id = INVALID_MSGBUF_ID;
+ pkt_cache_entry_t *entry = NULL;
+
+ // Update packet cache
+ pkt_cache_on_interest(forwarder->pkt_cache, msgbuf_pool, msgbuf_id, &verdict,
+ &data_msgbuf_id, &entry, msgbuf_get_name(msgbuf),
+ forwarder->serve_from_cs);
+
+ _forwarder_update_interest_stats(forwarder, verdict, msgbuf,
+ entry->has_expire_ts, entry->expire_ts);
+
+ int rc = _forwarder_forward_upon_interest(
+ forwarder, msgbuf_pool, data_msgbuf_id, msgbuf_id, entry, verdict, false);
+
+ // No route when trying to forward interest, remove from PIT
+ if (rc == -1) pkt_cache_pit_remove_entry(forwarder->pkt_cache, entry);
+
+ return msgbuf_get_len(msgbuf);
+}
+
+static ssize_t forwarder_process_aggregated_interest(
+ forwarder_t *forwarder, interest_manifest_header_t *int_manifest_header,
+ msgbuf_pool_t *msgbuf_pool, msgbuf_t *msgbuf, off_t msgbuf_id) {
+ pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR;
+ off_t data_msgbuf_id = INVALID_MSGBUF_ID;
+ pkt_cache_entry_t *entry = NULL;
+ // Save PIT entries to avoid re-doing pkt cache lookup in
+ // `_forwarder_forward_aggregated_interest()`
+ pkt_cache_entry_t *entries[BITMAP_SIZE * WORD_WIDTH];
+
+ int n_suffixes_to_fwd = 0;
+
+ hicn_name_t name_copy = HICN_NAME_EMPTY;
+ hicn_name_copy(&name_copy, msgbuf_get_name(msgbuf));
+
+ // Suffixes in interest manifest also contains suffix in main name. We can
+ // then just iterate the interest manifest and update the suffix in the name
+ // struct
+ hicn_name_suffix_t *suffix;
+ unsigned long pos;
+ interest_manifest_foreach_suffix(int_manifest_header, suffix, pos) {
+ // Update name
+ hicn_name_set_suffix(&name_copy, *suffix);
+
+ // Update packet cache
+ pkt_cache_on_interest(forwarder->pkt_cache, msgbuf_pool, msgbuf_id,
+ &verdict, &data_msgbuf_id, &entry, &name_copy,
+ forwarder->serve_from_cs);
+
+ entries[pos] = entry;
+ _forwarder_update_interest_stats(forwarder, verdict, msgbuf,
+ entry->has_expire_ts, entry->expire_ts);
+
+ // Here only data forwarding is performed, interest forwarding is done
+ // in '_forwarder_forward_aggregated_interest()'
+ int rc =
+ _forwarder_forward_upon_interest(forwarder, msgbuf_pool, data_msgbuf_id,
+ msgbuf_id, entry, verdict, true);
+
+ // No route when trying to forward interest, remove from PIT
+ if (rc == -1) pkt_cache_pit_remove_entry(forwarder->pkt_cache, entry);
+
+ // Unset in bitmap if no interest forwarding needed,
+ // otherwise increase count of suffixes to forward
+ if (rc == -1 || verdict == PKT_CACHE_VERDICT_AGGREGATE_INTEREST ||
+ verdict == PKT_CACHE_VERDICT_FORWARD_DATA) {
+ bitmap_unset_no_check(int_manifest_header->request_bitmap, pos);
+ } else {
+ n_suffixes_to_fwd++;
+ }
+
+ WITH_DEBUG({
+ char buf[MAXSZ_HICN_PREFIX];
+ int rc = hicn_name_snprintf(buf, MAXSZ_HICN_NAME, &name_copy);
+ if (rc < 0 || rc >= MAXSZ_HICN_PREFIX)
+ snprintf(buf, MAXSZ_HICN_PREFIX, "(error)");
+ DEBUG("Next in manifest: %s", buf);
+ });
+ }
+
+ // Return if nothing in the manifest to forward
+ if (n_suffixes_to_fwd == 0) return msgbuf_get_len(msgbuf);
+
+ return _forwarder_forward_aggregated_interest(forwarder, int_manifest_header,
+ msgbuf, msgbuf_id, entries);
+}
+
+/**
+ * @function forwarder_process_interest
+ * @abstract Receive an interest from the network
+ * @discussion
+ * (1) if interest in the PIT, aggregate in PIT
+ * (2) if interest in the ContentStore, reply
+ * (3) if in the FIB, forward
+ * (4) drop
+ *
+ */
+static ssize_t forwarder_process_interest(forwarder_t *forwarder,
+ off_t msgbuf_id) {
+ assert(forwarder);
+ assert(msgbuf_id_is_valid(msgbuf_id));
+
+ msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+ const connection_table_t *table = forwarder_get_connection_table(forwarder);
+ connection_t *conn =
+ connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf));
+
+ assert(msgbuf_get_type(msgbuf) == HICN_PACKET_TYPE_INTEREST);
+
+ u32 n_suffixes = 0;
+ interest_manifest_header_t *int_manifest_header = NULL;
+ size_t payload_size;
+ int ret = _forwarder_get_interest_manifest(msgbuf, &int_manifest_header,
+ &payload_size);
+ if (ret == 0) {
+ // Deserialize intrest manifest
+ interest_manifest_deserialize(int_manifest_header);
+
+ if (!interest_manifest_is_valid(int_manifest_header, payload_size))
+ return -1;
+
+ n_suffixes = int_manifest_header->n_suffixes;
+ }
+
+ // Update stats
+ forwarder->stats.countInterestsReceived++;
+ conn->stats.interests.rx_pkts++;
+ conn->stats.interests.rx_bytes += msgbuf_get_len(msgbuf);
+
+ WITH_DEBUG({
+ char buf[MAXSZ_HICN_PREFIX];
+ int rc = hicn_name_snprintf(buf, MAXSZ_HICN_NAME, msgbuf_get_name(msgbuf));
+ if (rc < 0 || rc >= MAXSZ_HICN_PREFIX)
+ snprintf(buf, MAXSZ_HICN_PREFIX, "(error)");
+ DEBUG("INTEREST (%s) msgbuf_id=%lu ingress=%u length=%u", buf, msgbuf_id,
+ msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf));
+ DEBUG("INTEREST (%s) suffixes=%u msgbuf_id=%lu ingress=%u length=%u", buf,
+ n_suffixes, msgbuf_id, msgbuf_get_connection_id(msgbuf),
+ msgbuf_get_len(msgbuf));
+ });
+
+ // Cache suffixes for current prefix to (possibly) avoid double lookups
+ pkt_cache_save_suffixes_for_prefix(
+ forwarder->pkt_cache, hicn_name_get_prefix(msgbuf_get_name(msgbuf)));
+
+ if (ret == -1)
+ return forwarder_process_single_interest(forwarder, msgbuf_pool, msgbuf,
+ msgbuf_id);
+ return forwarder_process_aggregated_interest(forwarder, int_manifest_header,
+ msgbuf_pool, msgbuf, msgbuf_id);
+}
+
+static void _forwarder_log_on_data(forwarder_t *forwarder,
+ pkt_cache_verdict_t verdict) {
+ switch (verdict) {
+ case PKT_CACHE_VERDICT_FORWARD_DATA:
+ DEBUG("Message added to CS from PIT");
+ break;
+ case PKT_CACHE_VERDICT_STORE_DATA:
+ DEBUG(
+ "Message added to CS (expired or no previous interest "
+ "pending)");
+ break;
+ case PKT_CACHE_VERDICT_CLEAR_DATA:
+ break;
+ case PKT_CACHE_VERDICT_UPDATE_DATA:
+ DEBUG("Message updated in CS");
+ break;
+ case PKT_CACHE_VERDICT_IGNORE_DATA:
+ DEBUG("Message not stored in CS");
+ break;
+ case PKT_CACHE_VERDICT_ERROR:
+ ERROR("Invalid packet cache content");
+ break;
+ default:
+ break;
+ }
+}
+
+/**
+ * @function forwarder_process_data
+ * @abstract Process an in-bound content object
+ * @discussion
+ * (1) If it does not match anything in the PIT, drop it
+ * (2) Add to Content Store
+ * (3) Reverse path forward via PIT entries
+ *
+ * @param <#param1#>
+ */
+static ssize_t forwarder_process_data(forwarder_t *forwarder, off_t msgbuf_id) {
+ msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+
+ WITH_DEBUG({
+ char buf[MAXSZ_HICN_PREFIX];
+ int rc = hicn_name_snprintf(buf, MAXSZ_HICN_NAME, msgbuf_get_name(msgbuf));
+ if (rc < 0 || rc >= MAXSZ_HICN_PREFIX)
+ snprintf(buf, MAXSZ_HICN_PREFIX, "(error)");
+ DEBUG("DATA (%s) msgbuf_id=%lu ingress=%u length=%u", buf, msgbuf_id,
+ msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf));
+ });
+
+ const connection_table_t *table = forwarder_get_connection_table(forwarder);
+ connection_t *conn =
+ connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf));
+
+ // Update stats
+ forwarder->stats.countObjectsReceived++;
+ conn->stats.data.rx_pkts++;
+ conn->stats.data.rx_bytes += msgbuf_get_len(msgbuf);
+
+ // Cache suffixes for current prefix to (possibly) avoid double lookups
+ pkt_cache_save_suffixes_for_prefix(
+ forwarder->pkt_cache, hicn_name_get_prefix(msgbuf_get_name(msgbuf)));
+
+ pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR;
+ bool wrong_egress;
+ nexthops_t *ingressSetUnion = pkt_cache_on_data(
+ forwarder->pkt_cache, msgbuf_pool, msgbuf_id, forwarder->store_in_cs,
+ connection_is_local(conn), &wrong_egress, &verdict);
+
+ _forwarder_log_on_data(forwarder, verdict);
+
+ if (wrong_egress) { // Interest sent via a connection but received from
+ // another
+ WARN("Data coming from unexpected connection, discarded");
+ } else if (!ingressSetUnion) { // No match in the PIT
+ forwarder->stats.countDroppedNoReversePath++;
+ DEBUG("Message %lu did not match PIT, no reverse path", msgbuf_id);
+
+ // NOTE : probes are not stored in PIT
+ if (msgbuf_is_probe(msgbuf)) {
+ fib_entry_t *entry = fib_match_msgbuf(forwarder->fib, msgbuf);
+ if (entry && fib_entry_strategy_type(entry) == STRATEGY_TYPE_BESTPATH) {
+ nexthops_t probe_nexthops = NEXTHOPS_EMPTY;
+ nexthops_add(&probe_nexthops, msgbuf_get_connection_id(msgbuf));
+ fib_entry_on_data(entry, &probe_nexthops, msgbuf, 0, ticks_now());
+ }
+ }
+ forwarder_drop(forwarder, msgbuf_id);
+ } else {
+ // Reverse path forward via PIT entries
+ forwarder_forward_to_nexthops(forwarder, msgbuf_id, ingressSetUnion);
+ free(ingressSetUnion);
+ }
+
+ return msgbuf_get_len(msgbuf);
+}
+
+void forwarder_flush_connections(forwarder_t *forwarder) {
+ // DEBUG("[forwarder_flush_connections]");
+ const connection_table_t *table = forwarder_get_connection_table(forwarder);
+
+ unsigned num_pending_conn = (unsigned)vector_len(forwarder->pending_conn);
+ for (unsigned i = 0; i < num_pending_conn; i++) {
+ unsigned conn_id = forwarder->pending_conn[i];
+ connection_t *conn = connection_table_at(table, conn_id);
+ if (!connection_flush(conn)) {
+ WARN("Could not flush connection queue");
+ // XXX keep track of non flushed connections...
+ }
+ }
+ vector_reset(forwarder->pending_conn);
+ // DEBUG("[forwarder_flush_connections] done");
+}
+
+#if WITH_WLDR
+// XXX move to wldr file, worst case in connection.
+void forwarder_apply_wldr(const forwarder_t *forwarder, const msgbuf_t *msgbuf,
+ connection_t *connection) {
+ // this are the checks needed to implement WLDR. We set wldr only on the
+ // STAs and we let the AP to react according to choice of the client. if
+ // the STA enables wldr using the set command, the AP enable wldr as
+ // well otherwise, if the STA disable it the AP remove wldr WLDR should
+ // be enabled only on the STAs using the command line
+ // TODO
+ // disable WLDR command line on the AP
+ if (msgbuf_has_wldr(msgbuf)) {
+ if (connection_has_wldr(connection)) {
// case 1: WLDR is enabled
- connection_DetectLosses((Connection *)conn, message);
- } else if (!connection_HasWldr(conn) &&
- connection_WldrAutoStartAllowed(conn)) {
+ connection_wldr_detect_losses(connection, msgbuf);
+ } else if (!connection_has_wldr(connection) &&
+ connection_wldr_autostart_is_allowed(connection)) {
// case 2: We are on an AP. We enable WLDR
- connection_EnableWldr((Connection *)conn);
- connection_DetectLosses((Connection *)conn, message);
+ connection_wldr_enable(connection, true);
+ connection_wldr_detect_losses(connection, msgbuf);
}
// case 3: Ignore WLDR
} else {
- if (connection_HasWldr(conn) && connection_WldrAutoStartAllowed(conn)) {
+ if (connection_has_wldr(connection) &&
+ connection_wldr_autostart_is_allowed(connection)) {
// case 1: STA do not use WLDR, we disable it
- connection_DisableWldr((Connection *)conn);
+ connection_wldr_enable(connection, false);
}
}
-
- messageProcessor_Receive(forwarder->processor, message);
}
+#endif
-Ticks forwarder_GetTicks(const Forwarder *forwarder) {
- parcAssertNotNull(forwarder, "Parameter must be non-null");
- return parcClock_GetTime(forwarder->clock) + forwarder->clockOffset;
-}
+bool forwarder_add_or_update_route(forwarder_t *forwarder,
+ hicn_ip_prefix_t *prefix,
+ unsigned ingress_id) {
+ assert(forwarder);
+ assert(prefix);
-Ticks forwarder_NanosToTicks(uint64_t nanos) { return NSEC_TO_TICKS(nanos); }
+ configuration_t *config = forwarder_get_configuration(forwarder);
-uint64_t forwarder_TicksToNanos(Ticks ticks) {
- return (1000000000ULL) * ticks / HZ;
-}
+ char prefix_s[MAXSZ_IP_PREFIX];
+ int rc = hicn_ip_prefix_snprintf(prefix_s, MAXSZ_IP_PREFIX, prefix);
+ assert(rc < MAXSZ_IP_PREFIX);
+ if (rc < 0) return false;
+
+ INFO("Adding prefix=%s for conn_id=%d", prefix_s, ingress_id);
-bool forwarder_AddOrUpdateRoute(Forwarder *forwarder,
- add_route_command *control, unsigned ifidx) {
- parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null");
- parcAssertNotNull(control, "Parameter route must be non-null");
+ strategy_type_t strategy_type = configuration_get_strategy(config, prefix_s);
- // we only have one message processor
- bool res =
- messageProcessor_AddOrUpdateRoute(forwarder->processor, control, ifidx);
+ hicn_prefix_t name_prefix = HICN_PREFIX_EMPTY;
+ hicn_prefix_create_from_ip_address_len(&prefix->address, prefix->len,
+ &name_prefix);
+ fib_entry_t *entry = fib_contains(forwarder->fib, &name_prefix);
+ if (!entry) {
+ entry = fib_entry_create(&name_prefix, strategy_type, NULL, forwarder);
+ if (ingress_id != INVALID_FACE_ID)
+ fib_entry_nexthops_add(entry, ingress_id);
+ entry = fib_add(forwarder->fib, entry);
- return res;
+ } else {
+ if (ingress_id != INVALID_FACE_ID)
+ fib_entry_nexthops_add(entry, ingress_id);
+ }
+
+ forwarder_on_route_event(forwarder, entry);
+
+ return true;
}
+bool forwarder_remove_route(forwarder_t *forwarder, hicn_ip_prefix_t *prefix,
+ unsigned ingress_id) {
+ assert(forwarder);
+ assert(prefix);
-bool forwarder_RemoveRoute(Forwarder *forwarder, remove_route_command *control,
- unsigned ifidx) {
- parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null");
- parcAssertNotNull(control, "Parameter route must be non-null");
+ hicn_prefix_t name_prefix = HICN_PREFIX_EMPTY;
+ hicn_prefix_create_from_ip_address_len(&prefix->address, prefix->len,
+ &name_prefix);
+ fib_remove(forwarder->fib, &name_prefix, ingress_id);
- // we only have one message processor
- return messageProcessor_RemoveRoute(forwarder->processor, control, ifidx);
+ return true;
}
#ifdef WITH_POLICY
-bool forwarder_AddOrUpdatePolicy(Forwarder *forwarder,
- add_policy_command *control) {
- parcAssertNotNull(forwarder, "Parameter forwarder must be non-null");
- parcAssertNotNull(control, "Parameter control must be non-null");
+bool forwarder_add_or_update_policy(forwarder_t *forwarder,
+ hicn_ip_prefix_t *prefix,
+ hicn_policy_t *policy) {
+ assert(forwarder);
+ assert(prefix);
+ assert(policy);
+
+ hicn_prefix_t name_prefix = HICN_PREFIX_EMPTY;
+ hicn_prefix_create_from_ip_address_len(&prefix->address, prefix->len,
+ &name_prefix);
+ fib_entry_t *entry = fib_contains(forwarder->fib, &name_prefix);
+ if (!entry) return false;
- return messageProcessor_AddOrUpdatePolicy(forwarder->processor, control);
+ fib_entry_set_policy(entry, *policy);
+
+ return true;
}
-bool forwarder_RemovePolicy(Forwarder *forwarder, remove_policy_command *control) {
- parcAssertNotNull(forwarder, "Parameter forwarder must be non-null");
- parcAssertNotNull(control, "Parameter control must be non-null");
+bool forwarder_remove_policy(forwarder_t *forwarder, hicn_ip_prefix_t *prefix) {
+ assert(forwarder);
+ assert(prefix);
+
+ hicn_prefix_t name_prefix = HICN_PREFIX_EMPTY;
+ hicn_prefix_create_from_ip_address_len(&prefix->address, prefix->len,
+ &name_prefix);
+ fib_entry_t *entry = fib_contains(forwarder->fib, &name_prefix);
+ if (!entry) return false;
- return messageProcessor_RemovePolicy(forwarder->processor, control);
+ fib_entry_set_policy(entry, POLICY_EMPTY);
+
+ return true;
}
#endif /* WITH_POLICY */
-void forwarder_RemoveConnectionIdFromRoutes(Forwarder *forwarder,
- unsigned connectionId) {
- parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null");
- messageProcessor_RemoveConnectionIdFromRoutes(forwarder->processor,
- connectionId);
+void forwarder_remove_connection_id_from_routes(const forwarder_t *forwarder,
+ unsigned connection_id) {
+ fib_entry_t **removed_entries = NULL;
+ size_t num_removed_entries;
+
+ assert(forwarder);
+
+ fib_remove_connection(forwarder->fib, connection_id, &removed_entries,
+ &num_removed_entries);
+
+ if (num_removed_entries > 0) {
+ assert(removed_entries);
+
+ for (int i = 0; i < num_removed_entries; i++) {
+ fib_entry_t *entry = removed_entries[i];
+ forwarder_on_route_event(forwarder, entry);
+ fib_remove_entry(forwarder->fib, entry);
+ }
+ free(removed_entries);
+ }
}
-void forwarder_SetStrategy(Forwarder *forwarder, Name *prefix,
- strategy_type strategy,
- unsigned related_prefixes_len,
- Name **related_prefixes) {
- parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null");
- parcAssertNotNull(prefix, "Parameter prefix must be non-null");
+void forwarder_add_strategy_options(forwarder_t *forwarder,
+ hicn_prefix_t *name_prefix,
+ strategy_type_t strategy_type,
+ strategy_options_t *strategy_options) {
+ assert(forwarder);
+ assert(name_prefix);
+ assert(strategy_options);
+ assert(STRATEGY_TYPE_VALID(strategy_type));
- processor_SetStrategy(forwarder->processor, prefix, strategy,
- related_prefixes_len, related_prefixes);
+ fib_entry_t *entry = fib_contains(forwarder->fib, name_prefix);
+ if (!entry) return;
+
+ fib_entry_add_strategy_options(entry, strategy_type, strategy_options);
}
-FibEntryList *forwarder_GetFibEntries(Forwarder *forwarder) {
- return messageProcessor_GetFibEntries(forwarder->processor);
+void forwarder_set_strategy(forwarder_t *forwarder, hicn_prefix_t *prefix,
+ strategy_type_t strategy_type,
+ strategy_options_t *strategy_options) {
+ assert(forwarder);
+ assert(prefix);
+ assert(STRATEGY_TYPE_VALID(strategy_type));
+ /* strategy_options might be NULL */
+
+ fib_entry_t *entry = fib_contains(forwarder->fib, prefix);
+ if (!entry) {
+ // there is no exact match. so if the forwarding strategy is not in
+ // the list of strategies that can be set by the transport, return
+ if (strategy_type != STRATEGY_TYPE_BESTPATH &&
+ strategy_type != STRATEGY_TYPE_REPLICATION) {
+ return;
+ }
+
+ // here it may be the transprot that wants to set the strategy, but it has
+ // no knowledge of the length of the prefix. so we apply the strategy at the
+ // matching fib entry, which later will be the one that will be used to send
+ // interests with this name
+ entry = fib_match_prefix(forwarder->fib, prefix);
+ if (!entry) {
+ return; // no fib match, return
+ }
+ }
+
+ fib_entry_set_strategy(entry, strategy_type, strategy_options);
}
-void forwarder_SetContentObjectStoreSize(Forwarder *forwarder,
- size_t maximumContentStoreSize) {
- messageProcessor_SetContentObjectStoreSize(forwarder->processor,
- maximumContentStoreSize);
+cs_t *forwarder_get_cs(const forwarder_t *forwarder) {
+ assert(forwarder);
+
+ return pkt_cache_get_cs(forwarder->pkt_cache);
}
-void forwarder_ClearCache(Forwarder *forwarder) {
- messageProcessor_ClearCache(forwarder->processor);
+// IMPORTANT: Use this function ONLY for read-only operations since a
+// realloc would otherwise modify the returned copy but not the original
+// msgbuf ids vector in the forwarder. This constraint cannot be enforced
+// by returning a (const off_t *) because the vector_t macros still cast
+// to (void **).
+off_t *forwarder_get_acquired_msgbuf_ids(const forwarder_t *forwarder) {
+ return forwarder->acquired_msgbuf_ids;
}
-PARCClock *forwarder_GetClock(const Forwarder *forwarder) {
- return forwarder->clock;
+void forwarder_acquired_msgbuf_ids_reset(const forwarder_t *forwarder) {
+ vector_reset(forwarder->acquired_msgbuf_ids);
}
-#if !defined(__APPLE__)
-hicn_socket_helper_t *forwarder_GetHicnSocketHelper(Forwarder *forwarder) {
- return forwarder->hicnSocketHelper;
+void forwarder_acquired_msgbuf_ids_push(const forwarder_t *forwarder,
+ off_t msgbuf_id) {
+ vector_push(forwarder->acquired_msgbuf_ids, msgbuf_id);
}
-#endif
// =======================================================
-static void _signal_cb(int sig, PARCEventType events, void *user_data) {
- Forwarder *forwarder = (Forwarder *)user_data;
+fib_t *forwarder_get_fib(const forwarder_t *forwarder) {
+ return forwarder->fib;
+}
+
+msgbuf_pool_t *forwarder_get_msgbuf_pool(const forwarder_t *forwarder) {
+ return forwarder->msgbuf_pool;
+}
+
+mapme_t *forwarder_get_mapme(const forwarder_t *forwarder) {
+ return forwarder->mapme;
+}
+
+#ifdef WITH_POLICY_STATS
+const policy_stats_mgr_t *forwarder_get_policy_stats_mgr(
+ const forwarder_t *forwarder) {
+ return &forwarder->policy_stats_mgr;
+}
+#endif /* WITH_POLICY_STATS */
+
+/**
+ * @brief Finalize (i.e. close fd and free internal data structures)
+ * the current connection ("SELF") when the command is received.
+ * The connection cannot be removed inside the command handling
+ * because it is needed to return the ack back.
+ */
+static void _forwarder_finalize_connection_if_self(connection_t *conn,
+ msgbuf_t *msgbuf) {
+ uint8_t *packet = msgbuf_get_packet(msgbuf);
+ msg_connection_remove_t *msg = (msg_connection_remove_t *)packet;
+ cmd_connection_remove_t *control = &msg->payload;
+
+ if (strcmp(control->symbolic_or_connid, "SELF") == 0)
+ connection_finalize(conn);
+}
- logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning,
- __func__, "signal %d events %d", sig, events);
+ssize_t forwarder_receive(forwarder_t *forwarder, listener_t *listener,
+ off_t msgbuf_id, address_pair_t *pair, Ticks now) {
+ assert(forwarder);
+ /* listener can be NULL */
+ assert(msgbuf_id_is_valid(msgbuf_id));
+ assert(pair);
+
+ hicn_name_t name;
+ msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+ assert(msgbuf);
+
+ size_t size = msgbuf_get_len(msgbuf);
+
+ const connection_table_t *table =
+ forwarder_get_connection_table(listener->forwarder);
+
+ /* Connection lookup */
+ if (msgbuf_get_connection_id(msgbuf) == CONNECTION_ID_UNDEFINED) {
+ connection_t *connection = connection_table_get_by_pair(table, pair);
+ unsigned conn_id =
+ connection
+ ? (unsigned)connection_table_get_connection_id(table, connection)
+ : CONNECTION_ID_UNDEFINED;
+
+ assert((conn_id != CONNECTION_ID_UNDEFINED) || listener);
+ msgbuf->connection_id = conn_id;
+ }
- switch ((int)sig) {
- case SIGTERM:
- logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning,
- __func__, "Caught an terminate signal; exiting cleanly.");
- dispatcher_Stop(forwarder->dispatcher);
+ forwarder->stats.countReceived++;
+
+ /* Initialize packet buffer stored in msgbuf through libhicn */
+ msgbuf_initialize_from_packet(msgbuf);
+
+ /* Detect packet type */
+ hicn_packet_analyze(msgbuf_get_pkbuf(msgbuf));
+
+ msgbuf->recv_ts = now;
+
+RETRY:
+
+ switch (msgbuf_get_type(msgbuf)) {
+ case HICN_PACKET_TYPE_INTEREST:
+ if (!connection_id_is_valid(msgbuf->connection_id)) {
+ char conn_name[SYMBOLIC_NAME_LEN];
+ int rc = connection_table_get_random_name(table, conn_name);
+ if (rc < 0) {
+ ERROR("Could not create name for new connection");
+ goto DROP;
+ }
+
+ unsigned connection_id =
+ listener_create_connection(listener, conn_name, pair);
+ if (connection_id == CONNECTION_ID_UNDEFINED) {
+ ERROR("Could not create new connection");
+ goto DROP;
+ }
+ msgbuf->connection_id = connection_id;
+ }
+ msgbuf->path_label = 0; // not used for interest packets
+ hicn_interest_get_name(msgbuf_get_pkbuf(msgbuf), &name);
+ msgbuf_set_name(msgbuf, &name);
+#ifdef WITH_WLDR
+ forwarder_apply_wldr(forwarder, msgbuf, connection);
+#endif /* WITH_WLDR */
+ forwarder_process_interest(forwarder, msgbuf_id);
+
+ pkt_cache_log(forwarder->pkt_cache);
break;
- case SIGINT:
- logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning,
- __func__, "Caught an interrupt signal; exiting cleanly.");
- dispatcher_Stop(forwarder->dispatcher);
+ case HICN_PACKET_TYPE_DATA:
+ /* This include probes */
+ if (!connection_id_is_valid(msgbuf->connection_id)) {
+ ERROR("Invalid connection for data packet");
+ goto DROP;
+ }
+ msgbuf_init_pathlabel(msgbuf);
+ hicn_data_get_name(msgbuf_get_pkbuf(msgbuf), &name);
+ msgbuf_set_name(msgbuf, &name);
+#ifdef WITH_WLDR
+ forwarder_apply_wldr(forwarder, msgbuf, connection);
+#endif /* WITH_WLDR */
+ forwarder_process_data(forwarder, msgbuf_id);
+
+ pkt_cache_log(forwarder->pkt_cache);
break;
-#ifndef _WIN32
- case SIGUSR1:
- // dump stats
+
+#ifdef WITH_WLDR
+ case HICN_PACKET_TYPE_WLDR_NOTIFICATION:
+ if (!connection_id_is_valid(msgbuf->connection_id)) {
+ ERROR("Invalid connection for WLDR packet");
+ goto DROP;
+ }
+ connection_t *connection =
+ connection_table_get_by_id(table, msgbuf->connection_id);
+ connection_wldr_handle_notification(connection, msgbuf);
break;
#endif
- default:
+ case HICN_PACKET_TYPE_MAPME:
+ INFO("Received MAP-Me packet");
+ if (!connection_id_is_valid(msgbuf->connection_id)) {
+ char conn_name[SYMBOLIC_NAME_LEN];
+ int rc = connection_table_get_random_name(table, conn_name);
+ if (rc < 0) {
+ ERROR("Could not create name for new connection");
+ goto DROP;
+ }
+
+ unsigned connection_id =
+ listener_create_connection(listener, conn_name, pair);
+ if (connection_id == CONNECTION_ID_UNDEFINED) {
+ ERROR("Could not create new connection");
+ goto DROP;
+ }
+ INFO("Created connection upon MAP-Me packet");
+ msgbuf->connection_id = connection_id;
+ }
+ mapme_process(forwarder->mapme, msgbuf);
break;
- }
-}
-static void _keepalive_cb(int fd, PARCEventType what, void *user_data) {
- parcAssertTrue(what & PARCEventType_Timeout, "Got unexpected tick_cb: %d",
- what);
- // function is just a keepalive for hicn-light, does not do anything
-}
+ case HICN_PACKET_TYPE_COMMAND:
+ // Create the connection to send the ack back
+ if (!connection_id_is_valid(msgbuf->connection_id)) {
+ char conn_name[SYMBOLIC_NAME_LEN];
+ int rc = connection_table_get_random_name(table, conn_name);
+ if (rc < 0) {
+ ERROR("Could not create name for new connection");
+ goto DROP;
+ }
+
+ unsigned connection_id =
+ listener_create_connection(listener, conn_name, pair);
+ if (connection_id == CONNECTION_ID_UNDEFINED) {
+ ERROR("Could not create new connection");
+ goto DROP;
+ }
+ msgbuf->connection_id = connection_id;
+ }
+
+ msg_header_t *msg = (msg_header_t *)msgbuf_get_packet(msgbuf);
+ msgbuf->command.type = msg->header.command_id;
+ if (!command_type_is_valid(msgbuf->command.type)) {
+ ERROR("Invalid command %d", msgbuf->command.type);
+ goto DROP;
+ }
+
+ /*
+ * We need to retrieve the connection (in case it is useful) before
+ * proceeding through the removal
+ */
+ connection_t *connection =
+ connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf));
+ size = command_process_msgbuf(forwarder, msgbuf);
+ if (msgbuf->command.type == COMMAND_TYPE_CONNECTION_REMOVE)
+ _forwarder_finalize_connection_if_self(connection, msgbuf);
+ return size;
-#ifdef WITH_MAPME
-FIB *forwarder_getFib(Forwarder *forwarder) {
- return messageProcessor_getFib(forwarder->processor);
-}
+ default:
+ /* Commands are not recognized by the packet parser */
+ if (msgbuf_is_command(msgbuf)) {
+ msgbuf_set_type(msgbuf, HICN_PACKET_TYPE_COMMAND);
+ goto RETRY;
+ }
+ goto DROP;
+ }
-void forwarder_onConnectionEvent(Forwarder *forwarder, const Connection *conn, connection_event_t event) {
-//#ifdef WITH_POLICY
-// messageProcessor_onConnectionEvent(forwarder->processor, conn, event);
-//#else
- mapme_onConnectionEvent(forwarder->mapme, conn, event);
-//#endif /* WITH_POLICY */
-}
+ return size;
-void forwarder_ProcessMapMe(Forwarder *forwarder, const uint8_t *msgBuffer,
- unsigned conn_id) {
- mapme_Process(forwarder->mapme, msgBuffer, conn_id);
+DROP:
+ forwarder_drop(forwarder, msgbuf_id);
+ return 0;
}
-MapMe *
-forwarder_getMapmeInstance(const Forwarder *forwarder) {
- return forwarder->mapme;
+void forwarder_log(forwarder_t *forwarder) {
+ DEBUG(
+ "Forwarder: received = %u (interest = %u, data = %u), dropped = %u "
+ "(interest = %u, data = %u, other = %u), forwarded = { interests = "
+ "%u, "
+ "data = %u }, dropped = { connection_not_found = %u, send_failure "
+ "= "
+ "%u, "
+ "no_route_in_fib = %u }, interest processing = { aggregated = %u, "
+ "retransmitted = %u, satisfied_from_cs = %u, expired_interests = "
+ "%u, "
+ "expired_data = %u }, data processing = { "
+ "no_reverse_path = %u }\n",
+ forwarder->stats.countReceived, forwarder->stats.countInterestsReceived,
+ forwarder->stats.countObjectsReceived, forwarder->stats.countDropped,
+ forwarder->stats.countInterestsDropped,
+ forwarder->stats.countObjectsDropped, forwarder->stats.countOtherDropped,
+ forwarder->stats.countInterestForwarded,
+ forwarder->stats.countObjectsForwarded,
+ forwarder->stats.countDroppedConnectionNotFound,
+ forwarder->stats.countSendFailures, forwarder->stats.countDroppedNoRoute,
+ forwarder->stats.countInterestsAggregated,
+ forwarder->stats.countInterestsRetransmitted,
+ forwarder->stats.countInterestsSatisfiedFromStore,
+ forwarder->stats.countInterestsExpired, forwarder->stats.countDataExpired,
+ forwarder->stats.countDroppedNoReversePath);
}
-#endif /* WITH_MAPME */
+forwarder_stats_t forwarder_get_stats(forwarder_t *forwarder) {
+ return forwarder->stats;
+}