From 32dccec98e4c7d7e4ce902e19ba8d1b29b823758 Mon Sep 17 00:00:00 2001 From: Jordan Augé Date: Wed, 23 Sep 2020 17:50:52 +0200 Subject: [HICN-570] Message buffer (incl. CS and PIT changes) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I4c508e4b04dee3acbfc3da1d26e1770cb826f22b Signed-off-by: Jordan Augé --- hicn-light/src/hicn/core/forwarder.c | 572 ++++++++++++++++++----------------- 1 file changed, 287 insertions(+), 285 deletions(-) (limited to 'hicn-light/src/hicn/core/forwarder.c') diff --git a/hicn-light/src/hicn/core/forwarder.c b/hicn-light/src/hicn/core/forwarder.c index 9c3df906d..543fc99e4 100644 --- a/hicn-light/src/hicn/core/forwarder.c +++ b/hicn-light/src/hicn/core/forwarder.c @@ -31,7 +31,7 @@ #include #include #include -#include +//#include #include #include #include @@ -41,18 +41,20 @@ #define __STDC_FORMAT_MACROS #include -#include -#include -#include -#include -#include -#include -#include +#include "connection_table.h" +#include "content_store.h" +#include "fib.h" +#include "forwarder.h" +#include "listener_table.h" #ifdef WITH_MAPME -#include +#include "mapme.h" #endif /* WITH_MAPME */ -#include -#include +#include "msgbuf.h" +#include "msgbuf_pool.h" +#include "pit.h" +#include "../config/configuration.h" +#include "../config/configuration_file.h" +#include "../io/base.h" // MAX_MSG #ifdef WITH_PREFIX_STATS #include @@ -61,10 +63,8 @@ #include #include -#define DEFAULT_PIT_SIZE 65535 - typedef struct { - uint32_t countReceived; + uint32_t countReceived; // Interest & Data only uint32_t countInterestsReceived; uint32_t countObjectsReceived; @@ -110,15 +110,16 @@ struct forwarder_s { pit_t * pit; - content_store_t * content_store; + cs_t * cs; fib_t * fib; + msgbuf_pool_t * msgbuf_pool; #ifdef WITH_MAPME mapme_t * mapme; #endif /* WITH_MAPME */ - bool store_in_content_store; - bool serve_from_content_store; + bool store_in_cs; + bool serve_from_cs; forwarder_stats_t stats; #ifdef WITH_PREFIX_STATS @@ -132,7 +133,7 @@ struct forwarder_s { unsigned pending_conn[MAX_MSG]; size_t num_pending_conn; - msgbuf_t msgbuf; /* Storage for msgbuf, which are currently processed 1 by 1 */ + //msgbuf_t msgbuf; /* Storage for msgbuf, which are currently processed 1 by 1 */ }; @@ -172,27 +173,6 @@ forwarder_seed(forwarder_t * forwarder) { #endif } -int -init_batch_buffers(batch_buffer_t * bb) -{ - /* Setup recvmmsg data structures. */ - for (unsigned i = 0; i < MAX_MSG; i++) { - char *buf = &bb->buffers[i][0]; - struct iovec *iovec = &bb->iovecs[i]; - struct mmsghdr *msg = &bb->msghdr[i]; - - msg->msg_hdr.msg_iov = iovec; - msg->msg_hdr.msg_iovlen = 1; - - msg->msg_hdr.msg_name = &bb->addrs[i]; - msg->msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - - iovec->iov_base = &buf[0]; - iovec->iov_len = MTU; - } - return 0; -} - forwarder_t * forwarder_create() { @@ -218,22 +198,26 @@ forwarder_create() if (!forwarder->fib) goto ERR_FIB; - forwarder->pit = pit_create(DEFAULT_PIT_SIZE); + forwarder->msgbuf_pool = msgbuf_pool_create(); + if (!forwarder->msgbuf_pool) + goto ERR_PACKET_POOL; + + forwarder->pit = pit_create(); if (!forwarder->pit) goto ERR_PIT; size_t objectStoreSize = - configuration_content_store_get_size(forwarder_get_configuration(forwarder)); - forwarder->content_store = _content_store_create(CONTENT_STORE_TYPE_LRU, + configuration_cs_get_size(forwarder_get_configuration(forwarder)); + forwarder->cs = _cs_create(CS_TYPE_LRU, objectStoreSize, 0); - if (!forwarder->content_store) - goto ERR_CONTENT_STORE; + if (!forwarder->cs) + goto ERR_CS; - // the two flags for the content_store are set to true by default. If the content_store + // the two flags for the cs are set to true by default. If the cs // is active it always work as expected unless the use modifies this // values using controller - forwarder->store_in_content_store = true; - forwarder->serve_from_content_store = true; + forwarder->store_in_cs = true; + forwarder->serve_from_cs = true; #if 0 forwarder->signal_term = dispatcher_CreateSignalEvent( @@ -293,10 +277,12 @@ ERR_MAPME: dispatcher_Destroy(&(forwarder->dispatcher)); #endif - content_store_free(forwarder->content_store); -ERR_CONTENT_STORE: + cs_free(forwarder->cs); +ERR_CS: pit_free(forwarder->pit); ERR_PIT: + msgbuf_pool_free(forwarder->msgbuf_pool); +ERR_PACKET_POOL: fib_free(forwarder->fib); ERR_FIB: connection_table_free(forwarder->connection_table); @@ -335,8 +321,9 @@ forwarder_free(forwarder_t * forwarder) dispatcher_Destroy(&(forwarder->dispatcher)); #endif - content_store_free(forwarder->content_store); + cs_free(forwarder->cs); pit_free(forwarder->pit); + msgbuf_pool_free(forwarder->msgbuf_pool); fib_free(forwarder->fib); connection_table_free(forwarder->connection_table); listener_table_free(forwarder->listener_table); @@ -394,63 +381,56 @@ forwarder_get_listener_table(forwarder_t * forwarder) } void -forwarder_content_store_set_store(forwarder_t * forwarder, bool val) +forwarder_cs_set_store(forwarder_t * forwarder, bool val) { assert(forwarder); - forwarder->store_in_content_store = val; + forwarder->store_in_cs = val; } bool -forwarder_content_store_get_store(forwarder_t * forwarder) +forwarder_cs_get_store(forwarder_t * forwarder) { assert(forwarder); - return forwarder->store_in_content_store; + return forwarder->store_in_cs; } void -forwarder_content_store_set_serve(forwarder_t * forwarder, bool val) +forwarder_cs_set_serve(forwarder_t * forwarder, bool val) { assert(forwarder); - forwarder->serve_from_content_store = val; + forwarder->serve_from_cs = val; } bool -forwarder_content_store_get_serve(forwarder_t * forwarder) +forwarder_cs_get_serve(forwarder_t * forwarder) { assert(forwarder); - return forwarder->serve_from_content_store; + return forwarder->serve_from_cs; } void -forwarder_content_store_set_size(forwarder_t * forwarder, size_t size) +forwarder_cs_set_size(forwarder_t * forwarder, size_t size) { assert(forwarder); - content_store_free(forwarder->content_store); + cs_free(forwarder->cs); // XXX TODO #if 0 - ContentStoreConfig content_storeConfig = {.objectCapacity = + ContentStoreConfig csConfig = {.objectCapacity = maximumContentStoreSize}; - forwarder->content_store = - content_storeLRU_Create(&content_storeConfig, forwarder->logger); + forwarder->cs = + csLRU_Create(&csConfig, forwarder->logger); #endif } void -forwarder_content_store_clear(forwarder_t * forwarder) +forwarder_cs_clear(forwarder_t * forwarder) { assert(forwarder); - content_store_clear(forwarder->content_store); -} - -void -forwarder_receive_command(forwarder_t * forwarder, command_type_t command_type, - uint8_t * packet, unsigned connection_id) -{ - configuration_receive_command(forwarder->config, command_type, packet, connection_id); + cs_clear(forwarder->cs); } /** @@ -465,17 +445,20 @@ forwarder_receive_command(forwarder_t * forwarder, command_type_t command_type, * */ static -void -forwarder_drop(forwarder_t * forwarder, msgbuf_t *message) +ssize_t +forwarder_drop(forwarder_t * forwarder, off_t msgbuf_id) { forwarder->stats.countDropped++; - switch (msgbuf_get_type(message)) { - case MESSAGE_TYPE_INTEREST: + const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + const msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + + switch (msgbuf_get_type(msgbuf)) { + case MSGBUF_TYPE_INTEREST: forwarder->stats.countInterestsDropped++; break; - case MESSAGE_TYPE_DATA: + case MSGBUF_TYPE_DATA: forwarder->stats.countObjectsDropped++; break; @@ -484,6 +467,7 @@ forwarder_drop(forwarder_t * forwarder, msgbuf_t *message) break; } + return msgbuf_get_len(msgbuf); // dont destroy message here, its done at end of receive } @@ -493,23 +477,26 @@ forwarder_drop(forwarder_t * forwarder, msgbuf_t *message) * */ static -void -forwarder_forward_via_connection(forwarder_t * forwarder, msgbuf_t * msgbuf, +ssize_t +forwarder_forward_via_connection(forwarder_t * forwarder, off_t msgbuf_id, unsigned conn_id) { connection_table_t * table = forwarder_get_connection_table(forwarder); + + const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + const msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + const connection_t * conn = connection_table_get_by_id(table, conn_id); if (!conn) { forwarder->stats.countDroppedConnectionNotFound++; - DEBUG("forward msgbuf %p to interface %u not found (count %u)", - msgbuf, conn_id, forwarder->stats.countDroppedConnectionNotFound); - forwarder_drop(forwarder, msgbuf); - return; + DEBUG("forward msgbuf %lu to interface %u not found (count %u)", + msgbuf_id, conn_id, forwarder->stats.countDroppedConnectionNotFound); + return forwarder_drop(forwarder, msgbuf_id); } /* Always queue the packet... */ - bool success = connection_send(conn, msgbuf, true); + bool success = connection_send(conn, msgbuf_id, true); /* ... and mark the connection as pending if this is not yet the case */ unsigned i; @@ -523,18 +510,17 @@ forwarder_forward_via_connection(forwarder_t * forwarder, msgbuf_t * msgbuf, if (!success) { forwarder->stats.countSendFailures++; - DEBUG("forward msgbuf %p to interface %u send failure (count %u)", msgbuf, - conn_id, forwarder->stats.countSendFailures); - forwarder_drop(forwarder, msgbuf); - return; + DEBUG("forward msgbuf %llu to interface %u send failure (count %u)", + msgbuf_id, conn_id, forwarder->stats.countSendFailures); + return forwarder_drop(forwarder, msgbuf_id); } switch (msgbuf_get_type(msgbuf)) { - case MESSAGE_TYPE_INTEREST: + case MSGBUF_TYPE_INTEREST: forwarder->stats.countInterestForwarded++; break; - case MESSAGE_TYPE_DATA: + case MSGBUF_TYPE_DATA: forwarder->stats.countObjectsForwarded++; break; @@ -546,6 +532,7 @@ forwarder_forward_via_connection(forwarder_t * forwarder, msgbuf_t * msgbuf, conn_id, forwarder->stats.countInterestForwarded, forwarder->stats.countObjectsForwarded); + return (msgbuf_get_len(msgbuf)); } /** @@ -559,14 +546,16 @@ forwarder_forward_via_connection(forwarder_t * forwarder, msgbuf_t * msgbuf, static unsigned forwarder_forward_to_nexthops(forwarder_t * forwarder, - msgbuf_t *msgbuf, const nexthops_t * nexthops) + off_t msgbuf_id, const nexthops_t * nexthops) { unsigned forwardedCopies = 0; + const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); unsigned ingressId = msgbuf_get_connection_id(msgbuf); uint32_t old_path_label = 0; - if (msgbuf_get_type(msgbuf) == MESSAGE_TYPE_DATA) + if (msgbuf_get_type(msgbuf) == MSGBUF_TYPE_DATA) old_path_label = msgbuf_get_pathlabel(msgbuf); unsigned nexthop; @@ -575,13 +564,13 @@ forwarder_forward_to_nexthops(forwarder_t * forwarder, continue; forwardedCopies++; - forwarder_forward_via_connection(forwarder, msgbuf, nexthop); + forwarder_forward_via_connection(forwarder, msgbuf_id, nexthop); // everytime we send out a message we need to restore the original path // label of the message this is important because we keep a single copy // of the message (single pointer) and we modify the path label at each // send. - if (msgbuf_get_type(msgbuf) == MESSAGE_TYPE_DATA) + if (msgbuf_get_type(msgbuf) == MSGBUF_TYPE_DATA) msgbuf_set_pathlabel(msgbuf, old_path_label); }); @@ -591,12 +580,15 @@ forwarder_forward_to_nexthops(forwarder_t * forwarder, static bool -forwarder_forward_via_fib(forwarder_t * forwarder, msgbuf_t *msgbuf, +forwarder_forward_via_fib(forwarder_t * forwarder, off_t msgbuf_id, pit_verdict_t verdict) { assert(forwarder); - assert(msgbuf); - assert(msgbuf_get_type(msgbuf) == MESSAGE_TYPE_INTEREST); + assert(msgbuf_id_is_valid(msgbuf_id)); + + const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + assert(msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST); fib_entry_t *fib_entry = fib_match_message(forwarder->fib, msgbuf); if (!fib_entry) @@ -650,7 +642,7 @@ forwarder_forward_via_fib(forwarder_t * forwarder, msgbuf_t *msgbuf, entry_Release(&entry); #endif - if (forwarder_forward_to_nexthops(forwarder, msgbuf, nexthops) <= 0) { + if (forwarder_forward_to_nexthops(forwarder, msgbuf_id, nexthops) <= 0) { DEBUG("Message %p returned an emtpy next hop set", msgbuf); return false; } @@ -661,43 +653,49 @@ forwarder_forward_via_fib(forwarder_t * forwarder, msgbuf_t *msgbuf, static bool -_satisfy_from_content_store(forwarder_t * forwarder, msgbuf_t *interest_msgbuf) +_satisfy_from_cs(forwarder_t * forwarder, off_t msgbuf_id) { assert(forwarder); - assert(msgbuf_get_type(interest_msgbuf) == MESSAGE_TYPE_INTEREST); + assert(msgbuf_id_is_valid(msgbuf_id)); + + const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + const msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + + assert(msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST); - if (msgbuf_get_interest_lifetime(interest_msgbuf) == 0) + if (msgbuf_get_lifetime(msgbuf) == 0) return false; - if (!forwarder->serve_from_content_store) + if (!forwarder->serve_from_cs) return false; // See if there's a match in the store. - msgbuf_t * data_msgbuf = content_store_match(forwarder->content_store, - interest_msgbuf, ticks_now()); + off_t data_msgbuf_id = cs_match(forwarder_get_cs(forwarder), msgbuf_id, + ticks_now()); - if (!data_msgbuf) + if (msgbuf_id_is_valid(data_msgbuf_id)) return false; // Remove it from the PIT. nexthops is allocated, so need to destroy - nexthops_t * nexthops = pit_on_data(forwarder->pit, data_msgbuf); + nexthops_t * nexthops = pit_on_data(forwarder->pit, data_msgbuf_id); assert(nexthops); // Illegal state: got a null nexthops for an interest we just inserted // send message in reply, then done forwarder->stats.countInterestsSatisfiedFromStore++; - DEBUG("Message %p satisfied from content store (satisfied count %u)", - interest_msgbuf, forwarder->stats.countInterestsSatisfiedFromStore); + DEBUG("Message %lu satisfied from content store (satisfied count %u)", + msgbuf_id, forwarder->stats.countInterestsSatisfiedFromStore); + msgbuf_t * data_msgbuf = msgbuf_pool_at(msgbuf_pool, data_msgbuf_id); msgbuf_reset_pathlabel(data_msgbuf); - forwarder_forward_to_nexthops(forwarder, data_msgbuf, nexthops); + forwarder_forward_to_nexthops(forwarder, data_msgbuf_id, nexthops); return true; } /** - * @function forwarder_receive_interest + * @function forwarder_process_interest * @abstract Receive an interest from the network * @discussion * (1) if interest in the PIT, aggregate in PIT @@ -707,22 +705,34 @@ _satisfy_from_content_store(forwarder_t * forwarder, msgbuf_t *interest_msgbuf) * */ static -void -forwarder_receive_interest(forwarder_t * forwarder, msgbuf_t * msgbuf) +ssize_t +forwarder_process_interest(forwarder_t * forwarder, off_t msgbuf_id) { assert(forwarder); - assert(msgbuf); - assert(msgbuf_get_type(msgbuf) == MESSAGE_TYPE_INTEREST); + assert(msgbuf_id_is_valid(msgbuf_id)); + + const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + const msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + + assert(msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST); + + forwarder->stats.countReceived++; forwarder->stats.countInterestsReceived++; + char *nameString = name_ToString(msgbuf_get_name(msgbuf)); + DEBUG( "Message %p ingress %3u length %5u received name %s", msgbuf, + msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf), nameString); + free(nameString); + + // (1) Try to aggregate in PIT - pit_verdict_t verdict = pit_on_interest(forwarder->pit, msgbuf); + pit_verdict_t verdict = pit_on_interest(forwarder->pit, msgbuf_id); switch(verdict) { case PIT_VERDICT_AGGREGATE: forwarder->stats.countInterestsAggregated++; DEBUG("Message %p aggregated in PIT (aggregated count %u)", msgbuf, forwarder->stats.countInterestsAggregated); - return; + return msgbuf_get_len(msgbuf); case PIT_VERDICT_FORWARD: case PIT_VERDICT_RETRANSMIT: @@ -735,30 +745,30 @@ forwarder_receive_interest(forwarder_t * forwarder, msgbuf_t * msgbuf) // interest, we need to remove the PIT entry. // (2) Try to satisfy from content store - if (_satisfy_from_content_store(forwarder, msgbuf)) { + if (_satisfy_from_cs(forwarder, msgbuf_id)) { // done // If we found a content object in the CS, - // messageProcess_Satisfy_from_content_store already cleared the PIT state - return; + // messageProcess_Satisfy_from_cs already cleared the PIT state + return msgbuf_get_len(msgbuf); } // (3) Try to forward it - if (forwarder_forward_via_fib(forwarder, msgbuf, verdict)) { + if (forwarder_forward_via_fib(forwarder, msgbuf_id, verdict)) { // done - return; + return msgbuf_get_len(msgbuf); } // Remove the PIT entry? forwarder->stats.countDroppedNoRoute++; - DEBUG("Message %p did not match FIB, no route (count %u)", - msgbuf, forwarder->stats.countDroppedNoRoute); + DEBUG("Message %lu did not match FIB, no route (count %u)", + msgbuf_id, forwarder->stats.countDroppedNoRoute); - forwarder_drop(forwarder, msgbuf); + return forwarder_drop(forwarder, msgbuf_id); } /** - * @function forwarder_receive_data + * @function forwarder_process_data * @abstract Process an in-bound content object * @discussion * (1) If it does not match anything in the PIT, drop it @@ -768,19 +778,27 @@ forwarder_receive_interest(forwarder_t * forwarder, msgbuf_t * msgbuf) * @param <#param1#> */ static -void -forwarder_receive_data(forwarder_t * forwarder, - msgbuf_t *msgbuf) +ssize_t +forwarder_process_data(forwarder_t * forwarder, off_t msgbuf_id) { + const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + const msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + + char *nameString = name_ToString(msgbuf_get_name(msgbuf)); + DEBUG( "Message %lu ingress %3u length %5u received name %s", msgbuf_id, + msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf), nameString); + free(nameString); + + forwarder->stats.countReceived++; forwarder->stats.countObjectsReceived++; - nexthops_t * ingressSetUnion = pit_on_data(forwarder->pit, msgbuf); + nexthops_t * ingressSetUnion = pit_on_data(forwarder->pit, msgbuf_id); if (!ingressSetUnion) { // (1) If it does not match anything in the PIT, drop it forwarder->stats.countDroppedNoReversePath++; - DEBUG("Message %p did not match PIT, no reverse path (count %u)", - msgbuf, forwarder->stats.countDroppedNoReversePath); + DEBUG("Message %lu did not match PIT, no reverse path (count %u)", + msgbuf_id, forwarder->stats.countDroppedNoReversePath); // MOVE PROBE HOOK ELSEWHERE // XXX relationship with forwarding strategy... insert hooks @@ -803,95 +821,68 @@ forwarder_receive_data(forwarder_t * forwarder, const connection_table_t * table = forwarder_get_connection_table(forwarder); const connection_t * conn = connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf)); - if (forwarder->store_in_content_store && connection_is_local(conn)) { - content_store_add(forwarder->content_store, msgbuf, ticks_now()); + if (forwarder->store_in_cs && connection_is_local(conn)) { + cs_add(forwarder->cs, msgbuf_id, ticks_now()); DEBUG("Message %p store in CS anyway", msgbuf); } - forwarder_drop(forwarder, msgbuf); + return forwarder_drop(forwarder, msgbuf_id); } else { // (2) Add to Content Store. Store may remove expired content, if necessary, // depending on store policy. - if (forwarder->store_in_content_store) { - content_store_add(forwarder->content_store, msgbuf, ticks_now()); + if (forwarder->store_in_cs) { + cs_add(forwarder->cs, msgbuf_id, ticks_now()); } // (3) Reverse path forward via PIT entries - forwarder_forward_to_nexthops(forwarder, msgbuf, ingressSetUnion); + return forwarder_forward_to_nexthops(forwarder, msgbuf_id, ingressSetUnion); } } - -/** - * A NULL msgbuf is used to indicate the end of a batch - */ void -forwarder_receive(forwarder_t * forwarder, msgbuf_t * msgbuf) +forwarder_flush_connections(forwarder_t * forwarder) { - assert(forwarder); - - /* Send batch ? */ - if (!msgbuf) { - const connection_table_t * table = forwarder_get_connection_table(forwarder); - for (unsigned i = 0; i < forwarder->num_pending_conn; i++) { - const connection_t * conn = connection_table_at(table, forwarder->pending_conn[i]); - // flush - connection_send(conn, NULL, false); + const connection_table_t * table = forwarder_get_connection_table(forwarder); + + for (unsigned i = 0; i < forwarder->num_pending_conn; i++) { + unsigned conn_id = forwarder->pending_conn[i]; + const connection_t * conn = connection_table_at(table, conn_id); + if (!connection_flush(conn)) { + WARN("Could not flush connection queue"); + // XXX keep track of non flushed connections... } - forwarder->num_pending_conn = 0; } + forwarder->num_pending_conn = 0; +} +// XXX move to wldr file, worst case in connection. +void +forwarder_apply_wldr(const forwarder_t * forwarder, const msgbuf_t * msgbuf, connection_t * connection) +{ // this are the checks needed to implement WLDR. We set wldr only on the STAs - // and we let the AP to react according to choise of the client. + // and we let the AP to react according to choice of the client. // if the STA enables wldr using the set command, the AP enable wldr as well // otherwise, if the STA disable it the AP remove wldr // WLDR should be enabled only on the STAs using the command line // TODO // disable WLDR command line on the AP - connection_table_t * table = forwarder_get_connection_table(forwarder); - connection_t * conn = connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf)); - if (!conn) - return; - if (msgbuf_has_wldr(msgbuf)) { - if (connection_has_wldr(conn)) { + if (connection_has_wldr(connection)) { // case 1: WLDR is enabled - connection_wldr_detect_losses(conn, msgbuf); - } else if (!connection_has_wldr(conn) && - connection_wldr_autostart_is_allowed(conn)) { + connection_wldr_detect_losses(connection, msgbuf); + } else if (!connection_has_wldr(connection) && + connection_wldr_autostart_is_allowed(connection)) { // case 2: We are on an AP. We enable WLDR - connection_wldr_enable(conn, true); - connection_wldr_detect_losses(conn, msgbuf); + connection_wldr_enable(connection, true); + connection_wldr_detect_losses(connection, msgbuf); } // case 3: Ignore WLDR } else { - if (connection_has_wldr(conn) && connection_wldr_autostart_is_allowed(conn)) { + if (connection_has_wldr(connection) && connection_wldr_autostart_is_allowed(connection)) { // case 1: STA do not use WLDR, we disable it - connection_wldr_enable(conn, false); + connection_wldr_enable(connection, false); } } - - forwarder->stats.countReceived++; - - char *nameString = name_ToString(msgbuf_get_name(msgbuf)); - DEBUG( "Message %p ingress %3u length %5u received name %s", msgbuf, - msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf), nameString); - free(nameString); - - switch (msgbuf_get_type(msgbuf)) { - case MESSAGE_TYPE_INTEREST: - forwarder_receive_interest(forwarder, msgbuf); - break; - - case MESSAGE_TYPE_DATA: - forwarder_receive_data(forwarder, msgbuf); - break; - - default: - forwarder_drop(forwarder, msgbuf); - break; - } - } bool @@ -1006,7 +997,7 @@ forwarder_set_strategy(forwarder_t * forwarder, Name * name_prefix, { assert(forwarder); assert(name_prefix); - // assert(strategy_type_is_valid(strategy_type)); + assert(STRATEGY_TYPE_VALID(strategy_type)); /* strategy_options might be NULL */ fib_entry_t * entry = fib_contains(forwarder->fib, name_prefix); @@ -1016,12 +1007,12 @@ forwarder_set_strategy(forwarder_t * forwarder, Name * name_prefix, fib_entry_set_strategy(entry, strategy_type, strategy_options); } -content_store_t * -forwarder_get_content_store(const forwarder_t * forwarder) +cs_t * +forwarder_get_cs(const forwarder_t * forwarder) { assert(forwarder); - return forwarder->content_store; + return forwarder->cs; } // ======================================================= @@ -1055,10 +1046,17 @@ static void _signal_cb(int sig, PARCEventType events, void *user_data) { #endif fib_t * -forwarder_get_fib(forwarder_t * forwarder) { +forwarder_get_fib(forwarder_t * forwarder) +{ return forwarder->fib; } +msgbuf_pool_t * +forwarder_get_msgbuf_pool(const forwarder_t * forwarder) +{ + return forwarder->msgbuf_pool; +} + #ifdef WITH_MAPME void forwarder_on_connection_event(const forwarder_t * forwarder, @@ -1082,116 +1080,120 @@ forwarder_get_prefix_stats_mgr(const forwarder_t * forwarder) } #endif /* WITH_PREFIX_STATS */ -static -void -process_interest(forwarder_t * forwarder, listener_t * listener, - unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair) -{ - if (!connection_id_is_valid(conn_id)) { - conn_id = listener_create_connection(listener, pair); - } - - assert(messageHandler_GetTotalPacketLength(packet) == size); - - msgbuf_from_packet(&forwarder->msgbuf, packet, size, MESSAGE_TYPE_INTEREST, conn_id, ticks_now()); - forwarder_receive(listener->forwarder, &forwarder->msgbuf); -} +/** + * @brief Process a packet by creating the corresponding message buffer and + * dispatching it to the forwarder for further processing. + * @param[in] forwarder Forwarder instance. + * + */ +// XXX ??? XXX = process for listener as we are resolving connection id +// -static -void -process_data(forwarder_t * forwarder, listener_t * listener, - unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair) +msgbuf_type_t get_type_from_packet(uint8_t * packet) { - if (!connection_id_is_valid(conn_id)) { - INFO("Ignoring data packet associated to no connection"); - return; - } + if (messageHandler_IsTCP(packet)) { + if (messageHandler_IsData(packet)) { + return MSGBUF_TYPE_DATA; + } else if (messageHandler_IsInterest(packet)) { + return MSGBUF_TYPE_INTEREST; + } else { + return MSGBUF_TYPE_UNDEFINED; + } - assert(messageHandler_GetTotalPacketLength(packet) == size); + } else if (messageHandler_IsWldrNotification(packet)) { + return MSGBUF_TYPE_WLDR_NOTIFICATION; - msgbuf_from_packet(&forwarder->msgbuf, packet, size, MESSAGE_TYPE_DATA, conn_id, ticks_now()); - forwarder_receive(listener->forwarder, &forwarder->msgbuf); + } else if (mapme_match_packet(packet)) { + return MSGBUF_TYPE_MAPME; -} + } else if (*packet == REQUEST_LIGHT) { + return MSGBUF_TYPE_COMMAND; -static -void -process_wldr_notification(forwarder_t * forwarder, listener_t * listener, - unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair) -{ - if (!connection_id_is_valid(conn_id)) { - INFO("Ignoring WLDR notification not associated to a connection"); - return; + } else { + return MSGBUF_TYPE_UNDEFINED; } - - assert(messageHandler_GetTotalPacketLength(packet) == size); - - connection_table_t * table = forwarder_get_connection_table(forwarder); - connection_t * connection = connection_table_at(table, conn_id); - - msgbuf_from_packet(&forwarder->msgbuf, packet, size, MESSAGE_TYPE_WLDR_NOTIFICATION, conn_id, ticks_now()); - connection_wldr_handle_notification(connection, &forwarder->msgbuf); - } -static -void -process_mapme(forwarder_t * forwarder, listener_t * listener, - unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair) +ssize_t +forwarder_receive(forwarder_t * forwarder, listener_t * listener, + off_t msgbuf_id, address_pair_t * pair, Ticks now) { - if (!connection_id_is_valid(conn_id)) - conn_id = listener_create_connection(listener, pair); - mapme_process(forwarder->mapme, packet, conn_id); -} + assert(forwarder); + /* listener can be NULL */ + assert(msgbuf_id_is_valid(msgbuf_id)); + assert(pair); -static -void -process_command(const forwarder_t * forwarder, listener_t * listener, - unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair) -{ - if (!connection_id_is_valid(conn_id)) - conn_id = listener_create_connection(listener, pair); + const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); + msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); - command_type_t command_type= *(packet + 1); - if (command_type >= COMMAND_TYPE_N) { - ERROR("Invalid command"); - return; - } - forwarder_receive_command(listener->forwarder, command_type, packet, conn_id); + assert(msgbuf); -} + uint8_t * packet = msgbuf_get_packet(msgbuf); + size_t size = msgbuf_get_len(msgbuf); + assert(messageHandler_GetTotalPacketLength(packet) == size); // XXX confirm ? -// = process for listener as we are resolving connection id -// XXX this would typically be inside the forwarder -void -process_packet(forwarder_t * forwarder, listener_t * listener, uint8_t * packet, size_t size, address_pair_t * pair) -{ /* Connection lookup */ const connection_table_t * table = forwarder_get_connection_table(listener->forwarder); - const connection_t * conn = connection_table_get_by_pair(table, pair); - unsigned conn_id = conn ? connection_table_get_connection_id(table, conn): CONNECTION_ID_UNDEFINED; + connection_t * connection = connection_table_get_by_pair(table, pair); + unsigned conn_id = connection + ? connection_table_get_connection_id(table, connection) + : CONNECTION_ID_UNDEFINED; assert((conn_id != CONNECTION_ID_UNDEFINED) || listener); - // Actually hooks should be defined for each packet type to avoid this - // spaghetti code - if (messageHandler_IsTCP(packet)) { - if (messageHandler_IsData(packet)) { - process_data(forwarder, listener, conn_id, packet, size, pair); - } else if (messageHandler_IsInterest(packet)) { - process_interest(forwarder, listener, conn_id, packet, size, pair); - } else { - INFO("Unknown TCP packet received"); - forwarder_drop(forwarder, NULL); - } - } else if (messageHandler_IsWldrNotification(packet)) { - process_wldr_notification(forwarder, listener, conn_id, packet, size, pair); - } else if (mapme_match_packet(packet)) { - process_mapme(forwarder, listener, conn_id, packet, size, pair); - } else if (*packet == REQUEST_LIGHT) { - process_command(forwarder, listener, conn_id, packet, size, pair); - } else { - INFO("Unknown packet received"); - forwarder_drop(forwarder, NULL); + msgbuf_type_t type = get_type_from_packet(msgbuf_get_packet(msgbuf)); + + msgbuf->type = type; + msgbuf->connection_id = conn_id; + msgbuf->recv_ts = now; + msgbuf->refs = 1; + + switch(type) { + case MSGBUF_TYPE_INTEREST: + if (!connection_id_is_valid(msgbuf->connection_id)) + msgbuf->connection_id = listener_create_connection(listener, pair); + msgbuf->id.name = name_create_from_interest(packet); + forwarder_apply_wldr(forwarder, msgbuf, connection); + forwarder_process_interest(forwarder, msgbuf_id); + break; + + case MSGBUF_TYPE_DATA: + if (!connection_id_is_valid(msgbuf->connection_id)) + return forwarder_drop(forwarder, msgbuf_id); + msgbuf->id.name = name_create_from_data(packet); + forwarder_apply_wldr(forwarder, msgbuf, connection); + forwarder_process_data(forwarder, msgbuf_id); + break; + + case MSGBUF_TYPE_WLDR_NOTIFICATION: + if (!connection_id_is_valid(msgbuf->connection_id)) + return forwarder_drop(forwarder, msgbuf_id); + connection_wldr_handle_notification(connection, msgbuf); + return msgbuf_get_len(msgbuf); + + case MSGBUF_TYPE_MAPME: + // XXX what about acks ? + if (!connection_id_is_valid(msgbuf->connection_id)) + msgbuf->connection_id = listener_create_connection(listener, pair); + mapme_process(forwarder->mapme, msgbuf); + return msgbuf_get_len(msgbuf); + + case MSGBUF_TYPE_COMMAND: + // XXX before it used to create the connection + if (!connection_id_is_valid(msgbuf->connection_id)) + return forwarder_drop(forwarder, msgbuf_id); + msgbuf->command.type = *(packet + 1); // XXX use header + if (msgbuf->command.type >= COMMAND_TYPE_N) { + ERROR("Invalid command"); + return -msgbuf_get_len(msgbuf); + } + return configuration_receive_command(forwarder->config, msgbuf); + + case MSGBUF_TYPE_UNDEFINED: + case MSGBUF_TYPE_N: + // XXX Unexpected... shall we abort ? + return forwarder_drop(forwarder, msgbuf_id); } + + return size; } -- cgit 1.2.3-korg