aboutsummaryrefslogtreecommitdiffstats
path: root/hicn-light/src/hicn/core/forwarder.c
diff options
context:
space:
mode:
Diffstat (limited to 'hicn-light/src/hicn/core/forwarder.c')
-rw-r--r--hicn-light/src/hicn/core/forwarder.c572
1 files changed, 287 insertions, 285 deletions
diff --git a/hicn-light/src/hicn/core/forwarder.c b/hicn-light/src/hicn/core/forwarder.c
index 9c3df906d..543fc99e4 100644
--- a/hicn-light/src/hicn/core/forwarder.c
+++ b/hicn-light/src/hicn/core/forwarder.c
@@ -31,7 +31,7 @@
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
-#include <hicn/hicn-light/config.h>
+//#include <hicn/hicn-light/config.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
@@ -41,18 +41,20 @@
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
-#include <hicn/core/connection_table.h>
-#include <hicn/core/listener_table.h>
-#include <hicn/core/pit.h>
-#include <hicn/core/fib.h>
-#include <hicn/core/content_store.h>
-#include <hicn/core/forwarder.h>
-#include <hicn/core/messagePacketType.h>
+#include "connection_table.h"
+#include "content_store.h"
+#include "fib.h"
+#include "forwarder.h"
+#include "listener_table.h"
#ifdef WITH_MAPME
-#include <hicn/core/mapme.h>
+#include "mapme.h"
#endif /* WITH_MAPME */
-#include <hicn/config/configuration.h>
-#include <hicn/config/configuration_file.h>
+#include "msgbuf.h"
+#include "msgbuf_pool.h"
+#include "pit.h"
+#include "../config/configuration.h"
+#include "../config/configuration_file.h"
+#include "../io/base.h" // MAX_MSG
#ifdef WITH_PREFIX_STATS
#include <hicn/core/prefix_stats.h>
@@ -61,10 +63,8 @@
#include <hicn/core/wldr.h>
#include <hicn/util/log.h>
-#define DEFAULT_PIT_SIZE 65535
-
typedef struct {
- uint32_t countReceived;
+ uint32_t countReceived; // Interest & Data only
uint32_t countInterestsReceived;
uint32_t countObjectsReceived;
@@ -110,15 +110,16 @@ struct forwarder_s {
pit_t * pit;
- content_store_t * content_store;
+ cs_t * cs;
fib_t * fib;
+ msgbuf_pool_t * msgbuf_pool;
#ifdef WITH_MAPME
mapme_t * mapme;
#endif /* WITH_MAPME */
- bool store_in_content_store;
- bool serve_from_content_store;
+ bool store_in_cs;
+ bool serve_from_cs;
forwarder_stats_t stats;
#ifdef WITH_PREFIX_STATS
@@ -132,7 +133,7 @@ struct forwarder_s {
unsigned pending_conn[MAX_MSG];
size_t num_pending_conn;
- msgbuf_t msgbuf; /* Storage for msgbuf, which are currently processed 1 by 1 */
+ //msgbuf_t msgbuf; /* Storage for msgbuf, which are currently processed 1 by 1 */
};
@@ -172,27 +173,6 @@ forwarder_seed(forwarder_t * forwarder) {
#endif
}
-int
-init_batch_buffers(batch_buffer_t * bb)
-{
- /* Setup recvmmsg data structures. */
- for (unsigned i = 0; i < MAX_MSG; i++) {
- char *buf = &bb->buffers[i][0];
- struct iovec *iovec = &bb->iovecs[i];
- struct mmsghdr *msg = &bb->msghdr[i];
-
- msg->msg_hdr.msg_iov = iovec;
- msg->msg_hdr.msg_iovlen = 1;
-
- msg->msg_hdr.msg_name = &bb->addrs[i];
- msg->msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
-
- iovec->iov_base = &buf[0];
- iovec->iov_len = MTU;
- }
- return 0;
-}
-
forwarder_t *
forwarder_create()
{
@@ -218,22 +198,26 @@ forwarder_create()
if (!forwarder->fib)
goto ERR_FIB;
- forwarder->pit = pit_create(DEFAULT_PIT_SIZE);
+ forwarder->msgbuf_pool = msgbuf_pool_create();
+ if (!forwarder->msgbuf_pool)
+ goto ERR_PACKET_POOL;
+
+ forwarder->pit = pit_create();
if (!forwarder->pit)
goto ERR_PIT;
size_t objectStoreSize =
- configuration_content_store_get_size(forwarder_get_configuration(forwarder));
- forwarder->content_store = _content_store_create(CONTENT_STORE_TYPE_LRU,
+ configuration_cs_get_size(forwarder_get_configuration(forwarder));
+ forwarder->cs = _cs_create(CS_TYPE_LRU,
objectStoreSize, 0);
- if (!forwarder->content_store)
- goto ERR_CONTENT_STORE;
+ if (!forwarder->cs)
+ goto ERR_CS;
- // the two flags for the content_store are set to true by default. If the content_store
+ // the two flags for the cs are set to true by default. If the cs
// is active it always work as expected unless the use modifies this
// values using controller
- forwarder->store_in_content_store = true;
- forwarder->serve_from_content_store = true;
+ forwarder->store_in_cs = true;
+ forwarder->serve_from_cs = true;
#if 0
forwarder->signal_term = dispatcher_CreateSignalEvent(
@@ -293,10 +277,12 @@ ERR_MAPME:
dispatcher_Destroy(&(forwarder->dispatcher));
#endif
- content_store_free(forwarder->content_store);
-ERR_CONTENT_STORE:
+ cs_free(forwarder->cs);
+ERR_CS:
pit_free(forwarder->pit);
ERR_PIT:
+ msgbuf_pool_free(forwarder->msgbuf_pool);
+ERR_PACKET_POOL:
fib_free(forwarder->fib);
ERR_FIB:
connection_table_free(forwarder->connection_table);
@@ -335,8 +321,9 @@ forwarder_free(forwarder_t * forwarder)
dispatcher_Destroy(&(forwarder->dispatcher));
#endif
- content_store_free(forwarder->content_store);
+ cs_free(forwarder->cs);
pit_free(forwarder->pit);
+ msgbuf_pool_free(forwarder->msgbuf_pool);
fib_free(forwarder->fib);
connection_table_free(forwarder->connection_table);
listener_table_free(forwarder->listener_table);
@@ -394,63 +381,56 @@ forwarder_get_listener_table(forwarder_t * forwarder)
}
void
-forwarder_content_store_set_store(forwarder_t * forwarder, bool val)
+forwarder_cs_set_store(forwarder_t * forwarder, bool val)
{
assert(forwarder);
- forwarder->store_in_content_store = val;
+ forwarder->store_in_cs = val;
}
bool
-forwarder_content_store_get_store(forwarder_t * forwarder)
+forwarder_cs_get_store(forwarder_t * forwarder)
{
assert(forwarder);
- return forwarder->store_in_content_store;
+ return forwarder->store_in_cs;
}
void
-forwarder_content_store_set_serve(forwarder_t * forwarder, bool val)
+forwarder_cs_set_serve(forwarder_t * forwarder, bool val)
{
assert(forwarder);
- forwarder->serve_from_content_store = val;
+ forwarder->serve_from_cs = val;
}
bool
-forwarder_content_store_get_serve(forwarder_t * forwarder)
+forwarder_cs_get_serve(forwarder_t * forwarder)
{
assert(forwarder);
- return forwarder->serve_from_content_store;
+ return forwarder->serve_from_cs;
}
void
-forwarder_content_store_set_size(forwarder_t * forwarder, size_t size)
+forwarder_cs_set_size(forwarder_t * forwarder, size_t size)
{
assert(forwarder);
- content_store_free(forwarder->content_store);
+ cs_free(forwarder->cs);
// XXX TODO
#if 0
- ContentStoreConfig content_storeConfig = {.objectCapacity =
+ ContentStoreConfig csConfig = {.objectCapacity =
maximumContentStoreSize};
- forwarder->content_store =
- content_storeLRU_Create(&content_storeConfig, forwarder->logger);
+ forwarder->cs =
+ csLRU_Create(&csConfig, forwarder->logger);
#endif
}
void
-forwarder_content_store_clear(forwarder_t * forwarder)
+forwarder_cs_clear(forwarder_t * forwarder)
{
assert(forwarder);
- content_store_clear(forwarder->content_store);
-}
-
-void
-forwarder_receive_command(forwarder_t * forwarder, command_type_t command_type,
- uint8_t * packet, unsigned connection_id)
-{
- configuration_receive_command(forwarder->config, command_type, packet, connection_id);
+ cs_clear(forwarder->cs);
}
/**
@@ -465,17 +445,20 @@ forwarder_receive_command(forwarder_t * forwarder, command_type_t command_type,
*
*/
static
-void
-forwarder_drop(forwarder_t * forwarder, msgbuf_t *message)
+ssize_t
+forwarder_drop(forwarder_t * forwarder, off_t msgbuf_id)
{
forwarder->stats.countDropped++;
- switch (msgbuf_get_type(message)) {
- case MESSAGE_TYPE_INTEREST:
+ const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ const msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+
+ switch (msgbuf_get_type(msgbuf)) {
+ case MSGBUF_TYPE_INTEREST:
forwarder->stats.countInterestsDropped++;
break;
- case MESSAGE_TYPE_DATA:
+ case MSGBUF_TYPE_DATA:
forwarder->stats.countObjectsDropped++;
break;
@@ -484,6 +467,7 @@ forwarder_drop(forwarder_t * forwarder, msgbuf_t *message)
break;
}
+ return msgbuf_get_len(msgbuf);
// dont destroy message here, its done at end of receive
}
@@ -493,23 +477,26 @@ forwarder_drop(forwarder_t * forwarder, msgbuf_t *message)
*
*/
static
-void
-forwarder_forward_via_connection(forwarder_t * forwarder, msgbuf_t * msgbuf,
+ssize_t
+forwarder_forward_via_connection(forwarder_t * forwarder, off_t msgbuf_id,
unsigned conn_id)
{
connection_table_t * table = forwarder_get_connection_table(forwarder);
+
+ const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ const msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+
const connection_t * conn = connection_table_get_by_id(table, conn_id);
if (!conn) {
forwarder->stats.countDroppedConnectionNotFound++;
- DEBUG("forward msgbuf %p to interface %u not found (count %u)",
- msgbuf, conn_id, forwarder->stats.countDroppedConnectionNotFound);
- forwarder_drop(forwarder, msgbuf);
- return;
+ DEBUG("forward msgbuf %lu to interface %u not found (count %u)",
+ msgbuf_id, conn_id, forwarder->stats.countDroppedConnectionNotFound);
+ return forwarder_drop(forwarder, msgbuf_id);
}
/* Always queue the packet... */
- bool success = connection_send(conn, msgbuf, true);
+ bool success = connection_send(conn, msgbuf_id, true);
/* ... and mark the connection as pending if this is not yet the case */
unsigned i;
@@ -523,18 +510,17 @@ forwarder_forward_via_connection(forwarder_t * forwarder, msgbuf_t * msgbuf,
if (!success) {
forwarder->stats.countSendFailures++;
- DEBUG("forward msgbuf %p to interface %u send failure (count %u)", msgbuf,
- conn_id, forwarder->stats.countSendFailures);
- forwarder_drop(forwarder, msgbuf);
- return;
+ DEBUG("forward msgbuf %llu to interface %u send failure (count %u)",
+ msgbuf_id, conn_id, forwarder->stats.countSendFailures);
+ return forwarder_drop(forwarder, msgbuf_id);
}
switch (msgbuf_get_type(msgbuf)) {
- case MESSAGE_TYPE_INTEREST:
+ case MSGBUF_TYPE_INTEREST:
forwarder->stats.countInterestForwarded++;
break;
- case MESSAGE_TYPE_DATA:
+ case MSGBUF_TYPE_DATA:
forwarder->stats.countObjectsForwarded++;
break;
@@ -546,6 +532,7 @@ forwarder_forward_via_connection(forwarder_t * forwarder, msgbuf_t * msgbuf,
conn_id, forwarder->stats.countInterestForwarded,
forwarder->stats.countObjectsForwarded);
+ return (msgbuf_get_len(msgbuf));
}
/**
@@ -559,14 +546,16 @@ forwarder_forward_via_connection(forwarder_t * forwarder, msgbuf_t * msgbuf,
static
unsigned
forwarder_forward_to_nexthops(forwarder_t * forwarder,
- msgbuf_t *msgbuf, const nexthops_t * nexthops)
+ off_t msgbuf_id, const nexthops_t * nexthops)
{
unsigned forwardedCopies = 0;
+ const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
unsigned ingressId = msgbuf_get_connection_id(msgbuf);
uint32_t old_path_label = 0;
- if (msgbuf_get_type(msgbuf) == MESSAGE_TYPE_DATA)
+ if (msgbuf_get_type(msgbuf) == MSGBUF_TYPE_DATA)
old_path_label = msgbuf_get_pathlabel(msgbuf);
unsigned nexthop;
@@ -575,13 +564,13 @@ forwarder_forward_to_nexthops(forwarder_t * forwarder,
continue;
forwardedCopies++;
- forwarder_forward_via_connection(forwarder, msgbuf, nexthop);
+ forwarder_forward_via_connection(forwarder, msgbuf_id, nexthop);
// everytime we send out a message we need to restore the original path
// label of the message this is important because we keep a single copy
// of the message (single pointer) and we modify the path label at each
// send.
- if (msgbuf_get_type(msgbuf) == MESSAGE_TYPE_DATA)
+ if (msgbuf_get_type(msgbuf) == MSGBUF_TYPE_DATA)
msgbuf_set_pathlabel(msgbuf, old_path_label);
});
@@ -591,12 +580,15 @@ forwarder_forward_to_nexthops(forwarder_t * forwarder,
static
bool
-forwarder_forward_via_fib(forwarder_t * forwarder, msgbuf_t *msgbuf,
+forwarder_forward_via_fib(forwarder_t * forwarder, off_t msgbuf_id,
pit_verdict_t verdict)
{
assert(forwarder);
- assert(msgbuf);
- assert(msgbuf_get_type(msgbuf) == MESSAGE_TYPE_INTEREST);
+ assert(msgbuf_id_is_valid(msgbuf_id));
+
+ const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+ assert(msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST);
fib_entry_t *fib_entry = fib_match_message(forwarder->fib, msgbuf);
if (!fib_entry)
@@ -650,7 +642,7 @@ forwarder_forward_via_fib(forwarder_t * forwarder, msgbuf_t *msgbuf,
entry_Release(&entry);
#endif
- if (forwarder_forward_to_nexthops(forwarder, msgbuf, nexthops) <= 0) {
+ if (forwarder_forward_to_nexthops(forwarder, msgbuf_id, nexthops) <= 0) {
DEBUG("Message %p returned an emtpy next hop set", msgbuf);
return false;
}
@@ -661,43 +653,49 @@ forwarder_forward_via_fib(forwarder_t * forwarder, msgbuf_t *msgbuf,
static
bool
-_satisfy_from_content_store(forwarder_t * forwarder, msgbuf_t *interest_msgbuf)
+_satisfy_from_cs(forwarder_t * forwarder, off_t msgbuf_id)
{
assert(forwarder);
- assert(msgbuf_get_type(interest_msgbuf) == MESSAGE_TYPE_INTEREST);
+ assert(msgbuf_id_is_valid(msgbuf_id));
+
+ const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ const msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+
+ assert(msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST);
- if (msgbuf_get_interest_lifetime(interest_msgbuf) == 0)
+ if (msgbuf_get_lifetime(msgbuf) == 0)
return false;
- if (!forwarder->serve_from_content_store)
+ if (!forwarder->serve_from_cs)
return false;
// See if there's a match in the store.
- msgbuf_t * data_msgbuf = content_store_match(forwarder->content_store,
- interest_msgbuf, ticks_now());
+ off_t data_msgbuf_id = cs_match(forwarder_get_cs(forwarder), msgbuf_id,
+ ticks_now());
- if (!data_msgbuf)
+ if (msgbuf_id_is_valid(data_msgbuf_id))
return false;
// Remove it from the PIT. nexthops is allocated, so need to destroy
- nexthops_t * nexthops = pit_on_data(forwarder->pit, data_msgbuf);
+ nexthops_t * nexthops = pit_on_data(forwarder->pit, data_msgbuf_id);
assert(nexthops); // Illegal state: got a null nexthops for an interest we just inserted
// send message in reply, then done
forwarder->stats.countInterestsSatisfiedFromStore++;
- DEBUG("Message %p satisfied from content store (satisfied count %u)",
- interest_msgbuf, forwarder->stats.countInterestsSatisfiedFromStore);
+ DEBUG("Message %lu satisfied from content store (satisfied count %u)",
+ msgbuf_id, forwarder->stats.countInterestsSatisfiedFromStore);
+ msgbuf_t * data_msgbuf = msgbuf_pool_at(msgbuf_pool, data_msgbuf_id);
msgbuf_reset_pathlabel(data_msgbuf);
- forwarder_forward_to_nexthops(forwarder, data_msgbuf, nexthops);
+ forwarder_forward_to_nexthops(forwarder, data_msgbuf_id, nexthops);
return true;
}
/**
- * @function forwarder_receive_interest
+ * @function forwarder_process_interest
* @abstract Receive an interest from the network
* @discussion
* (1) if interest in the PIT, aggregate in PIT
@@ -707,22 +705,34 @@ _satisfy_from_content_store(forwarder_t * forwarder, msgbuf_t *interest_msgbuf)
*
*/
static
-void
-forwarder_receive_interest(forwarder_t * forwarder, msgbuf_t * msgbuf)
+ssize_t
+forwarder_process_interest(forwarder_t * forwarder, off_t msgbuf_id)
{
assert(forwarder);
- assert(msgbuf);
- assert(msgbuf_get_type(msgbuf) == MESSAGE_TYPE_INTEREST);
+ assert(msgbuf_id_is_valid(msgbuf_id));
+
+ const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ const msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+
+ assert(msgbuf_get_type(msgbuf) == MSGBUF_TYPE_INTEREST);
+
+ forwarder->stats.countReceived++;
forwarder->stats.countInterestsReceived++;
+ char *nameString = name_ToString(msgbuf_get_name(msgbuf));
+ DEBUG( "Message %p ingress %3u length %5u received name %s", msgbuf,
+ msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf), nameString);
+ free(nameString);
+
+
// (1) Try to aggregate in PIT
- pit_verdict_t verdict = pit_on_interest(forwarder->pit, msgbuf);
+ pit_verdict_t verdict = pit_on_interest(forwarder->pit, msgbuf_id);
switch(verdict) {
case PIT_VERDICT_AGGREGATE:
forwarder->stats.countInterestsAggregated++;
DEBUG("Message %p aggregated in PIT (aggregated count %u)",
msgbuf, forwarder->stats.countInterestsAggregated);
- return;
+ return msgbuf_get_len(msgbuf);
case PIT_VERDICT_FORWARD:
case PIT_VERDICT_RETRANSMIT:
@@ -735,30 +745,30 @@ forwarder_receive_interest(forwarder_t * forwarder, msgbuf_t * msgbuf)
// interest, we need to remove the PIT entry.
// (2) Try to satisfy from content store
- if (_satisfy_from_content_store(forwarder, msgbuf)) {
+ if (_satisfy_from_cs(forwarder, msgbuf_id)) {
// done
// If we found a content object in the CS,
- // messageProcess_Satisfy_from_content_store already cleared the PIT state
- return;
+ // messageProcess_Satisfy_from_cs already cleared the PIT state
+ return msgbuf_get_len(msgbuf);
}
// (3) Try to forward it
- if (forwarder_forward_via_fib(forwarder, msgbuf, verdict)) {
+ if (forwarder_forward_via_fib(forwarder, msgbuf_id, verdict)) {
// done
- return;
+ return msgbuf_get_len(msgbuf);
}
// Remove the PIT entry?
forwarder->stats.countDroppedNoRoute++;
- DEBUG("Message %p did not match FIB, no route (count %u)",
- msgbuf, forwarder->stats.countDroppedNoRoute);
+ DEBUG("Message %lu did not match FIB, no route (count %u)",
+ msgbuf_id, forwarder->stats.countDroppedNoRoute);
- forwarder_drop(forwarder, msgbuf);
+ return forwarder_drop(forwarder, msgbuf_id);
}
/**
- * @function forwarder_receive_data
+ * @function forwarder_process_data
* @abstract Process an in-bound content object
* @discussion
* (1) If it does not match anything in the PIT, drop it
@@ -768,19 +778,27 @@ forwarder_receive_interest(forwarder_t * forwarder, msgbuf_t * msgbuf)
* @param <#param1#>
*/
static
-void
-forwarder_receive_data(forwarder_t * forwarder,
- msgbuf_t *msgbuf)
+ssize_t
+forwarder_process_data(forwarder_t * forwarder, off_t msgbuf_id)
{
+ const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ const msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
+
+ char *nameString = name_ToString(msgbuf_get_name(msgbuf));
+ DEBUG( "Message %lu ingress %3u length %5u received name %s", msgbuf_id,
+ msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf), nameString);
+ free(nameString);
+
+ forwarder->stats.countReceived++;
forwarder->stats.countObjectsReceived++;
- nexthops_t * ingressSetUnion = pit_on_data(forwarder->pit, msgbuf);
+ nexthops_t * ingressSetUnion = pit_on_data(forwarder->pit, msgbuf_id);
if (!ingressSetUnion) {
// (1) If it does not match anything in the PIT, drop it
forwarder->stats.countDroppedNoReversePath++;
- DEBUG("Message %p did not match PIT, no reverse path (count %u)",
- msgbuf, forwarder->stats.countDroppedNoReversePath);
+ DEBUG("Message %lu did not match PIT, no reverse path (count %u)",
+ msgbuf_id, forwarder->stats.countDroppedNoReversePath);
// MOVE PROBE HOOK ELSEWHERE
// XXX relationship with forwarding strategy... insert hooks
@@ -803,95 +821,68 @@ forwarder_receive_data(forwarder_t * forwarder,
const connection_table_t * table = forwarder_get_connection_table(forwarder);
const connection_t * conn = connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf));
- if (forwarder->store_in_content_store && connection_is_local(conn)) {
- content_store_add(forwarder->content_store, msgbuf, ticks_now());
+ if (forwarder->store_in_cs && connection_is_local(conn)) {
+ cs_add(forwarder->cs, msgbuf_id, ticks_now());
DEBUG("Message %p store in CS anyway", msgbuf);
}
- forwarder_drop(forwarder, msgbuf);
+ return forwarder_drop(forwarder, msgbuf_id);
} else {
// (2) Add to Content Store. Store may remove expired content, if necessary,
// depending on store policy.
- if (forwarder->store_in_content_store) {
- content_store_add(forwarder->content_store, msgbuf, ticks_now());
+ if (forwarder->store_in_cs) {
+ cs_add(forwarder->cs, msgbuf_id, ticks_now());
}
// (3) Reverse path forward via PIT entries
- forwarder_forward_to_nexthops(forwarder, msgbuf, ingressSetUnion);
+ return forwarder_forward_to_nexthops(forwarder, msgbuf_id, ingressSetUnion);
}
}
-
-/**
- * A NULL msgbuf is used to indicate the end of a batch
- */
void
-forwarder_receive(forwarder_t * forwarder, msgbuf_t * msgbuf)
+forwarder_flush_connections(forwarder_t * forwarder)
{
- assert(forwarder);
-
- /* Send batch ? */
- if (!msgbuf) {
- const connection_table_t * table = forwarder_get_connection_table(forwarder);
- for (unsigned i = 0; i < forwarder->num_pending_conn; i++) {
- const connection_t * conn = connection_table_at(table, forwarder->pending_conn[i]);
- // flush
- connection_send(conn, NULL, false);
+ const connection_table_t * table = forwarder_get_connection_table(forwarder);
+
+ for (unsigned i = 0; i < forwarder->num_pending_conn; i++) {
+ unsigned conn_id = forwarder->pending_conn[i];
+ const connection_t * conn = connection_table_at(table, conn_id);
+ if (!connection_flush(conn)) {
+ WARN("Could not flush connection queue");
+ // XXX keep track of non flushed connections...
}
- forwarder->num_pending_conn = 0;
}
+ forwarder->num_pending_conn = 0;
+}
+// XXX move to wldr file, worst case in connection.
+void
+forwarder_apply_wldr(const forwarder_t * forwarder, const msgbuf_t * msgbuf, connection_t * connection)
+{
// this are the checks needed to implement WLDR. We set wldr only on the STAs
- // and we let the AP to react according to choise of the client.
+ // and we let the AP to react according to choice of the client.
// if the STA enables wldr using the set command, the AP enable wldr as well
// otherwise, if the STA disable it the AP remove wldr
// WLDR should be enabled only on the STAs using the command line
// TODO
// disable WLDR command line on the AP
- connection_table_t * table = forwarder_get_connection_table(forwarder);
- connection_t * conn = connection_table_get_by_id(table, msgbuf_get_connection_id(msgbuf));
- if (!conn)
- return;
-
if (msgbuf_has_wldr(msgbuf)) {
- if (connection_has_wldr(conn)) {
+ if (connection_has_wldr(connection)) {
// case 1: WLDR is enabled
- connection_wldr_detect_losses(conn, msgbuf);
- } else if (!connection_has_wldr(conn) &&
- connection_wldr_autostart_is_allowed(conn)) {
+ connection_wldr_detect_losses(connection, msgbuf);
+ } else if (!connection_has_wldr(connection) &&
+ connection_wldr_autostart_is_allowed(connection)) {
// case 2: We are on an AP. We enable WLDR
- connection_wldr_enable(conn, true);
- connection_wldr_detect_losses(conn, msgbuf);
+ connection_wldr_enable(connection, true);
+ connection_wldr_detect_losses(connection, msgbuf);
}
// case 3: Ignore WLDR
} else {
- if (connection_has_wldr(conn) && connection_wldr_autostart_is_allowed(conn)) {
+ if (connection_has_wldr(connection) && connection_wldr_autostart_is_allowed(connection)) {
// case 1: STA do not use WLDR, we disable it
- connection_wldr_enable(conn, false);
+ connection_wldr_enable(connection, false);
}
}
-
- forwarder->stats.countReceived++;
-
- char *nameString = name_ToString(msgbuf_get_name(msgbuf));
- DEBUG( "Message %p ingress %3u length %5u received name %s", msgbuf,
- msgbuf_get_connection_id(msgbuf), msgbuf_get_len(msgbuf), nameString);
- free(nameString);
-
- switch (msgbuf_get_type(msgbuf)) {
- case MESSAGE_TYPE_INTEREST:
- forwarder_receive_interest(forwarder, msgbuf);
- break;
-
- case MESSAGE_TYPE_DATA:
- forwarder_receive_data(forwarder, msgbuf);
- break;
-
- default:
- forwarder_drop(forwarder, msgbuf);
- break;
- }
-
}
bool
@@ -1006,7 +997,7 @@ forwarder_set_strategy(forwarder_t * forwarder, Name * name_prefix,
{
assert(forwarder);
assert(name_prefix);
- // assert(strategy_type_is_valid(strategy_type));
+ assert(STRATEGY_TYPE_VALID(strategy_type));
/* strategy_options might be NULL */
fib_entry_t * entry = fib_contains(forwarder->fib, name_prefix);
@@ -1016,12 +1007,12 @@ forwarder_set_strategy(forwarder_t * forwarder, Name * name_prefix,
fib_entry_set_strategy(entry, strategy_type, strategy_options);
}
-content_store_t *
-forwarder_get_content_store(const forwarder_t * forwarder)
+cs_t *
+forwarder_get_cs(const forwarder_t * forwarder)
{
assert(forwarder);
- return forwarder->content_store;
+ return forwarder->cs;
}
// =======================================================
@@ -1055,10 +1046,17 @@ static void _signal_cb(int sig, PARCEventType events, void *user_data) {
#endif
fib_t *
-forwarder_get_fib(forwarder_t * forwarder) {
+forwarder_get_fib(forwarder_t * forwarder)
+{
return forwarder->fib;
}
+msgbuf_pool_t *
+forwarder_get_msgbuf_pool(const forwarder_t * forwarder)
+{
+ return forwarder->msgbuf_pool;
+}
+
#ifdef WITH_MAPME
void
forwarder_on_connection_event(const forwarder_t * forwarder,
@@ -1082,116 +1080,120 @@ forwarder_get_prefix_stats_mgr(const forwarder_t * forwarder)
}
#endif /* WITH_PREFIX_STATS */
-static
-void
-process_interest(forwarder_t * forwarder, listener_t * listener,
- unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair)
-{
- if (!connection_id_is_valid(conn_id)) {
- conn_id = listener_create_connection(listener, pair);
- }
-
- assert(messageHandler_GetTotalPacketLength(packet) == size);
-
- msgbuf_from_packet(&forwarder->msgbuf, packet, size, MESSAGE_TYPE_INTEREST, conn_id, ticks_now());
- forwarder_receive(listener->forwarder, &forwarder->msgbuf);
-}
+/**
+ * @brief Process a packet by creating the corresponding message buffer and
+ * dispatching it to the forwarder for further processing.
+ * @param[in] forwarder Forwarder instance.
+ *
+ */
+// XXX ??? XXX = process for listener as we are resolving connection id
+//
-static
-void
-process_data(forwarder_t * forwarder, listener_t * listener,
- unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair)
+msgbuf_type_t get_type_from_packet(uint8_t * packet)
{
- if (!connection_id_is_valid(conn_id)) {
- INFO("Ignoring data packet associated to no connection");
- return;
- }
+ if (messageHandler_IsTCP(packet)) {
+ if (messageHandler_IsData(packet)) {
+ return MSGBUF_TYPE_DATA;
+ } else if (messageHandler_IsInterest(packet)) {
+ return MSGBUF_TYPE_INTEREST;
+ } else {
+ return MSGBUF_TYPE_UNDEFINED;
+ }
- assert(messageHandler_GetTotalPacketLength(packet) == size);
+ } else if (messageHandler_IsWldrNotification(packet)) {
+ return MSGBUF_TYPE_WLDR_NOTIFICATION;
- msgbuf_from_packet(&forwarder->msgbuf, packet, size, MESSAGE_TYPE_DATA, conn_id, ticks_now());
- forwarder_receive(listener->forwarder, &forwarder->msgbuf);
+ } else if (mapme_match_packet(packet)) {
+ return MSGBUF_TYPE_MAPME;
-}
+ } else if (*packet == REQUEST_LIGHT) {
+ return MSGBUF_TYPE_COMMAND;
-static
-void
-process_wldr_notification(forwarder_t * forwarder, listener_t * listener,
- unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair)
-{
- if (!connection_id_is_valid(conn_id)) {
- INFO("Ignoring WLDR notification not associated to a connection");
- return;
+ } else {
+ return MSGBUF_TYPE_UNDEFINED;
}
-
- assert(messageHandler_GetTotalPacketLength(packet) == size);
-
- connection_table_t * table = forwarder_get_connection_table(forwarder);
- connection_t * connection = connection_table_at(table, conn_id);
-
- msgbuf_from_packet(&forwarder->msgbuf, packet, size, MESSAGE_TYPE_WLDR_NOTIFICATION, conn_id, ticks_now());
- connection_wldr_handle_notification(connection, &forwarder->msgbuf);
-
}
-static
-void
-process_mapme(forwarder_t * forwarder, listener_t * listener,
- unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair)
+ssize_t
+forwarder_receive(forwarder_t * forwarder, listener_t * listener,
+ off_t msgbuf_id, address_pair_t * pair, Ticks now)
{
- if (!connection_id_is_valid(conn_id))
- conn_id = listener_create_connection(listener, pair);
- mapme_process(forwarder->mapme, packet, conn_id);
-}
+ assert(forwarder);
+ /* listener can be NULL */
+ assert(msgbuf_id_is_valid(msgbuf_id));
+ assert(pair);
-static
-void
-process_command(const forwarder_t * forwarder, listener_t * listener,
- unsigned conn_id, uint8_t * packet, size_t size, const address_pair_t * pair)
-{
- if (!connection_id_is_valid(conn_id))
- conn_id = listener_create_connection(listener, pair);
+ const msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+ msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, msgbuf_id);
- command_type_t command_type= *(packet + 1);
- if (command_type >= COMMAND_TYPE_N) {
- ERROR("Invalid command");
- return;
- }
- forwarder_receive_command(listener->forwarder, command_type, packet, conn_id);
+ assert(msgbuf);
-}
+ uint8_t * packet = msgbuf_get_packet(msgbuf);
+ size_t size = msgbuf_get_len(msgbuf);
+ assert(messageHandler_GetTotalPacketLength(packet) == size); // XXX confirm ?
-// = process for listener as we are resolving connection id
-// XXX this would typically be inside the forwarder
-void
-process_packet(forwarder_t * forwarder, listener_t * listener, uint8_t * packet, size_t size, address_pair_t * pair)
-{
/* Connection lookup */
const connection_table_t * table = forwarder_get_connection_table(listener->forwarder);
- const connection_t * conn = connection_table_get_by_pair(table, pair);
- unsigned conn_id = conn ? connection_table_get_connection_id(table, conn): CONNECTION_ID_UNDEFINED;
+ connection_t * connection = connection_table_get_by_pair(table, pair);
+ unsigned conn_id = connection
+ ? connection_table_get_connection_id(table, connection)
+ : CONNECTION_ID_UNDEFINED;
assert((conn_id != CONNECTION_ID_UNDEFINED) || listener);
- // Actually hooks should be defined for each packet type to avoid this
- // spaghetti code
- if (messageHandler_IsTCP(packet)) {
- if (messageHandler_IsData(packet)) {
- process_data(forwarder, listener, conn_id, packet, size, pair);
- } else if (messageHandler_IsInterest(packet)) {
- process_interest(forwarder, listener, conn_id, packet, size, pair);
- } else {
- INFO("Unknown TCP packet received");
- forwarder_drop(forwarder, NULL);
- }
- } else if (messageHandler_IsWldrNotification(packet)) {
- process_wldr_notification(forwarder, listener, conn_id, packet, size, pair);
- } else if (mapme_match_packet(packet)) {
- process_mapme(forwarder, listener, conn_id, packet, size, pair);
- } else if (*packet == REQUEST_LIGHT) {
- process_command(forwarder, listener, conn_id, packet, size, pair);
- } else {
- INFO("Unknown packet received");
- forwarder_drop(forwarder, NULL);
+ msgbuf_type_t type = get_type_from_packet(msgbuf_get_packet(msgbuf));
+
+ msgbuf->type = type;
+ msgbuf->connection_id = conn_id;
+ msgbuf->recv_ts = now;
+ msgbuf->refs = 1;
+
+ switch(type) {
+ case MSGBUF_TYPE_INTEREST:
+ if (!connection_id_is_valid(msgbuf->connection_id))
+ msgbuf->connection_id = listener_create_connection(listener, pair);
+ msgbuf->id.name = name_create_from_interest(packet);
+ forwarder_apply_wldr(forwarder, msgbuf, connection);
+ forwarder_process_interest(forwarder, msgbuf_id);
+ break;
+
+ case MSGBUF_TYPE_DATA:
+ if (!connection_id_is_valid(msgbuf->connection_id))
+ return forwarder_drop(forwarder, msgbuf_id);
+ msgbuf->id.name = name_create_from_data(packet);
+ forwarder_apply_wldr(forwarder, msgbuf, connection);
+ forwarder_process_data(forwarder, msgbuf_id);
+ break;
+
+ case MSGBUF_TYPE_WLDR_NOTIFICATION:
+ if (!connection_id_is_valid(msgbuf->connection_id))
+ return forwarder_drop(forwarder, msgbuf_id);
+ connection_wldr_handle_notification(connection, msgbuf);
+ return msgbuf_get_len(msgbuf);
+
+ case MSGBUF_TYPE_MAPME:
+ // XXX what about acks ?
+ if (!connection_id_is_valid(msgbuf->connection_id))
+ msgbuf->connection_id = listener_create_connection(listener, pair);
+ mapme_process(forwarder->mapme, msgbuf);
+ return msgbuf_get_len(msgbuf);
+
+ case MSGBUF_TYPE_COMMAND:
+ // XXX before it used to create the connection
+ if (!connection_id_is_valid(msgbuf->connection_id))
+ return forwarder_drop(forwarder, msgbuf_id);
+ msgbuf->command.type = *(packet + 1); // XXX use header
+ if (msgbuf->command.type >= COMMAND_TYPE_N) {
+ ERROR("Invalid command");
+ return -msgbuf_get_len(msgbuf);
+ }
+ return configuration_receive_command(forwarder->config, msgbuf);
+
+ case MSGBUF_TYPE_UNDEFINED:
+ case MSGBUF_TYPE_N:
+ // XXX Unexpected... shall we abort ?
+ return forwarder_drop(forwarder, msgbuf_id);
}
+
+ return size;
}