diff options
Diffstat (limited to 'hicn-light/src/hicn/core/forwarder.c')
-rw-r--r-- | hicn-light/src/hicn/core/forwarder.c | 1837 |
1 files changed, 1456 insertions, 381 deletions
diff --git a/hicn-light/src/hicn/core/forwarder.c b/hicn-light/src/hicn/core/forwarder.c index 94e8cc885..8c276bfef 100644 --- a/hicn-light/src/hicn/core/forwarder.c +++ b/hicn-light/src/hicn/core/forwarder.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2023 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -23,6 +23,20 @@ * the event scheduler */ +/* Bypass FIB and send packet one by one */ +//#define BYPASS_FIB 1 + +/* Send packets one by one : only effective if FIB is not bypassed */ +//#define USE_SEND_PACKET 1 + +/* Batch sending: only if the previous option is undefined */ +#define USE_QUEUE true + +/* Shall we send mapme updates to advertise all local prefixes on newly created + * faces + */ +//#define ADVERTISE_PREFIXES_ON_NEW_FACES + #ifndef _WIN32 #include <arpa/inet.h> #include <sys/socket.h> @@ -30,8 +44,7 @@ #endif #include <errno.h> #include <fcntl.h> -#include <signal.h> -#include <hicn/hicn-light/config.h> +//#include <hicn/hicn-light/config.h> #include <stdarg.h> #include <stdio.h> #include <stdlib.h> @@ -41,90 +54,71 @@ #define __STDC_FORMAT_MACROS #include <inttypes.h> -#include <parc/algol/parc_ArrayList.h> -#include <parc/algol/parc_Memory.h> -#include <parc/algol/parc_Object.h> -#include <parc/logging/parc_LogReporterTextStdout.h> - -#include <hicn/core/connectionManager.h> -#include <hicn/core/connectionTable.h> -#include <hicn/core/dispatcher.h> -#include <hicn/core/forwarder.h> -#include <hicn/core/messagePacketType.h> +#include "connection_table.h" +#include "fib.h" +#include "forwarder.h" +#include "listener_table.h" #ifdef WITH_MAPME -#include <hicn/core/mapme.h> +#include "mapme.h" #endif /* WITH_MAPME */ -#include <hicn/config/configuration.h> -#include <hicn/config/configurationFile.h> -#include <hicn/config/configurationListeners.h> -#include <hicn/processor/messageProcessor.h> +#include "msgbuf.h" +#include "msgbuf_pool.h" +#include "packet_cache.h" +#include "../config/configuration.h" +// #include "../config/configuration_file.h" +#include "../config/commands.h" +#include "../io/base.h" // MAX_MSG + +#ifdef WITH_POLICY_STATS +#include <hicn/core/policy_stats.h> +#endif /* WITH_POLICY_STATS */ #include <hicn/core/wldr.h> +#include <hicn/interest_manifest.h> +#include <hicn/util/log.h> -#include <parc/assert/parc_Assert.h> - -// the router's clock frequency (we now use the monotonic clock) -#define HZ 1000 - -// these will all be a little off because its all integer division -#define MSEC_PER_TICK (1000 / HZ) -#define USEC_PER_TICK (1000000 / HZ) -#define NSEC_PER_TICK ((1000000000ULL) / HZ) -#define MSEC_TO_TICKS(msec) \ - ((msec < FC_MSEC_PER_TICK) ? 1 : msec / FC_MSEC_PER_TICK) -#define NSEC_TO_TICKS(nsec) ((nsec < NSEC_PER_TICK) ? 1 : nsec / NSEC_PER_TICK) +struct forwarder_s { + // uint16_t server_port; -struct forwarder { - Dispatcher *dispatcher; - - uint16_t server_port; + // used by seed48 and nrand48 + unsigned short seed[3]; - PARCEventSignal *signal_int; - PARCEventSignal *signal_term; -#ifndef _WIN32 - PARCEventSignal *signal_usr1; -#endif - PARCEventTimer *keepalive_event; + connection_table_t *connection_table; + listener_table_t *listener_table; + configuration_t *config; - // will skew the virtual clock forward. In normal operaiton, it is 0. - Ticks clockOffset; + pkt_cache_t *pkt_cache; + fib_t *fib; + msgbuf_pool_t *msgbuf_pool; - unsigned nextConnectionid; - Messenger *messenger; - ConnectionManager *connectionManager; - ConnectionTable *connectionTable; - ListenerSet *listenerSet; - Configuration *config; +#ifdef WITH_MAPME + mapme_t *mapme; +#endif /* WITH_MAPME */ - // we'll eventually want to setup a threadpool of these - MessageProcessor *processor; + bool store_in_cs; + bool serve_from_cs; - Logger *logger; + forwarder_stats_t stats; +#ifdef WITH_POLICY_STATS + policy_stats_mgr_t policy_stats_mgr; +#endif /* WITH_POLICY_STATS */ - PARCClock *clock; + /* + * The message forwarder has to decide whether to queue incoming packets for + * batching, or trigger the transmission on the connection + */ + unsigned *pending_conn; -#if !defined(__APPLE__) - hicn_socket_helper_t - *hicnSocketHelper; // state required to manage hicn connections -#endif - // used by seed48 and nrand48 - unsigned short seed[3]; + subscription_table_t *subscriptions; -#ifdef WITH_MAPME - MapMe *mapme; -#endif /* WITH_MAPME */ + // Used to store the msgbufs that need to be released + off_t *acquired_msgbuf_ids; }; -// signal traps through the event scheduler -static void _signal_cb(int, PARCEventType, void *); - -// A no-op keepalive to prevent Libevent from exiting the dispatch loop -static void _keepalive_cb(int, PARCEventType, void *); - /** * Reseed our pseudo-random number generator. */ -static void forwarder_Seed(Forwarder *forwarder) { +static void forwarder_seed(forwarder_t *forwarder) { #ifndef _WIN32 int fd; ssize_t res; @@ -150,452 +144,1533 @@ static void forwarder_Seed(Forwarder *forwarder) { #endif } -Logger *forwarder_GetLogger(const Forwarder *forwarder) { - return forwarder->logger; -} +forwarder_t *forwarder_create(configuration_t *configuration) { + forwarder_t *forwarder = malloc(sizeof(forwarder_t)); + if (!forwarder) goto ERR_MALLOC; -// ============================================================================ -// Setup and destroy section + forwarder_seed(forwarder); + srand(forwarder->seed[0] ^ forwarder->seed[1] ^ forwarder->seed[2]); -Forwarder *forwarder_Create(Logger *logger) { - Forwarder *forwarder = parcMemory_AllocateAndClear(sizeof(Forwarder)); - parcAssertNotNull(forwarder, "parcMemory_AllocateAndClear(%zu) returned NULL", - sizeof(Forwarder)); - memset(forwarder, 0, sizeof(Forwarder)); - forwarder_Seed(forwarder); + forwarder->config = configuration; - forwarder->clock = parcClock_Monotonic(); - forwarder->clockOffset = 0; + forwarder->listener_table = listener_table_create(); + if (!forwarder->listener_table) goto ERR_LISTENER_TABLE; - if (logger) { - forwarder->logger = logger_Acquire(logger); - logger_SetClock(forwarder->logger, forwarder->clock); - } else { - PARCLogReporter *reporter = parcLogReporterTextStdout_Create(); - forwarder->logger = logger_Create(reporter, forwarder->clock); - parcLogReporter_Release(&reporter); - } + forwarder->connection_table = connection_table_create(); + if (!forwarder->connection_table) goto ERR_CONNECTION_TABLE; - forwarder->nextConnectionid = 1; - forwarder->dispatcher = dispatcher_Create(forwarder->logger); - forwarder->messenger = messenger_Create(forwarder->dispatcher); - forwarder->connectionManager = connectionManager_Create(forwarder); - forwarder->connectionTable = connectionTable_Create(); - forwarder->listenerSet = listenerSet_Create(); - forwarder->config = configuration_Create(forwarder); - forwarder->processor = messageProcessor_Create(forwarder); - - forwarder->signal_term = dispatcher_CreateSignalEvent( - forwarder->dispatcher, _signal_cb, forwarder, SIGTERM); - dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_term); - - forwarder->signal_int = dispatcher_CreateSignalEvent( - forwarder->dispatcher, _signal_cb, forwarder, SIGINT); - dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_int); -#ifndef _WIN32 - forwarder->signal_usr1 = dispatcher_CreateSignalEvent( - forwarder->dispatcher, _signal_cb, forwarder, SIGPIPE); - dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_usr1); -#endif + forwarder->fib = fib_create(forwarder); + if (!forwarder->fib) goto ERR_FIB; -#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \ - defined(PUNTING) - forwarder->hicnSocketHelper = hicn_create(); - if (!forwarder->hicnSocketHelper) - goto ERR_SOCKET; -#endif /* __APPLE__ */ - -#ifdef WITH_MAPME - if (!(mapme_create(&forwarder->mapme, forwarder))) - goto ERR_MAPME; -#endif /* WITH_MAPME */ + forwarder->msgbuf_pool = msgbuf_pool_create(); + if (!forwarder->msgbuf_pool) goto ERR_PACKET_POOL; + size_t objectStoreSize = configuration_get_cs_size(configuration); + forwarder->pkt_cache = pkt_cache_create(objectStoreSize); + if (!forwarder->pkt_cache) goto ERR_PKT_CACHE; - /* ignore child */ -#ifndef _WIN32 - signal(SIGCHLD, SIG_IGN); + forwarder->subscriptions = subscription_table_create(); + if (!forwarder->subscriptions) goto ERR_SUBSCRIPTION; - /* ignore tty signals */ - signal(SIGTSTP, SIG_IGN); - signal(SIGTTOU, SIG_IGN); - signal(SIGTTIN, SIG_IGN); -#endif + // the two flags for the cs are set to true by default. If the cs + // is active it always work as expected unless the use modifies this + // values using controller + if (objectStoreSize != 0) { + forwarder->store_in_cs = true; + forwarder->serve_from_cs = true; + } - // We no longer use this for ticks, but we need to have at least one event - // schedule to keep Libevent happy. +#ifdef WITH_MAPME + forwarder->mapme = mapme_create(forwarder); + if (!forwarder->mapme) goto ERR_MAPME; +#endif /* WITH_MAPME */ - struct timeval wtnow_timeout; - timerclear(&wtnow_timeout); +#ifdef WITH_POLICY_STATS + if (policy_stats_mgr_initialize(&forwarder->policy_stats_mgr, forwarder) < 0) + goto ERR_MGR; +#endif /* WITH_POLICY_STATS */ - wtnow_timeout.tv_sec = 0; - wtnow_timeout.tv_usec = 50000; // 20 Hz keepalive + memset(&forwarder->stats, 0, sizeof(forwarder_stats_t)); + vector_init(forwarder->pending_conn, MAX_MSG, 0); + vector_init(forwarder->acquired_msgbuf_ids, MAX_MSG, 0); - PARCEventScheduler *base = - dispatcher_GetEventScheduler(forwarder->dispatcher); - forwarder->keepalive_event = parcEventTimer_Create( - base, PARCEventType_Persist, _keepalive_cb, (void *)forwarder); - parcEventTimer_Start(forwarder->keepalive_event, &wtnow_timeout); + char *n_suffixes_per_split_str = getenv("N_SUFFIXES_PER_SPLIT"); + if (n_suffixes_per_split_str) + configuration_set_suffixes_per_split(forwarder_get_configuration(forwarder), + atoi(n_suffixes_per_split_str)); return forwarder; +ERR_MGR: #ifdef WITH_MAPME ERR_MAPME: #endif /* WITH_MAPME */ -#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \ - defined(PUNTING) - hicn_free(forwarder->hicnSocketHelper); -ERR_SOCKET: -#endif - listenerSet_Destroy(&(forwarder->listenerSet)); - connectionManager_Destroy(&(forwarder->connectionManager)); - connectionTable_Destroy(&(forwarder->connectionTable)); - messageProcessor_Destroy(&(forwarder->processor)); - configuration_Destroy(&(forwarder->config)); - messenger_Destroy(&(forwarder->messenger)); - - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_int)); - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_term)); -#ifndef _WIN32 - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_usr1)); -#endif - - parcClock_Release(&forwarder->clock); - logger_Release(&forwarder->logger); - - // do the dispatcher last - dispatcher_Destroy(&(forwarder->dispatcher)); - parcMemory_Deallocate((void **)&forwarder); +ERR_SUBSCRIPTION: + subscription_table_free(forwarder->subscriptions); +ERR_PKT_CACHE: + pkt_cache_free(forwarder->pkt_cache); + + msgbuf_pool_free(forwarder->msgbuf_pool); +ERR_PACKET_POOL: + fib_free(forwarder->fib); +ERR_FIB: + connection_table_free(forwarder->connection_table); +ERR_CONNECTION_TABLE: + listener_table_free(forwarder->listener_table); +ERR_LISTENER_TABLE: + free(forwarder); +ERR_MALLOC: return NULL; } -void forwarder_Destroy(Forwarder **ptr) { - parcAssertNotNull(ptr, "Parameter must be non-null double pointer"); - parcAssertNotNull(*ptr, "Parameter must dereference to non-null pointer"); - Forwarder *forwarder = *ptr; -#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \ - defined(PUNTING) - hicn_free(forwarder->hicnSocketHelper); -#endif - parcEventTimer_Destroy(&(forwarder->keepalive_event)); - - listenerSet_Destroy(&(forwarder->listenerSet)); - connectionManager_Destroy(&(forwarder->connectionManager)); - connectionTable_Destroy(&(forwarder->connectionTable)); - messageProcessor_Destroy(&(forwarder->processor)); - configuration_Destroy(&(forwarder->config)); +void forwarder_free(forwarder_t *forwarder) { + assert(forwarder); - // the messenger is used by many of the other pieces, so destroy it last - messenger_Destroy(&(forwarder->messenger)); + policy_stats_mgr_finalize(&forwarder->policy_stats_mgr); #ifdef WITH_MAPME mapme_free(forwarder->mapme); #endif /* WITH_MAPME */ - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_int)); - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_term)); -#ifndef _WIN32 - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_usr1)); -#endif + pkt_cache_free(forwarder->pkt_cache); + msgbuf_pool_free(forwarder->msgbuf_pool); + fib_free(forwarder->fib); + connection_table_free(forwarder->connection_table); + listener_table_free(forwarder->listener_table); + subscription_table_free(forwarder->subscriptions); + configuration_free(forwarder->config); + vector_free(forwarder->pending_conn); + vector_free(forwarder->acquired_msgbuf_ids); + free(forwarder); +} + +/* + * An event occurred that might trigger an update of the FIB cache. It is + * possible that the flags have been reset following a connection add or remote. + * The objective of this function is to prepare the cache entry, and to alert of + * any change for both consumer and producer prefixes. + */ +void forwarder_on_route_event(const forwarder_t *forwarder, + fib_entry_t *entry) { + commands_notify_route(forwarder, entry); + +#ifdef ADVERTISE_PREFIXES_ON_NEW_FACES + nexthops_t new_nexthops = NEXTHOPS_EMPTY; +#endif /* ADVERTISE_PREFIXES_ON_NEW_FACES */ + + nexthops_t *nexthops = NULL; + char *prefix_type_s; + + const connection_table_t *table = + forwarder_get_connection_table(entry->forwarder); + + const hicn_prefix_t *prefix = fib_entry_get_prefix(entry); + + WITH_INFO({ + char buf[MAXSZ_HICN_PREFIX]; + hicn_prefix_snprintf(buf, MAXSZ_HICN_NAME, prefix); + INFO("fib_entry_on_event: %s", buf); + )}; + + if (!fib_entry_has_local_nexthop(entry)) { + /* Recompute FIB cache, then check whether it has changed based on hash */ + prefix_type_s = "consumer"; + nexthops = fib_entry_get_nexthops(entry); + nexthops_reset(nexthops); + fib_entry_filter_nexthops(entry, nexthops, ~0, false); +#ifdef ADVERTISE_PREFIXES_ON_NEW_FACES + } else { + /* Check available non-local connections (on which we would send MAP-Me + * updates */ + prefix_type_s = "producer"; + + nexthops = fib_entry_get_mapme_nexthops(entry, &new_nexthops); + fib_entry_filter_nexthops(entry, nexthops, ~0, true); + +#ifdef WITH_MAPME + mapme_set_adjacencies(forwarder->mapme, entry, nexthops, NULL); +#endif /* WITH_MAPME */ +#endif /* ADVERTISE_PREFIXES_ON_NEW_FACES */ + } + + if (!nexthops) + return; - parcClock_Release(&forwarder->clock); - logger_Release(&forwarder->logger); + if (!fib_entry_nexthops_changed(entry, nexthops)) return; - // do the dispatcher last - dispatcher_Destroy(&(forwarder->dispatcher)); + /* Send notification */ + WITH_INFO({ + char buf[MAXSZ_HICN_PREFIX]; + hicn_prefix_snprintf(buf, MAXSZ_HICN_NAME, prefix); + INFO("Active interfaces changed for %s prefix %s", prefix_type_s, buf); + }); - parcMemory_Deallocate((void **)&forwarder); - *ptr = NULL; + netdevice_flags_t flags = NETDEVICE_FLAGS_EMPTY; + nexthops_foreach(nexthops, nh, { + connection_t *connection = connection_table_get_by_id(table, nh); + netdevice_flags_add(flags, connection_get_interface_type(connection)); + }); + + hicn_ip_prefix_t ip_prefix; + hicn_prefix_get_ip_prefix(prefix, &ip_prefix); + commands_notify_active_interface_update(forwarder, &ip_prefix, flags); } -void forwarder_SetupAllListeners(Forwarder *forwarder, uint16_t port, - const char *localPath) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); +int forwarder_add_connection(const forwarder_t *forwarder, + const char *symbolic_name, face_type_t type, + address_pair_t *pair, policy_tags_t tags, + int priority, face_state_t admin_state) { + connection_table_t *table = forwarder_get_connection_table(forwarder); + connection_t *connection = connection_table_get_by_pair(table, pair); + + if (!connection) { + connection = connection_create(type, symbolic_name, pair, forwarder); + if (!connection) { + ERROR("Failed to create %s connection", face_type_str(type)); + return -1; + } + + } else { + WARN("Connection already exists"); + } + +#ifdef WITH_POLICY + connection_set_tags(connection, tags); + connection_set_priority(connection, priority); +#endif /* WITH_POLICY */ - configurationListeners_SetupAll(forwarder->config, port, localPath); + connection_set_admin_state(connection, admin_state); + return 0; } -void forwarder_SetupLocalListeners(Forwarder *forwarder, uint16_t port) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - configurationListeners_SetutpLocalIPv4(forwarder->config, port); +int forwarder_remove_connection(const forwarder_t *forwarder, + unsigned connection_id, bool finalize) { + /* Remove connection from the FIB */ + forwarder_remove_connection_id_from_routes(forwarder, connection_id); + + /* Remove connection */ + connection_table_t *table = forwarder_get_connection_table(forwarder); + + /* Hook: connection deleted through the control protocol */ + connection_t *connection = connection_table_at(table, connection_id); + forwarder_on_connection_event(forwarder, connection, CONNECTION_EVENT_DELETE); + + connection_table_remove_by_id(table, connection_id); + if (finalize) connection_finalize(connection); + + return 0; } -void forwarder_SetupFromConfigFile(Forwarder *forwarder, const char *filename) { - ConfigurationFile *configFile = configurationFile_Create(forwarder, filename); - if (configFile) { - configurationFile_Process(configFile); - configurationFile_Release(&configFile); - } +/* + * This is currently called from commands.c for every command sent to update + * a connection. + */ +void forwarder_on_connection_event(const forwarder_t *forwarder, + const connection_t *connection, + connection_event_t event) { + assert(connection); + + commands_notify_connection(forwarder, event, connection); + + unsigned conn_id = connection_get_id(connection); + + /* We need to send a MapMe update on the newly selected connections for + * each concerned fib_entry : connection is involved, or no more involved */ + fib_t *fib = forwarder_get_fib(forwarder); + fib_foreach_entry(fib, entry, { + const nexthops_t *nexthops = fib_entry_get_nexthops(entry); + + if (!fib_entry_has_local_nexthop(entry)) { + /* Consumer prefix */ + /* + * A new connection has no impact until it is added to FIB, which will + * be handled in a route event + */ + if (event == CONNECTION_EVENT_CREATE) break; + + /* + * For each FIB entry, trigger an event only if the connection is part + * of nexthops */ + // XXX Replace this by a function + nexthops_foreach(nexthops, nexthop, { + if (nexthop != conn_id) continue; + forwarder_on_route_event(forwarder, entry); + break; + }); + } else { + /* Producer prefix */ + if (connection_is_local(connection)) break; + + // XXX we could optimize event more + forwarder_on_route_event(forwarder, entry); + } + }); +} + +void forwarder_setup_local_listeners(forwarder_t *forwarder, uint16_t port) { + assert(forwarder); + listener_setup_local(forwarder, port); } -Configuration *forwarder_GetConfiguration(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); +configuration_t *forwarder_get_configuration(forwarder_t *forwarder) { + assert(forwarder); return forwarder->config; } -// ============================================================================ +subscription_table_t *forwarder_get_subscriptions( + const forwarder_t *forwarder) { + return forwarder->subscriptions; +} + +connection_table_t *forwarder_get_connection_table( + const forwarder_t *forwarder) { + assert(forwarder); + return forwarder->connection_table; +} -unsigned forwarder_GetNextConnectionId(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->nextConnectionid++; +listener_table_t *forwarder_get_listener_table(const forwarder_t *forwarder) { + assert(forwarder); + return forwarder->listener_table; } -Messenger *forwarder_GetMessenger(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->messenger; +pkt_cache_t *forwarder_get_pkt_cache(const forwarder_t *forwarder) { + assert(forwarder); + return forwarder->pkt_cache; } -Dispatcher *forwarder_GetDispatcher(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->dispatcher; +void forwarder_cs_set_store(forwarder_t *forwarder, bool val) { + assert(forwarder); + forwarder->store_in_cs = val; } -#ifdef WITH_POLICY -ConnectionTable *forwarder_GetConnectionTable(const Forwarder *forwarder) { -#else -ConnectionTable *forwarder_GetConnectionTable(Forwarder *forwarder) { -#endif /* WITH_POLICY */ - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->connectionTable; +bool forwarder_cs_get_store(forwarder_t *forwarder) { + assert(forwarder); + return forwarder->store_in_cs; } -ListenerSet *forwarder_GetListenerSet(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->listenerSet; +void forwarder_cs_set_serve(forwarder_t *forwarder, bool val) { + assert(forwarder); + forwarder->serve_from_cs = val; } -void forwarder_SetChacheStoreFlag(Forwarder *forwarder, bool val) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - messageProcessor_SetCacheStoreFlag(forwarder->processor, val); +bool forwarder_cs_get_serve(forwarder_t *forwarder) { + assert(forwarder); + return forwarder->serve_from_cs; } -bool forwarder_GetChacheStoreFlag(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return messageProcessor_GetCacheStoreFlag(forwarder->processor); +void forwarder_cs_set_size(forwarder_t *forwarder, size_t size) { + assert(forwarder); + + if (pkt_cache_set_cs_size(forwarder->pkt_cache, size) < 0) { + ERROR( + "Unable to resize the CS: provided maximum size (%u) is smaller than " + "the number of elements currently stored in the CS (%u). Clear the " + "CS " + "and retry.", + size, pkt_cache_get_cs_size(forwarder->pkt_cache)); + } } -void forwarder_SetChacheServeFlag(Forwarder *forwarder, bool val) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - messageProcessor_SetCacheServeFlag(forwarder->processor, val); +size_t forwarder_cs_get_size(forwarder_t *forwarder) { + assert(forwarder); + return pkt_cache_get_cs_size(forwarder->pkt_cache); } -bool forwarder_GetChacheServeFlag(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return messageProcessor_GetCacheServeFlag(forwarder->processor); +size_t forwarder_cs_get_num_stale_entries(forwarder_t *forwarder) { + assert(forwarder); + return pkt_cache_get_num_cs_stale_entries(forwarder->pkt_cache); } -void forwarder_ReceiveCommand(Forwarder *forwarder, command_id command, - struct iovec *message, unsigned ingressId) { - configuration_ReceiveCommand(forwarder->config, command, message, ingressId); +void forwarder_cs_clear(forwarder_t *forwarder) { + assert(forwarder); + + pkt_cache_cs_clear(forwarder->pkt_cache); } -void forwarder_Receive(Forwarder *forwarder, Message *message) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(message, "Parameter message must be non-null"); +/** + * @function forwarder_Drop + * @abstract Whenever we "drop" a message, increment counters + * @discussion + * This is a bookkeeping function. It increments the appropriate counters. + * + * The default action for a message is to destroy it in + * <code>forwarder_Receive()</code>, so this function does not need to do + * that. + * + */ +static ssize_t forwarder_drop(forwarder_t *forwarder, off_t msgbuf_id) { + forwarder->stats.countDropped++; + + const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + const msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); - // this takes ownership of the message, so we're done here + switch (msgbuf_get_type(msgbuf)) { + case HICN_PACKET_TYPE_INTEREST: + forwarder->stats.countInterestsDropped++; + break; - // this are the checks needed to implement WLDR. We set wldr only on the STAs - // and we let the AP to react according to choise of the client. - // if the STA enables wldr using the set command, the AP enable wldr as well - // otherwise, if the STA disable it the AP remove wldr - // WLDR should be enabled only on the STAs using the command line - // TODO - // disable WLDR command line on the AP - const Connection *conn = connectionTable_FindById( - forwarder->connectionTable, message_GetIngressConnectionId(message)); + case HICN_PACKET_TYPE_DATA: + forwarder->stats.countObjectsDropped++; + break; + + default: + forwarder->stats.countOtherDropped++; + break; + } + + return msgbuf_get_len(msgbuf); + // dont destroy message here, its done at end of receive +} + +#ifndef BYPASS_FIB +/* + * If the hoplimit is equal to 0, then we may only forward it to local + * applications. Otherwise, we may forward it off the system. + * + */ + +static ssize_t forwarder_forward_via_connection(forwarder_t *forwarder, + off_t msgbuf_id, + unsigned conn_id) { + connection_table_t *table = forwarder_get_connection_table(forwarder); + + const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + + connection_t *conn = connection_table_get_by_id(table, conn_id); if (!conn) { - /* - * Drop is a static method in messageProcessor which might or might not need - * to be called for accounting purposes. This call was initially absent so - * the behaviour was kept like this, as this situation is unlikely. We need - * to release memory though, as this is not done in Drop anyways. - */ - //messageProcessor_Drop(forwarder->processor, message); - message_Release(&message); - return; + forwarder->stats.countDroppedConnectionNotFound++; + WARN("forward msgbuf %lu to interface %u not found (count %u)", msgbuf_id, + conn_id, forwarder->stats.countDroppedConnectionNotFound); + return forwarder_drop(forwarder, msgbuf_id); } - if (message_HasWldr(message)) { - if (connection_HasWldr(conn)) { + /* Always queue the packet... */ + // DEBUG("Queueing packet\n"); + +#if defined(USE_SEND_PACKET) || !defined(__linux__) + + // Here we need to update the path label of a data packet before send + // it. The path label update can be done here because the packet is sent + // directly to the socket + if (msgbuf_get_type(msgbuf) == HICN_PACKET_TYPE_DATA) + msgbuf_update_pathlabel(msgbuf, connection_get_id(conn)); + + bool success = connection_send_packet(conn, msgbuf_get_packet(msgbuf), + msgbuf_get_len(msgbuf)); +#else + + // In this case we cannot update the path label even if it is need because + // the packet is not copied and only the packet id is enqueued to the ring + // buffer associated the output interface. If the path label is updated here + // all data packets get delivered to the next hop with the same label that is + // associated to the last connection used. For this reason the path label + // update must be done before the packet is actually sent inside the different + // IO implementations. + bool success = connection_send(conn, msgbuf_id, USE_QUEUE); + +#endif + + /* ... and mark the connection as pending if this is not yet the case */ + if (!vector_contains(forwarder->pending_conn, conn_id)) + vector_push(forwarder->pending_conn, conn_id); + + if (!success) { + forwarder->stats.countSendFailures++; + + DEBUG("forward msgbuf %llu to interface %u send failure (count %u)", + msgbuf_id, conn_id, forwarder->stats.countSendFailures); + return forwarder_drop(forwarder, msgbuf_id); + } + + switch (msgbuf_get_type(msgbuf)) { + case HICN_PACKET_TYPE_INTEREST: + forwarder->stats.countInterestForwarded++; + break; + + case HICN_PACKET_TYPE_DATA: + forwarder->stats.countObjectsForwarded++; + break; + + default: + break; + } + + TRACE("forward msgbuf %p (size=%u) to interface %u", msgbuf, + msgbuf_get_len(msgbuf), conn_id); + return msgbuf_get_len(msgbuf); +} + +/** + * @function forwarder_forward_to_nexthops + * @abstract Try to forward to each nexthop listed in the NumberSet + * @discussion + * Will not forward to the ingress connection. + * + * @return The number of nexthops tried + */ +static unsigned forwarder_forward_to_nexthops(forwarder_t *forwarder, + off_t msgbuf_id, + const nexthops_t *nexthops) { + // DEBUG("[forwarder_forward_to_nexthops] num=%d/%d", + // nexthops_get_curlen(nexthops), nexthops_get_len(nexthops)); + unsigned forwardedCopies = 0; + + const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + unsigned ingressId = msgbuf_get_connection_id(msgbuf); + + nexthops_foreach(nexthops, nexthop, { + // DEBUG("[forwarder_forward_to_nexthops] - nexthop = %d"); + if (nexthop == ingressId) continue; + + forwardedCopies++; + // INFO("[forwarder_forward_to_nexthops] - nexthop = %d OK", nexthop); + forwarder_forward_via_connection(forwarder, msgbuf_id, nexthop); + }); + + return forwardedCopies; +} + +static bool forwarder_forward_via_fib(forwarder_t *forwarder, off_t msgbuf_id, + pkt_cache_verdict_t verdict, + pkt_cache_entry_t *entry) { + assert(forwarder && entry && msgbuf_id_is_valid(msgbuf_id)); + + bool ret = true; + + const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + assert(msgbuf_get_type(msgbuf) == HICN_PACKET_TYPE_INTEREST); + + fib_entry_t *fib_entry = fib_match_msgbuf(forwarder->fib, msgbuf); + if (!fib_entry) return false; + + nexthops_t *nexthops = fib_entry_get_nexthops(fib_entry); + + /* Backup flags and cur_len*/ + uint_fast32_t flags = nexthops->flags; + size_t cur_len = nexthops_get_curlen(nexthops); + + /* This affects the nexthops */ + nexthops = strategy_lookup_nexthops(&fib_entry->strategy, nexthops, msgbuf); + + if (nexthops_get_curlen(nexthops) == 0) { + ERROR("Message %p returned an empty next hop set", msgbuf); + ret = false; + goto END; + } + + pit_entry_t *pit_entry = &entry->u.pit_entry; + if (!pit_entry) { + ret = false; + goto END; + } + + pit_entry_set_fib_entry(pit_entry, fib_entry); + + // this requires some additional checks. It may happen that some of the output + // faces selected by the forwarding strategy are not usable. So far all the + // forwarding strategy return only valid faces (or an empty list) + nexthops_foreach(nexthops, nexthop, { + // DEBUG("Adding egress to PIT for nexthop %d", nexthop); + pit_entry_egress_add(pit_entry, nexthop); + }); + + if (forwarder_forward_to_nexthops(forwarder, msgbuf_id, nexthops) <= 0) { + ERROR("Error forwarding mMessage %p to next hops", msgbuf); + ret = false; + } + +END: + /* Restore flags & curlen */ + nexthops->flags = flags; + nexthops->cur_elts = cur_len; + + return ret; +} + +#endif /* ! BYPASS_FIB */ + +int _forwarder_forward_upon_interest( + forwarder_t *forwarder, msgbuf_pool_t *msgbuf_pool, off_t data_msgbuf_id, + off_t interest_msgbuf_id, pkt_cache_entry_t *entry, + pkt_cache_verdict_t verdict, bool is_aggregated) { + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, interest_msgbuf_id); + + // - Aggregation can be perfomed, do not forward + if (verdict == PKT_CACHE_VERDICT_AGGREGATE_INTEREST) { + forwarder_drop(forwarder, interest_msgbuf_id); + return (int)msgbuf_get_len(msgbuf); + } + + // - Data packet matching the interest was found, forward reply + if (verdict == PKT_CACHE_VERDICT_FORWARD_DATA) { + assert(forwarder->serve_from_cs == true); + + msgbuf_t *interest_msgbuf = msgbuf_pool_at(msgbuf_pool, interest_msgbuf_id); + msgbuf_t *data_msgbuf = msgbuf_pool_at(msgbuf_pool, data_msgbuf_id); + + msgbuf_reset_pathlabel(data_msgbuf); + forwarder_forward_via_connection(forwarder, data_msgbuf_id, + msgbuf_get_connection_id(interest_msgbuf)); + return (int)msgbuf_get_len(msgbuf); + } + + // - For aggregated interest, the interest forwarding is done in + // `_forwarder_forward_aggregated_interest()` + if (is_aggregated) return (int)msgbuf_get_len(msgbuf); + + // - Try to forward the interest + int rc = + forwarder_forward_via_fib(forwarder, interest_msgbuf_id, verdict, entry); + if (!rc) { + // - Not able to forward, drop the packet + forwarder->stats.countDroppedNoRoute++; + INFO("Message %lu did not match FIB, no route (count %u)", + interest_msgbuf_id, forwarder->stats.countDroppedNoRoute); + + forwarder_drop(forwarder, interest_msgbuf_id); + return -1; + } + + return (int)msgbuf_get_len(msgbuf); +} + +static void _forwarder_update_interest_stats(forwarder_t *forwarder, + pkt_cache_verdict_t verdict, + msgbuf_t *msgbuf, + bool has_expire_ts, + uint64_t expire_ts) { + long expiration = has_expire_ts ? expire_ts : -1; + switch (verdict) { + case PKT_CACHE_VERDICT_FORWARD_INTEREST: + DEBUG("Message added to PIT (expiration=%ld)", expiration); + break; + + case PKT_CACHE_VERDICT_AGGREGATE_INTEREST: + forwarder->stats.countInterestsAggregated++; + DEBUG("Message aggregated in PIT (expiration=%ld)", expiration); + break; + + case PKT_CACHE_VERDICT_RETRANSMIT_INTEREST: + forwarder->stats.countInterestsRetransmitted++; + DEBUG("Message retransmitted (expiration=%ld)", expiration); + break; + + case PKT_CACHE_VERDICT_FORWARD_DATA: + forwarder->stats.countInterestsSatisfiedFromStore++; + DEBUG("Message satisfied from content store (expiration=%ld)", + expiration); + break; + + case PKT_CACHE_VERDICT_INTEREST_EXPIRED_FORWARD_INTEREST: + forwarder->stats.countInterestsExpired++; + DEBUG("Message replaced expired interest (expiration=%ld)", expiration); + break; + + case PKT_CACHE_VERDICT_DATA_EXPIRED_FORWARD_INTEREST: + forwarder->stats.countDataExpired++; + DEBUG("Message replaced expired data (expiration=%ld)", expiration); + break; + + case PKT_CACHE_VERDICT_ERROR: + ERROR("Invalid packet cache content"); + break; + + default: + break; + } +} + +/** + * Return the interest manifest from the interest payload + */ +static int _forwarder_get_interest_manifest( + msgbuf_t *msgbuf, interest_manifest_header_t **int_manifest_header, + size_t *payload_size) { + uint8_t *payload; + + hicn_packet_buffer_t *pkbuf = msgbuf_get_pkbuf(msgbuf); + + hicn_payload_type_t payload_type; + HICN_UNUSED(int rc) = hicn_packet_get_payload_type(pkbuf, &payload_type); + // XXX ASSERT HERE !!! + if (rc != HICN_LIB_ERROR_NONE) return -1; + // assert(rc == HICN_LIB_ERROR_NONE); + + if (payload_type != HPT_MANIFEST) return -1; + + rc = hicn_packet_get_payload(pkbuf, &payload, payload_size, false); + // assert(rc == HICN_LIB_ERROR_NONE); + if (rc != HICN_LIB_ERROR_NONE) return -1; + + *int_manifest_header = (interest_manifest_header_t *)payload; + + return 0; +} + +// Manifest is split using splitting strategy, then every +// sub-manifest is sent using the forwarding strategy defined for the prefix +int _forwarder_forward_aggregated_interest( + forwarder_t *forwarder, interest_manifest_header_t *int_manifest_header, + msgbuf_t *msgbuf, off_t msgbuf_id, pkt_cache_entry_t **entries) { + assert(msgbuf_id_is_valid(msgbuf_id) && + msgbuf_get_type(msgbuf) == HICN_PACKET_TYPE_INTEREST); + + int ret = -1; + + fib_entry_t *fib_entry = fib_match_msgbuf(forwarder->fib, msgbuf); + if (!fib_entry) goto END; + + nexthops_t *nexthops = fib_entry_get_nexthops(fib_entry); + if (nexthops_get_curlen(nexthops) == 0) { + ret = 0; + goto END; + } + + /* Backup flags and cur_len*/ + uint_fast32_t flags = nexthops->flags; + size_t cur_len = nexthops_get_curlen(nexthops); + + size_t n_suffixes_per_split = configuration_get_suffixes_per_split( + forwarder_get_configuration(forwarder)); + int_manifest_split_strategy_t disaggregation_strategy = + configuration_get_split_strategy(forwarder_get_configuration(forwarder)); + switch (disaggregation_strategy) { + case INT_MANIFEST_SPLIT_STRATEGY_NONE: + n_suffixes_per_split = int_manifest_header->n_suffixes + 1; + + case INT_MANIFEST_SPLIT_STRATEGY_MAX_N_SUFFIXES: { + // Generate sub-manifests: same as original manifest, + // but different suffix in the header and different bitmap + + int total_len = 0; + // Suffixes in manifest plus the one in the header + int total_suffixes = int_manifest_header->n_suffixes + 1; + + // Save copy of original bitmap to use as a reference + // to generate bitmaps for sub-manifests + hicn_uword original_bitmap[BITMAP_SIZE] = {0}; + memcpy(&original_bitmap, int_manifest_header->request_bitmap, + BITMAP_SIZE * sizeof(hicn_uword)); + + size_t suffix_index = 0; // Position of suffix in initial manifest + interest_manifest_header_t *manifest = NULL; + size_t payload_size; + while (suffix_index < total_suffixes) { + // If more than one sub-manifest, + // clone original interest manifest and update suffix + if (suffix_index > 0) { + msgbuf_t *clone; + off_t clone_id = + msgbuf_pool_clone(forwarder->msgbuf_pool, &clone, msgbuf_id); + msgbuf_pool_acquire(clone); + forwarder_acquired_msgbuf_ids_push(forwarder, clone_id); + + msgbuf_id = clone_id; + msgbuf = clone; + } + + hicn_uword curr_bitmap[BITMAP_SIZE] = {0}; + size_t first_suffix_index_in_submanifest = suffix_index; + suffix_index = interest_manifest_update_bitmap( + original_bitmap, curr_bitmap, suffix_index, total_suffixes, + n_suffixes_per_split); + size_t first_suffix_index_in_next_submanifest = suffix_index; + + // Update manifest bitmap in current msgbuf + + int ret = + _forwarder_get_interest_manifest(msgbuf, &manifest, &payload_size); + _ASSERT(ret == 0); + memcpy(manifest->request_bitmap, curr_bitmap, + BITMAP_SIZE * sizeof(hicn_uword)); + WITH_TRACE({ + bitmap_print(manifest->request_bitmap, BITMAP_SIZE); + printf("\n"); + }); + + /* + * Update PIT entries for suffixes in current sub-manifest. + * + * Note that strategy lookup affects the nexthops, and we need to + *restore the initial state before every lookup + */ + nexthops->flags = flags; + nexthops->cur_elts = cur_len; + nexthops = + strategy_lookup_nexthops(&fib_entry->strategy, nexthops, msgbuf); + + if (nexthops_get_curlen(nexthops) == 0) { + ERROR("Message %p returned an empty next hop set", msgbuf); + goto RESTORE; + } + + for (size_t i = first_suffix_index_in_submanifest; + i < first_suffix_index_in_next_submanifest; i++) { + if (!bitmap_is_set_no_check(manifest->request_bitmap, i)) continue; + + pit_entry_t *pit_entry = &(entries[i]->u.pit_entry); + if (!pit_entry) goto RESTORE; + + pit_entry_set_fib_entry(pit_entry, fib_entry); + nexthops_foreach(nexthops, nexthop, + { pit_entry_egress_add(pit_entry, nexthop); }); + } + + // Serialize manifest before sending it + interest_manifest_serialize(int_manifest_header); + + if (forwarder_forward_to_nexthops(forwarder, msgbuf_id, nexthops) <= + 0) { + ERROR("Message %p returned an empty next hop set", msgbuf); + continue; + } + + total_len += msgbuf_get_len(msgbuf); + } + + ret = total_len; + goto END; + } + + default: + break; + } + +RESTORE: + /* Restore flags & curlen */ + nexthops->flags = flags; + nexthops->cur_elts = cur_len; + +END: + return ret; +} + +static ssize_t forwarder_process_single_interest(forwarder_t *forwarder, + msgbuf_pool_t *msgbuf_pool, + msgbuf_t *msgbuf, + off_t msgbuf_id) { + pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR; + off_t data_msgbuf_id = INVALID_MSGBUF_ID; + pkt_cache_entry_t *entry = NULL; + + // Update packet cache + pkt_cache_on_interest(forwarder->pkt_cache, msgbuf_pool, msgbuf_id, &verdict, + &data_msgbuf_id, &entry, msgbuf_get_name(msgbuf), + forwarder->serve_from_cs); + + _forwarder_update_interest_stats(forwarder, verdict, msgbuf, + entry->has_expire_ts, entry->expire_ts); + + int rc = _forwarder_forward_upon_interest( + forwarder, msgbuf_pool, data_msgbuf_id, msgbuf_id, entry, verdict, false); + + // No route when trying to forward interest, remove from PIT + if (rc == -1) pkt_cache_pit_remove_entry(forwarder->pkt_cache, entry); + + return msgbuf_get_len(msgbuf); +} + +static ssize_t forwarder_process_aggregated_interest( + forwarder_t *forwarder, interest_manifest_header_t *int_manifest_header, + msgbuf_pool_t *msgbuf_pool, msgbuf_t *msgbuf, off_t msgbuf_id) { + pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR; + off_t data_msgbuf_id = INVALID_MSGBUF_ID; + pkt_cache_entry_t *entry = NULL; + // Save PIT entries to avoid re-doing pkt cache lookup in + // `_forwarder_forward_aggregated_interest()` + pkt_cache_entry_t *entries[BITMAP_SIZE * WORD_WIDTH]; + + int n_suffixes_to_fwd = 0; + + hicn_name_t name_copy = HICN_NAME_EMPTY; + hicn_name_copy(&name_copy, msgbuf_get_name(msgbuf)); + + // Suffixes in interest manifest also contains suffix in main name. We can + // then just iterate the interest manifest and update the suffix in the name + // struct + hicn_name_suffix_t *suffix; + unsigned long pos; + interest_manifest_foreach_suffix(int_manifest_header, suffix, pos) { + // Update name + hicn_name_set_suffix(&name_copy, *suffix); + + // Update packet cache + pkt_cache_on_interest(forwarder->pkt_cache, msgbuf_pool, msgbuf_id, + &verdict, &data_msgbuf_id, &entry, &name_copy, + forwarder->serve_from_cs); + + entries[pos] = entry; + _forwarder_update_interest_stats(forwarder, verdict, msgbuf, + entry->has_expire_ts, entry->expire_ts); + + // Here only data forwarding is performed, interest forwarding is done + // in '_forwarder_forward_aggregated_interest()' + int rc = + _forwarder_forward_upon_interest(forwarder, msgbuf_pool, data_msgbuf_id, + msgbuf_id, entry, verdict, true); + + // No route when trying to forward interest, remove from PIT + if (rc == -1) pkt_cache_pit_remove_entry(forwarder->pkt_cache, entry); + + // Unset in bitmap if no interest forwarding needed, + // otherwise increase count of suffixes to forward + if (rc == -1 || verdict == PKT_CACHE_VERDICT_AGGREGATE_INTEREST || + verdict == PKT_CACHE_VERDICT_FORWARD_DATA) { + bitmap_unset_no_check(int_manifest_header->request_bitmap, pos); + } else { + n_suffixes_to_fwd++; + } + + WITH_DEBUG({ + char buf[MAXSZ_HICN_PREFIX]; + int rc = hicn_name_snprintf(buf, MAXSZ_HICN_NAME, &name_copy); + if (rc < 0 || rc >= MAXSZ_HICN_PREFIX) + snprintf(buf, MAXSZ_HICN_PREFIX, "(error)"); + DEBUG("Next in manifest: %s", buf); + }); + } + + // Return if nothing in the manifest to forward + if (n_suffixes_to_fwd == 0) return msgbuf_get_len(msgbuf); + + return _forwarder_forward_aggregated_interest(forwarder, int_manifest_header, + msgbuf, msgbuf_id, entries); +} + +/** + * @function forwarder_process_interest + * @abstract Receive an interest from the network + * @discussion + * (1) if interest in the PIT, aggregate in PIT + * (2) if interest in the ContentStore, reply + * (3) if in the FIB, forward + * (4) drop + * + */ +static ssize_t forwarder_process_interest(forwarder_t *forwarder, + off_t msgbuf_id) { + assert(forwarder); + assert(msgbuf_id_is_valid(msgbuf_id)); + + msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + const connection_table_t *table = forwarder_get_connection_table(forwarder); + connection_t *conn = + connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf)); + + assert(msgbuf_get_type(msgbuf) == HICN_PACKET_TYPE_INTEREST); + + u32 n_suffixes = 0; + interest_manifest_header_t *int_manifest_header = NULL; + size_t payload_size; + int ret = _forwarder_get_interest_manifest(msgbuf, &int_manifest_header, + &payload_size); + if (ret == 0) { + // Deserialize intrest manifest + interest_manifest_deserialize(int_manifest_header); + + if (!interest_manifest_is_valid(int_manifest_header, payload_size)) + return -1; + + n_suffixes = int_manifest_header->n_suffixes; + } + + // Update stats + forwarder->stats.countInterestsReceived++; + conn->stats.interests.rx_pkts++; + conn->stats.interests.rx_bytes += msgbuf_get_len(msgbuf); + + WITH_DEBUG({ + char buf[MAXSZ_HICN_PREFIX]; + int rc = hicn_name_snprintf(buf, MAXSZ_HICN_NAME, msgbuf_get_name(msgbuf)); + if (rc < 0 || rc >= MAXSZ_HICN_PREFIX) + snprintf(buf, MAXSZ_HICN_PREFIX, "(error)"); + DEBUG("INTEREST (%s) msgbuf_id=%lu ingress=%u length=%u", buf, msgbuf_id, + msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf)); + DEBUG("INTEREST (%s) suffixes=%u msgbuf_id=%lu ingress=%u length=%u", buf, + n_suffixes, msgbuf_id, msgbuf_get_connection_id(msgbuf), + msgbuf_get_len(msgbuf)); + }); + + // Cache suffixes for current prefix to (possibly) avoid double lookups + pkt_cache_save_suffixes_for_prefix( + forwarder->pkt_cache, hicn_name_get_prefix(msgbuf_get_name(msgbuf))); + + if (ret == -1) + return forwarder_process_single_interest(forwarder, msgbuf_pool, msgbuf, + msgbuf_id); + return forwarder_process_aggregated_interest(forwarder, int_manifest_header, + msgbuf_pool, msgbuf, msgbuf_id); +} + +static void _forwarder_log_on_data(forwarder_t *forwarder, + pkt_cache_verdict_t verdict) { + switch (verdict) { + case PKT_CACHE_VERDICT_FORWARD_DATA: + DEBUG("Message added to CS from PIT"); + break; + case PKT_CACHE_VERDICT_STORE_DATA: + DEBUG( + "Message added to CS (expired or no previous interest " + "pending)"); + break; + case PKT_CACHE_VERDICT_CLEAR_DATA: + break; + case PKT_CACHE_VERDICT_UPDATE_DATA: + DEBUG("Message updated in CS"); + break; + case PKT_CACHE_VERDICT_IGNORE_DATA: + DEBUG("Message not stored in CS"); + break; + case PKT_CACHE_VERDICT_ERROR: + ERROR("Invalid packet cache content"); + break; + default: + break; + } +} + +/** + * @function forwarder_process_data + * @abstract Process an in-bound content object + * @discussion + * (1) If it does not match anything in the PIT, drop it + * (2) Add to Content Store + * (3) Reverse path forward via PIT entries + * + * @param <#param1#> + */ +static ssize_t forwarder_process_data(forwarder_t *forwarder, off_t msgbuf_id) { + msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + + WITH_DEBUG({ + char buf[MAXSZ_HICN_PREFIX]; + int rc = hicn_name_snprintf(buf, MAXSZ_HICN_NAME, msgbuf_get_name(msgbuf)); + if (rc < 0 || rc >= MAXSZ_HICN_PREFIX) + snprintf(buf, MAXSZ_HICN_PREFIX, "(error)"); + DEBUG("DATA (%s) msgbuf_id=%lu ingress=%u length=%u", buf, msgbuf_id, + msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf)); + }); + + const connection_table_t *table = forwarder_get_connection_table(forwarder); + connection_t *conn = + connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf)); + + // Update stats + forwarder->stats.countObjectsReceived++; + conn->stats.data.rx_pkts++; + conn->stats.data.rx_bytes += msgbuf_get_len(msgbuf); + + // Cache suffixes for current prefix to (possibly) avoid double lookups + pkt_cache_save_suffixes_for_prefix( + forwarder->pkt_cache, hicn_name_get_prefix(msgbuf_get_name(msgbuf))); + + pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR; + bool wrong_egress; + nexthops_t *ingressSetUnion = pkt_cache_on_data( + forwarder->pkt_cache, msgbuf_pool, msgbuf_id, forwarder->store_in_cs, + connection_is_local(conn), &wrong_egress, &verdict); + + _forwarder_log_on_data(forwarder, verdict); + + if (wrong_egress) { // Interest sent via a connection but received from + // another + WARN("Data coming from unexpected connection, discarded"); + } else if (!ingressSetUnion) { // No match in the PIT + forwarder->stats.countDroppedNoReversePath++; + DEBUG("Message %lu did not match PIT, no reverse path", msgbuf_id); + + // NOTE : probes are not stored in PIT + if (msgbuf_is_probe(msgbuf)) { + fib_entry_t *entry = fib_match_msgbuf(forwarder->fib, msgbuf); + if (entry && fib_entry_strategy_type(entry) == STRATEGY_TYPE_BESTPATH) { + nexthops_t probe_nexthops = NEXTHOPS_EMPTY; + nexthops_add(&probe_nexthops, msgbuf_get_connection_id(msgbuf)); + fib_entry_on_data(entry, &probe_nexthops, msgbuf, 0, ticks_now()); + } + } + forwarder_drop(forwarder, msgbuf_id); + } else { + // Reverse path forward via PIT entries + forwarder_forward_to_nexthops(forwarder, msgbuf_id, ingressSetUnion); + free(ingressSetUnion); + } + + return msgbuf_get_len(msgbuf); +} + +void forwarder_flush_connections(forwarder_t *forwarder) { + // DEBUG("[forwarder_flush_connections]"); + const connection_table_t *table = forwarder_get_connection_table(forwarder); + + unsigned num_pending_conn = (unsigned)vector_len(forwarder->pending_conn); + for (unsigned i = 0; i < num_pending_conn; i++) { + unsigned conn_id = forwarder->pending_conn[i]; + connection_t *conn = connection_table_at(table, conn_id); + if (!connection_flush(conn)) { + WARN("Could not flush connection queue"); + // XXX keep track of non flushed connections... + } + } + vector_reset(forwarder->pending_conn); + // DEBUG("[forwarder_flush_connections] done"); +} + +#if WITH_WLDR +// XXX move to wldr file, worst case in connection. +void forwarder_apply_wldr(const forwarder_t *forwarder, const msgbuf_t *msgbuf, + connection_t *connection) { + // this are the checks needed to implement WLDR. We set wldr only on the + // STAs and we let the AP to react according to choice of the client. if + // the STA enables wldr using the set command, the AP enable wldr as + // well otherwise, if the STA disable it the AP remove wldr WLDR should + // be enabled only on the STAs using the command line + // TODO + // disable WLDR command line on the AP + if (msgbuf_has_wldr(msgbuf)) { + if (connection_has_wldr(connection)) { // case 1: WLDR is enabled - connection_DetectLosses((Connection *)conn, message); - } else if (!connection_HasWldr(conn) && - connection_WldrAutoStartAllowed(conn)) { + connection_wldr_detect_losses(connection, msgbuf); + } else if (!connection_has_wldr(connection) && + connection_wldr_autostart_is_allowed(connection)) { // case 2: We are on an AP. We enable WLDR - connection_EnableWldr((Connection *)conn); - connection_DetectLosses((Connection *)conn, message); + connection_wldr_enable(connection, true); + connection_wldr_detect_losses(connection, msgbuf); } // case 3: Ignore WLDR } else { - if (connection_HasWldr(conn) && connection_WldrAutoStartAllowed(conn)) { + if (connection_has_wldr(connection) && + connection_wldr_autostart_is_allowed(connection)) { // case 1: STA do not use WLDR, we disable it - connection_DisableWldr((Connection *)conn); + connection_wldr_enable(connection, false); } } - - messageProcessor_Receive(forwarder->processor, message); } +#endif -Ticks forwarder_GetTicks(const Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return parcClock_GetTime(forwarder->clock) + forwarder->clockOffset; -} +bool forwarder_add_or_update_route(forwarder_t *forwarder, + hicn_ip_prefix_t *prefix, + unsigned ingress_id) { + assert(forwarder); + assert(prefix); -Ticks forwarder_NanosToTicks(uint64_t nanos) { return NSEC_TO_TICKS(nanos); } + configuration_t *config = forwarder_get_configuration(forwarder); -uint64_t forwarder_TicksToNanos(Ticks ticks) { - return (1000000000ULL) * ticks / HZ; -} + char prefix_s[MAXSZ_IP_PREFIX]; + int rc = hicn_ip_prefix_snprintf(prefix_s, MAXSZ_IP_PREFIX, prefix); + assert(rc < MAXSZ_IP_PREFIX); + if (rc < 0) return false; + + INFO("Adding prefix=%s for conn_id=%d", prefix_s, ingress_id); -bool forwarder_AddOrUpdateRoute(Forwarder *forwarder, - add_route_command *control, unsigned ifidx) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(control, "Parameter route must be non-null"); + strategy_type_t strategy_type = configuration_get_strategy(config, prefix_s); - // we only have one message processor - bool res = - messageProcessor_AddOrUpdateRoute(forwarder->processor, control, ifidx); + hicn_prefix_t name_prefix = HICN_PREFIX_EMPTY; + hicn_prefix_create_from_ip_address_len(&prefix->address, prefix->len, + &name_prefix); + fib_entry_t *entry = fib_contains(forwarder->fib, &name_prefix); + if (!entry) { + entry = fib_entry_create(&name_prefix, strategy_type, NULL, forwarder); + if (ingress_id != INVALID_FACE_ID) + fib_entry_nexthops_add(entry, ingress_id); + entry = fib_add(forwarder->fib, entry); - return res; + } else { + if (ingress_id != INVALID_FACE_ID) + fib_entry_nexthops_add(entry, ingress_id); + } + + forwarder_on_route_event(forwarder, entry); + + return true; } +bool forwarder_remove_route(forwarder_t *forwarder, hicn_ip_prefix_t *prefix, + unsigned ingress_id) { + assert(forwarder); + assert(prefix); -bool forwarder_RemoveRoute(Forwarder *forwarder, remove_route_command *control, - unsigned ifidx) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(control, "Parameter route must be non-null"); + hicn_prefix_t name_prefix = HICN_PREFIX_EMPTY; + hicn_prefix_create_from_ip_address_len(&prefix->address, prefix->len, + &name_prefix); + fib_remove(forwarder->fib, &name_prefix, ingress_id); - // we only have one message processor - return messageProcessor_RemoveRoute(forwarder->processor, control, ifidx); + return true; } #ifdef WITH_POLICY -bool forwarder_AddOrUpdatePolicy(Forwarder *forwarder, - add_policy_command *control) { - parcAssertNotNull(forwarder, "Parameter forwarder must be non-null"); - parcAssertNotNull(control, "Parameter control must be non-null"); +bool forwarder_add_or_update_policy(forwarder_t *forwarder, + hicn_ip_prefix_t *prefix, + hicn_policy_t *policy) { + assert(forwarder); + assert(prefix); + assert(policy); + + hicn_prefix_t name_prefix = HICN_PREFIX_EMPTY; + hicn_prefix_create_from_ip_address_len(&prefix->address, prefix->len, + &name_prefix); + fib_entry_t *entry = fib_contains(forwarder->fib, &name_prefix); + if (!entry) return false; - return messageProcessor_AddOrUpdatePolicy(forwarder->processor, control); + fib_entry_set_policy(entry, *policy); + + return true; } -bool forwarder_RemovePolicy(Forwarder *forwarder, remove_policy_command *control) { - parcAssertNotNull(forwarder, "Parameter forwarder must be non-null"); - parcAssertNotNull(control, "Parameter control must be non-null"); +bool forwarder_remove_policy(forwarder_t *forwarder, hicn_ip_prefix_t *prefix) { + assert(forwarder); + assert(prefix); + + hicn_prefix_t name_prefix = HICN_PREFIX_EMPTY; + hicn_prefix_create_from_ip_address_len(&prefix->address, prefix->len, + &name_prefix); + fib_entry_t *entry = fib_contains(forwarder->fib, &name_prefix); + if (!entry) return false; - return messageProcessor_RemovePolicy(forwarder->processor, control); + fib_entry_set_policy(entry, POLICY_EMPTY); + + return true; } #endif /* WITH_POLICY */ -void forwarder_RemoveConnectionIdFromRoutes(Forwarder *forwarder, - unsigned connectionId) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - messageProcessor_RemoveConnectionIdFromRoutes(forwarder->processor, - connectionId); +void forwarder_remove_connection_id_from_routes(const forwarder_t *forwarder, + unsigned connection_id) { + fib_entry_t **removed_entries = NULL; + size_t num_removed_entries; + + assert(forwarder); + + fib_remove_connection(forwarder->fib, connection_id, &removed_entries, + &num_removed_entries); + + if (num_removed_entries > 0) { + assert(removed_entries); + + for (int i = 0; i < num_removed_entries; i++) { + fib_entry_t *entry = removed_entries[i]; + forwarder_on_route_event(forwarder, entry); + fib_remove_entry(forwarder->fib, entry); + } + free(removed_entries); + } } -void forwarder_SetStrategy(Forwarder *forwarder, Name *prefix, - strategy_type strategy, - unsigned related_prefixes_len, - Name **related_prefixes) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(prefix, "Parameter prefix must be non-null"); +void forwarder_add_strategy_options(forwarder_t *forwarder, + hicn_prefix_t *name_prefix, + strategy_type_t strategy_type, + strategy_options_t *strategy_options) { + assert(forwarder); + assert(name_prefix); + assert(strategy_options); + assert(STRATEGY_TYPE_VALID(strategy_type)); - processor_SetStrategy(forwarder->processor, prefix, strategy, - related_prefixes_len, related_prefixes); + fib_entry_t *entry = fib_contains(forwarder->fib, name_prefix); + if (!entry) return; + + fib_entry_add_strategy_options(entry, strategy_type, strategy_options); } -FibEntryList *forwarder_GetFibEntries(Forwarder *forwarder) { - return messageProcessor_GetFibEntries(forwarder->processor); +void forwarder_set_strategy(forwarder_t *forwarder, hicn_prefix_t *prefix, + strategy_type_t strategy_type, + strategy_options_t *strategy_options) { + assert(forwarder); + assert(prefix); + assert(STRATEGY_TYPE_VALID(strategy_type)); + /* strategy_options might be NULL */ + + fib_entry_t *entry = fib_contains(forwarder->fib, prefix); + if (!entry) { + // there is no exact match. so if the forwarding strategy is not in + // the list of strategies that can be set by the transport, return + if (strategy_type != STRATEGY_TYPE_BESTPATH && + strategy_type != STRATEGY_TYPE_REPLICATION) { + return; + } + + // here it may be the transprot that wants to set the strategy, but it has + // no knowledge of the length of the prefix. so we apply the strategy at the + // matching fib entry, which later will be the one that will be used to send + // interests with this name + entry = fib_match_prefix(forwarder->fib, prefix); + if (!entry) { + return; // no fib match, return + } + } + + fib_entry_set_strategy(entry, strategy_type, strategy_options); } -void forwarder_SetContentObjectStoreSize(Forwarder *forwarder, - size_t maximumContentStoreSize) { - messageProcessor_SetContentObjectStoreSize(forwarder->processor, - maximumContentStoreSize); +cs_t *forwarder_get_cs(const forwarder_t *forwarder) { + assert(forwarder); + + return pkt_cache_get_cs(forwarder->pkt_cache); } -void forwarder_ClearCache(Forwarder *forwarder) { - messageProcessor_ClearCache(forwarder->processor); +// IMPORTANT: Use this function ONLY for read-only operations since a +// realloc would otherwise modify the returned copy but not the original +// msgbuf ids vector in the forwarder. This constraint cannot be enforced +// by returning a (const off_t *) because the vector_t macros still cast +// to (void **). +off_t *forwarder_get_acquired_msgbuf_ids(const forwarder_t *forwarder) { + return forwarder->acquired_msgbuf_ids; } -PARCClock *forwarder_GetClock(const Forwarder *forwarder) { - return forwarder->clock; +void forwarder_acquired_msgbuf_ids_reset(const forwarder_t *forwarder) { + vector_reset(forwarder->acquired_msgbuf_ids); } -#if !defined(__APPLE__) -hicn_socket_helper_t *forwarder_GetHicnSocketHelper(Forwarder *forwarder) { - return forwarder->hicnSocketHelper; +void forwarder_acquired_msgbuf_ids_push(const forwarder_t *forwarder, + off_t msgbuf_id) { + vector_push(forwarder->acquired_msgbuf_ids, msgbuf_id); } -#endif // ======================================================= -static void _signal_cb(int sig, PARCEventType events, void *user_data) { - Forwarder *forwarder = (Forwarder *)user_data; +fib_t *forwarder_get_fib(const forwarder_t *forwarder) { + return forwarder->fib; +} + +msgbuf_pool_t *forwarder_get_msgbuf_pool(const forwarder_t *forwarder) { + return forwarder->msgbuf_pool; +} + +mapme_t *forwarder_get_mapme(const forwarder_t *forwarder) { + return forwarder->mapme; +} + +#ifdef WITH_POLICY_STATS +const policy_stats_mgr_t *forwarder_get_policy_stats_mgr( + const forwarder_t *forwarder) { + return &forwarder->policy_stats_mgr; +} +#endif /* WITH_POLICY_STATS */ + +/** + * @brief Finalize (i.e. close fd and free internal data structures) + * the current connection ("SELF") when the command is received. + * The connection cannot be removed inside the command handling + * because it is needed to return the ack back. + */ +static void _forwarder_finalize_connection_if_self(connection_t *conn, + msgbuf_t *msgbuf) { + uint8_t *packet = msgbuf_get_packet(msgbuf); + msg_connection_remove_t *msg = (msg_connection_remove_t *)packet; + cmd_connection_remove_t *control = &msg->payload; + + if (strcmp(control->symbolic_or_connid, "SELF") == 0) + connection_finalize(conn); +} - logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning, - __func__, "signal %d events %d", sig, events); +ssize_t forwarder_receive(forwarder_t *forwarder, listener_t *listener, + off_t msgbuf_id, address_pair_t *pair, Ticks now) { + assert(forwarder); + /* listener can be NULL */ + assert(msgbuf_id_is_valid(msgbuf_id)); + assert(pair); + + hicn_name_t name; + msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + assert(msgbuf); + + size_t size = msgbuf_get_len(msgbuf); + + const connection_table_t *table = + forwarder_get_connection_table(listener->forwarder); + + /* Connection lookup */ + if (msgbuf_get_connection_id(msgbuf) == CONNECTION_ID_UNDEFINED) { + connection_t *connection = connection_table_get_by_pair(table, pair); + unsigned conn_id = + connection + ? (unsigned)connection_table_get_connection_id(table, connection) + : CONNECTION_ID_UNDEFINED; + + assert((conn_id != CONNECTION_ID_UNDEFINED) || listener); + msgbuf->connection_id = conn_id; + } - switch ((int)sig) { - case SIGTERM: - logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning, - __func__, "Caught an terminate signal; exiting cleanly."); - dispatcher_Stop(forwarder->dispatcher); + forwarder->stats.countReceived++; + + /* Initialize packet buffer stored in msgbuf through libhicn */ + msgbuf_initialize_from_packet(msgbuf); + + /* Detect packet type */ + hicn_packet_analyze(msgbuf_get_pkbuf(msgbuf)); + + msgbuf->recv_ts = now; + +RETRY: + + switch (msgbuf_get_type(msgbuf)) { + case HICN_PACKET_TYPE_INTEREST: + if (!connection_id_is_valid(msgbuf->connection_id)) { + char conn_name[SYMBOLIC_NAME_LEN]; + int rc = connection_table_get_random_name(table, conn_name); + if (rc < 0) { + ERROR("Could not create name for new connection"); + goto DROP; + } + + unsigned connection_id = + listener_create_connection(listener, conn_name, pair); + if (connection_id == CONNECTION_ID_UNDEFINED) { + ERROR("Could not create new connection"); + goto DROP; + } + msgbuf->connection_id = connection_id; + } + msgbuf->path_label = 0; // not used for interest packets + hicn_interest_get_name(msgbuf_get_pkbuf(msgbuf), &name); + msgbuf_set_name(msgbuf, &name); +#ifdef WITH_WLDR + forwarder_apply_wldr(forwarder, msgbuf, connection); +#endif /* WITH_WLDR */ + forwarder_process_interest(forwarder, msgbuf_id); + + pkt_cache_log(forwarder->pkt_cache); break; - case SIGINT: - logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning, - __func__, "Caught an interrupt signal; exiting cleanly."); - dispatcher_Stop(forwarder->dispatcher); + case HICN_PACKET_TYPE_DATA: + /* This include probes */ + if (!connection_id_is_valid(msgbuf->connection_id)) { + ERROR("Invalid connection for data packet"); + goto DROP; + } + msgbuf_init_pathlabel(msgbuf); + hicn_data_get_name(msgbuf_get_pkbuf(msgbuf), &name); + msgbuf_set_name(msgbuf, &name); +#ifdef WITH_WLDR + forwarder_apply_wldr(forwarder, msgbuf, connection); +#endif /* WITH_WLDR */ + forwarder_process_data(forwarder, msgbuf_id); + + pkt_cache_log(forwarder->pkt_cache); break; -#ifndef _WIN32 - case SIGUSR1: - // dump stats + +#ifdef WITH_WLDR + case HICN_PACKET_TYPE_WLDR_NOTIFICATION: + if (!connection_id_is_valid(msgbuf->connection_id)) { + ERROR("Invalid connection for WLDR packet"); + goto DROP; + } + connection_t *connection = + connection_table_get_by_id(table, msgbuf->connection_id); + connection_wldr_handle_notification(connection, msgbuf); break; #endif - default: + case HICN_PACKET_TYPE_MAPME: + INFO("Received MAP-Me packet"); + if (!connection_id_is_valid(msgbuf->connection_id)) { + char conn_name[SYMBOLIC_NAME_LEN]; + int rc = connection_table_get_random_name(table, conn_name); + if (rc < 0) { + ERROR("Could not create name for new connection"); + goto DROP; + } + + unsigned connection_id = + listener_create_connection(listener, conn_name, pair); + if (connection_id == CONNECTION_ID_UNDEFINED) { + ERROR("Could not create new connection"); + goto DROP; + } + INFO("Created connection upon MAP-Me packet"); + msgbuf->connection_id = connection_id; + } + mapme_process(forwarder->mapme, msgbuf); break; - } -} -static void _keepalive_cb(int fd, PARCEventType what, void *user_data) { - parcAssertTrue(what & PARCEventType_Timeout, "Got unexpected tick_cb: %d", - what); - // function is just a keepalive for hicn-light, does not do anything -} + case HICN_PACKET_TYPE_COMMAND: + // Create the connection to send the ack back + if (!connection_id_is_valid(msgbuf->connection_id)) { + char conn_name[SYMBOLIC_NAME_LEN]; + int rc = connection_table_get_random_name(table, conn_name); + if (rc < 0) { + ERROR("Could not create name for new connection"); + goto DROP; + } + + unsigned connection_id = + listener_create_connection(listener, conn_name, pair); + if (connection_id == CONNECTION_ID_UNDEFINED) { + ERROR("Could not create new connection"); + goto DROP; + } + msgbuf->connection_id = connection_id; + } + + msg_header_t *msg = (msg_header_t *)msgbuf_get_packet(msgbuf); + msgbuf->command.type = msg->header.command_id; + if (!command_type_is_valid(msgbuf->command.type)) { + ERROR("Invalid command %d", msgbuf->command.type); + goto DROP; + } + + /* + * We need to retrieve the connection (in case it is useful) before + * proceeding through the removal + */ + connection_t *connection = + connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf)); + size = command_process_msgbuf(forwarder, msgbuf); + if (msgbuf->command.type == COMMAND_TYPE_CONNECTION_REMOVE) + _forwarder_finalize_connection_if_self(connection, msgbuf); + return size; -#ifdef WITH_MAPME -FIB *forwarder_getFib(Forwarder *forwarder) { - return messageProcessor_getFib(forwarder->processor); -} + default: + /* Commands are not recognized by the packet parser */ + if (msgbuf_is_command(msgbuf)) { + msgbuf_set_type(msgbuf, HICN_PACKET_TYPE_COMMAND); + goto RETRY; + } + goto DROP; + } -void forwarder_onConnectionEvent(Forwarder *forwarder, const Connection *conn, connection_event_t event) { -//#ifdef WITH_POLICY -// messageProcessor_onConnectionEvent(forwarder->processor, conn, event); -//#else - mapme_onConnectionEvent(forwarder->mapme, conn, event); -//#endif /* WITH_POLICY */ -} + return size; -void forwarder_ProcessMapMe(Forwarder *forwarder, const uint8_t *msgBuffer, - unsigned conn_id) { - mapme_Process(forwarder->mapme, msgBuffer, conn_id); +DROP: + forwarder_drop(forwarder, msgbuf_id); + return 0; } -MapMe * -forwarder_getMapmeInstance(const Forwarder *forwarder) { - return forwarder->mapme; +void forwarder_log(forwarder_t *forwarder) { + DEBUG( + "Forwarder: received = %u (interest = %u, data = %u), dropped = %u " + "(interest = %u, data = %u, other = %u), forwarded = { interests = " + "%u, " + "data = %u }, dropped = { connection_not_found = %u, send_failure " + "= " + "%u, " + "no_route_in_fib = %u }, interest processing = { aggregated = %u, " + "retransmitted = %u, satisfied_from_cs = %u, expired_interests = " + "%u, " + "expired_data = %u }, data processing = { " + "no_reverse_path = %u }\n", + forwarder->stats.countReceived, forwarder->stats.countInterestsReceived, + forwarder->stats.countObjectsReceived, forwarder->stats.countDropped, + forwarder->stats.countInterestsDropped, + forwarder->stats.countObjectsDropped, forwarder->stats.countOtherDropped, + forwarder->stats.countInterestForwarded, + forwarder->stats.countObjectsForwarded, + forwarder->stats.countDroppedConnectionNotFound, + forwarder->stats.countSendFailures, forwarder->stats.countDroppedNoRoute, + forwarder->stats.countInterestsAggregated, + forwarder->stats.countInterestsRetransmitted, + forwarder->stats.countInterestsSatisfiedFromStore, + forwarder->stats.countInterestsExpired, forwarder->stats.countDataExpired, + forwarder->stats.countDroppedNoReversePath); } -#endif /* WITH_MAPME */ +forwarder_stats_t forwarder_get_stats(forwarder_t *forwarder) { + return forwarder->stats; +} |