diff options
Diffstat (limited to 'hicn-light/src/hicn/core/forwarder.c')
-rw-r--r-- | hicn-light/src/hicn/core/forwarder.c | 476 |
1 files changed, 355 insertions, 121 deletions
diff --git a/hicn-light/src/hicn/core/forwarder.c b/hicn-light/src/hicn/core/forwarder.c index 37502f3ac..74be6431a 100644 --- a/hicn-light/src/hicn/core/forwarder.c +++ b/hicn-light/src/hicn/core/forwarder.c @@ -69,45 +69,9 @@ #endif /* WITH_POLICY_STATS */ #include <hicn/core/wldr.h> +#include <hicn/core/interest_manifest.h> #include <hicn/util/log.h> -typedef struct { - // Packets processed - uint32_t countReceived; // Interest and data only - uint32_t countInterestsReceived; - uint32_t countObjectsReceived; - - // Packets Dropped - uint32_t countDropped; - uint32_t countInterestsDropped; - uint32_t countObjectsDropped; - uint32_t countOtherDropped; - - // Forwarding - uint32_t countInterestForwarded; - uint32_t countObjectsForwarded; - - // Errors while forwarding - uint32_t countDroppedConnectionNotFound; - uint32_t countSendFailures; - uint32_t countDroppedNoRoute; - - // Interest processing - uint32_t countInterestsAggregated; - uint32_t countInterestsRetransmitted; - uint32_t countInterestsSatisfiedFromStore; - uint32_t countInterestsExpired; - uint32_t countDataExpired; - - // Data processing - uint32_t countDroppedNoReversePath; - - // TODO(eloparco): Currently not used - // uint32_t countDroppedNoHopLimit; - // uint32_t countDroppedZeroHopLimitFromRemote; - // uint32_t countDroppedZeroHopLimitToRemote; -} forwarder_stats_t; - struct forwarder_s { // uint16_t server_port; @@ -138,13 +102,12 @@ struct forwarder_s { * The message forwarder has to decide whether to queue incoming packets for * batching, or trigger the transmission on the connection */ - unsigned pending_conn[MAX_MSG]; - size_t num_pending_conn; - - // msgbuf_t msgbuf; /* Storage for msgbuf, which are currently processed 1 by - // 1 */ + unsigned *pending_conn; subscription_table_t *subscriptions; + + // Used to store the msgbufs that need to be released + off_t *acquired_msgbuf_ids; }; /** @@ -223,8 +186,12 @@ forwarder_t *forwarder_create(configuration_t *configuration) { #endif /* WITH_POLICY_STATS */ memset(&forwarder->stats, 0, sizeof(forwarder_stats_t)); + vector_init(forwarder->pending_conn, MAX_MSG, 0); + vector_init(forwarder->acquired_msgbuf_ids, MAX_MSG, 0); - forwarder->num_pending_conn = 0; + char *n_suffixes_per_split_str = getenv("N_SUFFIXES_PER_SPIT"); + if (n_suffixes_per_split_str) + N_SUFFIXES_PER_SPIT = atoi(n_suffixes_per_split_str); return forwarder; @@ -267,6 +234,8 @@ void forwarder_free(forwarder_t *forwarder) { listener_table_free(forwarder->listener_table); subscription_table_free(forwarder->subscriptions); configuration_free(forwarder->config); + vector_free(forwarder->pending_conn); + vector_free(forwarder->acquired_msgbuf_ids); free(forwarder); } @@ -295,6 +264,11 @@ listener_table_t *forwarder_get_listener_table(forwarder_t *forwarder) { return forwarder->listener_table; } +pkt_cache_t *forwarder_get_pkt_cache(const forwarder_t *forwarder) { + assert(forwarder); + return forwarder->pkt_cache; +} + void forwarder_cs_set_store(forwarder_t *forwarder, bool val) { assert(forwarder); forwarder->store_in_cs = val; @@ -393,7 +367,7 @@ static ssize_t forwarder_forward_via_connection(forwarder_t *forwarder, const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); - const connection_t *conn = connection_table_get_by_id(table, conn_id); + connection_t *conn = connection_table_get_by_id(table, conn_id); if (!conn) { forwarder->stats.countDroppedConnectionNotFound++; @@ -429,12 +403,8 @@ static ssize_t forwarder_forward_via_connection(forwarder_t *forwarder, #endif /* ... and mark the connection as pending if this is not yet the case */ - unsigned i; - for (i = 0; i < forwarder->num_pending_conn; i++) { - if (forwarder->pending_conn[i] == conn_id) break; - } - if (i == forwarder->num_pending_conn) // Not found - forwarder->pending_conn[forwarder->num_pending_conn++] = conn_id; + if (!vector_contains(forwarder->pending_conn, conn_id)) + vector_push(forwarder->pending_conn, conn_id); if (!success) { forwarder->stats.countSendFailures++; @@ -457,7 +427,8 @@ static ssize_t forwarder_forward_via_connection(forwarder_t *forwarder, break; } - TRACE("forward msgbuf %p to interface %u", msgbuf, conn_id); + TRACE("forward msgbuf %p (size=%u) to interface %u", msgbuf, + msgbuf_get_len(msgbuf), conn_id); return msgbuf_get_len(msgbuf); } @@ -496,8 +467,7 @@ static unsigned forwarder_forward_to_nexthops(forwarder_t *forwarder, static bool forwarder_forward_via_fib(forwarder_t *forwarder, off_t msgbuf_id, pkt_cache_verdict_t verdict, pkt_cache_entry_t *entry) { - assert(forwarder); - assert(msgbuf_id_is_valid(msgbuf_id)); + assert(forwarder && entry && msgbuf_id_is_valid(msgbuf_id)); const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); @@ -515,15 +485,8 @@ static bool forwarder_forward_via_fib(forwarder_t *forwarder, off_t msgbuf_id, return false; } - // if this is the first time that we sent this interest the pit entry would be - // NULL. in that case we add the interest to the pit - if (entry == NULL) { - entry = pkt_cache_add_to_pit(forwarder->pkt_cache, msgbuf); - } pit_entry_t *pit_entry = &entry->u.pit_entry; - if (!pit_entry) { - return false; - } + if (!pit_entry) return false; pit_entry_set_fib_entry(pit_entry, fib_entry); @@ -547,21 +510,19 @@ static bool forwarder_forward_via_fib(forwarder_t *forwarder, off_t msgbuf_id, #endif /* ! BYPASS_FIB */ -ssize_t _forwarder_forward_upon_interest(forwarder_t *forwarder, - msgbuf_pool_t *msgbuf_pool, - off_t data_msgbuf_id, - off_t interest_msgbuf_id, - pkt_cache_entry_t *entry, - pkt_cache_verdict_t verdict) { +int _forwarder_forward_upon_interest( + forwarder_t *forwarder, msgbuf_pool_t *msgbuf_pool, off_t data_msgbuf_id, + off_t interest_msgbuf_id, pkt_cache_entry_t *entry, + pkt_cache_verdict_t verdict, bool is_aggregated) { msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, interest_msgbuf_id); + // - Aggregation can be perfomed, do not forward if (verdict == PKT_CACHE_VERDICT_AGGREGATE_INTEREST) { - // the packet shuold not be forwarder forwarder_drop(forwarder, interest_msgbuf_id); return msgbuf_get_len(msgbuf); } - // - Forward reply if a data packet matching the interest was found + // - Data packet matching the interest was found, forward reply if (verdict == PKT_CACHE_VERDICT_FORWARD_DATA) { assert(forwarder->serve_from_cs == true); @@ -569,19 +530,26 @@ ssize_t _forwarder_forward_upon_interest(forwarder_t *forwarder, msgbuf_t *data_msgbuf = msgbuf_pool_at(msgbuf_pool, data_msgbuf_id); msgbuf_reset_pathlabel(data_msgbuf); - forwarder_forward_via_connection(forwarder, data_msgbuf_id, msgbuf_get_connection_id(interest_msgbuf)); + return msgbuf_get_len(msgbuf); + } + + // - For aggregated interest, the interest forwarding is done in + // `_forwarder_forward_aggregated_interest()` + if (is_aggregated) return msgbuf_get_len(msgbuf); - // - Try to forward the interest - } else if (!forwarder_forward_via_fib(forwarder, interest_msgbuf_id, verdict, - entry)) { + // - Try to forward the interest + int rc = + forwarder_forward_via_fib(forwarder, interest_msgbuf_id, verdict, entry); + if (!rc) { + // - Not able to forward, drop the packet forwarder->stats.countDroppedNoRoute++; INFO("Message %lu did not match FIB, no route (count %u)", interest_msgbuf_id, forwarder->stats.countDroppedNoRoute); - // - Drop the packet (no forwarding) forwarder_drop(forwarder, interest_msgbuf_id); + return -1; } return msgbuf_get_len(msgbuf); @@ -590,19 +558,12 @@ ssize_t _forwarder_forward_upon_interest(forwarder_t *forwarder, static void _forwarder_update_interest_stats(forwarder_t *forwarder, pkt_cache_verdict_t verdict, msgbuf_t *msgbuf, - pkt_cache_entry_t *entry) { - long expiration = -1; - if (entry == NULL) - expiration = ticks_now() + msgbuf_get_interest_lifetime(msgbuf); - else if (entry->has_expire_ts) - expiration = entry->expire_ts; - + bool has_expire_ts, + uint64_t expire_ts) { + long expiration = has_expire_ts ? expire_ts : -1; switch (verdict) { case PKT_CACHE_VERDICT_FORWARD_INTEREST: - DEBUG( - "Message will be added to PIT (expiration=%ld), " - "if nexthops are available", - expiration); + DEBUG("Message added to PIT (expiration=%ld)", expiration); break; case PKT_CACHE_VERDICT_AGGREGATE_INTEREST: @@ -632,7 +593,7 @@ static void _forwarder_update_interest_stats(forwarder_t *forwarder, break; case PKT_CACHE_VERDICT_ERROR: - ERROR("Inivalid packet cache content"); + ERROR("Invalid packet cache content"); break; default: @@ -640,6 +601,234 @@ static void _forwarder_update_interest_stats(forwarder_t *forwarder, } } +static interest_manifest_header_t *_forwarder_get_interest_manifest( + msgbuf_t *msgbuf) { + uint8_t *packet = msgbuf_get_packet(msgbuf); + hicn_header_t *header = (hicn_header_t *)packet; + hicn_type_t type = hicn_header_to_type(header); + + hicn_payload_type_t payload_type; + int rc = hicn_ops_vft[type.l1]->get_payload_type(type, &header->protocol, + &payload_type); + _ASSERT(rc == HICN_LIB_ERROR_NONE); + if (payload_type != HPT_MANIFEST) return NULL; + + size_t header_length, payload_length; + rc = hicn_ops_vft[type.l1]->get_header_length(type, &header->protocol, + &header_length); + assert(rc == HICN_LIB_ERROR_NONE); + + rc = hicn_ops_vft[type.l1]->get_payload_length(type, &header->protocol, + &payload_length); + assert(rc == HICN_LIB_ERROR_NONE); + + u8 *payload = (u8 *)header + header_length; + interest_manifest_header_t *int_manifest_header = + (interest_manifest_header_t *)payload; + if (!interest_manifest_is_valid(int_manifest_header, payload_length)) + return NULL; + + return int_manifest_header; +} + +// Manifest is split using splitting strategy, then every +// sub-manifest is sent using the forwarding strategy defined for the prefix +int _forwarder_forward_aggregated_interest( + forwarder_t *forwarder, interest_manifest_header_t *int_manifest_header, + int n_suffixes_to_fwd, msgbuf_t *msgbuf, off_t msgbuf_id, + pkt_cache_entry_t **entries) { + assert(msgbuf_id_is_valid(msgbuf_id) && + msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST); + + fib_entry_t *fib_entry = fib_match_message(forwarder->fib, msgbuf); + if (!fib_entry) return -1; + + int n_suffixes_per_split = N_SUFFIXES_PER_SPIT; + switch (disaggregation_strategy) { + case INT_MANIFEST_SPLIT_NONE: + n_suffixes_per_split = int_manifest_header->n_suffixes + 1; + + case INT_MANIFEST_SPLIT_MAX_N_SUFFIXES: { + // Generate sub-manifests: same as original manifest, + // but different suffix in the header and different bitmap + + int total_len = 0; + // Suffixes in manifest plus the one in the header + int total_suffixes = int_manifest_header->n_suffixes + 1; + + // Save copy of original bitmap to use as a reference + // to generate bitmaps for sub-manifests + u32 original_bitmap[BITMAP_SIZE] = {0}; + memcpy(&original_bitmap, int_manifest_header->request_bitmap, + BITMAP_SIZE * sizeof(u32)); + + int suffix_index = 0; // Position of suffix in initial manifest + while (suffix_index < total_suffixes) { + // If more than one sub-manifest, + // clone original interest manifest and update suffix + if (suffix_index > 0) { + msgbuf_t *clone; + off_t clone_id = + msgbuf_pool_clone(forwarder->msgbuf_pool, &clone, msgbuf_id); + msgbuf_pool_acquire(clone); + forwarder_acquired_msgbuf_ids_push(forwarder, clone_id); + + msgbuf_id = clone_id; + msgbuf = clone; + } + + u32 curr_bitmap[BITMAP_SIZE] = {0}; + int first_suffix_index_in_submanifest = suffix_index; + suffix_index = interest_manifest_update_bitmap( + original_bitmap, curr_bitmap, suffix_index, total_suffixes, + n_suffixes_per_split); + int first_suffix_index_in_next_submanifest = suffix_index; + + // Update manifest bitmap in current msgbuf + interest_manifest_header_t *manifest = + _forwarder_get_interest_manifest(msgbuf); + assert(manifest != NULL); + memcpy(manifest->request_bitmap, curr_bitmap, + BITMAP_SIZE * sizeof(u32)); + WITH_TRACE({ + bitmap_print(manifest->request_bitmap, BITMAP_SIZE); + printf("\n"); + }); + + // Update PIT entries for suffixes in current sub-manifest + nexthops_t *nexthops = + fib_entry_get_nexthops_from_strategy(fib_entry, msgbuf, false); + if (nexthops_get_curlen(nexthops) == 0) return -1; + + for (int i = first_suffix_index_in_submanifest; + i < first_suffix_index_in_next_submanifest; i++) { + if (!is_bit_set(manifest->request_bitmap, i)) continue; + + pit_entry_t *pit_entry = &(entries[i]->u.pit_entry); + if (!pit_entry) return -1; + + pit_entry_set_fib_entry(pit_entry, fib_entry); + unsigned nexthop; + nexthops_foreach(nexthops, nexthop, + { pit_entry_egress_add(pit_entry, nexthop); }); + } + + if (forwarder_forward_to_nexthops(forwarder, msgbuf_id, nexthops) <= + 0) { + ERROR("Message %p returned an empty next hop set", msgbuf); + continue; + } + + total_len += msgbuf_get_len(msgbuf); + } + + return total_len; + } + + default: + return -1; + } +} + +static ssize_t forwarder_process_single_interest(forwarder_t *forwarder, + msgbuf_pool_t *msgbuf_pool, + msgbuf_t *msgbuf, + off_t msgbuf_id) { + pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR; + off_t data_msgbuf_id = INVALID_MSGBUF_ID; + pkt_cache_entry_t *entry = NULL; + + // Update packet cache + pkt_cache_on_interest(forwarder->pkt_cache, msgbuf_pool, msgbuf_id, &verdict, + &data_msgbuf_id, &entry, msgbuf_get_name(msgbuf), + forwarder->serve_from_cs); + _forwarder_update_interest_stats(forwarder, verdict, msgbuf, + entry->has_expire_ts, entry->expire_ts); + + int rc = _forwarder_forward_upon_interest( + forwarder, msgbuf_pool, data_msgbuf_id, msgbuf_id, entry, verdict, false); + + // No route when trying to forward interest, remove from PIT + if (rc == -1) + pkt_cache_pit_remove_entry(forwarder->pkt_cache, entry, + msgbuf_get_name(msgbuf)); + + return msgbuf_get_len(msgbuf); +} + +static ssize_t forwarder_process_aggregated_interest( + forwarder_t *forwarder, interest_manifest_header_t *int_manifest_header, + msgbuf_pool_t *msgbuf_pool, msgbuf_t *msgbuf, off_t msgbuf_id) { + pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR; + off_t data_msgbuf_id = INVALID_MSGBUF_ID; + pkt_cache_entry_t *entry = NULL; + // Save PIT entries to avoid re-doing pkt cache lookup in + // `_forwarder_forward_aggregated_interest()` + pkt_cache_entry_t *entries[BITMAP_SIZE * WORD_WIDTH]; + + int pos = 0; // Position of current suffix in manifest + int n_suffixes_to_fwd = 0; + u32 *suffix = (u32 *)(int_manifest_header + 1); + u32 seq = name_GetSegment(msgbuf_get_name(msgbuf)); + + Name name_copy = EMPTY_NAME; + name_Copy(msgbuf_get_name(msgbuf), &name_copy); + + // The fist loop iteration handles the suffix in the header, + // the following ones handle the suffiexes in the manifest + while (true) { + if (!is_bit_set(int_manifest_header->request_bitmap, pos)) goto NEXT_SUFFIX; + + // Update packet cache + pkt_cache_on_interest(forwarder->pkt_cache, msgbuf_pool, msgbuf_id, + &verdict, &data_msgbuf_id, &entry, &name_copy, + forwarder->serve_from_cs); + entries[pos] = entry; + _forwarder_update_interest_stats(forwarder, verdict, msgbuf, + entry->has_expire_ts, entry->expire_ts); + + // Here only data forwarding is performed, interest forwarding is done + // in '_forwarder_forward_aggregated_interest()' + int rc = + _forwarder_forward_upon_interest(forwarder, msgbuf_pool, data_msgbuf_id, + msgbuf_id, entry, verdict, true); + + // No route when trying to forward interest, remove from PIT + if (rc == -1) + pkt_cache_pit_remove_entry(forwarder->pkt_cache, entry, &name_copy); + + // Unset in bitmap if no interest forwarding needed, + // otherwise increase count of suffixes to forward + if (rc == -1 || verdict == PKT_CACHE_VERDICT_AGGREGATE_INTEREST || + verdict == PKT_CACHE_VERDICT_FORWARD_DATA) { + unset_bit(int_manifest_header->request_bitmap, pos); + } else { + n_suffixes_to_fwd++; + } + + NEXT_SUFFIX: + if (pos++ >= int_manifest_header->n_suffixes) break; + + // Use next segment in manifest + seq = *suffix; + suffix++; + name_SetSegment(&name_copy, seq); + + WITH_DEBUG({ + char *nameString = name_ToString(&name_copy); + DEBUG("Next in manifest: %s", nameString); + free(nameString); + }); + } + + // Return if nothing in the manifest to forward + if (n_suffixes_to_fwd == 0) return msgbuf_get_len(msgbuf); + + return _forwarder_forward_aggregated_interest(forwarder, int_manifest_header, + n_suffixes_to_fwd, msgbuf, + msgbuf_id, entries); +} + /** * @function forwarder_process_interest * @abstract Receive an interest from the network @@ -657,29 +846,39 @@ static ssize_t forwarder_process_interest(forwarder_t *forwarder, msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder); msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id); + const connection_table_t *table = forwarder_get_connection_table(forwarder); + connection_t *conn = + connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf)); assert(msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST); - forwarder->stats.countReceived++; + u32 n_suffixes = 0; + interest_manifest_header_t *int_manifest_header = + _forwarder_get_interest_manifest(msgbuf); + if (int_manifest_header) n_suffixes = int_manifest_header->n_suffixes; + + // Update stats forwarder->stats.countInterestsReceived++; + conn->stats.interests.rx_pkts++; + conn->stats.interests.rx_bytes += msgbuf_get_len(msgbuf); WITH_DEBUG({ char *nameString = name_ToString(msgbuf_get_name(msgbuf)); - DEBUG("INTEREST (%s) msgbuf_id=%lu ingress=%u length=%u", nameString, - msgbuf_id, msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf)); + DEBUG("INTEREST (%s) suffixes=%u msgbuf_id=%lu ingress=%u length=%u", + nameString, n_suffixes, msgbuf_id, msgbuf_get_connection_id(msgbuf), + msgbuf_get_len(msgbuf)); free(nameString); - }) - - pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR; - off_t data_msgbuf_id = INVALID_MSGBUF_ID; - pkt_cache_entry_t *entry = NULL; - pkt_cache_on_interest(forwarder->pkt_cache, msgbuf_pool, msgbuf_id, &verdict, - &data_msgbuf_id, &entry, forwarder->serve_from_cs); + }); - _forwarder_update_interest_stats(forwarder, verdict, msgbuf, entry); + // Cache suffixes for current prefix to (possibly) avoid double lookups + pkt_cache_save_suffixes_for_prefix( + forwarder->pkt_cache, name_GetContentName(msgbuf_get_name(msgbuf))); - return _forwarder_forward_upon_interest( - forwarder, msgbuf_pool, data_msgbuf_id, msgbuf_id, entry, verdict); + if (!int_manifest_header) + return forwarder_process_single_interest(forwarder, msgbuf_pool, msgbuf, + msgbuf_id); + return forwarder_process_aggregated_interest(forwarder, int_manifest_header, + msgbuf_pool, msgbuf, msgbuf_id); } static void _forwarder_log_on_data(forwarder_t *forwarder, @@ -700,7 +899,7 @@ static void _forwarder_log_on_data(forwarder_t *forwarder, DEBUG("Message not stored in CS"); break; case PKT_CACHE_VERDICT_ERROR: - ERROR("Inivalid packet cache content"); + ERROR("Invalid packet cache content"); break; default: break; @@ -726,24 +925,31 @@ static ssize_t forwarder_process_data(forwarder_t *forwarder, off_t msgbuf_id) { DEBUG("DATA (%s) msgbuf_id=%lu ingress=%u length=%u", nameString, msgbuf_id, msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf)); free(nameString); - )} - - forwarder->stats.countReceived++; - forwarder->stats.countObjectsReceived++; + }); const connection_table_t *table = forwarder_get_connection_table(forwarder); - const connection_t *conn = + connection_t *conn = connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf)); + // Update stats + forwarder->stats.countObjectsReceived++; + conn->stats.data.rx_pkts++; + conn->stats.data.rx_bytes += msgbuf_get_len(msgbuf); + + // Cache suffixes for current prefix to (possibly) avoid double lookups + pkt_cache_save_suffixes_for_prefix( + forwarder->pkt_cache, name_GetContentName(msgbuf_get_name(msgbuf))); + pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR; bool wrong_egress; - nexthops_t *ingressSetUnion = - pkt_cache_on_data(forwarder->pkt_cache, msgbuf_pool, msgbuf_id, - forwarder->store_in_cs, connection_is_local(conn), &wrong_egress, &verdict); + nexthops_t *ingressSetUnion = pkt_cache_on_data( + forwarder->pkt_cache, msgbuf_pool, msgbuf_id, forwarder->store_in_cs, + connection_is_local(conn), &wrong_egress, &verdict); _forwarder_log_on_data(forwarder, verdict); - if (wrong_egress) { // Interest sent via a connection but received from another + if (wrong_egress) { // Interest sent via a connection but received from + // another WARN("Data coming from unexpected connection, discarded"); } else if (!ingressSetUnion) { // No match in the PIT forwarder->stats.countDroppedNoReversePath++; @@ -776,15 +982,15 @@ void forwarder_flush_connections(forwarder_t *forwarder) { // DEBUG("[forwarder_flush_connections]"); const connection_table_t *table = forwarder_get_connection_table(forwarder); - for (unsigned i = 0; i < forwarder->num_pending_conn; i++) { + for (unsigned i = 0; i < vector_len(forwarder->pending_conn); i++) { unsigned conn_id = forwarder->pending_conn[i]; - const connection_t *conn = connection_table_at(table, conn_id); + connection_t *conn = connection_table_at(table, conn_id); if (!connection_flush(conn)) { WARN("Could not flush connection queue"); // XXX keep track of non flushed connections... } } - forwarder->num_pending_conn = 0; + vector_reset(forwarder->pending_conn); // DEBUG("[forwarder_flush_connections] done"); } @@ -957,6 +1163,23 @@ cs_t *forwarder_get_cs(const forwarder_t *forwarder) { return pkt_cache_get_cs(forwarder->pkt_cache); } +// IMPORTANT: Use this function ONLY for read-only operations since a realloc +// would otherwise modify the returned copy but not the original msgbuf ids +// vector in the forwarder. This constraint cannot be enforced by returning a +// (const off_t *) because the vector_t macros still cast to (void **). +off_t *forwarder_get_acquired_msgbuf_ids(const forwarder_t *forwarder) { + return forwarder->acquired_msgbuf_ids; +} + +void forwarder_acquired_msgbuf_ids_reset(const forwarder_t *forwarder) { + vector_reset(forwarder->acquired_msgbuf_ids); +} + +void forwarder_acquired_msgbuf_ids_push(const forwarder_t *forwarder, + off_t msgbuf_id) { + vector_push(forwarder->acquired_msgbuf_ids, msgbuf_id); +} + // ======================================================= fib_t *forwarder_get_fib(forwarder_t *forwarder) { return forwarder->fib; } @@ -1052,14 +1275,15 @@ ssize_t forwarder_receive(forwarder_t *forwarder, listener_t *listener, const connection_table_t *table = forwarder_get_connection_table(listener->forwarder); connection_t *connection = connection_table_get_by_pair(table, pair); - unsigned conn_id = connection - ? connection_table_get_connection_id(table, connection) - : CONNECTION_ID_UNDEFINED; + unsigned conn_id = connection ? (unsigned)connection_table_get_connection_id( + table, connection) + : CONNECTION_ID_UNDEFINED; assert((conn_id != CONNECTION_ID_UNDEFINED) || listener); msgbuf_type_t type = get_type_from_packet(msgbuf_get_packet(msgbuf)); + forwarder->stats.countReceived++; msgbuf->type = type; msgbuf->connection_id = conn_id; msgbuf->recv_ts = now; @@ -1067,12 +1291,14 @@ ssize_t forwarder_receive(forwarder_t *forwarder, listener_t *listener, switch (type) { case MSGBUF_TYPE_INTEREST: if (!connection_id_is_valid(msgbuf->connection_id)) { - char *conn_name = connection_table_get_random_name(table); + char conn_name[SYMBOLIC_NAME_LEN]; + int rc = connection_table_get_random_name(table, conn_name); + if (rc < 0) return 0; + unsigned connection_id = listener_create_connection(listener, conn_name, pair); msgbuf->connection_id = connection_id; connection = connection_table_get_by_id(table, connection_id); - free(conn_name); } msgbuf->path_label = 0; // not used for interest packets name_create_from_interest(packet, msgbuf_get_name(msgbuf)); @@ -1102,10 +1328,12 @@ ssize_t forwarder_receive(forwarder_t *forwarder, listener_t *listener, case MSGBUF_TYPE_MAPME: // XXX what about acks ? if (!connection_id_is_valid(msgbuf->connection_id)) { - char *conn_name = connection_table_get_random_name(table); + char conn_name[SYMBOLIC_NAME_LEN]; + int rc = connection_table_get_random_name(table, conn_name); + if (rc < 0) return 0; + msgbuf->connection_id = listener_create_connection(listener, conn_name, pair); - free(conn_name); } mapme_process(forwarder->mapme, msgbuf); return size; @@ -1113,12 +1341,14 @@ ssize_t forwarder_receive(forwarder_t *forwarder, listener_t *listener, case MSGBUF_TYPE_COMMAND: // Create the connection to send the ack back if (!connection_id_is_valid(msgbuf->connection_id)) { - char *conn_name = connection_table_get_random_name(table); + char conn_name[SYMBOLIC_NAME_LEN]; + int rc = connection_table_get_random_name(table, conn_name); + if (rc < 0) return 0; + unsigned connection_id = listener_create_connection(listener, conn_name, pair); msgbuf->connection_id = connection_id; connection = connection_table_get_by_id(table, connection_id); - free(conn_name); } msg_header_t *msg = (msg_header_t *)packet; @@ -1163,3 +1393,7 @@ void forwarder_log(forwarder_t *forwarder) { forwarder->stats.countInterestsExpired, forwarder->stats.countDataExpired, forwarder->stats.countDroppedNoReversePath); } + +forwarder_stats_t forwarder_get_stats(forwarder_t *forwarder) { + return forwarder->stats; +}
\ No newline at end of file |