diff options
Diffstat (limited to 'hicn-light/src/hicn/core/forwarder.c')
-rw-r--r-- | hicn-light/src/hicn/core/forwarder.c | 1334 |
1 files changed, 949 insertions, 385 deletions
diff --git a/hicn-light/src/hicn/core/forwarder.c b/hicn-light/src/hicn/core/forwarder.c index 94e8cc885..37502f3ac 100644 --- a/hicn-light/src/hicn/core/forwarder.c +++ b/hicn-light/src/hicn/core/forwarder.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -23,6 +23,15 @@ * the event scheduler */ +/* Bypass FIB and send packet one by one */ +//#define BYPASS_FIB 1 + +/* Send packets one by one : only effective if FIB is not bypassed */ +//#define USE_SEND_PACKET 1 + +/* Batch sending: only if the previous option is undefined */ +#define USE_QUEUE true + #ifndef _WIN32 #include <arpa/inet.h> #include <sys/socket.h> @@ -30,8 +39,7 @@ #endif #include <errno.h> #include <fcntl.h> -#include <signal.h> -#include <hicn/hicn-light/config.h> +//#include <hicn/hicn-light/config.h> #include <stdarg.h> #include <stdio.h> #include <stdlib.h> @@ -41,90 +49,108 @@ #define __STDC_FORMAT_MACROS #include <inttypes.h> -#include <parc/algol/parc_ArrayList.h> -#include <parc/algol/parc_Memory.h> -#include <parc/algol/parc_Object.h> -#include <parc/logging/parc_LogReporterTextStdout.h> - -#include <hicn/core/connectionManager.h> -#include <hicn/core/connectionTable.h> -#include <hicn/core/dispatcher.h> -#include <hicn/core/forwarder.h> -#include <hicn/core/messagePacketType.h> +#include "connection_table.h" +#include "fib.h" +#include "forwarder.h" +#include "listener_table.h" #ifdef WITH_MAPME -#include <hicn/core/mapme.h> +#include "mapme.h" #endif /* WITH_MAPME */ -#include <hicn/config/configuration.h> -#include <hicn/config/configurationFile.h> -#include <hicn/config/configurationListeners.h> -#include <hicn/processor/messageProcessor.h> +#include "msgbuf.h" +#include "msgbuf_pool.h" +#include "packet_cache.h" +#include "../config/configuration.h" +// #include "../config/configuration_file.h" +#include "../config/commands.h" +#include "../io/base.h" // MAX_MSG + +#ifdef WITH_POLICY_STATS +#include <hicn/core/policy_stats.h> +#endif /* WITH_POLICY_STATS */ #include <hicn/core/wldr.h> +#include <hicn/util/log.h> + +typedef struct { + // Packets processed + uint32_t countReceived; // Interest and data only + uint32_t countInterestsReceived; + uint32_t countObjectsReceived; + + // Packets Dropped + uint32_t countDropped; + uint32_t countInterestsDropped; + uint32_t countObjectsDropped; + uint32_t countOtherDropped; + + // Forwarding + uint32_t countInterestForwarded; + uint32_t countObjectsForwarded; + + // Errors while forwarding + uint32_t countDroppedConnectionNotFound; + uint32_t countSendFailures; + uint32_t countDroppedNoRoute; + + // Interest processing + uint32_t countInterestsAggregated; + uint32_t countInterestsRetransmitted; + uint32_t countInterestsSatisfiedFromStore; + uint32_t countInterestsExpired; + uint32_t countDataExpired; + + // Data processing + uint32_t countDroppedNoReversePath; + + // TODO(eloparco): Currently not used + // uint32_t countDroppedNoHopLimit; + // uint32_t countDroppedZeroHopLimitFromRemote; + // uint32_t countDroppedZeroHopLimitToRemote; +} forwarder_stats_t; + +struct forwarder_s { + // uint16_t server_port; -#include <parc/assert/parc_Assert.h> - -// the router's clock frequency (we now use the monotonic clock) -#define HZ 1000 - -// these will all be a little off because its all integer division -#define MSEC_PER_TICK (1000 / HZ) -#define USEC_PER_TICK (1000000 / HZ) -#define NSEC_PER_TICK ((1000000000ULL) / HZ) -#define MSEC_TO_TICKS(msec) \ - ((msec < FC_MSEC_PER_TICK) ? 1 : msec / FC_MSEC_PER_TICK) -#define NSEC_TO_TICKS(nsec) ((nsec < NSEC_PER_TICK) ? 1 : nsec / NSEC_PER_TICK) - -struct forwarder { - Dispatcher *dispatcher; - - uint16_t server_port; + // used by seed48 and nrand48 + unsigned short seed[3]; - PARCEventSignal *signal_int; - PARCEventSignal *signal_term; -#ifndef _WIN32 - PARCEventSignal *signal_usr1; -#endif - PARCEventTimer *keepalive_event; + connection_table_t *connection_table; + listener_table_t *listener_table; + configuration_t *config; - // will skew the virtual clock forward. In normal operaiton, it is 0. - Ticks clockOffset; + pkt_cache_t *pkt_cache; + fib_t *fib; + msgbuf_pool_t *msgbuf_pool; - unsigned nextConnectionid; - Messenger *messenger; - ConnectionManager *connectionManager; - ConnectionTable *connectionTable; - ListenerSet *listenerSet; - Configuration *config; +#ifdef WITH_MAPME + mapme_t *mapme; +#endif /* WITH_MAPME */ - // we'll eventually want to setup a threadpool of these - MessageProcessor *processor; + bool store_in_cs; + bool serve_from_cs; - Logger *logger; + forwarder_stats_t stats; +#ifdef WITH_POLICY_STATS + policy_stats_mgr_t policy_stats_mgr; +#endif /* WITH_POLICY_STATS */ - PARCClock *clock; + /* + * The message forwarder has to decide whether to queue incoming packets for + * batching, or trigger the transmission on the connection + */ + unsigned pending_conn[MAX_MSG]; + size_t num_pending_conn; -#if !defined(__APPLE__) - hicn_socket_helper_t - *hicnSocketHelper; // state required to manage hicn connections -#endif - // used by seed48 and nrand48 - unsigned short seed[3]; + // msgbuf_t msgbuf; /* Storage for msgbuf, which are currently processed 1 by + // 1 */ -#ifdef WITH_MAPME - MapMe *mapme; -#endif /* WITH_MAPME */ + subscription_table_t *subscriptions; }; -// signal traps through the event scheduler -static void _signal_cb(int, PARCEventType, void *); - -// A no-op keepalive to prevent Libevent from exiting the dispatch loop -static void _keepalive_cb(int, PARCEventType, void *); - /** * Reseed our pseudo-random number generator. */ -static void forwarder_Seed(Forwarder *forwarder) { +static void forwarder_seed(forwarder_t *forwarder) { #ifndef _WIN32 int fd; ssize_t res; @@ -150,452 +176,990 @@ static void forwarder_Seed(Forwarder *forwarder) { #endif } -Logger *forwarder_GetLogger(const Forwarder *forwarder) { - return forwarder->logger; -} +forwarder_t *forwarder_create(configuration_t *configuration) { + forwarder_t *forwarder = malloc(sizeof(forwarder_t)); + if (!forwarder) goto ERR_MALLOC; -// ============================================================================ -// Setup and destroy section + forwarder_seed(forwarder); + srand(forwarder->seed[0] ^ forwarder->seed[1] ^ forwarder->seed[2]); -Forwarder *forwarder_Create(Logger *logger) { - Forwarder *forwarder = parcMemory_AllocateAndClear(sizeof(Forwarder)); - parcAssertNotNull(forwarder, "parcMemory_AllocateAndClear(%zu) returned NULL", - sizeof(Forwarder)); - memset(forwarder, 0, sizeof(Forwarder)); - forwarder_Seed(forwarder); + forwarder->config = configuration; - forwarder->clock = parcClock_Monotonic(); - forwarder->clockOffset = 0; + forwarder->listener_table = listener_table_create(); + if (!forwarder->listener_table) goto ERR_LISTENER_TABLE; - if (logger) { - forwarder->logger = logger_Acquire(logger); - logger_SetClock(forwarder->logger, forwarder->clock); - } else { - PARCLogReporter *reporter = parcLogReporterTextStdout_Create(); - forwarder->logger = logger_Create(reporter, forwarder->clock); - parcLogReporter_Release(&reporter); - } + forwarder->connection_table = connection_table_create(); + if (!forwarder->connection_table) goto ERR_CONNECTION_TABLE; - forwarder->nextConnectionid = 1; - forwarder->dispatcher = dispatcher_Create(forwarder->logger); - forwarder->messenger = messenger_Create(forwarder->dispatcher); - forwarder->connectionManager = connectionManager_Create(forwarder); - forwarder->connectionTable = connectionTable_Create(); - forwarder->listenerSet = listenerSet_Create(); - forwarder->config = configuration_Create(forwarder); - forwarder->processor = messageProcessor_Create(forwarder); - - forwarder->signal_term = dispatcher_CreateSignalEvent( - forwarder->dispatcher, _signal_cb, forwarder, SIGTERM); - dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_term); - - forwarder->signal_int = dispatcher_CreateSignalEvent( - forwarder->dispatcher, _signal_cb, forwarder, SIGINT); - dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_int); -#ifndef _WIN32 - forwarder->signal_usr1 = dispatcher_CreateSignalEvent( - forwarder->dispatcher, _signal_cb, forwarder, SIGPIPE); - dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_usr1); -#endif + forwarder->fib = fib_create(forwarder); + if (!forwarder->fib) goto ERR_FIB; -#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \ - defined(PUNTING) - forwarder->hicnSocketHelper = hicn_create(); - if (!forwarder->hicnSocketHelper) - goto ERR_SOCKET; -#endif /* __APPLE__ */ + forwarder->msgbuf_pool = msgbuf_pool_create(); + if (!forwarder->msgbuf_pool) goto ERR_PACKET_POOL; -#ifdef WITH_MAPME - if (!(mapme_create(&forwarder->mapme, forwarder))) - goto ERR_MAPME; -#endif /* WITH_MAPME */ + size_t objectStoreSize = configuration_get_cs_size(configuration); + forwarder->pkt_cache = pkt_cache_create(objectStoreSize); + if (!forwarder->pkt_cache) goto ERR_PKT_CACHE; + forwarder->subscriptions = subscription_table_create(); + if (!forwarder->subscriptions) goto ERR_SUBSCRIPTION; - /* ignore child */ -#ifndef _WIN32 - signal(SIGCHLD, SIG_IGN); - - /* ignore tty signals */ - signal(SIGTSTP, SIG_IGN); - signal(SIGTTOU, SIG_IGN); - signal(SIGTTIN, SIG_IGN); -#endif + // the two flags for the cs are set to true by default. If the cs + // is active it always work as expected unless the use modifies this + // values using controller + if (objectStoreSize != 0) { + forwarder->store_in_cs = true; + forwarder->serve_from_cs = true; + } - // We no longer use this for ticks, but we need to have at least one event - // schedule to keep Libevent happy. +#ifdef WITH_MAPME + forwarder->mapme = mapme_create(forwarder); + if (!forwarder->mapme) goto ERR_MAPME; +#endif /* WITH_MAPME */ - struct timeval wtnow_timeout; - timerclear(&wtnow_timeout); +#ifdef WITH_POLICY_STATS + if (policy_stats_mgr_initialize(&forwarder->policy_stats_mgr, forwarder) < 0) + goto ERR_MGR; +#endif /* WITH_POLICY_STATS */ - wtnow_timeout.tv_sec = 0; - wtnow_timeout.tv_usec = 50000; // 20 Hz keepalive + memset(&forwarder->stats, 0, sizeof(forwarder_stats_t)); - PARCEventScheduler *base = - dispatcher_GetEventScheduler(forwarder->dispatcher); - forwarder->keepalive_event = parcEventTimer_Create( - base, PARCEventType_Persist, _keepalive_cb, (void *)forwarder); - parcEventTimer_Start(forwarder->keepalive_event, &wtnow_timeout); + forwarder->num_pending_conn = 0; return forwarder; +ERR_MGR: #ifdef WITH_MAPME ERR_MAPME: #endif /* WITH_MAPME */ -#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \ - defined(PUNTING) - hicn_free(forwarder->hicnSocketHelper); -ERR_SOCKET: -#endif - listenerSet_Destroy(&(forwarder->listenerSet)); - connectionManager_Destroy(&(forwarder->connectionManager)); - connectionTable_Destroy(&(forwarder->connectionTable)); - messageProcessor_Destroy(&(forwarder->processor)); - configuration_Destroy(&(forwarder->config)); - messenger_Destroy(&(forwarder->messenger)); - - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_int)); - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_term)); -#ifndef _WIN32 - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_usr1)); -#endif - - parcClock_Release(&forwarder->clock); - logger_Release(&forwarder->logger); - // do the dispatcher last - dispatcher_Destroy(&(forwarder->dispatcher)); - - parcMemory_Deallocate((void **)&forwarder); +ERR_SUBSCRIPTION: + subscription_table_free(forwarder->subscriptions); +ERR_PKT_CACHE: + pkt_cache_free(forwarder->pkt_cache); + + msgbuf_pool_free(forwarder->msgbuf_pool); +ERR_PACKET_POOL: + fib_free(forwarder->fib); +ERR_FIB: + connection_table_free(forwarder->connection_table); +ERR_CONNECTION_TABLE: + listener_table_free(forwarder->listener_table); +ERR_LISTENER_TABLE: + free(forwarder); +ERR_MALLOC: return NULL; } -void forwarder_Destroy(Forwarder **ptr) { - parcAssertNotNull(ptr, "Parameter must be non-null double pointer"); - parcAssertNotNull(*ptr, "Parameter must dereference to non-null pointer"); - Forwarder *forwarder = *ptr; -#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \ - defined(PUNTING) - hicn_free(forwarder->hicnSocketHelper); -#endif - parcEventTimer_Destroy(&(forwarder->keepalive_event)); +void forwarder_free(forwarder_t *forwarder) { + assert(forwarder); - listenerSet_Destroy(&(forwarder->listenerSet)); - connectionManager_Destroy(&(forwarder->connectionManager)); - connectionTable_Destroy(&(forwarder->connectionTable)); - messageProcessor_Destroy(&(forwarder->processor)); - configuration_Destroy(&(forwarder->config)); - - // the messenger is used by many of the other pieces, so destroy it last - messenger_Destroy(&(forwarder->messenger)); + policy_stats_mgr_finalize(&forwarder->policy_stats_mgr); #ifdef WITH_MAPME mapme_free(forwarder->mapme); #endif /* WITH_MAPME */ - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_int)); - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_term)); -#ifndef _WIN32 - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_usr1)); -#endif + pkt_cache_free(forwarder->pkt_cache); + msgbuf_pool_free(forwarder->msgbuf_pool); + fib_free(forwarder->fib); + connection_table_free(forwarder->connection_table); + listener_table_free(forwarder->listener_table); + subscription_table_free(forwarder->subscriptions); + configuration_free(forwarder->config); + free(forwarder); +} - parcClock_Release(&forwarder->clock); - logger_Release(&forwarder->logger); +void forwarder_setup_local_listeners(forwarder_t *forwarder, uint16_t port) { + assert(forwarder); + listener_setup_local(forwarder, port); +} - // do the dispatcher last - dispatcher_Destroy(&(forwarder->dispatcher)); +configuration_t *forwarder_get_configuration(forwarder_t *forwarder) { + assert(forwarder); + return forwarder->config; +} - parcMemory_Deallocate((void **)&forwarder); - *ptr = NULL; +subscription_table_t *forwarder_get_subscriptions(forwarder_t *forwarder) { + return forwarder->subscriptions; } -void forwarder_SetupAllListeners(Forwarder *forwarder, uint16_t port, - const char *localPath) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); +connection_table_t *forwarder_get_connection_table( + const forwarder_t *forwarder) { + assert(forwarder); + return forwarder->connection_table; +} - configurationListeners_SetupAll(forwarder->config, port, localPath); +listener_table_t *forwarder_get_listener_table(forwarder_t *forwarder) { + assert(forwarder); + return forwarder->listener_table; } -void forwarder_SetupLocalListeners(Forwarder *forwarder, uint16_t port) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - configurationListeners_SetutpLocalIPv4(forwarder->config, port); +void forwarder_cs_set_store(forwarder_t *forwarder, bool val) { + assert(forwarder); + forwarder->store_in_cs = val; } -void forwarder_SetupFromConfigFile(Forwarder *forwarder, const char *filename) { - ConfigurationFile *configFile = configurationFile_Create(forwarder, filename); - if (configFile) { - configurationFile_Process(configFile); - configurationFile_Release(&configFile); - } +bool forwarder_cs_get_store(forwarder_t *forwarder) { + assert(forwarder); + return forwarder->store_in_cs; } -Configuration *forwarder_GetConfiguration(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->config; +void forwarder_cs_set_serve(forwarder_t *forwarder, bool val) { + assert(forwarder); + forwarder->serve_from_cs = val; } -// ============================================================================ +bool forwarder_cs_get_serve(forwarder_t *forwarder) { + assert(forwarder); + return forwarder->serve_from_cs; +} + +void forwarder_cs_set_size(forwarder_t *forwarder, size_t size) { + assert(forwarder); -unsigned forwarder_GetNextConnectionId(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->nextConnectionid++; + if (pkt_cache_set_cs_size(forwarder->pkt_cache, size) < 0) { + ERROR( + "Unable to resize the CS: provided maximum size (%u) is smaller than " + "the number of elements currently stored in the CS (%u). Clear the CS " + "and retry.", + size, pkt_cache_get_cs_size(forwarder->pkt_cache)); + } } -Messenger *forwarder_GetMessenger(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->messenger; +size_t forwarder_cs_get_size(forwarder_t *forwarder) { + assert(forwarder); + return pkt_cache_get_cs_size(forwarder->pkt_cache); } -Dispatcher *forwarder_GetDispatcher(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->dispatcher; +size_t forwarder_cs_get_num_stale_entries(forwarder_t *forwarder) { + assert(forwarder); + return pkt_cache_get_num_cs_stale_entries(forwarder->pkt_cache); } -#ifdef WITH_POLICY -ConnectionTable *forwarder_GetConnectionTable(const Forwarder *forwarder) { +void forwarder_cs_clear(forwarder_t *forwarder) { + assert(forwarder); + + pkt_cache_cs_clear(forwarder->pkt_cache); +} + +/** + * @function forwarder_Drop + * @abstract Whenever we "drop" a message, increment counters + * @discussion + * This is a bookkeeping function. It increments the appropriate counters. + * + * The default action for a message is to destroy it in + * <code>forwarder_Receive()</code>, so this function does not need to do + * that. + * + */ +static ssize_t forwarder_drop(forwarder_t *forwarder, off_t msgbuf_id) { + forwarder->stats.countDropped++; + + const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + const msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + + switch (msgbuf_get_type(msgbuf)) { + case MSGBUF_TYPE_INTEREST: + forwarder->stats.countInterestsDropped++; + break; + + case MSGBUF_TYPE_DATA: + forwarder->stats.countObjectsDropped++; + break; + + default: + forwarder->stats.countOtherDropped++; + break; + } + + return msgbuf_get_len(msgbuf); + // dont destroy message here, its done at end of receive +} + +#ifndef BYPASS_FIB +/* + * If the hoplimit is equal to 0, then we may only forward it to local + * applications. Otherwise, we may forward it off the system. + * + */ + +static ssize_t forwarder_forward_via_connection(forwarder_t *forwarder, + off_t msgbuf_id, + unsigned conn_id) { + connection_table_t *table = forwarder_get_connection_table(forwarder); + + const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + + const connection_t *conn = connection_table_get_by_id(table, conn_id); + + if (!conn) { + forwarder->stats.countDroppedConnectionNotFound++; + WARN("forward msgbuf %lu to interface %u not found (count %u)", msgbuf_id, + conn_id, forwarder->stats.countDroppedConnectionNotFound); + return forwarder_drop(forwarder, msgbuf_id); + } + + /* Always queue the packet... */ + // DEBUG("Queueing packet\n"); + +#if defined(USE_SEND_PACKET) || !defined(__linux__) + + // Here we need to update the path label of a data packet before send + // it. The path label update can be done here because the packet is sent + // directly to the socket + if (msgbuf_get_type(msgbuf) == MSGBUF_TYPE_DATA) + msgbuf_update_pathlabel(msgbuf, connection_get_id(conn)); + + bool success = connection_send_packet(conn, msgbuf_get_packet(msgbuf), + msgbuf_get_len(msgbuf)); #else -ConnectionTable *forwarder_GetConnectionTable(Forwarder *forwarder) { -#endif /* WITH_POLICY */ - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->connectionTable; + + // In this case we cannot update the path label even if it is need because + // the packet is not copied and only the packet id is enqueued to the ring + // buffer associated the output interface. If the path label is updated here + // all data packets get delivered to the next hop with the same label that is + // associated to the last connection used. For this reason the path label + // update must be done before the packet is actually sent inside the different + // IO implementations. + bool success = connection_send(conn, msgbuf_id, USE_QUEUE); + +#endif + + /* ... and mark the connection as pending if this is not yet the case */ + unsigned i; + for (i = 0; i < forwarder->num_pending_conn; i++) { + if (forwarder->pending_conn[i] == conn_id) break; + } + if (i == forwarder->num_pending_conn) // Not found + forwarder->pending_conn[forwarder->num_pending_conn++] = conn_id; + + if (!success) { + forwarder->stats.countSendFailures++; + + DEBUG("forward msgbuf %llu to interface %u send failure (count %u)", + msgbuf_id, conn_id, forwarder->stats.countSendFailures); + return forwarder_drop(forwarder, msgbuf_id); + } + + switch (msgbuf_get_type(msgbuf)) { + case MSGBUF_TYPE_INTEREST: + forwarder->stats.countInterestForwarded++; + break; + + case MSGBUF_TYPE_DATA: + forwarder->stats.countObjectsForwarded++; + break; + + default: + break; + } + + TRACE("forward msgbuf %p to interface %u", msgbuf, conn_id); + return msgbuf_get_len(msgbuf); } -ListenerSet *forwarder_GetListenerSet(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->listenerSet; +/** + * @function forwarder_forward_to_nexthops + * @abstract Try to forward to each nexthop listed in the NumberSet + * @discussion + * Will not forward to the ingress connection. + * + * @return The number of nexthops tried + */ +static unsigned forwarder_forward_to_nexthops(forwarder_t *forwarder, + off_t msgbuf_id, + const nexthops_t *nexthops) { + // DEBUG("[forwarder_forward_to_nexthops] num=%d/%d", + // nexthops_get_curlen(nexthops), nexthops_get_len(nexthops)); + unsigned forwardedCopies = 0; + + const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + unsigned ingressId = msgbuf_get_connection_id(msgbuf); + + unsigned nexthop; + nexthops_foreach(nexthops, nexthop, { + // DEBUG("[forwarder_forward_to_nexthops] - nexthop = %d"); + if (nexthop == ingressId) continue; + + forwardedCopies++; + // INFO("[forwarder_forward_to_nexthops] - nexthop = %d OK", nexthop); + forwarder_forward_via_connection(forwarder, msgbuf_id, nexthop); + }); + + return forwardedCopies; } -void forwarder_SetChacheStoreFlag(Forwarder *forwarder, bool val) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - messageProcessor_SetCacheStoreFlag(forwarder->processor, val); +static bool forwarder_forward_via_fib(forwarder_t *forwarder, off_t msgbuf_id, + pkt_cache_verdict_t verdict, + pkt_cache_entry_t *entry) { + assert(forwarder); + assert(msgbuf_id_is_valid(msgbuf_id)); + + const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + assert(msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST); + + fib_entry_t *fib_entry = fib_match_message(forwarder->fib, msgbuf); + if (!fib_entry) return false; + + // DEBUG("[forwarder] Getting nexthops from strategy"); + nexthops_t *nexthops = fib_entry_get_nexthops_from_strategy( + fib_entry, msgbuf, verdict == PKT_CACHE_VERDICT_RETRANSMIT_INTEREST); + + if (nexthops_get_curlen(nexthops) == 0) { + ERROR("Message %p returned an empty next hop set", msgbuf); + return false; + } + + // if this is the first time that we sent this interest the pit entry would be + // NULL. in that case we add the interest to the pit + if (entry == NULL) { + entry = pkt_cache_add_to_pit(forwarder->pkt_cache, msgbuf); + } + pit_entry_t *pit_entry = &entry->u.pit_entry; + if (!pit_entry) { + return false; + } + + pit_entry_set_fib_entry(pit_entry, fib_entry); + + // this requires some additional checks. It may happen that some of the output + // faces selected by the forwarding strategy are not usable. So far all the + // forwarding strategy return only valid faces (or an empty list) + unsigned nexthop; + nexthops_foreach(nexthops, nexthop, { + // DEBUG("Adding egress to PIT for nexthop %d", nexthop); + pit_entry_egress_add(pit_entry, nexthop); + }); + + if (forwarder_forward_to_nexthops(forwarder, msgbuf_id, nexthops) <= 0) { + // this should never happen + ERROR("Message %p returned an empty next hop set", msgbuf); + return false; + } + + return true; } -bool forwarder_GetChacheStoreFlag(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return messageProcessor_GetCacheStoreFlag(forwarder->processor); +#endif /* ! BYPASS_FIB */ + +ssize_t _forwarder_forward_upon_interest(forwarder_t *forwarder, + msgbuf_pool_t *msgbuf_pool, + off_t data_msgbuf_id, + off_t interest_msgbuf_id, + pkt_cache_entry_t *entry, + pkt_cache_verdict_t verdict) { + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, interest_msgbuf_id); + + if (verdict == PKT_CACHE_VERDICT_AGGREGATE_INTEREST) { + // the packet shuold not be forwarder + forwarder_drop(forwarder, interest_msgbuf_id); + return msgbuf_get_len(msgbuf); + } + + // - Forward reply if a data packet matching the interest was found + if (verdict == PKT_CACHE_VERDICT_FORWARD_DATA) { + assert(forwarder->serve_from_cs == true); + + msgbuf_t *interest_msgbuf = msgbuf_pool_at(msgbuf_pool, interest_msgbuf_id); + msgbuf_t *data_msgbuf = msgbuf_pool_at(msgbuf_pool, data_msgbuf_id); + + msgbuf_reset_pathlabel(data_msgbuf); + + forwarder_forward_via_connection(forwarder, data_msgbuf_id, + msgbuf_get_connection_id(interest_msgbuf)); + + // - Try to forward the interest + } else if (!forwarder_forward_via_fib(forwarder, interest_msgbuf_id, verdict, + entry)) { + forwarder->stats.countDroppedNoRoute++; + INFO("Message %lu did not match FIB, no route (count %u)", + interest_msgbuf_id, forwarder->stats.countDroppedNoRoute); + + // - Drop the packet (no forwarding) + forwarder_drop(forwarder, interest_msgbuf_id); + } + + return msgbuf_get_len(msgbuf); +} + +static void _forwarder_update_interest_stats(forwarder_t *forwarder, + pkt_cache_verdict_t verdict, + msgbuf_t *msgbuf, + pkt_cache_entry_t *entry) { + long expiration = -1; + if (entry == NULL) + expiration = ticks_now() + msgbuf_get_interest_lifetime(msgbuf); + else if (entry->has_expire_ts) + expiration = entry->expire_ts; + + switch (verdict) { + case PKT_CACHE_VERDICT_FORWARD_INTEREST: + DEBUG( + "Message will be added to PIT (expiration=%ld), " + "if nexthops are available", + expiration); + break; + + case PKT_CACHE_VERDICT_AGGREGATE_INTEREST: + forwarder->stats.countInterestsAggregated++; + DEBUG("Message aggregated in PIT (expiration=%ld)", expiration); + break; + + case PKT_CACHE_VERDICT_RETRANSMIT_INTEREST: + forwarder->stats.countInterestsRetransmitted++; + DEBUG("Message retransmitted (expiration=%ld)", expiration); + break; + + case PKT_CACHE_VERDICT_FORWARD_DATA: + forwarder->stats.countInterestsSatisfiedFromStore++; + DEBUG("Message satisfied from content store (expiration=%ld)", + expiration); + break; + + case PKT_CACHE_VERDICT_INTEREST_EXPIRED_FORWARD_INTEREST: + forwarder->stats.countInterestsExpired++; + DEBUG("Message replaced expired interest (expiration=%ld)", expiration); + break; + + case PKT_CACHE_VERDICT_DATA_EXPIRED_FORWARD_INTEREST: + forwarder->stats.countDataExpired++; + DEBUG("Message replaced expired data (expiration=%ld)", expiration); + break; + + case PKT_CACHE_VERDICT_ERROR: + ERROR("Inivalid packet cache content"); + break; + + default: + break; + } } -void forwarder_SetChacheServeFlag(Forwarder *forwarder, bool val) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - messageProcessor_SetCacheServeFlag(forwarder->processor, val); +/** + * @function forwarder_process_interest + * @abstract Receive an interest from the network + * @discussion + * (1) if interest in the PIT, aggregate in PIT + * (2) if interest in the ContentStore, reply + * (3) if in the FIB, forward + * (4) drop + * + */ +static ssize_t forwarder_process_interest(forwarder_t *forwarder, + off_t msgbuf_id) { + assert(forwarder); + assert(msgbuf_id_is_valid(msgbuf_id)); + + msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + + assert(msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST); + + forwarder->stats.countReceived++; + forwarder->stats.countInterestsReceived++; + + WITH_DEBUG({ + char *nameString = name_ToString(msgbuf_get_name(msgbuf)); + DEBUG("INTEREST (%s) msgbuf_id=%lu ingress=%u length=%u", nameString, + msgbuf_id, msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf)); + free(nameString); + }) + + pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR; + off_t data_msgbuf_id = INVALID_MSGBUF_ID; + pkt_cache_entry_t *entry = NULL; + pkt_cache_on_interest(forwarder->pkt_cache, msgbuf_pool, msgbuf_id, &verdict, + &data_msgbuf_id, &entry, forwarder->serve_from_cs); + + _forwarder_update_interest_stats(forwarder, verdict, msgbuf, entry); + + return _forwarder_forward_upon_interest( + forwarder, msgbuf_pool, data_msgbuf_id, msgbuf_id, entry, verdict); } -bool forwarder_GetChacheServeFlag(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return messageProcessor_GetCacheServeFlag(forwarder->processor); +static void _forwarder_log_on_data(forwarder_t *forwarder, + pkt_cache_verdict_t verdict) { + switch (verdict) { + case PKT_CACHE_VERDICT_FORWARD_DATA: + DEBUG("Message added to CS from PIT"); + break; + case PKT_CACHE_VERDICT_STORE_DATA: + DEBUG("Message added to CS (expired or no previous interest pending)"); + break; + case PKT_CACHE_VERDICT_CLEAR_DATA: + break; + case PKT_CACHE_VERDICT_UPDATE_DATA: + DEBUG("Message updated in CS"); + break; + case PKT_CACHE_VERDICT_IGNORE_DATA: + DEBUG("Message not stored in CS"); + break; + case PKT_CACHE_VERDICT_ERROR: + ERROR("Inivalid packet cache content"); + break; + default: + break; + } } -void forwarder_ReceiveCommand(Forwarder *forwarder, command_id command, - struct iovec *message, unsigned ingressId) { - configuration_ReceiveCommand(forwarder->config, command, message, ingressId); +/** + * @function forwarder_process_data + * @abstract Process an in-bound content object + * @discussion + * (1) If it does not match anything in the PIT, drop it + * (2) Add to Content Store + * (3) Reverse path forward via PIT entries + * + * @param <#param1#> + */ +static ssize_t forwarder_process_data(forwarder_t *forwarder, off_t msgbuf_id) { + msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + + WITH_DEBUG({ + char *nameString = name_ToString(msgbuf_get_name(msgbuf)); + DEBUG("DATA (%s) msgbuf_id=%lu ingress=%u length=%u", nameString, msgbuf_id, + msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf)); + free(nameString); + )} + + forwarder->stats.countReceived++; + forwarder->stats.countObjectsReceived++; + + const connection_table_t *table = forwarder_get_connection_table(forwarder); + const connection_t *conn = + connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf)); + + pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR; + bool wrong_egress; + nexthops_t *ingressSetUnion = + pkt_cache_on_data(forwarder->pkt_cache, msgbuf_pool, msgbuf_id, + forwarder->store_in_cs, connection_is_local(conn), &wrong_egress, &verdict); + + _forwarder_log_on_data(forwarder, verdict); + + if (wrong_egress) { // Interest sent via a connection but received from another + WARN("Data coming from unexpected connection, discarded"); + } else if (!ingressSetUnion) { // No match in the PIT + forwarder->stats.countDroppedNoReversePath++; + DEBUG("Message %lu did not match PIT, no reverse path", msgbuf_id); + + // MOVE PROBE HOOK ELSEWHERE + // XXX relationship with forwarding strategy... insert hooks + // if the packet is a probe we need to analyze it + // NOTE : probes are not stored in PIT + if (msgbuf_is_probe(msgbuf)) { + fib_entry_t *entry = fib_match_message(forwarder->fib, msgbuf); + if (entry && fib_entry_strategy_type(entry) == STRATEGY_TYPE_BESTPATH) { + nexthops_t probe_nexthops = NEXTHOPS_EMPTY; + nexthops_add(&probe_nexthops, msgbuf_get_connection_id(msgbuf)); + fib_entry_on_data(entry, &probe_nexthops, msgbuf, 0, ticks_now()); + // XXX TODO CONFIRM WE DON'T EXIT HERE ? + } + } + forwarder_drop(forwarder, msgbuf_id); + } else { + // Reverse path forward via PIT entries + forwarder_forward_to_nexthops(forwarder, msgbuf_id, ingressSetUnion); + free(ingressSetUnion); + } + + return msgbuf_get_len(msgbuf); } -void forwarder_Receive(Forwarder *forwarder, Message *message) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(message, "Parameter message must be non-null"); +void forwarder_flush_connections(forwarder_t *forwarder) { + // DEBUG("[forwarder_flush_connections]"); + const connection_table_t *table = forwarder_get_connection_table(forwarder); - // this takes ownership of the message, so we're done here + for (unsigned i = 0; i < forwarder->num_pending_conn; i++) { + unsigned conn_id = forwarder->pending_conn[i]; + const connection_t *conn = connection_table_at(table, conn_id); + if (!connection_flush(conn)) { + WARN("Could not flush connection queue"); + // XXX keep track of non flushed connections... + } + } + forwarder->num_pending_conn = 0; + // DEBUG("[forwarder_flush_connections] done"); +} +// XXX move to wldr file, worst case in connection. +void forwarder_apply_wldr(const forwarder_t *forwarder, const msgbuf_t *msgbuf, + connection_t *connection) { // this are the checks needed to implement WLDR. We set wldr only on the STAs - // and we let the AP to react according to choise of the client. + // and we let the AP to react according to choice of the client. // if the STA enables wldr using the set command, the AP enable wldr as well // otherwise, if the STA disable it the AP remove wldr // WLDR should be enabled only on the STAs using the command line // TODO // disable WLDR command line on the AP - const Connection *conn = connectionTable_FindById( - forwarder->connectionTable, message_GetIngressConnectionId(message)); - - if (!conn) { - /* - * Drop is a static method in messageProcessor which might or might not need - * to be called for accounting purposes. This call was initially absent so - * the behaviour was kept like this, as this situation is unlikely. We need - * to release memory though, as this is not done in Drop anyways. - */ - //messageProcessor_Drop(forwarder->processor, message); - message_Release(&message); - return; - } - - if (message_HasWldr(message)) { - if (connection_HasWldr(conn)) { + if (msgbuf_has_wldr(msgbuf)) { + if (connection_has_wldr(connection)) { // case 1: WLDR is enabled - connection_DetectLosses((Connection *)conn, message); - } else if (!connection_HasWldr(conn) && - connection_WldrAutoStartAllowed(conn)) { + connection_wldr_detect_losses(connection, msgbuf); + } else if (!connection_has_wldr(connection) && + connection_wldr_autostart_is_allowed(connection)) { // case 2: We are on an AP. We enable WLDR - connection_EnableWldr((Connection *)conn); - connection_DetectLosses((Connection *)conn, message); + connection_wldr_enable(connection, true); + connection_wldr_detect_losses(connection, msgbuf); } // case 3: Ignore WLDR } else { - if (connection_HasWldr(conn) && connection_WldrAutoStartAllowed(conn)) { + if (connection_has_wldr(connection) && + connection_wldr_autostart_is_allowed(connection)) { // case 1: STA do not use WLDR, we disable it - connection_DisableWldr((Connection *)conn); + connection_wldr_enable(connection, false); } } - - messageProcessor_Receive(forwarder->processor, message); } -Ticks forwarder_GetTicks(const Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return parcClock_GetTime(forwarder->clock) + forwarder->clockOffset; -} +bool forwarder_add_or_update_route(forwarder_t *forwarder, ip_prefix_t *prefix, + unsigned ingress_id) { + assert(forwarder); + assert(prefix); -Ticks forwarder_NanosToTicks(uint64_t nanos) { return NSEC_TO_TICKS(nanos); } + configuration_t *config = forwarder_get_configuration(forwarder); -uint64_t forwarder_TicksToNanos(Ticks ticks) { - return (1000000000ULL) * ticks / HZ; -} + char prefix_s[MAXSZ_IP_PREFIX]; + int rc = ip_prefix_snprintf(prefix_s, MAXSZ_IP_PREFIX, prefix); + assert(rc < MAXSZ_IP_PREFIX); + if (rc < 0) return false; + + DEBUG("Adding prefix=%s for conn_id=%d", prefix_s, ingress_id); -bool forwarder_AddOrUpdateRoute(Forwarder *forwarder, - add_route_command *control, unsigned ifidx) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(control, "Parameter route must be non-null"); + // XXX TODO this should store options too + strategy_type_t strategy_type = configuration_get_strategy(config, prefix_s); - // we only have one message processor - bool res = - messageProcessor_AddOrUpdateRoute(forwarder->processor, control, ifidx); + Name name_prefix = EMPTY_NAME; + name_CreateFromAddress(&name_prefix, prefix->family, prefix->address, + prefix->len); + fib_entry_t *entry = fib_contains(forwarder->fib, &name_prefix); + if (!entry) { + entry = fib_entry_create(&name_prefix, strategy_type, NULL, forwarder); + fib_entry_nexthops_add(entry, ingress_id); + fib_add(forwarder->fib, entry); + + } else { + fib_entry_nexthops_add(entry, ingress_id); + } - return res; + return true; } +bool forwarder_remove_route(forwarder_t *forwarder, ip_prefix_t *prefix, + unsigned ingress_id) { + assert(forwarder); + assert(prefix); -bool forwarder_RemoveRoute(Forwarder *forwarder, remove_route_command *control, - unsigned ifidx) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(control, "Parameter route must be non-null"); + Name name_prefix = EMPTY_NAME; + name_CreateFromAddress(&name_prefix, prefix->family, prefix->address, + prefix->len); + fib_remove(forwarder->fib, &name_prefix, ingress_id); - // we only have one message processor - return messageProcessor_RemoveRoute(forwarder->processor, control, ifidx); + return true; } #ifdef WITH_POLICY -bool forwarder_AddOrUpdatePolicy(Forwarder *forwarder, - add_policy_command *control) { - parcAssertNotNull(forwarder, "Parameter forwarder must be non-null"); - parcAssertNotNull(control, "Parameter control must be non-null"); +bool forwarder_add_or_update_policy(forwarder_t *forwarder, ip_prefix_t *prefix, + hicn_policy_t *policy) { + assert(forwarder); + assert(prefix); + assert(policy); - return messageProcessor_AddOrUpdatePolicy(forwarder->processor, control); + Name name_prefix = EMPTY_NAME; + name_CreateFromAddress(&name_prefix, prefix->family, prefix->address, + prefix->len); + fib_entry_t *entry = fib_contains(forwarder->fib, &name_prefix); + if (!entry) return false; + fib_entry_set_policy(entry, *policy); + + return true; } -bool forwarder_RemovePolicy(Forwarder *forwarder, remove_policy_command *control) { - parcAssertNotNull(forwarder, "Parameter forwarder must be non-null"); - parcAssertNotNull(control, "Parameter control must be non-null"); +bool forwarder_remove_policy(forwarder_t *forwarder, ip_prefix_t *prefix) { + assert(forwarder); + assert(prefix); + + Name name_prefix = EMPTY_NAME; + name_CreateFromAddress(&name_prefix, prefix->family, prefix->address, + prefix->len); + fib_entry_t *entry = fib_contains(forwarder->fib, &name_prefix); + + if (!entry) return false; + + fib_entry_set_policy(entry, POLICY_EMPTY); - return messageProcessor_RemovePolicy(forwarder->processor, control); + return true; } #endif /* WITH_POLICY */ -void forwarder_RemoveConnectionIdFromRoutes(Forwarder *forwarder, - unsigned connectionId) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - messageProcessor_RemoveConnectionIdFromRoutes(forwarder->processor, - connectionId); +void forwarder_remove_connection_id_from_routes(forwarder_t *forwarder, + unsigned connection_id) { + assert(forwarder); + + fib_remove_connection_id(forwarder->fib, connection_id); } -void forwarder_SetStrategy(Forwarder *forwarder, Name *prefix, - strategy_type strategy, - unsigned related_prefixes_len, - Name **related_prefixes) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(prefix, "Parameter prefix must be non-null"); +void forwarder_add_strategy_options(forwarder_t *forwarder, Name *name_prefix, + strategy_type_t strategy_type, + strategy_options_t *strategy_options) { + assert(forwarder); + assert(name_prefix); + assert(strategy_options); + assert(STRATEGY_TYPE_VALID(strategy_type)); + + fib_entry_t *entry = fib_contains(forwarder->fib, name_prefix); + if (!entry) return; - processor_SetStrategy(forwarder->processor, prefix, strategy, - related_prefixes_len, related_prefixes); + fib_entry_add_strategy_options(entry, strategy_type, strategy_options); } -FibEntryList *forwarder_GetFibEntries(Forwarder *forwarder) { - return messageProcessor_GetFibEntries(forwarder->processor); +void forwarder_set_strategy(forwarder_t *forwarder, Name *name_prefix, + strategy_type_t strategy_type, + strategy_options_t *strategy_options) { + assert(forwarder); + assert(name_prefix); + assert(STRATEGY_TYPE_VALID(strategy_type)); + /* strategy_options might be NULL */ + + fib_entry_t *entry = fib_contains(forwarder->fib, name_prefix); + if (!entry) { + // there is no exact match. so if the forwarding strategy is not in the list + // of strategies that can be set by the transport, return + if (strategy_type != STRATEGY_TYPE_BESTPATH && + strategy_type != STRATEGY_TYPE_REPLICATION) { + return; + } + + // here it may be the transprot that wants to set the strategy, but it has + // no knowledge of the length of the prefix. so we apply the strategy at the + // matching fib entry, which later will be the one that will be used to send + // interests with this name + entry = fib_match_name(forwarder->fib, name_prefix); + if (!entry) { + return; // no fib match, return + } + } + + fib_entry_set_strategy(entry, strategy_type, strategy_options); } -void forwarder_SetContentObjectStoreSize(Forwarder *forwarder, - size_t maximumContentStoreSize) { - messageProcessor_SetContentObjectStoreSize(forwarder->processor, - maximumContentStoreSize); +cs_t *forwarder_get_cs(const forwarder_t *forwarder) { + assert(forwarder); + + return pkt_cache_get_cs(forwarder->pkt_cache); } -void forwarder_ClearCache(Forwarder *forwarder) { - messageProcessor_ClearCache(forwarder->processor); +// ======================================================= + +fib_t *forwarder_get_fib(forwarder_t *forwarder) { return forwarder->fib; } + +msgbuf_pool_t *forwarder_get_msgbuf_pool(const forwarder_t *forwarder) { + return forwarder->msgbuf_pool; } -PARCClock *forwarder_GetClock(const Forwarder *forwarder) { - return forwarder->clock; +#ifdef WITH_MAPME +void forwarder_on_connection_event(const forwarder_t *forwarder, + const connection_t *connection, + connection_event_t event) { + mapme_on_connection_event(forwarder->mapme, connection, event); } -#if !defined(__APPLE__) -hicn_socket_helper_t *forwarder_GetHicnSocketHelper(Forwarder *forwarder) { - return forwarder->hicnSocketHelper; +mapme_t *forwarder_get_mapme(const forwarder_t *forwarder) { + return forwarder->mapme; } -#endif -// ======================================================= +#endif /* WITH_MAPME */ -static void _signal_cb(int sig, PARCEventType events, void *user_data) { - Forwarder *forwarder = (Forwarder *)user_data; +#ifdef WITH_POLICY_STATS +const policy_stats_mgr_t *forwarder_get_policy_stats_mgr( + const forwarder_t *forwarder) { + return &forwarder->policy_stats_mgr; +} +#endif /* WITH_POLICY_STATS */ - logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning, - __func__, "signal %d events %d", sig, events); +/** + * @brief Process a packet by creating the corresponding message buffer and + * dispatching it to the forwarder for further processing. + * @param[in] forwarder Forwarder instance. + * + */ +// XXX ??? XXX = process for listener as we are resolving connection id +// + +msgbuf_type_t get_type_from_packet(uint8_t *packet) { + if (messageHandler_IsTCP(packet)) { + if (messageHandler_IsData(packet)) { + return MSGBUF_TYPE_DATA; + } else if (messageHandler_IsInterest(packet)) { + return MSGBUF_TYPE_INTEREST; + } else { + return MSGBUF_TYPE_UNDEFINED; + } - switch ((int)sig) { - case SIGTERM: - logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning, - __func__, "Caught an terminate signal; exiting cleanly."); - dispatcher_Stop(forwarder->dispatcher); - break; + } else if (messageHandler_IsWldrNotification(packet)) { + return MSGBUF_TYPE_WLDR_NOTIFICATION; - case SIGINT: - logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning, - __func__, "Caught an interrupt signal; exiting cleanly."); - dispatcher_Stop(forwarder->dispatcher); - break; -#ifndef _WIN32 - case SIGUSR1: - // dump stats - break; -#endif + } else if (mapme_match_packet(packet)) { + return MSGBUF_TYPE_MAPME; - default: - break; - } -} + } else if (*packet == REQUEST_LIGHT) { + return MSGBUF_TYPE_COMMAND; -static void _keepalive_cb(int fd, PARCEventType what, void *user_data) { - parcAssertTrue(what & PARCEventType_Timeout, "Got unexpected tick_cb: %d", - what); - // function is just a keepalive for hicn-light, does not do anything + } else { + return MSGBUF_TYPE_UNDEFINED; + } } -#ifdef WITH_MAPME -FIB *forwarder_getFib(Forwarder *forwarder) { - return messageProcessor_getFib(forwarder->processor); +/** + * @brief Finalize (i.e. close fd and free internal data structures) + * the current connection ("SELF") when the command is received. + * The connection cannot be removed inside the command handling + * because it is needed to return the ack back. + */ +static void _forwarder_finalize_connection_if_self(connection_t *conn, + msgbuf_t *msgbuf) { + uint8_t *packet = msgbuf_get_packet(msgbuf); + msg_connection_remove_t *msg = (msg_connection_remove_t *)packet; + cmd_connection_remove_t *control = &msg->payload; + + if (strcmp(control->symbolic_or_connid, "SELF") == 0) + connection_finalize(conn); } -void forwarder_onConnectionEvent(Forwarder *forwarder, const Connection *conn, connection_event_t event) { -//#ifdef WITH_POLICY -// messageProcessor_onConnectionEvent(forwarder->processor, conn, event); -//#else - mapme_onConnectionEvent(forwarder->mapme, conn, event); -//#endif /* WITH_POLICY */ -} +ssize_t forwarder_receive(forwarder_t *forwarder, listener_t *listener, + off_t msgbuf_id, address_pair_t *pair, Ticks now) { + assert(forwarder); + /* listener can be NULL */ + assert(msgbuf_id_is_valid(msgbuf_id)); + assert(pair); + + msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + assert(msgbuf); + + uint8_t *packet = msgbuf_get_packet(msgbuf); + size_t size = msgbuf_get_len(msgbuf); + + /* Connection lookup */ + const connection_table_t *table = + forwarder_get_connection_table(listener->forwarder); + connection_t *connection = connection_table_get_by_pair(table, pair); + unsigned conn_id = connection + ? connection_table_get_connection_id(table, connection) + : CONNECTION_ID_UNDEFINED; + + assert((conn_id != CONNECTION_ID_UNDEFINED) || listener); + + msgbuf_type_t type = get_type_from_packet(msgbuf_get_packet(msgbuf)); + + msgbuf->type = type; + msgbuf->connection_id = conn_id; + msgbuf->recv_ts = now; + + switch (type) { + case MSGBUF_TYPE_INTEREST: + if (!connection_id_is_valid(msgbuf->connection_id)) { + char *conn_name = connection_table_get_random_name(table); + unsigned connection_id = + listener_create_connection(listener, conn_name, pair); + msgbuf->connection_id = connection_id; + connection = connection_table_get_by_id(table, connection_id); + free(conn_name); + } + msgbuf->path_label = 0; // not used for interest packets + name_create_from_interest(packet, msgbuf_get_name(msgbuf)); + forwarder_apply_wldr(forwarder, msgbuf, connection); + forwarder_process_interest(forwarder, msgbuf_id); + + pkt_cache_log(forwarder->pkt_cache); + return size; + + case MSGBUF_TYPE_DATA: + if (!connection_id_is_valid(msgbuf->connection_id)) + return forwarder_drop(forwarder, msgbuf_id); + msgbuf_init_pathlabel(msgbuf); + name_create_from_data(packet, msgbuf_get_name(msgbuf)); + forwarder_apply_wldr(forwarder, msgbuf, connection); + forwarder_process_data(forwarder, msgbuf_id); + + pkt_cache_log(forwarder->pkt_cache); + return size; + + case MSGBUF_TYPE_WLDR_NOTIFICATION: + if (!connection_id_is_valid(msgbuf->connection_id)) + return forwarder_drop(forwarder, msgbuf_id); + connection_wldr_handle_notification(connection, msgbuf); + return size; + + case MSGBUF_TYPE_MAPME: + // XXX what about acks ? + if (!connection_id_is_valid(msgbuf->connection_id)) { + char *conn_name = connection_table_get_random_name(table); + msgbuf->connection_id = + listener_create_connection(listener, conn_name, pair); + free(conn_name); + } + mapme_process(forwarder->mapme, msgbuf); + return size; + + case MSGBUF_TYPE_COMMAND: + // Create the connection to send the ack back + if (!connection_id_is_valid(msgbuf->connection_id)) { + char *conn_name = connection_table_get_random_name(table); + unsigned connection_id = + listener_create_connection(listener, conn_name, pair); + msgbuf->connection_id = connection_id; + connection = connection_table_get_by_id(table, connection_id); + free(conn_name); + } + + msg_header_t *msg = (msg_header_t *)packet; + msgbuf->command.type = msg->header.command_id; + if (!command_type_is_valid(msgbuf->command.type)) { + ERROR("Invalid command"); + return 0; + } + + size = command_process_msgbuf(forwarder, msgbuf); + if (msgbuf->command.type == COMMAND_TYPE_CONNECTION_REMOVE) + _forwarder_finalize_connection_if_self(connection, msgbuf); + return size; -void forwarder_ProcessMapMe(Forwarder *forwarder, const uint8_t *msgBuffer, - unsigned conn_id) { - mapme_Process(forwarder->mapme, msgBuffer, conn_id); + default: + ERROR("Invalid msgbuf type"); + forwarder_drop(forwarder, msgbuf_id); + return 0; + } } -MapMe * -forwarder_getMapmeInstance(const Forwarder *forwarder) { - return forwarder->mapme; +void forwarder_log(forwarder_t *forwarder) { + DEBUG( + "Forwarder: received = %u (interest = %u, data = %u), dropped = %u " + "(interest = %u, data = %u, other = %u), forwarded = { interests = %u, " + "data = %u }, dropped = { connection_not_found = %u, send_failure = %u, " + "no_route_in_fib = %u }, interest processing = { aggregated = %u, " + "retransmitted = %u, satisfied_from_cs = %u, expired_interests = %u, " + "expired_data = %u }, data processing = { " + "no_reverse_path = %u }\n", + forwarder->stats.countReceived, forwarder->stats.countInterestsReceived, + forwarder->stats.countObjectsReceived, forwarder->stats.countDropped, + forwarder->stats.countInterestsDropped, + forwarder->stats.countObjectsDropped, forwarder->stats.countOtherDropped, + forwarder->stats.countInterestForwarded, + forwarder->stats.countObjectsForwarded, + forwarder->stats.countDroppedConnectionNotFound, + forwarder->stats.countSendFailures, forwarder->stats.countDroppedNoRoute, + forwarder->stats.countInterestsAggregated, + forwarder->stats.countInterestsRetransmitted, + forwarder->stats.countInterestsSatisfiedFromStore, + forwarder->stats.countInterestsExpired, forwarder->stats.countDataExpired, + forwarder->stats.countDroppedNoReversePath); } - -#endif /* WITH_MAPME */ |