aboutsummaryrefslogtreecommitdiffstats
path: root/hicn-light/src/hicn/core/forwarder.c
diff options
context:
space:
mode:
Diffstat (limited to 'hicn-light/src/hicn/core/forwarder.c')
-rw-r--r--hicn-light/src/hicn/core/forwarder.c476
1 files changed, 355 insertions, 121 deletions
diff --git a/hicn-light/src/hicn/core/forwarder.c b/hicn-light/src/hicn/core/forwarder.c
index 37502f3ac..74be6431a 100644
--- a/hicn-light/src/hicn/core/forwarder.c
+++ b/hicn-light/src/hicn/core/forwarder.c
@@ -69,45 +69,9 @@
#endif /* WITH_POLICY_STATS */
#include <hicn/core/wldr.h>
+#include <hicn/core/interest_manifest.h>
#include <hicn/util/log.h>
-typedef struct {
- // Packets processed
- uint32_t countReceived; // Interest and data only
- uint32_t countInterestsReceived;
- uint32_t countObjectsReceived;
-
- // Packets Dropped
- uint32_t countDropped;
- uint32_t countInterestsDropped;
- uint32_t countObjectsDropped;
- uint32_t countOtherDropped;
-
- // Forwarding
- uint32_t countInterestForwarded;
- uint32_t countObjectsForwarded;
-
- // Errors while forwarding
- uint32_t countDroppedConnectionNotFound;
- uint32_t countSendFailures;
- uint32_t countDroppedNoRoute;
-
- // Interest processing
- uint32_t countInterestsAggregated;
- uint32_t countInterestsRetransmitted;
- uint32_t countInterestsSatisfiedFromStore;
- uint32_t countInterestsExpired;
- uint32_t countDataExpired;
-
- // Data processing
- uint32_t countDroppedNoReversePath;
-
- // TODO(eloparco): Currently not used
- // uint32_t countDroppedNoHopLimit;
- // uint32_t countDroppedZeroHopLimitFromRemote;
- // uint32_t countDroppedZeroHopLimitToRemote;
-} forwarder_stats_t;
-
struct forwarder_s {
// uint16_t server_port;
@@ -138,13 +102,12 @@ struct forwarder_s {
* The message forwarder has to decide whether to queue incoming packets for
* batching, or trigger the transmission on the connection
*/
- unsigned pending_conn[MAX_MSG];
- size_t num_pending_conn;
-
- // msgbuf_t msgbuf; /* Storage for msgbuf, which are currently processed 1 by
- // 1 */
+ unsigned *pending_conn;
subscription_table_t *subscriptions;
+
+ // Used to store the msgbufs that need to be released
+ off_t *acquired_msgbuf_ids;
};
/**
@@ -223,8 +186,12 @@ forwarder_t *forwarder_create(configuration_t *configuration) {
#endif /* WITH_POLICY_STATS */
memset(&forwarder->stats, 0, sizeof(forwarder_stats_t));
+ vector_init(forwarder->pending_conn, MAX_MSG, 0);
+ vector_init(forwarder->acquired_msgbuf_ids, MAX_MSG, 0);
- forwarder->num_pending_conn = 0;
+ char *n_suffixes_per_split_str = getenv("N_SUFFIXES_PER_SPIT");
+ if (n_suffixes_per_split_str)
+ N_SUFFIXES_PER_SPIT = atoi(n_suffixes_per_split_str);
return forwarder;
@@ -267,6 +234,8 @@ void forwarder_free(forwarder_t *forwarder) {
listener_table_free(forwarder->listener_table);
subscription_table_free(forwarder->subscriptions);
configuration_free(forwarder->config);
+ vector_free(forwarder->pending_conn);
+ vector_free(forwarder->acquired_msgbuf_ids);
free(forwarder);
}
@@ -295,6 +264,11 @@ listener_table_t *forwarder_get_listener_table(forwarder_t *forwarder) {
return forwarder->listener_table;
}
+pkt_cache_t *forwarder_get_pkt_cache(const forwarder_t *forwarder) {
+ assert(forwarder);
+ return forwarder->pkt_cache;
+}
+
void forwarder_cs_set_store(forwarder_t *forwarder, bool val) {
assert(forwarder);
forwarder->store_in_cs = val;
@@ -393,7 +367,7 @@ static ssize_t forwarder_forward_via_connection(forwarder_t *forwarder,
const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
- const connection_t *conn = connection_table_get_by_id(table, conn_id);
+ connection_t *conn = connection_table_get_by_id(table, conn_id);
if (!conn) {
forwarder->stats.countDroppedConnectionNotFound++;
@@ -429,12 +403,8 @@ static ssize_t forwarder_forward_via_connection(forwarder_t *forwarder,
#endif
/* ... and mark the connection as pending if this is not yet the case */
- unsigned i;
- for (i = 0; i < forwarder->num_pending_conn; i++) {
- if (forwarder->pending_conn[i] == conn_id) break;
- }
- if (i == forwarder->num_pending_conn) // Not found
- forwarder->pending_conn[forwarder->num_pending_conn++] = conn_id;
+ if (!vector_contains(forwarder->pending_conn, conn_id))
+ vector_push(forwarder->pending_conn, conn_id);
if (!success) {
forwarder->stats.countSendFailures++;
@@ -457,7 +427,8 @@ static ssize_t forwarder_forward_via_connection(forwarder_t *forwarder,
break;
}
- TRACE("forward msgbuf %p to interface %u", msgbuf, conn_id);
+ TRACE("forward msgbuf %p (size=%u) to interface %u", msgbuf,
+ msgbuf_get_len(msgbuf), conn_id);
return msgbuf_get_len(msgbuf);
}
@@ -496,8 +467,7 @@ static unsigned forwarder_forward_to_nexthops(forwarder_t *forwarder,
static bool forwarder_forward_via_fib(forwarder_t *forwarder, off_t msgbuf_id,
pkt_cache_verdict_t verdict,
pkt_cache_entry_t *entry) {
- assert(forwarder);
- assert(msgbuf_id_is_valid(msgbuf_id));
+ assert(forwarder && entry && msgbuf_id_is_valid(msgbuf_id));
const msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
@@ -515,15 +485,8 @@ static bool forwarder_forward_via_fib(forwarder_t *forwarder, off_t msgbuf_id,
return false;
}
- // if this is the first time that we sent this interest the pit entry would be
- // NULL. in that case we add the interest to the pit
- if (entry == NULL) {
- entry = pkt_cache_add_to_pit(forwarder->pkt_cache, msgbuf);
- }
pit_entry_t *pit_entry = &entry->u.pit_entry;
- if (!pit_entry) {
- return false;
- }
+ if (!pit_entry) return false;
pit_entry_set_fib_entry(pit_entry, fib_entry);
@@ -547,21 +510,19 @@ static bool forwarder_forward_via_fib(forwarder_t *forwarder, off_t msgbuf_id,
#endif /* ! BYPASS_FIB */
-ssize_t _forwarder_forward_upon_interest(forwarder_t *forwarder,
- msgbuf_pool_t *msgbuf_pool,
- off_t data_msgbuf_id,
- off_t interest_msgbuf_id,
- pkt_cache_entry_t *entry,
- pkt_cache_verdict_t verdict) {
+int _forwarder_forward_upon_interest(
+ forwarder_t *forwarder, msgbuf_pool_t *msgbuf_pool, off_t data_msgbuf_id,
+ off_t interest_msgbuf_id, pkt_cache_entry_t *entry,
+ pkt_cache_verdict_t verdict, bool is_aggregated) {
msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, interest_msgbuf_id);
+ // - Aggregation can be perfomed, do not forward
if (verdict == PKT_CACHE_VERDICT_AGGREGATE_INTEREST) {
- // the packet shuold not be forwarder
forwarder_drop(forwarder, interest_msgbuf_id);
return msgbuf_get_len(msgbuf);
}
- // - Forward reply if a data packet matching the interest was found
+ // - Data packet matching the interest was found, forward reply
if (verdict == PKT_CACHE_VERDICT_FORWARD_DATA) {
assert(forwarder->serve_from_cs == true);
@@ -569,19 +530,26 @@ ssize_t _forwarder_forward_upon_interest(forwarder_t *forwarder,
msgbuf_t *data_msgbuf = msgbuf_pool_at(msgbuf_pool, data_msgbuf_id);
msgbuf_reset_pathlabel(data_msgbuf);
-
forwarder_forward_via_connection(forwarder, data_msgbuf_id,
msgbuf_get_connection_id(interest_msgbuf));
+ return msgbuf_get_len(msgbuf);
+ }
+
+ // - For aggregated interest, the interest forwarding is done in
+ // `_forwarder_forward_aggregated_interest()`
+ if (is_aggregated) return msgbuf_get_len(msgbuf);
- // - Try to forward the interest
- } else if (!forwarder_forward_via_fib(forwarder, interest_msgbuf_id, verdict,
- entry)) {
+ // - Try to forward the interest
+ int rc =
+ forwarder_forward_via_fib(forwarder, interest_msgbuf_id, verdict, entry);
+ if (!rc) {
+ // - Not able to forward, drop the packet
forwarder->stats.countDroppedNoRoute++;
INFO("Message %lu did not match FIB, no route (count %u)",
interest_msgbuf_id, forwarder->stats.countDroppedNoRoute);
- // - Drop the packet (no forwarding)
forwarder_drop(forwarder, interest_msgbuf_id);
+ return -1;
}
return msgbuf_get_len(msgbuf);
@@ -590,19 +558,12 @@ ssize_t _forwarder_forward_upon_interest(forwarder_t *forwarder,
static void _forwarder_update_interest_stats(forwarder_t *forwarder,
pkt_cache_verdict_t verdict,
msgbuf_t *msgbuf,
- pkt_cache_entry_t *entry) {
- long expiration = -1;
- if (entry == NULL)
- expiration = ticks_now() + msgbuf_get_interest_lifetime(msgbuf);
- else if (entry->has_expire_ts)
- expiration = entry->expire_ts;
-
+ bool has_expire_ts,
+ uint64_t expire_ts) {
+ long expiration = has_expire_ts ? expire_ts : -1;
switch (verdict) {
case PKT_CACHE_VERDICT_FORWARD_INTEREST:
- DEBUG(
- "Message will be added to PIT (expiration=%ld), "
- "if nexthops are available",
- expiration);
+ DEBUG("Message added to PIT (expiration=%ld)", expiration);
break;
case PKT_CACHE_VERDICT_AGGREGATE_INTEREST:
@@ -632,7 +593,7 @@ static void _forwarder_update_interest_stats(forwarder_t *forwarder,
break;
case PKT_CACHE_VERDICT_ERROR:
- ERROR("Inivalid packet cache content");
+ ERROR("Invalid packet cache content");
break;
default:
@@ -640,6 +601,234 @@ static void _forwarder_update_interest_stats(forwarder_t *forwarder,
}
}
+static interest_manifest_header_t *_forwarder_get_interest_manifest(
+ msgbuf_t *msgbuf) {
+ uint8_t *packet = msgbuf_get_packet(msgbuf);
+ hicn_header_t *header = (hicn_header_t *)packet;
+ hicn_type_t type = hicn_header_to_type(header);
+
+ hicn_payload_type_t payload_type;
+ int rc = hicn_ops_vft[type.l1]->get_payload_type(type, &header->protocol,
+ &payload_type);
+ _ASSERT(rc == HICN_LIB_ERROR_NONE);
+ if (payload_type != HPT_MANIFEST) return NULL;
+
+ size_t header_length, payload_length;
+ rc = hicn_ops_vft[type.l1]->get_header_length(type, &header->protocol,
+ &header_length);
+ assert(rc == HICN_LIB_ERROR_NONE);
+
+ rc = hicn_ops_vft[type.l1]->get_payload_length(type, &header->protocol,
+ &payload_length);
+ assert(rc == HICN_LIB_ERROR_NONE);
+
+ u8 *payload = (u8 *)header + header_length;
+ interest_manifest_header_t *int_manifest_header =
+ (interest_manifest_header_t *)payload;
+ if (!interest_manifest_is_valid(int_manifest_header, payload_length))
+ return NULL;
+
+ return int_manifest_header;
+}
+
+// Manifest is split using splitting strategy, then every
+// sub-manifest is sent using the forwarding strategy defined for the prefix
+int _forwarder_forward_aggregated_interest(
+ forwarder_t *forwarder, interest_manifest_header_t *int_manifest_header,
+ int n_suffixes_to_fwd, msgbuf_t *msgbuf, off_t msgbuf_id,
+ pkt_cache_entry_t **entries) {
+ assert(msgbuf_id_is_valid(msgbuf_id) &&
+ msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST);
+
+ fib_entry_t *fib_entry = fib_match_message(forwarder->fib, msgbuf);
+ if (!fib_entry) return -1;
+
+ int n_suffixes_per_split = N_SUFFIXES_PER_SPIT;
+ switch (disaggregation_strategy) {
+ case INT_MANIFEST_SPLIT_NONE:
+ n_suffixes_per_split = int_manifest_header->n_suffixes + 1;
+
+ case INT_MANIFEST_SPLIT_MAX_N_SUFFIXES: {
+ // Generate sub-manifests: same as original manifest,
+ // but different suffix in the header and different bitmap
+
+ int total_len = 0;
+ // Suffixes in manifest plus the one in the header
+ int total_suffixes = int_manifest_header->n_suffixes + 1;
+
+ // Save copy of original bitmap to use as a reference
+ // to generate bitmaps for sub-manifests
+ u32 original_bitmap[BITMAP_SIZE] = {0};
+ memcpy(&original_bitmap, int_manifest_header->request_bitmap,
+ BITMAP_SIZE * sizeof(u32));
+
+ int suffix_index = 0; // Position of suffix in initial manifest
+ while (suffix_index < total_suffixes) {
+ // If more than one sub-manifest,
+ // clone original interest manifest and update suffix
+ if (suffix_index > 0) {
+ msgbuf_t *clone;
+ off_t clone_id =
+ msgbuf_pool_clone(forwarder->msgbuf_pool, &clone, msgbuf_id);
+ msgbuf_pool_acquire(clone);
+ forwarder_acquired_msgbuf_ids_push(forwarder, clone_id);
+
+ msgbuf_id = clone_id;
+ msgbuf = clone;
+ }
+
+ u32 curr_bitmap[BITMAP_SIZE] = {0};
+ int first_suffix_index_in_submanifest = suffix_index;
+ suffix_index = interest_manifest_update_bitmap(
+ original_bitmap, curr_bitmap, suffix_index, total_suffixes,
+ n_suffixes_per_split);
+ int first_suffix_index_in_next_submanifest = suffix_index;
+
+ // Update manifest bitmap in current msgbuf
+ interest_manifest_header_t *manifest =
+ _forwarder_get_interest_manifest(msgbuf);
+ assert(manifest != NULL);
+ memcpy(manifest->request_bitmap, curr_bitmap,
+ BITMAP_SIZE * sizeof(u32));
+ WITH_TRACE({
+ bitmap_print(manifest->request_bitmap, BITMAP_SIZE);
+ printf("\n");
+ });
+
+ // Update PIT entries for suffixes in current sub-manifest
+ nexthops_t *nexthops =
+ fib_entry_get_nexthops_from_strategy(fib_entry, msgbuf, false);
+ if (nexthops_get_curlen(nexthops) == 0) return -1;
+
+ for (int i = first_suffix_index_in_submanifest;
+ i < first_suffix_index_in_next_submanifest; i++) {
+ if (!is_bit_set(manifest->request_bitmap, i)) continue;
+
+ pit_entry_t *pit_entry = &(entries[i]->u.pit_entry);
+ if (!pit_entry) return -1;
+
+ pit_entry_set_fib_entry(pit_entry, fib_entry);
+ unsigned nexthop;
+ nexthops_foreach(nexthops, nexthop,
+ { pit_entry_egress_add(pit_entry, nexthop); });
+ }
+
+ if (forwarder_forward_to_nexthops(forwarder, msgbuf_id, nexthops) <=
+ 0) {
+ ERROR("Message %p returned an empty next hop set", msgbuf);
+ continue;
+ }
+
+ total_len += msgbuf_get_len(msgbuf);
+ }
+
+ return total_len;
+ }
+
+ default:
+ return -1;
+ }
+}
+
+static ssize_t forwarder_process_single_interest(forwarder_t *forwarder,
+ msgbuf_pool_t *msgbuf_pool,
+ msgbuf_t *msgbuf,
+ off_t msgbuf_id) {
+ pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR;
+ off_t data_msgbuf_id = INVALID_MSGBUF_ID;
+ pkt_cache_entry_t *entry = NULL;
+
+ // Update packet cache
+ pkt_cache_on_interest(forwarder->pkt_cache, msgbuf_pool, msgbuf_id, &verdict,
+ &data_msgbuf_id, &entry, msgbuf_get_name(msgbuf),
+ forwarder->serve_from_cs);
+ _forwarder_update_interest_stats(forwarder, verdict, msgbuf,
+ entry->has_expire_ts, entry->expire_ts);
+
+ int rc = _forwarder_forward_upon_interest(
+ forwarder, msgbuf_pool, data_msgbuf_id, msgbuf_id, entry, verdict, false);
+
+ // No route when trying to forward interest, remove from PIT
+ if (rc == -1)
+ pkt_cache_pit_remove_entry(forwarder->pkt_cache, entry,
+ msgbuf_get_name(msgbuf));
+
+ return msgbuf_get_len(msgbuf);
+}
+
+static ssize_t forwarder_process_aggregated_interest(
+ forwarder_t *forwarder, interest_manifest_header_t *int_manifest_header,
+ msgbuf_pool_t *msgbuf_pool, msgbuf_t *msgbuf, off_t msgbuf_id) {
+ pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR;
+ off_t data_msgbuf_id = INVALID_MSGBUF_ID;
+ pkt_cache_entry_t *entry = NULL;
+ // Save PIT entries to avoid re-doing pkt cache lookup in
+ // `_forwarder_forward_aggregated_interest()`
+ pkt_cache_entry_t *entries[BITMAP_SIZE * WORD_WIDTH];
+
+ int pos = 0; // Position of current suffix in manifest
+ int n_suffixes_to_fwd = 0;
+ u32 *suffix = (u32 *)(int_manifest_header + 1);
+ u32 seq = name_GetSegment(msgbuf_get_name(msgbuf));
+
+ Name name_copy = EMPTY_NAME;
+ name_Copy(msgbuf_get_name(msgbuf), &name_copy);
+
+ // The fist loop iteration handles the suffix in the header,
+ // the following ones handle the suffiexes in the manifest
+ while (true) {
+ if (!is_bit_set(int_manifest_header->request_bitmap, pos)) goto NEXT_SUFFIX;
+
+ // Update packet cache
+ pkt_cache_on_interest(forwarder->pkt_cache, msgbuf_pool, msgbuf_id,
+ &verdict, &data_msgbuf_id, &entry, &name_copy,
+ forwarder->serve_from_cs);
+ entries[pos] = entry;
+ _forwarder_update_interest_stats(forwarder, verdict, msgbuf,
+ entry->has_expire_ts, entry->expire_ts);
+
+ // Here only data forwarding is performed, interest forwarding is done
+ // in '_forwarder_forward_aggregated_interest()'
+ int rc =
+ _forwarder_forward_upon_interest(forwarder, msgbuf_pool, data_msgbuf_id,
+ msgbuf_id, entry, verdict, true);
+
+ // No route when trying to forward interest, remove from PIT
+ if (rc == -1)
+ pkt_cache_pit_remove_entry(forwarder->pkt_cache, entry, &name_copy);
+
+ // Unset in bitmap if no interest forwarding needed,
+ // otherwise increase count of suffixes to forward
+ if (rc == -1 || verdict == PKT_CACHE_VERDICT_AGGREGATE_INTEREST ||
+ verdict == PKT_CACHE_VERDICT_FORWARD_DATA) {
+ unset_bit(int_manifest_header->request_bitmap, pos);
+ } else {
+ n_suffixes_to_fwd++;
+ }
+
+ NEXT_SUFFIX:
+ if (pos++ >= int_manifest_header->n_suffixes) break;
+
+ // Use next segment in manifest
+ seq = *suffix;
+ suffix++;
+ name_SetSegment(&name_copy, seq);
+
+ WITH_DEBUG({
+ char *nameString = name_ToString(&name_copy);
+ DEBUG("Next in manifest: %s", nameString);
+ free(nameString);
+ });
+ }
+
+ // Return if nothing in the manifest to forward
+ if (n_suffixes_to_fwd == 0) return msgbuf_get_len(msgbuf);
+
+ return _forwarder_forward_aggregated_interest(forwarder, int_manifest_header,
+ n_suffixes_to_fwd, msgbuf,
+ msgbuf_id, entries);
+}
+
/**
* @function forwarder_process_interest
* @abstract Receive an interest from the network
@@ -657,29 +846,39 @@ static ssize_t forwarder_process_interest(forwarder_t *forwarder,
msgbuf_pool_t *msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
msgbuf_t *msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+ const connection_table_t *table = forwarder_get_connection_table(forwarder);
+ connection_t *conn =
+ connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf));
assert(msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST);
- forwarder->stats.countReceived++;
+ u32 n_suffixes = 0;
+ interest_manifest_header_t *int_manifest_header =
+ _forwarder_get_interest_manifest(msgbuf);
+ if (int_manifest_header) n_suffixes = int_manifest_header->n_suffixes;
+
+ // Update stats
forwarder->stats.countInterestsReceived++;
+ conn->stats.interests.rx_pkts++;
+ conn->stats.interests.rx_bytes += msgbuf_get_len(msgbuf);
WITH_DEBUG({
char *nameString = name_ToString(msgbuf_get_name(msgbuf));
- DEBUG("INTEREST (%s) msgbuf_id=%lu ingress=%u length=%u", nameString,
- msgbuf_id, msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf));
+ DEBUG("INTEREST (%s) suffixes=%u msgbuf_id=%lu ingress=%u length=%u",
+ nameString, n_suffixes, msgbuf_id, msgbuf_get_connection_id(msgbuf),
+ msgbuf_get_len(msgbuf));
free(nameString);
- })
-
- pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR;
- off_t data_msgbuf_id = INVALID_MSGBUF_ID;
- pkt_cache_entry_t *entry = NULL;
- pkt_cache_on_interest(forwarder->pkt_cache, msgbuf_pool, msgbuf_id, &verdict,
- &data_msgbuf_id, &entry, forwarder->serve_from_cs);
+ });
- _forwarder_update_interest_stats(forwarder, verdict, msgbuf, entry);
+ // Cache suffixes for current prefix to (possibly) avoid double lookups
+ pkt_cache_save_suffixes_for_prefix(
+ forwarder->pkt_cache, name_GetContentName(msgbuf_get_name(msgbuf)));
- return _forwarder_forward_upon_interest(
- forwarder, msgbuf_pool, data_msgbuf_id, msgbuf_id, entry, verdict);
+ if (!int_manifest_header)
+ return forwarder_process_single_interest(forwarder, msgbuf_pool, msgbuf,
+ msgbuf_id);
+ return forwarder_process_aggregated_interest(forwarder, int_manifest_header,
+ msgbuf_pool, msgbuf, msgbuf_id);
}
static void _forwarder_log_on_data(forwarder_t *forwarder,
@@ -700,7 +899,7 @@ static void _forwarder_log_on_data(forwarder_t *forwarder,
DEBUG("Message not stored in CS");
break;
case PKT_CACHE_VERDICT_ERROR:
- ERROR("Inivalid packet cache content");
+ ERROR("Invalid packet cache content");
break;
default:
break;
@@ -726,24 +925,31 @@ static ssize_t forwarder_process_data(forwarder_t *forwarder, off_t msgbuf_id) {
DEBUG("DATA (%s) msgbuf_id=%lu ingress=%u length=%u", nameString, msgbuf_id,
msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf));
free(nameString);
- )}
-
- forwarder->stats.countReceived++;
- forwarder->stats.countObjectsReceived++;
+ });
const connection_table_t *table = forwarder_get_connection_table(forwarder);
- const connection_t *conn =
+ connection_t *conn =
connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf));
+ // Update stats
+ forwarder->stats.countObjectsReceived++;
+ conn->stats.data.rx_pkts++;
+ conn->stats.data.rx_bytes += msgbuf_get_len(msgbuf);
+
+ // Cache suffixes for current prefix to (possibly) avoid double lookups
+ pkt_cache_save_suffixes_for_prefix(
+ forwarder->pkt_cache, name_GetContentName(msgbuf_get_name(msgbuf)));
+
pkt_cache_verdict_t verdict = PKT_CACHE_VERDICT_ERROR;
bool wrong_egress;
- nexthops_t *ingressSetUnion =
- pkt_cache_on_data(forwarder->pkt_cache, msgbuf_pool, msgbuf_id,
- forwarder->store_in_cs, connection_is_local(conn), &wrong_egress, &verdict);
+ nexthops_t *ingressSetUnion = pkt_cache_on_data(
+ forwarder->pkt_cache, msgbuf_pool, msgbuf_id, forwarder->store_in_cs,
+ connection_is_local(conn), &wrong_egress, &verdict);
_forwarder_log_on_data(forwarder, verdict);
- if (wrong_egress) { // Interest sent via a connection but received from another
+ if (wrong_egress) { // Interest sent via a connection but received from
+ // another
WARN("Data coming from unexpected connection, discarded");
} else if (!ingressSetUnion) { // No match in the PIT
forwarder->stats.countDroppedNoReversePath++;
@@ -776,15 +982,15 @@ void forwarder_flush_connections(forwarder_t *forwarder) {
// DEBUG("[forwarder_flush_connections]");
const connection_table_t *table = forwarder_get_connection_table(forwarder);
- for (unsigned i = 0; i < forwarder->num_pending_conn; i++) {
+ for (unsigned i = 0; i < vector_len(forwarder->pending_conn); i++) {
unsigned conn_id = forwarder->pending_conn[i];
- const connection_t *conn = connection_table_at(table, conn_id);
+ connection_t *conn = connection_table_at(table, conn_id);
if (!connection_flush(conn)) {
WARN("Could not flush connection queue");
// XXX keep track of non flushed connections...
}
}
- forwarder->num_pending_conn = 0;
+ vector_reset(forwarder->pending_conn);
// DEBUG("[forwarder_flush_connections] done");
}
@@ -957,6 +1163,23 @@ cs_t *forwarder_get_cs(const forwarder_t *forwarder) {
return pkt_cache_get_cs(forwarder->pkt_cache);
}
+// IMPORTANT: Use this function ONLY for read-only operations since a realloc
+// would otherwise modify the returned copy but not the original msgbuf ids
+// vector in the forwarder. This constraint cannot be enforced by returning a
+// (const off_t *) because the vector_t macros still cast to (void **).
+off_t *forwarder_get_acquired_msgbuf_ids(const forwarder_t *forwarder) {
+ return forwarder->acquired_msgbuf_ids;
+}
+
+void forwarder_acquired_msgbuf_ids_reset(const forwarder_t *forwarder) {
+ vector_reset(forwarder->acquired_msgbuf_ids);
+}
+
+void forwarder_acquired_msgbuf_ids_push(const forwarder_t *forwarder,
+ off_t msgbuf_id) {
+ vector_push(forwarder->acquired_msgbuf_ids, msgbuf_id);
+}
+
// =======================================================
fib_t *forwarder_get_fib(forwarder_t *forwarder) { return forwarder->fib; }
@@ -1052,14 +1275,15 @@ ssize_t forwarder_receive(forwarder_t *forwarder, listener_t *listener,
const connection_table_t *table =
forwarder_get_connection_table(listener->forwarder);
connection_t *connection = connection_table_get_by_pair(table, pair);
- unsigned conn_id = connection
- ? connection_table_get_connection_id(table, connection)
- : CONNECTION_ID_UNDEFINED;
+ unsigned conn_id = connection ? (unsigned)connection_table_get_connection_id(
+ table, connection)
+ : CONNECTION_ID_UNDEFINED;
assert((conn_id != CONNECTION_ID_UNDEFINED) || listener);
msgbuf_type_t type = get_type_from_packet(msgbuf_get_packet(msgbuf));
+ forwarder->stats.countReceived++;
msgbuf->type = type;
msgbuf->connection_id = conn_id;
msgbuf->recv_ts = now;
@@ -1067,12 +1291,14 @@ ssize_t forwarder_receive(forwarder_t *forwarder, listener_t *listener,
switch (type) {
case MSGBUF_TYPE_INTEREST:
if (!connection_id_is_valid(msgbuf->connection_id)) {
- char *conn_name = connection_table_get_random_name(table);
+ char conn_name[SYMBOLIC_NAME_LEN];
+ int rc = connection_table_get_random_name(table, conn_name);
+ if (rc < 0) return 0;
+
unsigned connection_id =
listener_create_connection(listener, conn_name, pair);
msgbuf->connection_id = connection_id;
connection = connection_table_get_by_id(table, connection_id);
- free(conn_name);
}
msgbuf->path_label = 0; // not used for interest packets
name_create_from_interest(packet, msgbuf_get_name(msgbuf));
@@ -1102,10 +1328,12 @@ ssize_t forwarder_receive(forwarder_t *forwarder, listener_t *listener,
case MSGBUF_TYPE_MAPME:
// XXX what about acks ?
if (!connection_id_is_valid(msgbuf->connection_id)) {
- char *conn_name = connection_table_get_random_name(table);
+ char conn_name[SYMBOLIC_NAME_LEN];
+ int rc = connection_table_get_random_name(table, conn_name);
+ if (rc < 0) return 0;
+
msgbuf->connection_id =
listener_create_connection(listener, conn_name, pair);
- free(conn_name);
}
mapme_process(forwarder->mapme, msgbuf);
return size;
@@ -1113,12 +1341,14 @@ ssize_t forwarder_receive(forwarder_t *forwarder, listener_t *listener,
case MSGBUF_TYPE_COMMAND:
// Create the connection to send the ack back
if (!connection_id_is_valid(msgbuf->connection_id)) {
- char *conn_name = connection_table_get_random_name(table);
+ char conn_name[SYMBOLIC_NAME_LEN];
+ int rc = connection_table_get_random_name(table, conn_name);
+ if (rc < 0) return 0;
+
unsigned connection_id =
listener_create_connection(listener, conn_name, pair);
msgbuf->connection_id = connection_id;
connection = connection_table_get_by_id(table, connection_id);
- free(conn_name);
}
msg_header_t *msg = (msg_header_t *)packet;
@@ -1163,3 +1393,7 @@ void forwarder_log(forwarder_t *forwarder) {
forwarder->stats.countInterestsExpired, forwarder->stats.countDataExpired,
forwarder->stats.countDroppedNoReversePath);
}
+
+forwarder_stats_t forwarder_get_stats(forwarder_t *forwarder) {
+ return forwarder->stats;
+} \ No newline at end of file