From 15ad172a847fa667c57a4594ef4158405db9a984 Mon Sep 17 00:00:00 2001 From: Angelo Mantellini Date: Tue, 31 Mar 2020 17:50:43 +0200 Subject: [HICN-554] hicn-light refactoring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I36f2d393741d4502ce14d3791158e43e3e9cd4cf Signed-off-by: Jordan Augé --- hicn-light/src/hicn/core/forwarder.c | 1370 ++++++++++++++++++++++++---------- 1 file changed, 987 insertions(+), 383 deletions(-) (limited to 'hicn-light/src/hicn/core/forwarder.c') diff --git a/hicn-light/src/hicn/core/forwarder.c b/hicn-light/src/hicn/core/forwarder.c index f7b0af2c2..f8e99198f 100644 --- a/hicn-light/src/hicn/core/forwarder.c +++ b/hicn-light/src/hicn/core/forwarder.c @@ -41,553 +41,1157 @@ #define __STDC_FORMAT_MACROS #include -#include -#include -#include -#include - -#include -#include -#include +#include +#include +#include +#include +#include #include #include #ifdef WITH_MAPME #include #endif /* WITH_MAPME */ #include -#include -#include -#include +#include + +#ifdef WITH_PREFIX_STATS +#include +#endif /* WITH_PREFIX_STATS */ #include +#include -#include +#define DEFAULT_PIT_SIZE 65535 -// the router's clock frequency (we now use the monotonic clock) -#define HZ 1000 +typedef struct { + uint32_t countReceived; + uint32_t countInterestsReceived; + uint32_t countObjectsReceived; -// these will all be a little off because its all integer division -#define MSEC_PER_TICK (1000 / HZ) -#define USEC_PER_TICK (1000000 / HZ) -#define NSEC_PER_TICK ((1000000000ULL) / HZ) -#define MSEC_TO_TICKS(msec) \ - ((msec < FC_MSEC_PER_TICK) ? 1 : msec / FC_MSEC_PER_TICK) -#define NSEC_TO_TICKS(nsec) ((nsec < NSEC_PER_TICK) ? 1 : nsec / NSEC_PER_TICK) + uint32_t countInterestsAggregated; -struct forwarder { - Dispatcher *dispatcher; + uint32_t countDropped; + uint32_t countInterestsDropped; + uint32_t countDroppedNoRoute; + uint32_t countDroppedNoReversePath; - uint16_t server_port; + uint32_t countDroppedConnectionNotFound; + uint32_t countObjectsDropped; + uint32_t countOtherDropped; - PARCEventSignal *signal_int; - PARCEventSignal *signal_term; -#ifndef _WIN32 - PARCEventSignal *signal_usr1; -#endif - PARCEventTimer *keepalive_event; + uint32_t countSendFailures; + uint32_t countInterestForwarded; + uint32_t countObjectsForwarded; + uint32_t countInterestsSatisfiedFromStore; - // will skew the virtual clock forward. In normal operaiton, it is 0. - Ticks clockOffset; + uint32_t countDroppedNoHopLimit; + uint32_t countDroppedZeroHopLimitFromRemote; + uint32_t countDroppedZeroHopLimitToRemote; +} forwarder_stats_t; - unsigned nextConnectionid; - Messenger *messenger; - ConnectionManager *connectionManager; - ConnectionTable *connectionTable; - ListenerSet *listenerSet; - Configuration *config; +struct forwarder_s { +// uint16_t server_port; - // we'll eventually want to setup a threadpool of these - MessageProcessor *processor; +// XXX TODO signal handling +#if 0 + PARCEventSignal *signal_int; + PARCEventSignal *signal_term; +#ifndef _WIN32 + PARCEventSignal *signal_usr1; +#endif +#endif - Logger *logger; + // used by seed48 and nrand48 + unsigned short seed[3]; - PARCClock *clock; + connection_table_t * connection_table; + listener_table_t * listener_table; + configuration_t *config; -#if !defined(__APPLE__) - hicn_socket_helper_t - *hicnSocketHelper; // state required to manage hicn connections -#endif - // used by seed48 and nrand48 - unsigned short seed[3]; + + pit_t * pit; + content_store_t * content_store; + fib_t * fib; #ifdef WITH_MAPME - MapMe *mapme; + mapme_t * mapme; #endif /* WITH_MAPME */ + + bool store_in_content_store; + bool serve_from_content_store; + + forwarder_stats_t stats; +#ifdef WITH_PREFIX_STATS + prefix_stats_mgr_t prefix_stats_mgr; +#endif /* WITH_PREFIX_STATS */ + + /* + * The message forwarder has to decide whether to queue incoming packets for + * batching, or trigger the transmission on the connection + */ + unsigned pending_conn[MAX_MSG]; + size_t num_pending_conn; + + msgbuf_t msgbuf; /* Storage for msgbuf, which are currently processed 1 by 1 */ + }; +#if 0 // signal traps through the event scheduler static void _signal_cb(int, PARCEventType, void *); - -// A no-op keepalive to prevent Libevent from exiting the dispatch loop -static void _keepalive_cb(int, PARCEventType, void *); +#endif /** * Reseed our pseudo-random number generator. */ -static void forwarder_Seed(Forwarder *forwarder) { +static +void +forwarder_seed(forwarder_t * forwarder) { #ifndef _WIN32 - int fd; - ssize_t res; - - res = -1; - fd = open("/dev/urandom", O_RDONLY); - if (fd != -1) { - res = read(fd, forwarder->seed, sizeof(forwarder->seed)); - close(fd); - } - if (res != sizeof(forwarder->seed)) { + int fd; + ssize_t res; + + res = -1; + fd = open("/dev/urandom", O_RDONLY); + if (fd != -1) { + res = read(fd, forwarder->seed, sizeof(forwarder->seed)); + close(fd); + } + if (res != sizeof(forwarder->seed)) { + forwarder->seed[1] = (unsigned short)getpid(); /* better than no entropy */ + forwarder->seed[2] = (unsigned short)time(NULL); + } + /* + * The call to seed48 is needed by cygwin, and should be harmless + * on other platforms. + */ + seed48(forwarder->seed); +#else forwarder->seed[1] = (unsigned short)getpid(); /* better than no entropy */ forwarder->seed[2] = (unsigned short)time(NULL); - } - /* - * The call to seed48 is needed by cygwin, and should be harmless - * on other platforms. - */ - seed48(forwarder->seed); -#else - forwarder->seed[1] = (unsigned short)getpid(); /* better than no entropy */ - forwarder->seed[2] = (unsigned short)time(NULL); #endif } -Logger *forwarder_GetLogger(const Forwarder *forwarder) { - return forwarder->logger; -} - -// ============================================================================ -// Setup and destroy section +int +init_batch_buffers(batch_buffer_t * bb) +{ + /* Setup recvmmsg data structures. */ + for (unsigned i = 0; i < MAX_MSG; i++) { + char *buf = &bb->buffers[i][0]; + struct iovec *iovec = &bb->iovecs[i]; + struct mmsghdr *msg = &bb->msghdr[i]; -Forwarder *forwarder_Create(Logger *logger) { - Forwarder *forwarder = parcMemory_AllocateAndClear(sizeof(Forwarder)); - parcAssertNotNull(forwarder, "parcMemory_AllocateAndClear(%zu) returned NULL", - sizeof(Forwarder)); - memset(forwarder, 0, sizeof(Forwarder)); - forwarder_Seed(forwarder); + msg->msg_hdr.msg_iov = iovec; + msg->msg_hdr.msg_iovlen = 1; - forwarder->clock = parcClock_Monotonic(); - forwarder->clockOffset = 0; + msg->msg_hdr.msg_name = &bb->addrs[i]; + msg->msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - if (logger) { - forwarder->logger = logger_Acquire(logger); - logger_SetClock(forwarder->logger, forwarder->clock); - } else { - PARCLogReporter *reporter = parcLogReporterTextStdout_Create(); - forwarder->logger = logger_Create(reporter, forwarder->clock); - parcLogReporter_Release(&reporter); + iovec->iov_base = &buf[0]; + iovec->iov_len = MTU; } + return 0; +} - forwarder->nextConnectionid = 1; - forwarder->dispatcher = dispatcher_Create(forwarder->logger); - forwarder->messenger = messenger_Create(forwarder->dispatcher); - forwarder->connectionManager = connectionManager_Create(forwarder); - forwarder->connectionTable = connectionTable_Create(); - forwarder->listenerSet = listenerSet_Create(); - forwarder->config = configuration_Create(forwarder); - forwarder->processor = messageProcessor_Create(forwarder); - - forwarder->signal_term = dispatcher_CreateSignalEvent( - forwarder->dispatcher, _signal_cb, forwarder, SIGTERM); - dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_term); - - forwarder->signal_int = dispatcher_CreateSignalEvent( - forwarder->dispatcher, _signal_cb, forwarder, SIGINT); - dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_int); +forwarder_t * +forwarder_create() +{ + forwarder_t * forwarder = malloc(sizeof(forwarder_t)); + if (!forwarder) + goto ERR_MALLOC; + + forwarder_seed(forwarder); + + forwarder->config = configuration_create(forwarder); + if (!forwarder->config) + goto ERR_CONFIG; + + forwarder->listener_table = listener_table_create(); + if (!forwarder->listener_table) + goto ERR_LISTENER_TABLE; + + forwarder->connection_table = connection_table_create(); + if (!forwarder->connection_table) + goto ERR_CONNECTION_TABLE; + + forwarder->fib = fib_create(forwarder); + if (!forwarder->fib) + goto ERR_FIB; + + forwarder->pit = pit_create(DEFAULT_PIT_SIZE); + if (!forwarder->pit) + goto ERR_PIT; + + size_t objectStoreSize = + configuration_content_store_get_size(forwarder_get_configuration(forwarder)); + forwarder->content_store = content_store_create(CONTENT_STORE_TYPE_LRU, + objectStoreSize); + if (!forwarder->content_store) + goto ERR_CONTENT_STORE; + + // the two flags for the content_store are set to true by default. If the content_store + // is active it always work as expected unless the use modifies this + // values using controller + forwarder->store_in_content_store = true; + forwarder->serve_from_content_store = true; + +#if 0 + forwarder->signal_term = dispatcher_CreateSignalEvent( + forwarder->dispatcher, _signal_cb, forwarder, SIGTERM); + dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_term); + + forwarder->signal_int = dispatcher_CreateSignalEvent( + forwarder->dispatcher, _signal_cb, forwarder, SIGINT); + dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_int); #ifndef _WIN32 - forwarder->signal_usr1 = dispatcher_CreateSignalEvent( - forwarder->dispatcher, _signal_cb, forwarder, SIGPIPE); - dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_usr1); + forwarder->signal_usr1 = dispatcher_CreateSignalEvent( + forwarder->dispatcher, _signal_cb, forwarder, SIGPIPE); + dispatcher_StartSignalEvent(forwarder->dispatcher, forwarder->signal_usr1); +#endif #endif - -#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \ - defined(PUNTING) - forwarder->hicnSocketHelper = hicn_create(); - if (!forwarder->hicnSocketHelper) - goto ERR_SOCKET; -#endif /* __APPLE__ */ #ifdef WITH_MAPME - if (!(mapme_create(&forwarder->mapme, forwarder))) - goto ERR_MAPME; + forwarder->mapme = mapme_create(forwarder); + if (!forwarder->mapme) + goto ERR_MAPME; #endif /* WITH_MAPME */ - /* ignore child */ + /* ignore child */ #ifndef _WIN32 - signal(SIGCHLD, SIG_IGN); + signal(SIGCHLD, SIG_IGN); - /* ignore tty signals */ - signal(SIGTSTP, SIG_IGN); - signal(SIGTTOU, SIG_IGN); - signal(SIGTTIN, SIG_IGN); + /* ignore tty signals */ + signal(SIGTSTP, SIG_IGN); + signal(SIGTTOU, SIG_IGN); + signal(SIGTTIN, SIG_IGN); #endif - // We no longer use this for ticks, but we need to have at least one event - // schedule to keep Libevent happy. - - struct timeval wtnow_timeout; - timerclear(&wtnow_timeout); +#ifdef WITH_PREFIX_STATS + if (prefix_stats_mgr_initialize(&forwarder->prefix_stats_mgr, forwarder) < 0) + goto ERR_MGR; +#endif /* WITH_PREFIX_STATS */ - wtnow_timeout.tv_sec = 0; - wtnow_timeout.tv_usec = 50000; // 20 Hz keepalive - - PARCEventScheduler *base = - dispatcher_GetEventScheduler(forwarder->dispatcher); - forwarder->keepalive_event = parcEventTimer_Create( - base, PARCEventType_Persist, _keepalive_cb, (void *)forwarder); - parcEventTimer_Start(forwarder->keepalive_event, &wtnow_timeout); - - return forwarder; + return forwarder; +ERR_MGR: #ifdef WITH_MAPME ERR_MAPME: #endif /* WITH_MAPME */ -#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \ - defined(PUNTING) - hicn_free(forwarder->hicnSocketHelper); -ERR_SOCKET: -#endif - listenerSet_Destroy(&(forwarder->listenerSet)); - connectionManager_Destroy(&(forwarder->connectionManager)); - connectionTable_Destroy(&(forwarder->connectionTable)); - messageProcessor_Destroy(&(forwarder->processor)); - configuration_Destroy(&(forwarder->config)); - messenger_Destroy(&(forwarder->messenger)); - - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_int)); - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_term)); + +#if 0 + dispatcher_DestroySignalEvent(forwarder->dispatcher, + &(forwarder->signal_int)); + dispatcher_DestroySignalEvent(forwarder->dispatcher, + &(forwarder->signal_term)); #ifndef _WIN32 - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_usr1)); + dispatcher_DestroySignalEvent(forwarder->dispatcher, + &(forwarder->signal_usr1)); #endif - parcClock_Release(&forwarder->clock); - logger_Release(&forwarder->logger); - - // do the dispatcher last - dispatcher_Destroy(&(forwarder->dispatcher)); + // do the dispatcher last + dispatcher_Destroy(&(forwarder->dispatcher)); +#endif - parcMemory_Deallocate((void **)&forwarder); - return NULL; + content_store_free(forwarder->content_store); +ERR_CONTENT_STORE: + pit_free(forwarder->pit); +ERR_PIT: + fib_free(forwarder->fib); +ERR_FIB: + connection_table_free(forwarder->connection_table); +ERR_CONNECTION_TABLE: + listener_table_free(forwarder->listener_table); +ERR_LISTENER_TABLE: + configuration_free(forwarder->config); +ERR_CONFIG: + free(forwarder); +ERR_MALLOC: + return NULL; } -void forwarder_Destroy(Forwarder **ptr) { - parcAssertNotNull(ptr, "Parameter must be non-null double pointer"); - parcAssertNotNull(*ptr, "Parameter must dereference to non-null pointer"); - Forwarder *forwarder = *ptr; -#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(_WIN32) && \ - defined(PUNTING) - hicn_free(forwarder->hicnSocketHelper); -#endif - parcEventTimer_Destroy(&(forwarder->keepalive_event)); - - listenerSet_Destroy(&(forwarder->listenerSet)); - connectionManager_Destroy(&(forwarder->connectionManager)); - connectionTable_Destroy(&(forwarder->connectionTable)); - messageProcessor_Destroy(&(forwarder->processor)); - configuration_Destroy(&(forwarder->config)); +void +forwarder_free(forwarder_t * forwarder) +{ + assert(forwarder); - // the messenger is used by many of the other pieces, so destroy it last - messenger_Destroy(&(forwarder->messenger)); + prefix_stats_mgr_finalize(&forwarder->prefix_stats_mgr); #ifdef WITH_MAPME - mapme_free(forwarder->mapme); + mapme_free(forwarder->mapme); #endif /* WITH_MAPME */ - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_int)); - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_term)); +#if 0 + dispatcher_DestroySignalEvent(forwarder->dispatcher, + &(forwarder->signal_int)); + dispatcher_DestroySignalEvent(forwarder->dispatcher, + &(forwarder->signal_term)); #ifndef _WIN32 - dispatcher_DestroySignalEvent(forwarder->dispatcher, - &(forwarder->signal_usr1)); + dispatcher_DestroySignalEvent(forwarder->dispatcher, + &(forwarder->signal_usr1)); #endif - parcClock_Release(&forwarder->clock); - logger_Release(&forwarder->logger); - - // do the dispatcher last - dispatcher_Destroy(&(forwarder->dispatcher)); + // do the dispatcher last + dispatcher_Destroy(&(forwarder->dispatcher)); +#endif - parcMemory_Deallocate((void **)&forwarder); - *ptr = NULL; + content_store_free(forwarder->content_store); + pit_free(forwarder->pit); + fib_free(forwarder->fib); + connection_table_free(forwarder->connection_table); + listener_table_free(forwarder->listener_table); + configuration_free(forwarder->config); + free(forwarder); } -void forwarder_SetupAllListeners(Forwarder *forwarder, uint16_t port, - const char *localPath) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); +void +forwarder_setup_all_listeners(forwarder_t * forwarder, uint16_t port, + const char * local_path) +{ + assert(forwarder); + assert(local_path); - configurationListeners_SetupAll(forwarder->config, port, localPath); + listener_setup_all(forwarder, port, local_path); } -void forwarder_SetupLocalListeners(Forwarder *forwarder, uint16_t port) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - configurationListeners_SetutpLocalIPv4(forwarder->config, port); +void +forwarder_setup_local_listeners(forwarder_t * forwarder, uint16_t port) +{ + assert(forwarder); + listener_setup_local_ipv4(forwarder, port); } -void forwarder_SetupFromConfigFile(Forwarder *forwarder, const char *filename) { - ConfigurationFile *configFile = configurationFile_Create(forwarder, filename); - if (configFile) { - configurationFile_Process(configFile); - configurationFile_Release(&configFile); - } -} +void +forwarder_read_config(forwarder_t * forwarder, const char * filename) +{ + configuration_file_t *cfg = configuration_file_create(forwarder, filename); + if (!cfg) + return; -Configuration *forwarder_GetConfiguration(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->config; + configuration_file_process(cfg); + configuration_file_free(cfg); } -// ============================================================================ +configuration_t * +forwarder_get_configuration(forwarder_t * forwarder) +{ + assert(forwarder); + return forwarder->config; +} -unsigned forwarder_GetNextConnectionId(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->nextConnectionid++; +connection_table_t * +forwarder_get_connection_table(const forwarder_t * forwarder) +{ + assert(forwarder); + return forwarder->connection_table; } -Messenger *forwarder_GetMessenger(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->messenger; +listener_table_t * +forwarder_get_listener_table(forwarder_t * forwarder) +{ + assert(forwarder); + return forwarder->listener_table; } -Dispatcher *forwarder_GetDispatcher(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->dispatcher; +void +forwarder_content_store_set_store(forwarder_t * forwarder, bool val) +{ + assert(forwarder); + forwarder->store_in_content_store = val; } -#ifdef WITH_POLICY -ConnectionTable *forwarder_GetConnectionTable(const Forwarder *forwarder) { -#else -ConnectionTable *forwarder_GetConnectionTable(Forwarder *forwarder) { -#endif /* WITH_POLICY */ - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->connectionTable; +bool +forwarder_content_store_get_store(forwarder_t * forwarder) +{ + assert(forwarder); + return forwarder->store_in_content_store; } -ListenerSet *forwarder_GetListenerSet(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return forwarder->listenerSet; +void +forwarder_content_store_set_serve(forwarder_t * forwarder, bool val) +{ + assert(forwarder); + forwarder->serve_from_content_store = val; } -void forwarder_SetChacheStoreFlag(Forwarder *forwarder, bool val) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - messageProcessor_SetCacheStoreFlag(forwarder->processor, val); +bool +forwarder_content_store_get_serve(forwarder_t * forwarder) +{ + assert(forwarder); + return forwarder->serve_from_content_store; } -bool forwarder_GetChacheStoreFlag(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return messageProcessor_GetCacheStoreFlag(forwarder->processor); +void +forwarder_content_store_set_size(forwarder_t * forwarder, size_t size) +{ + assert(forwarder); + + content_store_free(forwarder->content_store); + + // XXX TODO +#if 0 + ContentStoreConfig content_storeConfig = {.objectCapacity = + maximumContentStoreSize}; + + forwarder->content_store = + content_storeLRU_Create(&content_storeConfig, forwarder->logger); +#endif } -void forwarder_SetChacheServeFlag(Forwarder *forwarder, bool val) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - messageProcessor_SetCacheServeFlag(forwarder->processor, val); +void +forwarder_content_store_clear(forwarder_t * forwarder) +{ + assert(forwarder); + + content_store_clear(forwarder->content_store); } -bool forwarder_GetChacheServeFlag(Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return messageProcessor_GetCacheServeFlag(forwarder->processor); +void +forwarder_receive_command(forwarder_t * forwarder, command_type_t command_type, + uint8_t * packet, unsigned connection_id) +{ + configuration_receive_command(forwarder->config, command_type, packet, connection_id); } -void forwarder_ReceiveCommand(Forwarder *forwarder, command_id command, - struct iovec *message, unsigned ingressId) { - configuration_ReceiveCommand(forwarder->config, command, message, ingressId); +/** + * @function forwarder_Drop + * @abstract Whenever we "drop" a message, increment countes + * @discussion + * This is a bookkeeping function. It increments the appropriate counters. + * + * The default action for a message is to destroy it in + * forwarder_Receive(), so this function does not need to do + * that. + * + */ +static +void +forwarder_drop(forwarder_t * forwarder, msgbuf_t *message) +{ + forwarder->stats.countDropped++; + + switch (msgbuf_get_type(message)) { + case MESSAGE_TYPE_INTEREST: + forwarder->stats.countInterestsDropped++; + break; + + case MESSAGE_TYPE_DATA: + forwarder->stats.countObjectsDropped++; + break; + + default: + forwarder->stats.countOtherDropped++; + break; + } + + // dont destroy message here, its done at end of receive } -void forwarder_Receive(Forwarder *forwarder, Message *message) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(message, "Parameter message must be non-null"); +/* + * If the hoplimit is equal to 0, then we may only forward it to local + * applications. Otherwise, we may forward it off the system. + * + */ +static +void +forwarder_forward_via_connection(forwarder_t * forwarder, msgbuf_t * msgbuf, + unsigned conn_id) +{ + connection_table_t * table = forwarder_get_connection_table(forwarder); + const connection_t * conn = connection_table_get_by_id(table, conn_id); + + if (!conn) { + forwarder->stats.countDroppedConnectionNotFound++; + DEBUG("forward msgbuf %p to interface %u not found (count %u)", + msgbuf, conn_id, forwarder->stats.countDroppedConnectionNotFound); + forwarder_drop(forwarder, msgbuf); + return; + } - // this takes ownership of the message, so we're done here + /* Always queue the packet... */ + bool success = connection_send(conn, msgbuf, true); - // this are the checks needed to implement WLDR. We set wldr only on the STAs - // and we let the AP to react according to choise of the client. - // if the STA enables wldr using the set command, the AP enable wldr as well - // otherwise, if the STA disable it the AP remove wldr - // WLDR should be enabled only on the STAs using the command line - // TODO - // disable WLDR command line on the AP - const Connection *conn = connectionTable_FindById( - forwarder->connectionTable, message_GetIngressConnectionId(message)); + /* ... and mark the connection as pending if this is not yet the case */ + unsigned i; + for (i = 0; i < forwarder->num_pending_conn; i++) { + if (forwarder->pending_conn[i] == conn_id) + break; + } + if (i == forwarder->num_pending_conn) + forwarder->pending_conn[forwarder->num_pending_conn++] = conn_id; - if (!conn) { - return; - } + if (!success) { + forwarder->stats.countSendFailures++; - if (message_HasWldr(message)) { - if (connection_HasWldr(conn)) { - // case 1: WLDR is enabled - connection_DetectLosses((Connection *)conn, message); - } else if (!connection_HasWldr(conn) && - connection_WldrAutoStartAllowed(conn)) { - // case 2: We are on an AP. We enable WLDR - connection_EnableWldr((Connection *)conn); - connection_DetectLosses((Connection *)conn, message); + DEBUG("forward msgbuf %p to interface %u send failure (count %u)", msgbuf, + conn_id, forwarder->stats.countSendFailures); + forwarder_drop(forwarder, msgbuf); + return; } - // case 3: Ignore WLDR - } else { - if (connection_HasWldr(conn) && connection_WldrAutoStartAllowed(conn)) { - // case 1: STA do not use WLDR, we disable it - connection_DisableWldr((Connection *)conn); + + switch (msgbuf_get_type(msgbuf)) { + case MESSAGE_TYPE_INTEREST: + forwarder->stats.countInterestForwarded++; + break; + + case MESSAGE_TYPE_DATA: + forwarder->stats.countObjectsForwarded++; + break; + + default: + break; } - } - messageProcessor_Receive(forwarder->processor, message); + DEBUG("forward msgbuf %p to interface %u (int %u, obj %u)", msgbuf, + conn_id, forwarder->stats.countInterestForwarded, + forwarder->stats.countObjectsForwarded); + } -Ticks forwarder_GetTicks(const Forwarder *forwarder) { - parcAssertNotNull(forwarder, "Parameter must be non-null"); - return parcClock_GetTime(forwarder->clock) + forwarder->clockOffset; +/** + * @function forwarder_forward_to_nexthops + * @abstract Try to forward to each nexthop listed in the NumberSet + * @discussion + * Will not forward to the ingress connection. + * + * @return The number of nexthops tried + */ +static +unsigned +forwarder_forward_to_nexthops(forwarder_t * forwarder, + msgbuf_t *msgbuf, const nexthops_t * nexthops) +{ + unsigned forwardedCopies = 0; + + unsigned ingressId = msgbuf_get_connection_id(msgbuf); + uint32_t old_path_label = 0; + + if (msgbuf_get_type(msgbuf) == MESSAGE_TYPE_DATA) + old_path_label = msgbuf_get_pathlabel(msgbuf); + + unsigned nexthop; + nexthops_foreach(nexthops, nexthop, { + if (nexthop == ingressId) + continue; + + forwardedCopies++; + forwarder_forward_via_connection(forwarder, msgbuf, nexthop); + + // everytime we send out a message we need to restore the original path + // label of the message this is important because we keep a single copy + // of the message (single pointer) and we modify the path label at each + // send. + if (msgbuf_get_type(msgbuf) == MESSAGE_TYPE_DATA) + msgbuf_set_pathlabel(msgbuf, old_path_label); + }); + + return forwardedCopies; } -Ticks forwarder_NanosToTicks(uint64_t nanos) { return NSEC_TO_TICKS(nanos); } -uint64_t forwarder_TicksToNanos(Ticks ticks) { - return (1000000000ULL) * ticks / HZ; -} +static +bool +forwarder_forward_via_fib(forwarder_t * forwarder, msgbuf_t *msgbuf, + pit_verdict_t verdict) +{ + assert(forwarder); + assert(msgbuf); + assert(msgbuf_get_type(msgbuf) == MESSAGE_TYPE_INTEREST); + + fib_entry_t *fib_entry = fib_match_message(forwarder->fib, msgbuf); + if (!fib_entry) + return false; + + // XXX TODO PROBE HOOK MIGHT BE HANDLED ELSEWHERE + if (msgbuf_is_probe(msgbuf)) { + connection_table_t * table = forwarder_get_connection_table(forwarder); + const nexthops_t * nexthops = fib_entry_get_nexthops(fib_entry); + + unsigned nexthop; + nexthops_foreach(nexthops, nexthop, { + connection_t * conn = connection_table_at(table, nexthop); + if (!conn) + continue; + if (!connection_is_local(conn)) + continue; + uint8_t * packet = msgbuf_get_packet(msgbuf); + unsigned size = msgbuf_get_len(msgbuf); + connection_t * reply_connection = connection_table_get_by_id(table, + msgbuf_get_connection_id(msgbuf)); + if (messageHandler_IsInterest(packet)) { + messageHandler_CreateProbeReply(packet, HF_INET6_TCP); + connection_send_packet(reply_connection, packet, size); + } + return false; + }); + } + + pit_entry_t * entry = pit_lookup(forwarder->pit, msgbuf); + if (!entry) + return false; + + pit_entry_set_fib_entry(entry, fib_entry); -bool forwarder_AddOrUpdateRoute(Forwarder *forwarder, - add_route_command *control, unsigned ifidx) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(control, "Parameter route must be non-null"); + const nexthops_t * nexthops = fib_entry_get_nexthops_from_strategy(fib_entry, + msgbuf, verdict); - // we only have one message processor - bool res = - messageProcessor_AddOrUpdateRoute(forwarder->processor, control, ifidx); + unsigned nexthop; + nexthops_foreach(nexthops, nexthop, { + pit_entry_egress_add(entry, nexthop); + }); + + // this requires some additional checks. It may happen that some of the output + // faces selected by the forwarding strategy are not usable. So far all the + // forwarding strategy return only valid faces (or an empty list) + +#if 0 + // The function GetPitEntry encreases the ref counter in the pit entry + // we need to decrease it + entry_Release(&entry); +#endif + + if (forwarder_forward_to_nexthops(forwarder, msgbuf, nexthops) <= 0) { + DEBUG("Message %p returned an emtpy next hop set", msgbuf); + return false; + } + return true; - return res; } -bool forwarder_RemoveRoute(Forwarder *forwarder, remove_route_command *control, - unsigned ifidx) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(control, "Parameter route must be non-null"); +static +bool +_satisfy_from_content_store(forwarder_t * forwarder, msgbuf_t *interest_msgbuf) +{ + assert(forwarder); + assert(msgbuf_get_type(msgbuf) == MESSAGE_TYPE_INTEREST); + + if (msgbuf_get_interest_lifetime(interest_msgbuf) == 0) + return false; + + if (!forwarder->serve_from_content_store) + return false; + + // See if there's a match in the store. + msgbuf_t * data_msgbuf = content_store_match(forwarder->content_store, + interest_msgbuf, ticks_now()); + + if (!data_msgbuf) + return false; + + // Remove it from the PIT. nexthops is allocated, so need to destroy + nexthops_t * nexthops = pit_on_data(forwarder->pit, data_msgbuf); + assert(nexthops); // Illegal state: got a null nexthops for an interest we just inserted + + // send message in reply, then done + forwarder->stats.countInterestsSatisfiedFromStore++; + + DEBUG("Message %p satisfied from content store (satisfied count %u)", + interest_msgbuf, forwarder->stats.countInterestsSatisfiedFromStore); - // we only have one message processor - return messageProcessor_RemoveRoute(forwarder->processor, control, ifidx); + msgbuf_reset_pathlabel(data_msgbuf); + + forwarder_forward_to_nexthops(forwarder, data_msgbuf, nexthops); + + return true; } -#ifdef WITH_POLICY +/** + * @function forwarder_receive_interest + * @abstract Receive an interest from the network + * @discussion + * (1) if interest in the PIT, aggregate in PIT + * (2) if interest in the ContentStore, reply + * (3) if in the FIB, forward + * (4) drop + * + */ +static +void +forwarder_receive_interest(forwarder_t * forwarder, msgbuf_t * msgbuf) +{ + assert(forwarder); + assert(msgbuf); + assert(msgbuf_get_type(msgbuf) == MESSAGE_TYPE_INTEREST); + forwarder->stats.countInterestsReceived++; + + // (1) Try to aggregate in PIT + pit_verdict_t verdict = pit_on_interest(forwarder->pit, msgbuf); + switch(verdict) { + case PIT_VERDICT_AGGREGATE: + forwarder->stats.countInterestsAggregated++; + DEBUG("Message %p aggregated in PIT (aggregated count %u)", + msgbuf, forwarder->stats.countInterestsAggregated); + return; + + case PIT_VERDICT_FORWARD: + case PIT_VERDICT_RETRANSMIT: + DEBUG("Message %p not aggregated in PIT (aggregated count %u)", + msgbuf, forwarder->stats.countInterestsAggregated); + break; + } + + // At this point, we just created a PIT entry. If we don't forward the + // interest, we need to remove the PIT entry. -bool forwarder_AddOrUpdatePolicy(Forwarder *forwarder, - add_policy_command *control) { - parcAssertNotNull(forwarder, "Parameter forwarder must be non-null"); - parcAssertNotNull(control, "Parameter control must be non-null"); + // (2) Try to satisfy from content store + if (_satisfy_from_content_store(forwarder, msgbuf)) { + // done + // If we found a content object in the CS, + // messageProcess_Satisfy_from_content_store already cleared the PIT state + return; + } + + // (3) Try to forward it + if (forwarder_forward_via_fib(forwarder, msgbuf, verdict)) { + // done + return; + } - return messageProcessor_AddOrUpdatePolicy(forwarder->processor, control); + // Remove the PIT entry? + forwarder->stats.countDroppedNoRoute++; + + DEBUG("Message %p did not match FIB, no route (count %u)", + msgbuf, forwarder->stats.countDroppedNoRoute); + + forwarder_drop(forwarder, msgbuf); } -bool forwarder_RemovePolicy(Forwarder *forwarder, remove_policy_command *control) { - parcAssertNotNull(forwarder, "Parameter forwarder must be non-null"); - parcAssertNotNull(control, "Parameter control must be non-null"); +/** + * @function forwarder_receive_data + * @abstract Process an in-bound content object + * @discussion + * (1) If it does not match anything in the PIT, drop it + * (2) Add to Content Store + * (3) Reverse path forward via PIT entries + * + * @param <#param1#> + */ +static +void +forwarder_receive_data(forwarder_t * forwarder, + msgbuf_t *msgbuf) +{ + forwarder->stats.countObjectsReceived++; + + nexthops_t * ingressSetUnion = pit_on_data(forwarder->pit, msgbuf); + if (!ingressSetUnion) { + // (1) If it does not match anything in the PIT, drop it + forwarder->stats.countDroppedNoReversePath++; + + DEBUG("Message %p did not match PIT, no reverse path (count %u)", + msgbuf, forwarder->stats.countDroppedNoReversePath); + + // MOVE PROBE HOOK ELSEWHERE + // XXX relationship with forwarding strategy... insert hooks + // if the packet is a probe we need to analyze it + // NOTE : probes are not stored in PIT + if (msgbuf_is_probe(msgbuf)) { + fib_entry_t *entry = fib_match_message(forwarder->fib, msgbuf); + if (entry && fib_entry_strategy_type(entry) == STRATEGY_TYPE_LOW_LATENCY) { + nexthops_t probe_nexthops; + nexthops_add(&probe_nexthops, msgbuf_get_connection_id(msgbuf)); + fib_entry_on_data(entry, &probe_nexthops, msgbuf, 0, ticks_now()); + + // XXX TODO CONFIRM WE DON'T EXIT HERE ? + } + } + + // we store the packets in the content store enven in the case where there + // is no match in the PIT table in this way the applications can push the + // content in the CS of the forwarder. We allow this only for local faces + const connection_table_t * table = forwarder_get_connection_table(forwarder); + const connection_t * conn = connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf)); + + if (forwarder->store_in_content_store && connection_is_local(conn)) { + content_store_add(forwarder->content_store, msgbuf, ticks_now()); + DEBUG("Message %p store in CS anyway", msgbuf); + } + + forwarder_drop(forwarder, msgbuf); + } else { + // (2) Add to Content Store. Store may remove expired content, if necessary, + // depending on store policy. + if (forwarder->store_in_content_store) { + content_store_add(forwarder->content_store, msgbuf, ticks_now()); + } + // (3) Reverse path forward via PIT entries + forwarder_forward_to_nexthops(forwarder, msgbuf, ingressSetUnion); - return messageProcessor_RemovePolicy(forwarder->processor, control); + } } -#endif /* WITH_POLICY */ -void forwarder_RemoveConnectionIdFromRoutes(Forwarder *forwarder, - unsigned connectionId) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - messageProcessor_RemoveConnectionIdFromRoutes(forwarder->processor, - connectionId); +/** + * A NULL msgbuf is used to indicate the end of a batch + */ +void +forwarder_receive(forwarder_t * forwarder, msgbuf_t * msgbuf) +{ + assert(forwarder); + + /* Send batch ? */ + if (!msgbuf) { + const connection_table_t * table = forwarder_get_connection_table(forwarder); + for (unsigned i = 0; i < forwarder->num_pending_conn; i++) { + const connection_t * conn = connection_table_at(table, forwarder->pending_conn[i]); + // flush + connection_send(conn, NULL, false); + } + forwarder->num_pending_conn = 0; + } + + // this are the checks needed to implement WLDR. We set wldr only on the STAs + // and we let the AP to react according to choise of the client. + // if the STA enables wldr using the set command, the AP enable wldr as well + // otherwise, if the STA disable it the AP remove wldr + // WLDR should be enabled only on the STAs using the command line + // TODO + // disable WLDR command line on the AP + connection_table_t * table = forwarder_get_connection_table(forwarder); + connection_t * conn = connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf)); + if (!conn) + return; + + if (msgbuf_has_wldr(msgbuf)) { + if (connection_has_wldr(conn)) { + // case 1: WLDR is enabled + connection_wldr_detect_losses(conn, msgbuf); + } else if (!connection_has_wldr(conn) && + connection_wldr_autostart_is_allowed(conn)) { + // case 2: We are on an AP. We enable WLDR + connection_wldr_enable(conn, true); + connection_wldr_detect_losses(conn, msgbuf); + } + // case 3: Ignore WLDR + } else { + if (connection_has_wldr(conn) && connection_wldr_autostart_is_allowed(conn)) { + // case 1: STA do not use WLDR, we disable it + connection_wldr_enable(conn, false); + } + } + + forwarder->stats.countReceived++; + + char *nameString = name_ToString(msgbuf_get_name(msgbuf)); + DEBUG( "Message %p ingress %3u length %5u received name %s", msgbuf, + msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf), nameString); + free(nameString); + + switch (msgbuf_get_type(msgbuf)) { + case MESSAGE_TYPE_INTEREST: + forwarder_receive_interest(forwarder, msgbuf); + break; + + case MESSAGE_TYPE_DATA: + forwarder_receive_data(forwarder, msgbuf); + break; + + default: + forwarder_drop(forwarder, msgbuf); + break; + } + } -void forwarder_SetStrategy(Forwarder *forwarder, Name *prefix, - strategy_type strategy, - unsigned related_prefixes_len, - Name **related_prefixes) { - parcAssertNotNull(forwarder, "Parameter hicn-light must be non-null"); - parcAssertNotNull(prefix, "Parameter prefix must be non-null"); +bool +forwarder_add_or_update_route(forwarder_t * forwarder, ip_prefix_t * prefix, + unsigned ingress_id) +{ + assert(forwarder); + assert(prefix); + + configuration_t *config = forwarder_get_configuration(forwarder); + + char prefix_s[MAXSZ_IP_PREFIX]; + int rc = ip_prefix_snprintf(prefix_s, MAXSZ_IP_PREFIX, prefix); + assert(rc < MAXSZ_IP_PREFIX); + if (rc < 0) + return false; + + // XXX TODO this should store options too + strategy_type_t strategy_type = configuration_get_strategy(config, prefix_s); + + Name * name_prefix = name_CreateFromAddress(prefix->family, + prefix->address, prefix->len); + // XXX TODO error handling + fib_entry_t * entry = fib_contains(forwarder->fib, name_prefix); + if (!entry) { + entry = fib_entry_create(name_prefix, strategy_type, NULL, forwarder); + fib_entry_nexthops_add(entry, ingress_id); + fib_add(forwarder->fib, entry); + } else { + fib_entry_nexthops_add(entry, ingress_id); + } + + name_Release(&name_prefix); - processor_SetStrategy(forwarder->processor, prefix, strategy, - related_prefixes_len, related_prefixes); + return true; } -FibEntryList *forwarder_GetFibEntries(Forwarder *forwarder) { - return messageProcessor_GetFibEntries(forwarder->processor); + +bool +forwarder_remove_route(forwarder_t * forwarder, ip_prefix_t * prefix, + unsigned ingress_id) +{ + assert(forwarder); + assert(prefix); + + Name *name_prefix = name_CreateFromAddress(prefix->family, + prefix->address, prefix->len); + // XXX TODO error handling + fib_remove(forwarder->fib, name_prefix, ingress_id); + name_Release(&name_prefix); + + return true; +} + +#ifdef WITH_POLICY + +bool +forwarder_add_or_update_policy(forwarder_t * forwarder, ip_prefix_t * prefix, + policy_t * policy) +{ + assert(forwarder); + assert(prefix); + assert(policy); + + Name *name_prefix = name_CreateFromAddress(prefix->family, prefix->address, + prefix->len); + // XXX TODO error handling + fib_entry_t *entry = fib_contains(forwarder->fib, name_prefix); + if (!entry) + return false; + fib_entry_set_policy(entry, *policy); + + name_Release(&name_prefix); + + return true; } -void forwarder_SetContentObjectStoreSize(Forwarder *forwarder, - size_t maximumContentStoreSize) { - messageProcessor_SetContentObjectStoreSize(forwarder->processor, - maximumContentStoreSize); +bool +forwarder_remove_policy(forwarder_t * forwarder, ip_prefix_t * prefix) +{ + assert(forwarder); + assert(prefix); + + Name *name_prefix = name_CreateFromAddress(prefix->family, prefix->address, + prefix->len); + // XXX TODO error handling + fib_entry_t * entry = fib_contains(forwarder->fib, name_prefix); + name_Release(&name_prefix); + + if (!entry) + return false; + + fib_entry_set_policy(entry, POLICY_NONE); + + return true; } -void forwarder_ClearCache(Forwarder *forwarder) { - messageProcessor_ClearCache(forwarder->processor); +#endif /* WITH_POLICY */ + +void +forwarder_remove_connection_id_from_routes(forwarder_t * forwarder, + unsigned connection_id) +{ + assert(forwarder); + + fib_remove_connection_id(forwarder->fib, connection_id); } -PARCClock *forwarder_GetClock(const Forwarder *forwarder) { - return forwarder->clock; +void +forwarder_set_strategy(forwarder_t * forwarder, Name * name_prefix, + strategy_type_t strategy_type, strategy_options_t * strategy_options) +{ + assert(forwarder); + assert(name_prefix); + assert(strategy_type_valid(strategy_type)); + /* strategy_options might be NULL */ + + fib_entry_t * entry = fib_contains(forwarder->fib, name_prefix); + if (!entry) + return; + + fib_entry_set_strategy(entry, strategy_type, strategy_options); } -#if !defined(__APPLE__) -hicn_socket_helper_t *forwarder_GetHicnSocketHelper(Forwarder *forwarder) { - return forwarder->hicnSocketHelper; +content_store_t * +forwarder_get_content_store(const forwarder_t * forwarder) +{ + assert(forwarder); + + return forwarder->content_store; } -#endif // ======================================================= +#if 0 static void _signal_cb(int sig, PARCEventType events, void *user_data) { - Forwarder *forwarder = (Forwarder *)user_data; - - logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning, - __func__, "signal %d events %d", sig, events); - - switch ((int)sig) { - case SIGTERM: - logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning, - __func__, "Caught an terminate signal; exiting cleanly."); - dispatcher_Stop(forwarder->dispatcher); - break; - - case SIGINT: - logger_Log(forwarder->logger, LoggerFacility_Core, PARCLogLevel_Warning, - __func__, "Caught an interrupt signal; exiting cleanly."); - dispatcher_Stop(forwarder->dispatcher); - break; + forwarder_t * forwarder = (forwarder_t *)user_data; + + WARN("signal %d events %d", sig, events); + + switch ((int)sig) { + case SIGTERM: + WARN("Caught an terminate signal; exiting cleanly."); + dispatcher_Stop(forwarder->dispatcher); + break; + + case SIGINT: + WARN("Caught an interrupt signal; exiting cleanly."); + dispatcher_Stop(forwarder->dispatcher); + break; #ifndef _WIN32 - case SIGUSR1: - // dump stats - break; + case SIGUSR1: + // dump stats + break; #endif - default: - break; - } + default: + break; + } } +#endif -static void _keepalive_cb(int fd, PARCEventType what, void *user_data) { - parcAssertTrue(what & PARCEventType_Timeout, "Got unexpected tick_cb: %d", - what); - // function is just a keepalive for hicn-light, does not do anything +fib_t * +forwarder_get_fib(forwarder_t * forwarder) { + return forwarder->fib; } #ifdef WITH_MAPME -FIB *forwarder_getFib(Forwarder *forwarder) { - return messageProcessor_getFib(forwarder->processor); +void +forwarder_on_connection_event(const forwarder_t * forwarder, + const connection_t * connection, connection_event_t event) +{ + mapme_on_connection_event(forwarder->mapme, connection, event); } -void forwarder_onConnectionEvent(Forwarder *forwarder, const Connection *conn, connection_event_t event) { -//#ifdef WITH_POLICY -// messageProcessor_onConnectionEvent(forwarder->processor, conn, event); -//#else - mapme_onConnectionEvent(forwarder->mapme, conn, event); -//#endif /* WITH_POLICY */ +mapme_t * +forwarder_get_mapme(const forwarder_t * forwarder) { + return forwarder->mapme; } -void forwarder_ProcessMapMe(Forwarder *forwarder, const uint8_t *msgBuffer, - unsigned conn_id) { - mapme_Process(forwarder->mapme, msgBuffer, conn_id); +#endif /* WITH_MAPME */ + +#ifdef WITH_PREFIX_STATS +const prefix_stats_mgr_t * +forwarder_get_prefix_stats_mgr(const forwarder_t * forwarder) +{ + return &forwarder->prefix_stats_mgr; } +#endif /* WITH_PREFIX_STATS */ + +static +void +process_interest(forwarder_t * forwarder, listener_t * listener, + unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair) +{ + if (!connection_id_is_valid(conn_id)) { + conn_id = listener_create_connection(listener, pair); + } -MapMe * -forwarder_getMapmeInstance(const Forwarder *forwarder) { - return forwarder->mapme; + assert(messageHandler_GetTotalPacketLength(packet) == size); + + msgbuf_from_packet(&forwarder->msgbuf, packet, size, MESSAGE_TYPE_INTEREST, conn_id, ticks_now()); + forwarder_receive(listener->forwarder, &forwarder->msgbuf); } -#endif /* WITH_MAPME */ +static +void +process_data(forwarder_t * forwarder, listener_t * listener, + unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair) +{ + if (!connection_id_is_valid(conn_id)) { + INFO("Ignoring data packet associated to no connection"); + return; + } + + assert(messageHandler_GetTotalPacketLength(packet) == size); + + msgbuf_from_packet(&forwarder->msgbuf, packet, size, MESSAGE_TYPE_DATA, conn_id, ticks_now()); + forwarder_receive(listener->forwarder, &forwarder->msgbuf); + +} + +static +void +process_wldr_notification(forwarder_t * forwarder, listener_t * listener, + unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair) +{ + if (!connection_id_is_valid(conn_id)) { + INFO("Ignoring WLDR notification not associated to a connection"); + return; + } + + assert(messageHandler_GetTotalPacketLength(packet) == size); + + connection_table_t * table = forwarder_get_connection_table(forwarder); + connection_t * connection = connection_table_at(table, conn_id); + + msgbuf_from_packet(&forwarder->msgbuf, packet, size, MESSAGE_TYPE_WLDR_NOTIFICATION, conn_id, ticks_now()); + connection_wldr_handle_notification(connection, &forwarder->msgbuf); + +} + +static +void +process_mapme(forwarder_t * forwarder, listener_t * listener, + unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair) +{ + if (!connection_id_is_valid(conn_id)) + conn_id = listener_create_connection(listener, pair); + mapme_process(forwarder->mapme, packet, conn_id); +} + +static +void +process_command(const forwarder_t * forwarder, listener_t * listener, + unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair) +{ + if (!connection_id_is_valid(conn_id)) + conn_id = listener_create_connection(listener, pair); + + command_type_t command_type= *(packet + 1); + if (command_type >= COMMAND_TYPE_N) { + ERROR("Invalid command"); + return; + } + forwarder_receive_command(listener->forwarder, command_type, packet, conn_id); + +} + +// = process for listener as we are resolving connection id +// XXX this would typically be inside the forwarder +void +process_packet(forwarder_t * forwarder, listener_t * listener, uint8_t * packet, size_t size, address_pair_t * pair) +{ + /* Connection lookup */ + const connection_table_t * table = forwarder_get_connection_table(listener->forwarder); + const connection_t * conn = connection_table_get_by_pair(table, pair); + unsigned conn_id = conn ? connection_table_get_connection_id(table, conn): CONNECTION_ID_UNDEFINED; + + assert((conn_id != CONNECTION_ID_UNDEFINED) || listener); + + // Actually hooks should be defined for each packet type to avoid this + // spaghetti code + if (messageHandler_IsTCP(packet)) { + if (messageHandler_IsData(packet)) { + process_data(forwarder, listener, conn_id, packet, size, pair); + } else if (messageHandler_IsInterest(packet)) { + process_interest(forwarder, listener, conn_id, packet, size, pair); + } else { + INFO("Unknown TCP packet received"); + forwarder_drop(forwarder, NULL); + } + } else if (messageHandler_IsWldrNotification(packet)) { + process_wldr_notification(forwarder, listener, conn_id, packet, size, pair); + } else if (mapme_match_packet(packet)) { + process_mapme(forwarder, listener, conn_id, packet, size, pair); + } else if (*packet == REQUEST_LIGHT) { + process_command(forwarder, listener, conn_id, packet, size, pair); + } else { + INFO("Unknown packet received"); + forwarder_drop(forwarder, NULL); + } +} -- cgit 1.2.3-korg