From 475f055305cf904b1c1c0436654f2f3e1c4f3358 Mon Sep 17 00:00:00 2001 From: Matus Fabian Date: Wed, 19 Oct 2016 06:17:52 -0700 Subject: snat: thread safe (VPP-443) All traffic corresponding to a specific SANT user is handled by a CPU core. in2out: Non-translated packets worker lookup by src address and VRF hash in snat-in2out-worker-handoff node. out2in: Translated packets worker lookup by dst address and port number hash in snat-out2in-worker-handoff node. Change-Id: Ia092a605689539469841d382588f3f486a29a769 Signed-off-by: Matus Fabian --- plugins/snat-plugin/snat/in2out.c | 350 ++++++++++++++++++++++++++++++-------- 1 file changed, 278 insertions(+), 72 deletions(-) (limited to 'plugins/snat-plugin/snat/in2out.c') diff --git a/plugins/snat-plugin/snat/in2out.c b/plugins/snat-plugin/snat/in2out.c index 9a4aeb01fd4..e1edbb81581 100644 --- a/plugins/snat-plugin/snat/in2out.c +++ b/plugins/snat-plugin/snat/in2out.c @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -33,6 +34,11 @@ typedef struct { u32 is_slow_path; } snat_in2out_trace_t; +typedef struct { + u32 next_worker_index; + u8 do_handoff; +} snat_in2out_worker_handoff_trace_t; + /* packet trace format function */ static u8 * format_snat_in2out_trace (u8 * s, va_list * args) { @@ -61,9 +67,24 @@ static u8 * format_snat_in2out_fast_trace (u8 * s, va_list * args) return s; } +static u8 * format_snat_in2out_worker_handoff_trace (u8 * s, va_list * args) +{ + CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); + CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); + snat_in2out_worker_handoff_trace_t * t = + va_arg (*args, snat_in2out_worker_handoff_trace_t *); + char * m; + + m = t->do_handoff ? "next worker" : "same worker"; + s = format (s, "SNAT_IN2OUT_WORKER_HANDOFF: %s %d", m, t->next_worker_index); + + return s; +} + vlib_node_registration_t snat_in2out_node; vlib_node_registration_t snat_in2out_slowpath_node; vlib_node_registration_t snat_in2out_fast_node; +vlib_node_registration_t snat_in2out_worker_handoff_node; #define foreach_snat_in2out_error \ _(UNSUPPORTED_PROTOCOL, "Unsupported protocol") \ @@ -93,14 +114,14 @@ typedef enum { SNAT_IN2OUT_N_NEXT, } snat_in2out_next_t; - static u32 slow_path (snat_main_t *sm, vlib_buffer_t *b0, ip4_header_t * ip0, u32 rx_fib_index0, snat_session_key_t * key0, snat_session_t ** sessionp, vlib_node_runtime_t * node, - u32 next0) + u32 next0, + u32 cpu_index) { snat_user_t *u; snat_user_key_t user_key; @@ -115,6 +136,7 @@ static u32 slow_path (snat_main_t *sm, vlib_buffer_t *b0, u32 address_index = ~0; u32 outside_fib_index; uword * p; + snat_static_mapping_key_t worker_by_out_key; p = hash_get (sm->ip4_main->fib_index_by_table_id, sm->outside_vrf_id); if (! p) @@ -132,25 +154,27 @@ static u32 slow_path (snat_main_t *sm, vlib_buffer_t *b0, if (clib_bihash_search_8_8 (&sm->user_hash, &kv0, &value0)) { /* no, make a new one */ - pool_get (sm->users, u); + pool_get (sm->per_thread_data[cpu_index].users, u); memset (u, 0, sizeof (*u)); u->addr = ip0->src_address; - pool_get (sm->list_pool, per_user_list_head_elt); + pool_get (sm->per_thread_data[cpu_index].list_pool, per_user_list_head_elt); u->sessions_per_user_list_head_index = per_user_list_head_elt - - sm->list_pool; + sm->per_thread_data[cpu_index].list_pool; - clib_dlist_init (sm->list_pool, u->sessions_per_user_list_head_index); + clib_dlist_init (sm->per_thread_data[cpu_index].list_pool, + u->sessions_per_user_list_head_index); - kv0.value = u - sm->users; + kv0.value = u - sm->per_thread_data[cpu_index].users; /* add user */ clib_bihash_add_del_8_8 (&sm->user_hash, &kv0, 1 /* is_add */); } else { - u = pool_elt_at_index (sm->users, value0.value); + u = pool_elt_at_index (sm->per_thread_data[cpu_index].users, + value0.value); } /* Over quota? Recycle the least recently used dynamic translation */ @@ -159,25 +183,26 @@ static u32 slow_path (snat_main_t *sm, vlib_buffer_t *b0, /* Remove the oldest dynamic translation */ do { oldest_per_user_translation_list_index = - clib_dlist_remove_head - (sm->list_pool, u->sessions_per_user_list_head_index); + clib_dlist_remove_head (sm->per_thread_data[cpu_index].list_pool, + u->sessions_per_user_list_head_index); ASSERT (oldest_per_user_translation_list_index != ~0); /* add it back to the end of the LRU list */ - clib_dlist_addtail (sm->list_pool, + clib_dlist_addtail (sm->per_thread_data[cpu_index].list_pool, u->sessions_per_user_list_head_index, oldest_per_user_translation_list_index); /* Get the list element */ oldest_per_user_translation_list_elt = - pool_elt_at_index (sm->list_pool, + pool_elt_at_index (sm->per_thread_data[cpu_index].list_pool, oldest_per_user_translation_list_index); /* Get the session index from the list element */ session_index = oldest_per_user_translation_list_elt->value; /* Get the session */ - s = pool_elt_at_index (sm->sessions, session_index); + s = pool_elt_at_index (sm->per_thread_data[cpu_index].sessions, + session_index); } while (!snat_is_session_static (s)); /* Remove in2out, out2in keys */ @@ -218,7 +243,7 @@ static u32 slow_path (snat_main_t *sm, vlib_buffer_t *b0, } /* Create a new session */ - pool_get (sm->sessions, s); + pool_get (sm->per_thread_data[cpu_index].sessions, s); memset (s, 0, sizeof (*s)); s->outside_address_index = address_index; @@ -234,16 +259,22 @@ static u32 slow_path (snat_main_t *sm, vlib_buffer_t *b0, } /* Create list elts */ - pool_get (sm->list_pool, per_user_translation_list_elt); - clib_dlist_init (sm->list_pool, per_user_translation_list_elt - - sm->list_pool); - - per_user_translation_list_elt->value = s - sm->sessions; - s->per_user_index = per_user_translation_list_elt - sm->list_pool; + pool_get (sm->per_thread_data[cpu_index].list_pool, + per_user_translation_list_elt); + clib_dlist_init (sm->per_thread_data[cpu_index].list_pool, + per_user_translation_list_elt - + sm->per_thread_data[cpu_index].list_pool); + + per_user_translation_list_elt->value = + s - sm->per_thread_data[cpu_index].sessions; + s->per_user_index = per_user_translation_list_elt - + sm->per_thread_data[cpu_index].list_pool; s->per_user_list_head_index = u->sessions_per_user_list_head_index; - clib_dlist_addtail (sm->list_pool, s->per_user_list_head_index, - per_user_translation_list_elt - sm->list_pool); + clib_dlist_addtail (sm->per_thread_data[cpu_index].list_pool, + s->per_user_list_head_index, + per_user_translation_list_elt - + sm->per_thread_data[cpu_index].list_pool); } s->in2out = *key0; @@ -254,16 +285,23 @@ static u32 slow_path (snat_main_t *sm, vlib_buffer_t *b0, /* Add to translation hashes */ kv0.key = s->in2out.as_u64; - kv0.value = s - sm->sessions; + kv0.value = s - sm->per_thread_data[cpu_index].sessions; if (clib_bihash_add_del_8_8 (&sm->in2out, &kv0, 1 /* is_add */)) clib_warning ("in2out key add failed"); kv0.key = s->out2in.as_u64; - kv0.value = s - sm->sessions; + kv0.value = s - sm->per_thread_data[cpu_index].sessions; if (clib_bihash_add_del_8_8 (&sm->out2in, &kv0, 1 /* is_add */)) clib_warning ("out2in key add failed"); + /* Add to translated packets worker lookup */ + worker_by_out_key.addr = s->out2in.addr; + worker_by_out_key.port = s->out2in.port; + worker_by_out_key.fib_index = s->out2in.fib_index; + kv0.key = worker_by_out_key.as_u64; + kv0.value = cpu_index; + clib_bihash_add_del_8_8 (&sm->worker_by_out, &kv0, 1); return next0; } @@ -275,7 +313,8 @@ static inline u32 icmp_in2out_slow_path (snat_main_t *sm, u32 rx_fib_index0, vlib_node_runtime_t * node, u32 next0, - f64 now) + f64 now, + u32 cpu_index) { snat_session_key_t key0; icmp_echo_header_t *echo0; @@ -320,13 +359,14 @@ static inline u32 icmp_in2out_slow_path (snat_main_t *sm, return next0; next0 = slow_path (sm, b0, ip0, rx_fib_index0, &key0, - &s0, node, next0); + &s0, node, next0, cpu_index); if (PREDICT_FALSE (next0 == SNAT_IN2OUT_NEXT_DROP)) return next0; } else - s0 = pool_elt_at_index (sm->sessions, value0.value); + s0 = pool_elt_at_index (sm->per_thread_data[cpu_index].sessions, + value0.value); old_addr0 = ip0->src_address.as_u32; ip0->src_address = s0->out2in.addr; @@ -355,8 +395,10 @@ static inline u32 icmp_in2out_slow_path (snat_main_t *sm, /* Per-user LRU list maintenance for dynamic translations */ if (!snat_is_session_static (s0)) { - clib_dlist_remove (sm->list_pool, s0->per_user_index); - clib_dlist_addtail (sm->list_pool, s0->per_user_list_head_index, + clib_dlist_remove (sm->per_thread_data[cpu_index].list_pool, + s0->per_user_index); + clib_dlist_addtail (sm->per_thread_data[cpu_index].list_pool, + s0->per_user_list_head_index, s0->per_user_index); } @@ -375,6 +417,7 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, snat_runtime_t * rt = (snat_runtime_t *)node->runtime_data; f64 now = vlib_time_now (vm); u32 stats_node_index; + u32 cpu_index = os_get_cpu_number (); stats_node_index = is_slow_path ? snat_in2out_slowpath_node.index : snat_in2out_node.index; @@ -445,14 +488,6 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, next0 = next1 = SNAT_IN2OUT_NEXT_LOOKUP; -#if 0 - /* Formally correct, but we send to slowpath, lookup or drop */ - vnet_get_config_data (&cm->config_main, - &b0->current_config_index, - &next0, - 0 /* sizeof config data */); -#endif - proto0 = ~0; proto0 = (ip0->protocol == IP_PROTOCOL_UDP) ? SNAT_PROTOCOL_UDP : proto0; @@ -471,7 +506,7 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, { next0 = icmp_in2out_slow_path (sm, b0, ip0, icmp0, sw_if_index0, rx_fib_index0, - node, next0, now); + node, next0, now, cpu_index); goto trace00; } } @@ -512,7 +547,7 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, goto trace00; next0 = slow_path (sm, b0, ip0, rx_fib_index0, &key0, - &s0, node, next0); + &s0, node, next0, cpu_index); if (PREDICT_FALSE (next0 == SNAT_IN2OUT_NEXT_DROP)) goto trace00; } @@ -523,7 +558,8 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, } } else - s0 = pool_elt_at_index (sm->sessions, value0.value); + s0 = pool_elt_at_index (sm->per_thread_data[cpu_index].sessions, + value0.value); old_addr0 = ip0->src_address.as_u32; ip0->src_address = s0->out2in.addr; @@ -565,8 +601,10 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, /* Per-user LRU list maintenance for dynamic translation */ if (!snat_is_session_static (s0)) { - clib_dlist_remove (sm->list_pool, s0->per_user_index); - clib_dlist_addtail (sm->list_pool, s0->per_user_list_head_index, + clib_dlist_remove (sm->per_thread_data[cpu_index].list_pool, + s0->per_user_index); + clib_dlist_addtail (sm->per_thread_data[cpu_index].list_pool, + s0->per_user_list_head_index, s0->per_user_index); } trace00: @@ -581,7 +619,7 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, t->next_index = next0; t->session_index = ~0; if (s0) - t->session_index = s0 - sm->sessions; + t->session_index = s0 - sm->per_thread_data[cpu_index].sessions; } pkts_processed += next0 != SNAT_IN2OUT_NEXT_DROP; @@ -595,13 +633,6 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, rx_fib_index1 = vec_elt (sm->ip4_main->fib_index_by_sw_if_index, sw_if_index1); -#if 0 - vnet_get_config_data (&cm->config_main, - &b1->current_config_index, - &next1, - 0 /* sizeof config data */); -#endif - proto1 = ~0; proto1 = (ip1->protocol == IP_PROTOCOL_UDP) ? SNAT_PROTOCOL_UDP : proto1; @@ -619,8 +650,8 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, if (PREDICT_FALSE (proto1 == SNAT_PROTOCOL_ICMP)) { next1 = icmp_in2out_slow_path - (sm, b1, ip1, icmp1, sw_if_index1, rx_fib_index1, node, next1, - now); + (sm, b1, ip1, icmp1, sw_if_index1, rx_fib_index1, node, + next1, now, cpu_index); goto trace01; } } @@ -661,7 +692,7 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, goto trace01; next1 = slow_path (sm, b1, ip1, rx_fib_index1, &key1, - &s1, node, next1); + &s1, node, next1, cpu_index); if (PREDICT_FALSE (next1 == SNAT_IN2OUT_NEXT_DROP)) goto trace01; } @@ -672,7 +703,8 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, } } else - s1 = pool_elt_at_index (sm->sessions, value1.value); + s1 = pool_elt_at_index (sm->per_thread_data[cpu_index].sessions, + value1.value); old_addr1 = ip1->src_address.as_u32; ip1->src_address = s1->out2in.addr; @@ -714,8 +746,10 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, /* Per-user LRU list maintenance for dynamic translation */ if (!snat_is_session_static (s1)) { - clib_dlist_remove (sm->list_pool, s1->per_user_index); - clib_dlist_addtail (sm->list_pool, s1->per_user_list_head_index, + clib_dlist_remove (sm->per_thread_data[cpu_index].list_pool, + s1->per_user_index); + clib_dlist_addtail (sm->per_thread_data[cpu_index].list_pool, + s1->per_user_list_head_index, s1->per_user_index); } trace01: @@ -729,7 +763,7 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, t->next_index = next1; t->session_index = ~0; if (s1) - t->session_index = s1 - sm->sessions; + t->session_index = s1 - sm->per_thread_data[cpu_index].sessions; } pkts_processed += next1 != SNAT_IN2OUT_NEXT_DROP; @@ -779,14 +813,6 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, rx_fib_index0 = vec_elt (sm->ip4_main->fib_index_by_sw_if_index, sw_if_index0); - -#if 0 - vnet_get_config_data (&cm->config_main, - &b0->current_config_index, - &next0, - 0 /* sizeof config data */); -#endif - proto0 = ~0; proto0 = (ip0->protocol == IP_PROTOCOL_UDP) ? SNAT_PROTOCOL_UDP : proto0; @@ -804,8 +830,8 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, if (PREDICT_FALSE (proto0 == SNAT_PROTOCOL_ICMP)) { next0 = icmp_in2out_slow_path - (sm, b0, ip0, icmp0, sw_if_index0, rx_fib_index0, node, next0, - now); + (sm, b0, ip0, icmp0, sw_if_index0, rx_fib_index0, node, + next0, now, cpu_index); goto trace0; } } @@ -846,7 +872,7 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, goto trace0; next0 = slow_path (sm, b0, ip0, rx_fib_index0, &key0, - &s0, node, next0); + &s0, node, next0, cpu_index); if (PREDICT_FALSE (next0 == SNAT_IN2OUT_NEXT_DROP)) goto trace0; } @@ -857,7 +883,8 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, } } else - s0 = pool_elt_at_index (sm->sessions, value0.value); + s0 = pool_elt_at_index (sm->per_thread_data[cpu_index].sessions, + value0.value); old_addr0 = ip0->src_address.as_u32; ip0->src_address = s0->out2in.addr; @@ -899,8 +926,10 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, /* Per-user LRU list maintenance for dynamic translation */ if (!snat_is_session_static (s0)) { - clib_dlist_remove (sm->list_pool, s0->per_user_index); - clib_dlist_addtail (sm->list_pool, s0->per_user_list_head_index, + clib_dlist_remove (sm->per_thread_data[cpu_index].list_pool, + s0->per_user_index); + clib_dlist_addtail (sm->per_thread_data[cpu_index].list_pool, + s0->per_user_list_head_index, s0->per_user_index); } @@ -915,7 +944,7 @@ snat_in2out_node_fn_inline (vlib_main_t * vm, t->next_index = next0; t->session_index = ~0; if (s0) - t->session_index = s0 - sm->sessions; + t->session_index = s0 - sm->per_thread_data[cpu_index].sessions; } pkts_processed += next0 != SNAT_IN2OUT_NEXT_DROP; @@ -999,6 +1028,183 @@ VLIB_REGISTER_NODE (snat_in2out_slowpath_node) = { VLIB_NODE_FUNCTION_MULTIARCH (snat_in2out_slowpath_node, snat_in2out_slow_path_fn); +static uword +snat_in2out_worker_handoff_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, + vlib_frame_t * frame) +{ + snat_main_t *sm = &snat_main; + vlib_thread_main_t *tm = vlib_get_thread_main (); + u32 n_left_from, *from, *to_next = 0; + static __thread vlib_frame_queue_elt_t **handoff_queue_elt_by_worker_index; + static __thread vlib_frame_queue_t **congested_handoff_queue_by_worker_index + = 0; + vlib_frame_queue_elt_t *hf = 0; + vlib_frame_t *f = 0; + int i; + u32 n_left_to_next_worker = 0, *to_next_worker = 0; + u32 next_worker_index = 0; + u32 current_worker_index = ~0; + u32 cpu_index = os_get_cpu_number (); + + if (PREDICT_FALSE (handoff_queue_elt_by_worker_index == 0)) + { + vec_validate (handoff_queue_elt_by_worker_index, tm->n_vlib_mains - 1); + + vec_validate_init_empty (congested_handoff_queue_by_worker_index, + sm->first_worker_index + sm->num_workers - 1, + (vlib_frame_queue_t *) (~0)); + } + + from = vlib_frame_vector_args (frame); + n_left_from = frame->n_vectors; + + while (n_left_from > 0) + { + u32 bi0; + vlib_buffer_t *b0; + u32 sw_if_index0; + u32 rx_fib_index0; + ip4_header_t * ip0; + snat_user_key_t key0; + clib_bihash_kv_8_8_t kv0, value0; + u8 do_handoff; + + bi0 = from[0]; + from += 1; + n_left_from -= 1; + + b0 = vlib_get_buffer (vm, bi0); + + sw_if_index0 = vnet_buffer (b0)->sw_if_index[VLIB_RX]; + rx_fib_index0 = ip4_fib_table_get_index_for_sw_if_index(sw_if_index0); + + ip0 = vlib_buffer_get_current (b0); + + key0.addr = ip0->src_address; + key0.fib_index = rx_fib_index0; + + kv0.key = key0.as_u64; + + /* Ever heard of of the "user" before? */ + if (clib_bihash_search_8_8 (&sm->worker_by_in, &kv0, &value0)) + { + /* No, assign next available worker (RR) */ + next_worker_index = sm->first_worker_index + + sm->workers[sm->next_worker++ % vec_len (sm->workers)]; + + /* add non-traslated packets worker lookup */ + kv0.value = next_worker_index; + clib_bihash_add_del_8_8 (&sm->worker_by_in, &kv0, 1); + } + else + next_worker_index = value0.value; + + if (PREDICT_FALSE (next_worker_index != cpu_index)) + { + do_handoff = 1; + + if (next_worker_index != current_worker_index) + { + if (hf) + hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker; + + hf = vlib_get_worker_handoff_queue_elt (sm->fq_in2out_index, + next_worker_index, + handoff_queue_elt_by_worker_index); + + n_left_to_next_worker = VLIB_FRAME_SIZE - hf->n_vectors; + to_next_worker = &hf->buffer_index[hf->n_vectors]; + current_worker_index = next_worker_index; + } + + /* enqueue to correct worker thread */ + to_next_worker[0] = bi0; + to_next_worker++; + n_left_to_next_worker--; + + if (n_left_to_next_worker == 0) + { + hf->n_vectors = VLIB_FRAME_SIZE; + vlib_put_frame_queue_elt (hf); + current_worker_index = ~0; + handoff_queue_elt_by_worker_index[next_worker_index] = 0; + hf = 0; + } + } + else + { + do_handoff = 0; + /* if this is 1st frame */ + if (!f) + { + f = vlib_get_frame_to_node (vm, snat_in2out_node.index); + to_next = vlib_frame_vector_args (f); + } + + to_next[0] = bi0; + to_next += 1; + f->n_vectors++; + } + + if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE) + && (b0->flags & VLIB_BUFFER_IS_TRACED))) + { + snat_in2out_worker_handoff_trace_t *t = + vlib_add_trace (vm, node, b0, sizeof (*t)); + t->next_worker_index = next_worker_index; + t->do_handoff = do_handoff; + } + } + + if (f) + vlib_put_frame_to_node (vm, snat_in2out_node.index, f); + + if (hf) + hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker; + + /* Ship frames to the worker nodes */ + for (i = 0; i < vec_len (handoff_queue_elt_by_worker_index); i++) + { + if (handoff_queue_elt_by_worker_index[i]) + { + hf = handoff_queue_elt_by_worker_index[i]; + /* + * It works better to let the handoff node + * rate-adapt, always ship the handoff queue element. + */ + if (1 || hf->n_vectors == hf->last_n_vectors) + { + vlib_put_frame_queue_elt (hf); + handoff_queue_elt_by_worker_index[i] = 0; + } + else + hf->last_n_vectors = hf->n_vectors; + } + congested_handoff_queue_by_worker_index[i] = + (vlib_frame_queue_t *) (~0); + } + hf = 0; + current_worker_index = ~0; + return frame->n_vectors; +} + +VLIB_REGISTER_NODE (snat_in2out_worker_handoff_node) = { + .function = snat_in2out_worker_handoff_fn, + .name = "snat-in2out-worker-handoff", + .vector_size = sizeof (u32), + .format_trace = format_snat_in2out_worker_handoff_trace, + .type = VLIB_NODE_TYPE_INTERNAL, + + .n_next_nodes = 1, + + .next_nodes = { + [0] = "error-drop", + }, +}; + +VLIB_NODE_FUNCTION_MULTIARCH (snat_in2out_worker_handoff_node, snat_in2out_worker_handoff_fn); + static inline u32 icmp_in2out_static_map (snat_main_t *sm, vlib_buffer_t * b0, ip4_header_t * ip0, -- cgit 1.2.3-korg