From dfd7ce27fea04c1a76844e21286c2b1d6653e153 Mon Sep 17 00:00:00 2001 From: Jim Gibson Date: Mon, 20 Feb 2017 11:53:54 -0500 Subject: Initial Commit: VPP cicn VPP plugin Change-Id: If1b965f0a4b7cfacda8f6caf6925072a9007ffb4 Signed-off-by: Jim Gibson --- cicn-plugin/cicn/node.c | 2031 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 2031 insertions(+) create mode 100644 cicn-plugin/cicn/node.c (limited to 'cicn-plugin/cicn/node.c') diff --git a/cicn-plugin/cicn/node.c b/cicn-plugin/cicn/node.c new file mode 100644 index 00000000..922ebf4b --- /dev/null +++ b/cicn-plugin/cicn/node.c @@ -0,0 +1,2031 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * node.c - icn plug-in nodes for vpp + */ + +#include "cicn_rte_mbuf.h" // needed first because vlib.h defs conflict +#include +#include + +#include +#include +#include +#include +#include + +int cicn_buftrc = 0; // make permanent or delete? Set to 1 to enable trace + +#define CICN_IP_TTL_DEFAULT 128 + +/* + * First forwarder worker node starts here + */ + +/* Trace context struct */ +typedef struct +{ + u32 next_index; + u32 sw_if_index; + u8 pkt_type; + u16 msg_type; +} icnfwd_trace_t; + +/* packet trace format function */ +static u8 * +icnfwd_format_trace (u8 * s, va_list * args) +{ + CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); + CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); + icnfwd_trace_t *t = va_arg (*args, icnfwd_trace_t *); + + s = format (s, "ICNFWD: pkt: %d, msg %d, sw_if_index %d, next index %d", + (int) t->pkt_type, (int) t->msg_type, + t->sw_if_index, t->next_index); + return (s); +} + +/* + * Node context data; we think this is per-thread/instance + */ +typedef struct icnfwd_runtime_s +{ + /* TODO -- use this more when we add shards */ + int id; + cicn_pit_cs_t pitcs; +} icnfwd_runtime_t; + +/* Registration struct for a graph node */ +vlib_node_registration_t icnfwd_node; + +/* Next graph nodes, which reference the list in the actual registration + * block below + */ +typedef enum +{ + ICNFWD_NEXT_LOOKUP, + ICNFWD_NEXT_ERROR_DROP, + ICNFWD_N_NEXT, +} icnfwd_next_t; + +/* Stats string values */ +static char *icnfwd_error_strings[] = { +#define _(sym,string) string, + foreach_icnfwd_error +#undef _ +}; + +/* Prototypes */ +static int cicn_trim_cs_lru (vlib_main_t * vm, vlib_node_runtime_t * node, + cicn_pit_cs_t * pit); + +/* + * + */ +static void +update_node_counter (vlib_main_t * vm, u32 node_idx, u32 counter_idx, u64 val) +{ + vlib_node_t *node = vlib_get_node (vm, node_idx); + vlib_error_main_t *em = &(vm->error_main); + u32 base_idx = node->error_heap_index; + + em->counters[base_idx + counter_idx] = val; +} + +/* + * Prepare a packet buffer for the CS. We'll clone this mbuf and use a + * newly-allocated mbuf to hold the header/rewrite info needed to send + * each packet out. + */ +static int +prep_buffer_for_cs (vlib_main_t * vm, vlib_buffer_t * b0) +{ + int ret = EINVAL; + + /* Update the CS mbuf (and vlib buffer) so that it includes only the + * ICN payload + */ + + /* Advance the vlib buffer to the beginning of the ICN payload */ + vlib_buffer_advance (b0, sizeof (ip4_header_t) + sizeof (udp_header_t)); + + cicn_infra_vlib_buffer_cs_prep_finalize (vm, b0); + ret = AOK; + + return (ret); +} + +/* + * Clone a packet being referenced in a CS entry, using another packet + * (received interest packet) as a header to hold content response + * rewrite info and pointer to cloned cs entry buffer. + */ +static int +cicn_clone_cs_buffer (vlib_buffer_t * hdr_b0, const cicn_pcs_entry_t * pcs, + vlib_main_t * vm, vlib_buffer_free_list_t * fl, + unsigned socket_id, cicn_face_db_entry_t * outface) +{ + int ret = EINVAL; + vlib_buffer_t *cs_b0, *clone_b0; + + BUFTRC ("CS-H-SW", GBI (vm, hdr_b0)); + if (PREDICT_FALSE (pcs->u.cs.cs_pkt_buf == 0)) + { + goto done; + } + BUFTRC ("CS-H-CS", pcs->u.cs.cs_pkt_buf); + + cs_b0 = vlib_get_buffer (vm, pcs->u.cs.cs_pkt_buf); + + /* Clone the buf held in the CS */ + clone_b0 = cicn_infra_vlib_buffer_clone (cs_b0, vm, fl, socket_id, outface); + BUFTRC ("CS-H-CL", GBI (vm, clone_b0)); + if (PREDICT_FALSE (clone_b0 == 0)) + { + /* If we can't get a buf, we can't continue */ + goto done; + } + + /* At this point, the base CS buffer is pointing at the ICN payload + * part of the packet, and we'll be using the other buffer + * to hold the egress/tx rewrite info. + */ + hdr_b0->current_data = 0; + hdr_b0->current_length = sizeof (ip4_header_t) + sizeof (udp_header_t); + hdr_b0->flags |= VLIB_BUFFER_NEXT_PRESENT; + if (outface->swif_is_dpdk_driver) + { + ASSERT ((hdr_b0->flags & VNET_BUFFER_RTE_MBUF_VALID) != 0); + } + hdr_b0->total_length_not_including_first_buffer = + vlib_buffer_length_in_chain (vm, cs_b0); + + /* Connect the header particle to the body */ + hdr_b0->next_buffer = vlib_get_buffer_index (vm, clone_b0); + + cicn_infra_vlib_buffer_clone_attach_finalize (hdr_b0, clone_b0, outface); + + /* Looks like success */ + ret = AOK; + +done: + + return (ret); +} + +/* + * ICN forwarder node: handling of Interests and Content Msgs delivered + * based on udp_register_dst_port(). + * - 1 packet at a time + * - ipv4 udp only + */ +static uword +icnfwd_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame) +{ + u32 n_left_from, *from, *to_next; + icnfwd_next_t next_index; + u32 pkts_processed = 0; + u32 pkts_interest_count = 0, pkts_data_count = 0, pkts_nak_count = 0; + u32 pkts_control_request_count = 0, pkts_control_reply_count = 0; + u32 pkts_from_cache_count = 0; + u32 pkts_nacked_interests_count = 0; + u32 pkts_nak_hoplimit_count = 0, pkts_nak_no_route_count = 0; + u32 pkts_no_pit_count = 0, pit_expired_count = 0, cs_expired_count = 0; + u32 no_bufs_count = 0, pkts_interest_agg = 0, pkts_int_retrans = 0; + u32 pit_int_count, pit_cs_count; + u32 pkts_hello_int_rec = 0, pkts_hello_data_sent = 0; + u32 pkts_hello_data_rec = 0; + int i, ret; + icnfwd_runtime_t *rt; + cicn_prefix_hashinf_t pfxhash; + f64 tnow; + vlib_buffer_free_list_t *fl; + unsigned socket_id = cicn_infra_rte_socket_id (); + cicn_main_t *sm = &cicn_main; + + fl = vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX); + + rt = vlib_node_get_runtime_data (vm, icnfwd_node.index); + + /* Alloc the pit/cs for each shard when the icn feature + * is enabled, access by thread in the node context. + */ + if (rt->pitcs.pcs_table == NULL) + { + cicn_pit_create (&rt->pitcs, cicn_infra_shard_pit_size); + cicn_pit_set_lru_max (&rt->pitcs, cicn_infra_shard_cs_size); + } + + /* Maybe update our thread's config generation number, if the global + * number has changed + */ + if (cicn_infra_gshard.cfg_generation != + cicn_infra_shards[vm->cpu_index].cfg_generation) + { + cicn_infra_shards[vm->cpu_index].cfg_generation = + cicn_infra_gshard.cfg_generation; + } + + from = vlib_frame_vector_args (frame); + n_left_from = frame->n_vectors; + next_index = node->cached_next_index; + + /* Capture time in vpp terms */ + tnow = vlib_time_now (vm); + + while (n_left_from > 0) + { + u32 n_left_to_next; + + vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); + + /* TODO -- just doing 1-at-a-time for now, to simplify things a bit. */ + + /* TODO -- more interesting stats and trace */ + + while (n_left_from > 0 && n_left_to_next > 0) + { + u32 bi0; + vlib_buffer_t *b0; + u32 next0 = ICNFWD_NEXT_LOOKUP; + u32 sw_if_index0; + udp_header_t *udp0; + ip4_header_t *ip0; + u8 *body0; + u32 len0; + u8 pkt_type; + u16 msg_type, bkt_ent_exp_time, sval; + cicn_pkt_hdr_desc_t pkt_hdr_desc0; + u8 *nameptr; + u32 namelen; + u64 hashval; + struct sockaddr_in srcaddr, destaddr; + cicn_face_db_entry_t *inface, *outface; + cicn_face_stats_t *inface_stats, *outface_stats; + cicn_hash_node_t *nodep; + cicn_pcs_entry_t *pitp; + cicn_fib_entry_t *pentry; + uint16_t faceid; + int clone_count; + vlib_buffer_t *hdr_vec[CICN_PARAM_PIT_ENTRY_PHOPS_MAX]; + vlib_buffer_t *clone_vec[CICN_PARAM_PIT_ENTRY_PHOPS_MAX]; + cicn_face_db_entry_t *face_vec[CICN_PARAM_PIT_ENTRY_PHOPS_MAX]; + u64 seq_num; + int trace_p = 0; + + /* Prefetch for next iteration. */ + if (n_left_from > 1) + { + vlib_buffer_t *p2; + + p2 = vlib_get_buffer (vm, from[1]); + + vlib_prefetch_buffer_header (p2, LOAD); + + CLIB_PREFETCH (p2->data, (CLIB_CACHE_LINE_BYTES * 2), STORE); + } + + /* TODO -- we're not dealing with 'chained' buffers yet */ + + /* Dequeue a packet buffer */ + bi0 = from[0]; + BUFTRC ("CICN-SW", bi0); + from += 1; + n_left_from -= 1; + + b0 = vlib_get_buffer (vm, bi0); + + if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE) && + (b0->flags & VLIB_BUFFER_IS_TRACED))) + { + trace_p = 1; + } + + /* + * From the udp code, we think we're handed the payload part + * of the packet + */ + ASSERT (b0->current_data >= + (sizeof (ip4_header_t) + sizeof (udp_header_t))); + + /* Capture pointer to the payload */ + body0 = vlib_buffer_get_current (b0); + len0 = b0->current_length; + + /* Walk 'back' to the ip header */ + vlib_buffer_advance (b0, -(sizeof (udp_header_t))); + udp0 = vlib_buffer_get_current (b0); + vlib_buffer_advance (b0, -(sizeof (ip4_header_t))); + ip0 = vlib_buffer_get_current (b0); + + sw_if_index0 = vnet_buffer (b0)->sw_if_index[VLIB_RX]; + + /* + * Do a quick, in-place parse/validate pass, locating + * a couple of key pieces of info about the packet + * TODO -- could we pass this info from the dist node? + */ + ret = cicn_parse_pkt (body0, len0, &pkt_type, &msg_type, + &nameptr, &namelen, &pkt_hdr_desc0); + + if (ret != AOK) + { + /* Just drop on error? */ + pkt_type = 0; + msg_type = 0; + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + /* Use result to determine next steps: forward, reply from CS, + * drop, nack + */ + + if (pkt_type == CICN_PKT_TYPE_INTEREST) + { + pkts_interest_count++; + } + else if (pkt_type == CICN_PKT_TYPE_CONTENT) + { + pkts_data_count++; + } + else if (pkt_type == CICN_PKT_TYPE_NAK) + { + pkts_nak_count++; + } + else if (pkt_type == CICN_PKT_TYPE_CONTROL_REQUEST) + { + pkts_control_request_count++; + } + else if (pkt_type == CICN_PKT_TYPE_CONTROL_REPLY) + { + pkts_control_reply_count++; + } + + /* Locate ingress face */ + srcaddr.sin_addr.s_addr = ip0->src_address.as_u32; + srcaddr.sin_port = udp0->src_port; + + destaddr.sin_addr.s_addr = ip0->dst_address.as_u32; + destaddr.sin_port = udp0->dst_port; + + /* Search for a match where the _local_ and _remote_ addresses + * correspond to the _dest_ and _src_ addresses from the packet. + */ + ret = cicn_face_entry_find_by_addr (&destaddr /*local */ , + &srcaddr /*remote */ , &inface); + + /* If no matching face, don't do any more */ + if (PREDICT_FALSE (ret != AOK || inface == NULL || + (inface->flags & CICN_FACE_FLAGS_DOWN_HARD))) + { + /* Just drop on error? */ + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + cicn_infra_shard_t *wshard = &cicn_infra_shards[vm->cpu_index]; + inface_stats = &wshard->face_stats[cicn_face_db_index (inface)]; + + /* If content, use PIT info to determine egress face */ + + if (pkt_type == CICN_PKT_TYPE_CONTENT || + pkt_type == CICN_PKT_TYPE_CONTROL_REPLY) + { + + inface_stats->in_datas++; + + if (PREDICT_FALSE (inface->flags & CICN_FACE_FLAG_HELLO_DOWN)) + { + // hello down, only hello messages should be processed + goto hello_reply_rcvd_check; + } + + /* Compute the full name hash for content lookup */ + hashval = cicn_hashtb_hash_name (nameptr, namelen); + + /* + * Search PIT/CS by name hash + */ + /* + * Opportunistic scan of hash row/bucket for expirations. + * Some old code below could be removed with this addition, + * it won't be executed anyway. + * + * The timeout scan and the node lookup could be + * easily integrated. + */ + cicn_pcs_timeout (vm, &rt->pitcs, hashval, + &pit_expired_count, &cs_expired_count); + + ret = + cicn_hashtb_lookup_node (rt->pitcs.pcs_table, nameptr, + namelen, hashval, &nodep); + + if (PREDICT_FALSE (ret != AOK)) + { + goto hello_reply_rcvd_check; //no PIT entry: maybe a hello? + } + + pitp = cicn_pit_get_data (nodep); + + if (PREDICT_FALSE (pitp->shared.entry_type != CICN_PIT_TYPE)) + { + /* Whatever this is, it's not a PIT */ + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + /* Is the PIT entry expired? */ + if (PREDICT_FALSE (tnow > pitp->shared.expire_time)) + { + /* + * Remove existing entry; treat this as if no PIT entry + */ + cicn_pcs_delete (&rt->pitcs, &pitp, &nodep, vm); + pit_expired_count++; + + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + /* Content should arrive on face where interest tx happened */ + if (PREDICT_FALSE (pitp->u.pit.pe_txface != inface->faceid)) + { + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + /* + * Hold the packet buffer in the CS, and + * then use it to satisfy the PIT entry. We allocate unique + * packet mbufs to hold the rewrite info for each reply we'll + * send; the rewrite mbufs all share clones of the reply + * payload. + */ + + /* Prepare the packet for cs and for cloning. */ + BUFTRC ("CS--ADD", bi0); + ret = prep_buffer_for_cs (vm, b0); + if (PREDICT_FALSE (ret != AOK)) + { + + cicn_pcs_delete (&rt->pitcs, &pitp, &nodep, vm); + no_bufs_count++; + + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + /* For each packet we will send, allocate a new packet + * buffer to hold the rewrite/header info and a clone of + * the ICN payload packet buf. We also capture the tx faceid. + */ + ret = AOK; + for (clone_count = 0, i = 0; i < CICN_PARAM_PIT_ENTRY_PHOPS_MAX; + i++) + { + + if (pitp->u.pit.pe_rxfaces[i] != 0) + { + + ret = + cicn_face_entry_find_by_id (pitp->u.pit.pe_rxfaces[i], + &outface); + if (PREDICT_FALSE + ((ret != AOK) + || (outface->flags & CICN_FACE_FLAGS_DOWN))) + { + /* Can't use this face, skip the entry */ + ret = AOK; + continue; + } + + face_vec[clone_count] = outface; + hdr_vec[clone_count] = + cicn_infra_vlib_buffer_alloc (vm, fl, socket_id, + outface); + if (!cicn_cs_enabled (&rt->pitcs) && clone_count == 0) + { + clone_vec[clone_count] = b0; + } + else + { + clone_vec[clone_count] = + cicn_infra_vlib_buffer_clone (b0, vm, fl, + socket_id, outface); + } + BUFTRC ("CLN-HDR", GBI (vm, hdr_vec[clone_count])); + BUFTRC ("CLN-CLN", GBI (vm, clone_vec[clone_count])); + + if (PREDICT_FALSE ((hdr_vec[clone_count] == NULL) || + (clone_vec[clone_count] == NULL))) + { + + /* Need to check this index in the arrays in + * the error-handling code below. + */ + clone_count++; + + ret = ENOMEM; + break; + } + + clone_count++; + } + } + + /* If error, clean up any buffers we allocated */ + if (PREDICT_FALSE (ret != AOK)) + { + for (i = 0; i < clone_count; i++) + { + BUFTRC ("ERR-FRE", + vlib_get_buffer_index (vm, + clone_vec[clone_count])); + if (hdr_vec[i]) + { + cicn_infra_vlib_buffer_free (hdr_vec[i], vm, + face_vec[i]); + } + if (clone_vec[i]) + { + cicn_infra_vlib_buffer_free (hdr_vec[i], vm, + face_vec[i]); + } + } + + /* Drop */ + cicn_pcs_delete (&rt->pitcs, &pitp, &nodep, vm); + no_bufs_count++; + + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + /* No valid PIT faces found? Not much we can do. + * TODO -- for now, leaving the PIT entry; should we delete it? + */ + if (PREDICT_FALSE (clone_count == 0)) + { + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + // no cs entry for ctrl responses + if (pkt_type == CICN_PKT_TYPE_CONTENT) + { + + if (cicn_cs_enabled (&rt->pitcs)) + { + /* At this point we think we're safe to proceed. + * Store the CS buf in the PIT/CS hashtable entry + */ + + /* Start turning the PIT into a CS. Note that we may be + * stepping on the PIT part of the union as we + * update the CS part, so don't expect the PIT part + * to be valid after this point. + */ + cicn_pit_to_cs (&rt->pitcs, pitp); + + pitp->u.cs.cs_rxface = inface->faceid; + pitp->shared.create_time = tnow; + + uint64_t dmsg_lifetime; + ret = + cicn_parse_hdr_time_ms (body0, &pkt_hdr_desc0, + CICN_HDR_TLV_CACHE_TIME, + &dmsg_lifetime); + if (ret != AOK) + { // no header timeout, use default + dmsg_lifetime = CICN_PARAM_CS_LIFETIME_DFLT; + } + else if (dmsg_lifetime != 0) + { + if (dmsg_lifetime < CICN_PARAM_CS_LIFETIME_MIN) + { + dmsg_lifetime = CICN_PARAM_CS_LIFETIME_MIN; + } + else if (dmsg_lifetime > CICN_PARAM_CS_LIFETIME_MAX) + { + dmsg_lifetime = CICN_PARAM_CS_LIFETIME_MAX; + } + } + pitp->shared.expire_time = + cicn_pcs_get_exp_time (tnow, dmsg_lifetime); + + /* Update hashtable-level expiration value too */ + bkt_ent_exp_time = + cicn_infra_get_slow_exp_time (dmsg_lifetime); + + cicn_hashtb_entry_set_expiration (rt->pitcs.pcs_table, + nodep, + bkt_ent_exp_time, 0); + + /* Store the original packet buffer in the CS node */ + pitp->u.cs.cs_pkt_buf = vlib_get_buffer_index (vm, b0); + + /* Add to CS LRU */ + cicn_cs_lru_insert (&rt->pitcs, nodep, pitp); + } + else + { + cicn_pcs_delete (&rt->pitcs, &pitp, &nodep, vm); + } + + /* Set up to enqueue frames to the transmit next-node */ + if (next_index != ICNFWD_NEXT_LOOKUP) + { + vlib_put_next_frame (vm, node, next_index, + n_left_to_next); + next_index = next0 = ICNFWD_NEXT_LOOKUP; + vlib_get_next_frame (vm, node, next_index, to_next, + n_left_to_next); + + /* Ensure that we have space for at least one packet */ + if (PREDICT_FALSE (n_left_to_next <= 0)) + { + vlib_put_next_frame (vm, node, next_index, + n_left_to_next); + + vlib_get_next_frame (vm, node, next_index, to_next, + n_left_to_next); + } + } + + ASSERT (n_left_to_next > 0); + + /* Connect each header buffer to a clone + * of the payload buffer. The last packet will go through + * to the normal end of the node loop. + */ + for (i = 0; i < clone_count; i++) + { + vlib_buffer_t *cs_b0; + + b0 = hdr_vec[i]; + cs_b0 = clone_vec[i]; + outface = face_vec[i]; + + if (PREDICT_FALSE (trace_p != 0)) + { + b0->flags |= VLIB_BUFFER_IS_TRACED; + } + + bi0 = vlib_get_buffer_index (vm, b0); + + b0->current_data = 0; + b0->current_length = (sizeof (ip4_header_t) + + sizeof (udp_header_t)); + b0->flags |= VLIB_BUFFER_NEXT_PRESENT; + + b0->total_length_not_including_first_buffer = + vlib_buffer_length_in_chain (vm, cs_b0); + + /* Connect the header particle to the body */ + b0->next_buffer = vlib_get_buffer_index (vm, cs_b0); + + cicn_infra_vlib_buffer_clone_attach_finalize (b0, cs_b0, + outface); + + /* Refresh the ip and udp headers + * before the final part of the rewrite + */ + ip0 = vlib_buffer_get_current (b0); + udp0 = (udp_header_t *) ((uint8_t *) ip0 + + sizeof (ip4_header_t)); + + memset (ip0, 0, + sizeof (ip4_header_t) + sizeof (udp_header_t)); + + ip0->ip_version_and_header_length = 0x45; + ip0->protocol = IP_PROTOCOL_UDP; + + sval = vlib_buffer_length_in_chain (vm, b0); + ip0->length = clib_host_to_net_u16 (sval); + + sval -= sizeof (ip4_header_t); + udp0->length = clib_host_to_net_u16 (sval); + + vnet_buffer (b0)->sw_if_index[VLIB_TX] = ~0; + + if (i == (clone_count - 1)) + { + /* Last packet - drop out of the loop, let the + * transit path finish with 'b0' now + */ + break; + } + + /* Rewrite ip and udp headers */ + + ip0->src_address.as_u32 = + outface->src_addr.sin_addr.s_addr; + ip0->dst_address.as_u32 = + outface->dest_addr.sin_addr.s_addr; + + /* TODO -- Refresh IP ttl - is that ok? */ + ip0->ttl = CICN_IP_TTL_DEFAULT; + + /* TODO -- Not optimizing the IP checksum currently */ + ip0->checksum = ip4_header_checksum (ip0); + + udp0->src_port = outface->src_addr.sin_port; + udp0->dst_port = outface->dest_addr.sin_port; + + /* TODO -- clear UDP checksum; is this ok? */ + udp0->checksum = 0; + + pkts_from_cache_count++; + + /* Update face-level stats */ + outface_stats = + &wshard->face_stats[cicn_face_db_index (outface)]; + outface_stats->out_datas++; + + /* Enqueue packet to next graph node */ + to_next[0] = bi0; + to_next += 1; + n_left_to_next -= 1; + + BUFTRC ("ICN-TX2", bi0); + if (n_left_to_next == 0) + { + vlib_put_next_frame (vm, node, next_index, + n_left_to_next); + + vlib_get_next_frame (vm, node, next_index, to_next, + n_left_to_next); + } + } + } + + /* We're now processing the last (or only) PIT entry; 'b0', + * 'bi0', 'ip0', 'udp0', and 'outface' should be set + * properly. We'll just drop through to the normal + * 'send one packet code below. + */ + + /* Update face-level stats */ + outface_stats = + &wshard->face_stats[cicn_face_db_index (outface)]; + outface_stats->out_datas++; + + next0 = ICNFWD_NEXT_LOOKUP; + + goto ready_to_send; + + hello_reply_rcvd_check: + // not a normal content msg, maybe it's hello reply + if (cicn_hello_match (inface, pkt_type, nameptr, namelen, + &sm->hello_name, &seq_num)) + { + /// it's a hello response + inface_stats->term_datas++; + pkts_hello_data_rec++; + /* Copy seq_num to global array of Up/Down data */ + sm->cicn_hello_data_array[inface->faceid].seq_num = seq_num; + sm->cicn_hello_data_array[inface->faceid].faceid = + inface->faceid; + + // Signal an event to the background process + vlib_process_signal_event_pointer (vm, + vlib_get_node_by_name + (vm, + (u8 *) + "icn-hello-process")->index, + CICN_HELLO_EVENT_DATA_RCVD, + &sm->cicn_hello_data_array + [inface->faceid]); + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + /* No PIT entry, not a hello, drop */ + pkts_no_pit_count++; + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + + /* END: Content/Control Response */ + + } + else if (pkt_type == CICN_PKT_TYPE_INTEREST || + pkt_type == CICN_PKT_TYPE_CONTROL_REQUEST) + { + cicn_packet_hdr_t *pkt_hdr0 = (cicn_packet_hdr_t *) body0; + uint8_t *msg_tlv = (uint8_t *) (pkt_hdr0 + 1); + + inface_stats->in_interests++; + + if (PREDICT_FALSE (pkt_hdr0->pkt_hop_limit == 0)) + { + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + pkt_hdr0->pkt_hop_limit--; + + /* Check whether this is an ICN Hello Interest */ + if (PREDICT_FALSE + (cicn_hello_match + (inface, pkt_type, nameptr, namelen, &sm->hello_name, + NULL /*seq_num */ ))) + { + goto hello_request_forus; + } + + if (PREDICT_FALSE (inface->flags & CICN_FACE_FLAG_HELLO_DOWN)) + { + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + if (PREDICT_FALSE (pkt_hdr0->pkt_hop_limit == 0)) + { + /* + * If traceroute request, return our information. + * Otherwise NAK the interest in all cases. + * (draft-irtf-icnrg-ccnxsemantics-03 says + * "If the HopLimit equals 0, ... it MAY be sent to a + * publisher application or serviced from a local + * Content Store.". The current implemention does not.) + */ + if (msg_type == CICN_MSG_TYPE_TRACEROUTE_REQUEST) + { + goto traceroute_request_forus; + } + + pkt_hdr0->pkt_type = CICN_PKT_TYPE_NAK; + pkt_hdr0->pkt_nack_code = CICN_MSG_ERR_HOPLIM; + + outface = inface; + outface_stats = inface_stats; + + pkts_nacked_interests_count++; + pkts_nak_hoplimit_count++; + outface_stats->orig_naks++; + outface_stats->out_naks++; + + next0 = ICNFWD_NEXT_LOOKUP; + goto ready_to_send; + } + + /* Compute the name and prefix hashes */ + + /* TODO -- could we carry hash value in from the dist node? */ + + /* TODO -- use FIB max-comps hint? */ + + /* + * Full and LPM prefix hashing for PIT and FIB lookups + */ + ret = + cicn_hashtb_hash_prefixes (nameptr, namelen, + TRUE /*fullname */ , &pfxhash, + 0 /*!limit */ ); + if (ret != AOK) + { + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + /* If this is a ping request, parse the target name and compare + * it to the name of the forwarder + */ + if (pkt_type == CICN_PKT_TYPE_CONTROL_REQUEST && + (msg_type == CICN_MSG_TYPE_ECHO_REQUEST || + msg_type == CICN_MSG_TYPE_TRACEROUTE_REQUEST)) + { + + /* Check whether the hash of the ping request target + * prefix matches the hash of the forwarder's name. + * If so, respond + * If a name has not been given to the forwarder, + * or if the hashes do not match, forward the control + * packet as a regular insterest. + */ + + /* We received an echo request with an invalid name */ + if (pfxhash.pfx_count < 3) + { + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + if (cicn_infra_fwdr_name.fn_reply_payload_flen != 0 && + cicn_infra_fwdr_name.fn_match_pfx_hash == + pfxhash.pfx_hashes[pfxhash.pfx_count - 3]) + { + if (msg_type == CICN_MSG_TYPE_ECHO_REQUEST) + { + goto echo_request_forus; + } + else + { + goto traceroute_request_forus; + } + } + } + + /* + * Opportunistic scan of hash row/bucket for expirations. + * Some old code below could be removed with this addition, + * it won't be executed anyway. + * + * The timeout scan and the node lookup could be + * easily integrated. + */ + cicn_pcs_timeout (vm, &rt->pitcs, pfxhash.pfx_full_hash, + &pit_expired_count, &cs_expired_count); + + /* + * Search PIT/CS by full-name hash + */ + ret = + cicn_hashtb_lookup_node (rt->pitcs.pcs_table, nameptr, + namelen, pfxhash.pfx_full_hash, + &nodep); + if (ret != AOK) + { + goto interest_is_new; + } + + pitp = cicn_pit_get_data (nodep); + + if (pitp->shared.entry_type == CICN_CS_TYPE) + { + /* Case: Existing CS entry */ + + /* Check for expired CS entry (if not done during the + * scan) + */ + if ((tnow > pitp->shared.expire_time) || + (pitp->u.cs.cs_pkt_buf == 0)) + { + + /* Delete and clean up expired CS entry */ + cicn_pcs_delete (&rt->pitcs, &pitp, &nodep, vm); + cs_expired_count++; + + goto interest_is_new; + } + + /* Update the CS LRU, moving this item to the head */ + cicn_cs_lru_update_head (&rt->pitcs, nodep, pitp); + + /* + * Clone the CS packet, and prepare the incoming request + * packet to hold the rewrite info as a particle. + */ + if (cicn_clone_cs_buffer (b0, pitp, vm, fl, socket_id, + inface /*outface */ ) != AOK) + { + no_bufs_count++; + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + /* Refresh the ip and udp headers before the final part of + * the rewrite down below + */ + ip0 = vlib_buffer_get_current (b0); + udp0 = (udp_header_t *) ((uint8_t *) ip0 + + sizeof (ip4_header_t)); + + memset (ip0, 0, + sizeof (ip4_header_t) + sizeof (udp_header_t)); + + ip0->ip_version_and_header_length = 0x45; + ip0->protocol = IP_PROTOCOL_UDP; + + sval = vlib_buffer_length_in_chain (vm, b0); + ip0->length = clib_host_to_net_u16 (sval); + + sval -= sizeof (ip4_header_t); + udp0->length = clib_host_to_net_u16 (sval); + + pkts_from_cache_count++; + + /* Reply to sender */ + outface = inface; + inface_stats->out_datas++; + + next0 = ICNFWD_NEXT_LOOKUP; + goto ready_to_send; + + } + + /* + * Case: Existing PIT entry + */ + + /* Is the PIT entry expired? */ + if (tnow > pitp->shared.expire_time) + { + + /* Remove existing entry, and treat this as new Interest */ + cicn_pcs_delete (&rt->pitcs, &pitp, &nodep, vm); + pit_expired_count++; + goto interest_is_new; + } + + /* + * PIT aggregation: simple form for now, no change in PIT + * expiration. + * + * TODO -- many open questions: retransmissions, + * PIT entry expiration time handling. + */ + for (i = 0; i < CICN_PARAM_PIT_ENTRY_PHOPS_MAX; i++) + { + + /* Note that this loop is vulnerable if we remove + * rx faces from the middle of the PIT array. We don't + * do that right now, so I think this is ok. + */ + if (pitp->u.pit.pe_rxfaces[i] == inface->faceid) + { + /* + * Already in the PIT - a retransmission? We allow + * retransmits, by capturing the egress face + * and jumping to the 'send interest' code. + */ + ret = + cicn_face_entry_find_by_id (pitp->u.pit.pe_txface, + &outface); + if (ret == AOK) + { + pkts_int_retrans++; + next0 = ICNFWD_NEXT_LOOKUP; + goto ready_to_send; + } + + break; + + } + else if (pitp->u.pit.pe_rxfaces[i] == 0) + { + /* Found an available slot in the PIT */ + pitp->u.pit.pe_rxfaces[i] = inface->faceid; + break; + } + } + + /* TODO -- stat for 'full PIT'? */ + + /* + * At this point, we've dealt with the PIT aggregation, + * and we can drop the current packet. + */ + pkts_interest_agg++; + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + + interest_is_new: + + /* + * Need PIT entry: + * - find outface from FIB lookup + * - init new PIT entry. + */ + outface = NULL; + + ret = cicn_fib_lookup (&cicn_main.fib, &pfxhash, &pentry); + if (PREDICT_FALSE (ret != AOK)) + { + goto interest_noroute_check; + } + + /* Look for the right next-hop - for now, use max weight */ + u8 weight = 0; + for (i = 0; i < CICN_PARAM_FIB_ENTRY_NHOPS_MAX; i++) + { + cicn_face_db_entry_t *face; + if ((pentry->fe_next_hops[i].nh_faceid == 0)) + { + continue; + } + if (pentry->fe_next_hops[i].nh_weight <= weight) + { + continue; + } + faceid = pentry->fe_next_hops[i].nh_faceid; + + /* Find tx face by face id */ + ret = cicn_face_entry_find_by_id (faceid, &face); + if (PREDICT_FALSE (ret != AOK)) + { + continue; + } + if (PREDICT_FALSE ((face->flags & CICN_FACE_FLAGS_DOWN))) + { + continue; + } + outface = face; + weight = pentry->fe_next_hops[i].nh_weight; + } + + interest_noroute_check: + if (outface == NULL) + { + pkt_hdr0->pkt_type = CICN_PKT_TYPE_NAK; + pkt_hdr0->pkt_nack_code = CICN_MSG_ERR_NOROUTE; + + outface = inface; + outface_stats = inface_stats; + + pkts_nacked_interests_count++; + pkts_nak_no_route_count++; + outface_stats->orig_naks++; + outface_stats->out_naks++; + + next0 = ICNFWD_NEXT_LOOKUP; + goto ready_to_send; + } + + /* Create PIT node and init PIT entry */ + nodep = cicn_hashtb_alloc_node (rt->pitcs.pcs_table); + if (PREDICT_FALSE (nodep == NULL)) + { + /* Nothing we can do - no mem */ + + no_bufs_count++; + + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + pitp = cicn_pit_get_data (nodep); + + cicn_pit_init_data (pitp); + + pitp->shared.entry_type = CICN_PIT_TYPE; + pitp->shared.create_time = tnow; + pitp->u.pit.pe_txface = outface->faceid; + pitp->u.pit.pe_rxfaces[0] = inface->faceid; + + /* + * Interest lifetime based on optional hdr_tlv, ranges, default + * - special case is lifetime of 0 + * - this is "forward but do not expect content return" case + * - code sequence here (and above for content) + * always checks if an existing PIT entry has + * expired. If so, it is deleted before continuing + * to process current message. Thus, should be + * benign to enter an interest with a 0 lifetime + * into the PIT: it will always be be found to be + * expired at the earliest opportunity, the only + * cost being the held hash resources. + * - corresponding expiry time appears in pit entry and + * (compressed) in bucket entry + */ + uint64_t imsg_lifetime; + ret = + cicn_parse_hdr_time_ms (body0, &pkt_hdr_desc0, + CICN_HDR_TLV_INT_LIFETIME, + &imsg_lifetime); + if (ret != AOK) + { // no header timeout, use default + imsg_lifetime = sm->pit_lifetime_dflt_ms; + } + else if (imsg_lifetime != 0) + { + if (imsg_lifetime < sm->pit_lifetime_min_ms) + { + imsg_lifetime = sm->pit_lifetime_min_ms; + } + else if (imsg_lifetime > sm->pit_lifetime_max_ms) + { + imsg_lifetime = sm->pit_lifetime_max_ms; + } + } + pitp->shared.expire_time = + cicn_pcs_get_exp_time (tnow, imsg_lifetime); + bkt_ent_exp_time = cicn_infra_get_fast_exp_time (imsg_lifetime); + + /* TODO -- decide whether to hold/clone interest packet mbuf */ + + /* Set up the hash node and insert it */ + ret = + cicn_hashtb_init_node (rt->pitcs.pcs_table, nodep, + pfxhash.pfx_full_hash, nameptr, + namelen); + if (ret == AOK) + { + ret = cicn_pit_insert (&rt->pitcs, pitp, nodep); + } + if (ret != AOK) + { + /* Just dropping on error for now */ + + /* Return hashtable node */ + cicn_hashtb_free_node (rt->pitcs.pcs_table, nodep); + + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + // Set the hashtable-level expiration value in bucket entry + cicn_hashtb_entry_set_expiration (rt->pitcs.pcs_table, nodep, + bkt_ent_exp_time, + CICN_HASH_ENTRY_FLAG_FAST_TIMEOUT); + + /* Looks like we're ok to forward */ + outface_stats = + &wshard->face_stats[cicn_face_db_index (outface)]; + outface_stats->out_interests++; + + next0 = ICNFWD_NEXT_LOOKUP; + goto ready_to_send; + + + /* + * Code routes control requests for us to these labels: + * respond to with control reply + */ + + hello_request_forus: + /* Hello Request: For now, just change the packet and msg type + * (do not attach any extra payload) and reflect back + */ + pkt_hdr0->pkt_type = CICN_PKT_TYPE_CONTROL_REPLY; + C_PUTINT16 (&msg_tlv[0], CICN_MSG_TYPE_CONTENT); + + outface = inface; + outface_stats = inface_stats; + + pkts_hello_int_rec++; + pkts_hello_data_sent++; + inface_stats->term_interests++; + outface_stats->orig_datas++; + outface_stats->out_datas++; + + /* Send it out directly without doing anything further */ + next0 = ICNFWD_NEXT_LOOKUP; + goto ready_to_send; + + echo_request_forus: + /* Tweak packet and message types and send back + * as a ping request reply + */ + pkt_hdr0->pkt_type = CICN_PKT_TYPE_CONTROL_REPLY; + C_PUTINT16 (msg_tlv, CICN_MSG_TYPE_ECHO_REPLY); + + outface = inface; + outface_stats = inface_stats; + + pkts_control_reply_count++; + inface_stats->term_interests++; + outface_stats->out_datas++; + + next0 = ICNFWD_NEXT_LOOKUP; + goto ready_to_send; + + traceroute_request_forus: + /* Update msg type and hop limit value */ + pkt_hdr0->pkt_type = CICN_PKT_TYPE_CONTROL_REPLY; + C_PUTINT16 (msg_tlv, CICN_MSG_TYPE_TRACEROUTE_REPLY); + pkt_hdr0->pkt_hop_limit = CICN_DEFAULT_HOP_LIMIT; + if (cicn_infra_fwdr_name.fn_reply_payload_flen > 0) + { + int payload_size = + cicn_infra_fwdr_name.fn_reply_payload_flen; + vlib_buffer_add_data (vm, b0->free_list_index, + bi0, + cicn_infra_fwdr_name.fn_reply_payload, + payload_size); + + uint16_t imsg_size; + C_GETINT16 (imsg_size, &msg_tlv[CICN_TLV_TYPE_LEN]); + C_PUTINT16 (&msg_tlv[CICN_TLV_TYPE_LEN], + imsg_size + payload_size); + pkt_hdr0->pkt_len = + clib_host_to_net_u16 (clib_net_to_host_u16 + (pkt_hdr0->pkt_len) + payload_size); + udp0->length = + clib_host_to_net_u16 (clib_net_to_host_u16 (udp0->length) + + payload_size); + ip0->length = + clib_host_to_net_u16 (clib_net_to_host_u16 (ip0->length) + + payload_size); + } + + outface = inface; + outface_stats = inface_stats; + + pkts_control_reply_count++; + inface_stats->term_interests++; + outface_stats->out_datas++; + + next0 = ICNFWD_NEXT_LOOKUP; + goto ready_to_send; + + } + else if (pkt_type == CICN_PKT_TYPE_NAK) + { + + inface_stats->in_naks++; + + } + else + { + /* Don't expect any other packets: just drop? */ + + next0 = ICNFWD_NEXT_ERROR_DROP; + goto trace_single; + } + + ready_to_send: + + /* Use info to prep and enqueue: we expect that + * the egress face and the next-node have been set. + */ + + /* TODO -- worth optimizing/remembering egress interface? */ + + /* TODO -- assuming ipv4 udp egress for now */ + + vnet_buffer (b0)->sw_if_index[VLIB_TX] = ~0; + + /* Rewrite ip and udp headers */ + ip0->src_address.as_u32 = outface->src_addr.sin_addr.s_addr; + ip0->dst_address.as_u32 = outface->dest_addr.sin_addr.s_addr; + + /* TODO -- Refresh IP ttl - is that ok? */ + ip0->ttl = CICN_IP_TTL_DEFAULT; + + /* TODO -- Not optimizing the IP checksum currently */ + ip0->checksum = ip4_header_checksum (ip0); + + udp0->src_port = outface->src_addr.sin_port; + udp0->dst_port = outface->dest_addr.sin_port; + + /* TODO -- clear UDP checksum; is this ok? */ + udp0->checksum = 0; + + /* Next-node should already be ok at this point */ + + trace_single: + + /* Maybe trace */ + if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE) && + (b0->flags & VLIB_BUFFER_IS_TRACED))) + { + + icnfwd_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t)); + t->pkt_type = pkt_type; + t->msg_type = msg_type; + t->sw_if_index = sw_if_index0; + t->next_index = next0; + } + + /* Speculatively enqueue packet b0 (index in bi0) + * to the current next frame + */ + to_next[0] = bi0; + to_next += 1; + n_left_to_next -= 1; + + /* Incr packet counter */ + pkts_processed += 1; + + BUFTRC ((next0 == ICNFWD_NEXT_ERROR_DROP) ? "DROPTX1" : "ICN-TX1", + bi0); + /* Verify speculative enqueue, maybe switch current next frame */ + vlib_validate_buffer_enqueue_x1 (vm, node, next_index, + to_next, n_left_to_next, + bi0, next0); + } + + /* + * End of 1-at-a-time loop; finish 'next' processing + */ + vlib_put_next_frame (vm, node, next_index, n_left_to_next); + } + + /* Check the CS LRU, and trim if necessary. */ + cicn_trim_cs_lru (vm, node, &(rt->pitcs)); + + pit_int_count = cicn_pit_get_int_count (&(rt->pitcs)); + pit_cs_count = cicn_pit_get_cs_count (&(rt->pitcs)); + + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_PROCESSED, pkts_processed); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_INTERESTS, pkts_interest_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_DATAS, pkts_data_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_NAKS, pkts_nak_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_CACHED, pkts_from_cache_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_NACKED_INTERESTS, + pkts_nacked_interests_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_HOPLIMIT_EXCEEDED, + pkts_nak_hoplimit_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_NO_ROUTE, + pkts_nak_no_route_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_NO_PIT, pkts_no_pit_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_PIT_EXPIRED, pit_expired_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_CS_EXPIRED, cs_expired_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_NO_BUFS, no_bufs_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_INTEREST_AGG, pkts_interest_agg); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_INT_RETRANS, pkts_int_retrans); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_CONTROL_REQUESTS, + pkts_control_request_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_CONTROL_REPLIES, + pkts_control_reply_count); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_HELLO_INTERESTS_RCVD, + pkts_hello_int_rec); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_HELLO_DMSGS_SENT, + pkts_hello_data_sent); + vlib_node_increment_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_HELLO_DMSGS_RCVD, + pkts_hello_data_rec); + + + update_node_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_INT_COUNT, pit_int_count); + update_node_counter (vm, icnfwd_node.index, + ICNFWD_ERROR_CS_COUNT, pit_cs_count); + ASSERT (rt->pitcs.pcs_lru_count == pit_cs_count); + + return (frame->n_vectors); +} + +/* + * Check the CS LRU, trim if necessary + */ +static int +cicn_trim_cs_lru (vlib_main_t * vm, vlib_node_runtime_t * node, + cicn_pit_cs_t * pit) +{ +#define LRU_TRIM_COUNT 512 + + int i, count = 0, bufcount; + u32 node_list[LRU_TRIM_COUNT], buf_list[LRU_TRIM_COUNT]; + cicn_hash_node_t *np; + cicn_pcs_entry_t *pcs; + + if (pit->pcs_lru_count > pit->pcs_lru_max) + { + + /* Collect an armful of entries from the back of the LRU */ + count = cicn_cs_lru_trim (pit, node_list, LRU_TRIM_COUNT); + + bufcount = 0; + + for (i = 0; i < count; i++) + { + /* Retrieve the CS data */ + np = cicn_hashtb_node_from_idx (pit->pcs_table, node_list[i]); + + pcs = cicn_pit_get_data (np); + + /* Extract the packet buffer id and save it */ + if (pcs->u.cs.cs_pkt_buf != 0) + { + BUFTRC (" CS-TRIM", pcs->u.cs.cs_pkt_buf); + buf_list[bufcount++] = pcs->u.cs.cs_pkt_buf; + pcs->u.cs.cs_pkt_buf = 0; + } + + /* Remove the hash node from the hashtable and free it */ + cicn_cs_delete_trimmed (pit, &pcs, &np, vm); + + } + + /* Free packet buffers */ + BUFTRC ("CS-TRIM-ALL", bufcount); + if (bufcount > 0) + { +#if 1 //$$$XXX TODO: does this work better than drop-node approach? seems so(?) + vlib_buffer_free (vm, buf_list, bufcount); +#else + /* This ought to work, not limited to a single frame size. It has + * the nice property that we get to set a stat/error code for + * the bufs we're freeing. Note that we specify the 'next index' + * in terms of our own node's array of 'nexts'. + * + * Seems to work but can replace with + * vlib_buffer_free (vm, buf_list, bufcount); + * if willing to give up the counter. + */ + vlib_error_drop_buffers (vm, node, buf_list, 1 /*stride */ , + bufcount, + ICNFWD_NEXT_ERROR_DROP /*next index */ , + icnfwd_node.index, ICNFWD_ERROR_CS_LRU); +#endif + } + } + + return (count); +} + +/* + * Node registration for the forwarder node + */ +VLIB_REGISTER_NODE (icnfwd_node) = +{ + .function = icnfwd_node_fn,.name = "icnfwd",.vector_size = + sizeof (u32),.runtime_data_bytes = + sizeof (icnfwd_runtime_t),.format_trace = icnfwd_format_trace,.type = + VLIB_NODE_TYPE_INTERNAL,.n_errors = + ARRAY_LEN (icnfwd_error_strings),.error_strings = + icnfwd_error_strings,.n_next_nodes = ICNFWD_N_NEXT, + /* edit / add dispositions here */ + .next_nodes = + { + [ICNFWD_NEXT_LOOKUP] = "ip4-lookup", + [ICNFWD_NEXT_ERROR_DROP] = "error-drop",} +,}; + +/* + * TODO -- new worker node ends here + */ + +#if CICN_FEATURE_MULTITHREAD +/* + * Work-distribution node (first pass, anyway). We use the full-name hash + * to direct packets at forwarding worker threads. We've informed the + * handoff node running at the edge of each graph instance of the existence + * of our forwarding node, as part of setup/enable. As a result, the + * target thread's handoff node will be able to hand our packets + * directly to our forwarding node. + */ + +/* + * Node context data; we think this is per-thread/instance/graph + */ +typedef struct icndist_runtime_s +{ + + /* Vector of queues directed at each forwarding worker thread */ + vlib_frame_queue_elt_t **handoff_q_elt_by_worker; + +} icndist_runtime_t; + +/* Registration struct for a graph node */ +vlib_node_registration_t icndist_node; + +/* + * Next graph nodes, which reference the list in the actual registration + * block below + */ +typedef enum +{ + ICNDIST_NEXT_FWD, + ICNDIST_NEXT_ERROR_DROP, + ICNDIST_N_NEXT, +} icndist_next_t; + +/* Stats string values */ +static char *icndist_error_strings[] = { +#define _(sym,string) string, + foreach_icndist_error +#undef _ +}; + +/* Trace context struct */ +typedef struct +{ + u32 next_worker; + u32 sw_if_index; + u8 pkt_type; + u16 msg_type; +} icndist_trace_t; + +/* Distribution node packet trace format function */ +static u8 * +icndist_format_trace (u8 * s, va_list * args) +{ + CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); + CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); + icndist_trace_t *t = va_arg (*args, icndist_trace_t *); + + s = format (s, "ICN-DIST: pkt: %d, msg %d, sw_if_index %d, next worker %d", + (int) t->pkt_type, (int) t->msg_type, + t->sw_if_index, t->next_worker); + return (s); +} + +/* + * IP-worker allocates a free packet frame to fill in and handed off + * to ICN-worker. + */ +static inline vlib_frame_queue_elt_t * +get_new_handoff_queue_elt (u32 vlib_worker_index) +{ + vlib_frame_queue_t *fq; + vlib_frame_queue_elt_t *elt; + u64 new_tail; + + fq = vlib_frame_queues[vlib_worker_index]; + ASSERT (fq); + + new_tail = __sync_add_and_fetch (&fq->tail, 1); + + /* Wait until a ring slot is available */ + while (new_tail >= fq->head_hint + fq->nelts) + { + vlib_worker_thread_barrier_check (); + } + + elt = fq->elts + (new_tail & (fq->nelts - 1)); + + /* Should not happen that available ring slot is marked valid */ + while (elt->valid) + ; + + elt->msg_type = VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME; + elt->last_n_vectors = elt->n_vectors = 0; + + return (elt); +} + +/* + * IP-worker gets frame for ICN-worker, allocating new frame if needed. + */ +static inline vlib_frame_queue_elt_t * +icn_get_handoff_queue_elt (u32 vlib_worker_index, + vlib_frame_queue_elt_t ** handoff_queue_elt) +{ + vlib_frame_queue_elt_t *elt; + + if (handoff_queue_elt[vlib_worker_index]) + { + return (handoff_queue_elt[vlib_worker_index]); + } + + elt = get_new_handoff_queue_elt (vlib_worker_index); + + handoff_queue_elt[vlib_worker_index] = elt; + + return (elt); +} + +/* + * Enables the frame once the IP-worker is done with it. + */ +static inline void +icn_put_handoff_queue_elt (vlib_frame_queue_elt_t * hf) +{ + CLIB_MEMORY_BARRIER (); + hf->valid = 1; +} + +/* + * Second-level work-distribution node: IP-worker got packets based on + * IP 5-tuple hash, redistributes to (final) ICN-worker based on ICN name hash. + */ +static uword +icndist_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame) +{ + u32 n_left_from, *from; + u32 *to_next; + icndist_next_t next_index; + u32 pkts_processed = 0, pkts_interest_count = 0, pkts_data_count = 0; + u32 pkts_dropped = 0; + int i, ret; + icndist_runtime_t *rt; + u32 current_worker_index = ~0; + u32 next_worker_index = 0; + vlib_frame_queue_elt_t *hf = 0; + u32 n_left_to_next_worker = 0, *to_next_worker = 0; + cicn_main_t *icnmain = &cicn_main; + u32 n_left_to_next; + u32 drop_count = 0, drop_list[VLIB_FRAME_SIZE]; + + /* Retrieve the per-thread context struct */ + rt = vlib_node_get_runtime_data (vm, icndist_node.index); + + /* + * If necessary, do one-time init + */ + if (rt->handoff_q_elt_by_worker == NULL) + { + + /* Init/allocate a vector we'll use to store queues directed at + * each worker thread we're using for forwarding. + */ + vec_validate (rt->handoff_q_elt_by_worker, + icnmain->worker_first_index + icnmain->worker_count - 1); + + /* TODO -- dpdk io does a 'congested_handoff' queue also? Do we have + * to do that too, or is there other infra that will do something + * sane if we're overrunning the forwarder threads? + */ + } + + from = vlib_frame_vector_args (frame); + n_left_from = frame->n_vectors; + + next_index = node->cached_next_index; + next_index = ICNDIST_NEXT_FWD; + + vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); + + /* TODO -- just doing 1-at-a-time for now, to simplify things a bit. */ + + /* TODO -- v6 support too? */ + /* TODO -- more interesting stats and trace */ + + /* TODO -- simpler loop since we don't use the vlib api? */ +// while (n_left_from > 0 && n_left_to_next > 0) { + + while (n_left_from > 0) + { + u32 bi0; + vlib_buffer_t *b0; + u32 sw_if_index0; + u8 *body0, *ptr0; + u32 len0; + u8 pkt_type; + u16 msg_type; + cicn_pkt_hdr_desc_t pkt_hdr_desc0; + u8 *nameptr; + u32 namelen; + u64 hashval; + + /* Prefetch for next iteration. */ + if (n_left_from > 1) + { + vlib_buffer_t *p2; + + p2 = vlib_get_buffer (vm, from[1]); + + vlib_prefetch_buffer_header (p2, LOAD); + + CLIB_PREFETCH (p2->data, (2 * CLIB_CACHE_LINE_BYTES), LOAD); + } + + /* Set up to access the packet */ + + /* We don't use the normal 'to next node' variables, because we're + * mostly moving packets to other threads. We only use the direct + * path for packets destined for the current thread's forwarder + * node; even error/drop packets are dealt with all at once, at + * the end of the loop. + */ + bi0 = from[0]; + from += 1; + n_left_from -= 1; + + b0 = vlib_get_buffer (vm, bi0); + + /* + * From the IPv4 udp code, we think we're handed the payload part + * of the packet + */ + ASSERT (b0->current_data >= + (sizeof (ip4_header_t) + sizeof (udp_header_t))); + + /* Capture pointer to the payload */ + ptr0 = body0 = vlib_buffer_get_current (b0); + len0 = b0->current_length; + + sw_if_index0 = vnet_buffer (b0)->sw_if_index[VLIB_RX]; + + /* Reset destination worker thread idx */ + next_worker_index = icnmain->worker_first_index; + + /* + * Do a quick, in-place parse/validate pass, locating + * a couple of key pieces of info about the packet + */ + ret = cicn_parse_pkt (ptr0, len0, &pkt_type, &msg_type, + &nameptr, &namelen, &pkt_hdr_desc0); + + /* If we can't even get at the name, we just drop */ + if (ret != AOK) + { + /* Just drop on error? */ + drop_list[drop_count] = bi0; + drop_count++; + + pkts_dropped++; + goto trace_single; + } + + if (pkt_type == CICN_PKT_TYPE_INTEREST) + { + pkts_interest_count++; + } + else if (pkt_type == CICN_PKT_TYPE_CONTENT) + { + pkts_data_count++; + } + + /* Compute the full name hash, for distribution + * (so only doing the full-name hash here, no LPM prefix hashing). + * TODO - could we capture the hash and pass it along? + */ + hashval = cicn_hashtb_hash_name (nameptr, namelen); + + /* Use the hash to identify the correct worker thread */ + + if (PREDICT_TRUE (is_pow2 (icnmain->worker_count))) + { + next_worker_index += hashval & (icnmain->worker_count - 1); + } + else + { + next_worker_index += hashval % icnmain->worker_count; + } + + /* Use normal next-node path if we're + * using the forwarding node on the current thread; that'd + * save some work. + */ + if (next_worker_index == vm->cpu_index) + { + if (n_left_to_next == 0) + { + vlib_put_next_frame (vm, node, next_index, n_left_to_next); + + vlib_get_next_frame (vm, node, next_index, + to_next, n_left_to_next); + } + + ASSERT (n_left_to_next > 0); + + to_next[0] = bi0; + to_next++; + n_left_to_next--; + + /* Skip to end of the loop */ + goto trace_single; + } + + /* On the target worker thread, the buffers will arrive at the + * dpdk handoff node. Convince target's dpdk handoff node to move + * the buffers to the icn orwarding node. + */ + vnet_buffer (b0)->handoff.next_index = cicn_main.fwd_next_node; + + /* Locate or allocate a queue for the thread; update current + * queue if we're changing destination threads. + */ + if (next_worker_index != current_worker_index) + { + if (hf) + { + hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker; + } + + hf = icn_get_handoff_queue_elt (next_worker_index, + rt->handoff_q_elt_by_worker); + + n_left_to_next_worker = VLIB_FRAME_SIZE - hf->n_vectors; + to_next_worker = &hf->buffer_index[hf->n_vectors]; + current_worker_index = next_worker_index; + } + + /* Enqueue to correct worker thread */ + to_next_worker[0] = bi0; + to_next_worker++; + n_left_to_next_worker--; + + /* + * If we've filled a frame, pass it on + */ + if (n_left_to_next_worker == 0) + { + hf->n_vectors = VLIB_FRAME_SIZE; + icn_put_handoff_queue_elt (hf); + current_worker_index = ~0; + rt->handoff_q_elt_by_worker[next_worker_index] = 0; + hf = 0; + } + + trace_single: + + /* Maybe trace */ + if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE) + && (b0->flags & VLIB_BUFFER_IS_TRACED))) + { + + icndist_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t)); + + t->sw_if_index = sw_if_index0; + t->pkt_type = pkt_type; + t->msg_type = msg_type; + t->next_worker = next_worker_index; + } + + /* Incr packet counter */ + pkts_processed += 1; + } + + /* + * End of 1-at-a-time loop through the incoming frame + */ + + /* Finish handing frames to threads, and reset */ + if (hf) + { + hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker; + } + + /* Ship remaining frames to the worker nodes */ + for (i = 0; i < vec_len (rt->handoff_q_elt_by_worker); i++) + { + if (rt->handoff_q_elt_by_worker[i]) + { + hf = rt->handoff_q_elt_by_worker[i]; + /* + * It works better to let the handoff node + * rate-adapt, always ship the handoff queue element. + */ + if (1 || hf->n_vectors == hf->last_n_vectors) + { + icn_put_handoff_queue_elt (hf); + rt->handoff_q_elt_by_worker[i] = 0; + } + else + { + hf->last_n_vectors = hf->n_vectors; + } + } + +#if 0 /* TODO -- no congested queues for now? */ + congested_handoff_queue_by_worker_index[i] = + (vlib_frame_queue_t *) (~0); +#endif /* TODO */ + } + + hf = 0; + current_worker_index = ~0; + + /* Dispose of any pending 'normal' frame within this thread */ + vlib_put_next_frame (vm, node, next_index, n_left_to_next); + + /* Deal with any error/drop packets */ + if (drop_count > 0) + { + vlib_error_drop_buffers (vm, node, drop_list, 1, drop_count, + ICNDIST_NEXT_ERROR_DROP /* next index */ , + icndist_node.index, ICNDIST_ERROR_DROPS); + } + + /* Update counters */ + vlib_node_increment_counter (vm, icndist_node.index, + ICNDIST_ERROR_PROCESSED, pkts_processed); + vlib_node_increment_counter (vm, icndist_node.index, + ICNDIST_ERROR_INTERESTS, pkts_interest_count); + vlib_node_increment_counter (vm, icndist_node.index, + ICNDIST_ERROR_DATAS, pkts_data_count); + vlib_node_increment_counter (vm, icndist_node.index, + ICNDIST_ERROR_DROPS, pkts_dropped); + + return (frame->n_vectors); +} + +/* End of the packet-distributing node function */ + +/* + * Node registration block for the work-distributing node. + * TODO -- using the same trace func as the icnfwd node for now + */ +VLIB_REGISTER_NODE (icndist_node) = +{ + .function = icndist_node_fn,.name = "icndist",.vector_size = + sizeof (u32),.runtime_data_bytes = + sizeof (icndist_runtime_t),.format_trace = icndist_format_trace,.type = + VLIB_NODE_TYPE_INTERNAL,.n_errors = + ARRAY_LEN (icndist_error_strings),.error_strings = + icndist_error_strings,.n_next_nodes = ICNDIST_N_NEXT, + /* edit / add dispositions here */ + .next_nodes = + { + [ICNDIST_NEXT_FWD] = "icnfwd",[ICNDIST_NEXT_ERROR_DROP] = "error-drop",} +,}; +#endif // CICN_FEATURE_MULTITHREAD -- cgit 1.2.3-korg