diff options
author | Andrew Yourtchenko <ayourtch@gmail.com> | 2017-05-17 21:27:03 +0200 |
---|---|---|
committer | Damjan Marion <dmarion.lists@gmail.com> | 2017-06-07 13:37:46 +0000 |
commit | 5dbfbb7110a52595915acd5ec034f82ce517846a (patch) | |
tree | 1214c5cb0904a5ca3eadba59f587a12af531364a /src/plugins/acl/fa_node.c | |
parent | a62699954a9f57c8407ba10d08636c79166f56ed (diff) |
acl-plugin: make the ACL plugin multicore-capable
Add the logic to be able to use stateful ACLs in a multithreaded setup.
Change-Id: I3b0cfa6ca4ea8f46f61648611c3e97b00c3376b6
Signed-off-by: Andrew Yourtchenko <ayourtch@gmail.com>
Diffstat (limited to 'src/plugins/acl/fa_node.c')
-rw-r--r-- | src/plugins/acl/fa_node.c | 646 |
1 files changed, 459 insertions, 187 deletions
diff --git a/src/plugins/acl/fa_node.c b/src/plugins/acl/fa_node.c index c71429e76c6..c6059aae76a 100644 --- a/src/plugins/acl/fa_node.c +++ b/src/plugins/acl/fa_node.c @@ -543,6 +543,39 @@ fa_session_get_timeout_type (acl_main_t * am, fa_session_t * sess) static u64 +fa_session_get_shortest_timeout(acl_main_t * am) +{ + int timeout_type; + u64 timeout = ~0LL; + for(timeout_type = 0; timeout_type < ACL_N_TIMEOUTS; timeout_type++) { + if (timeout > am->session_timeout_sec[timeout_type]) { + timeout = am->session_timeout_sec[timeout_type]; + } + } + return timeout; +} + +/* + * Get the timeout of the session in a list since its enqueue time. + */ + +static u64 +fa_session_get_list_timeout (acl_main_t * am, fa_session_t * sess) +{ + u64 timeout = am->vlib_main->clib_time.clocks_per_second; + /* + * we have the shortest possible timeout type in all the lists + * (see README-multicore for the rationale) + */ + timeout *= fa_session_get_shortest_timeout(am); + return timeout; +} + +/* + * Get the idle timeout of a session. + */ + +static u64 fa_session_get_timeout (acl_main_t * am, fa_session_t * sess) { u64 timeout = am->vlib_main->clib_time.clocks_per_second; @@ -554,6 +587,7 @@ fa_session_get_timeout (acl_main_t * am, fa_session_t * sess) static void acl_fa_ifc_init_sessions (acl_main_t * am, int sw_if_index0) { + /// FIXME-MULTICORE: lock around this function #ifdef FA_NODE_VERBOSE_DEBUG clib_warning ("Initializing bihash for sw_if_index %d num buckets %lu memory size %llu", @@ -569,68 +603,97 @@ acl_fa_ifc_init_sessions (acl_main_t * am, int sw_if_index0) clib_bitmap_set (am->fa_sessions_on_sw_if_index, sw_if_index0, 1); } +static inline fa_session_t *get_session_ptr(acl_main_t *am, u16 thread_index, u32 session_index) +{ + acl_fa_per_worker_data_t *pw = &am->per_worker_data[thread_index]; + fa_session_t *sess = pw->fa_sessions_pool + session_index; + return sess; +} + static void -acl_fa_conn_list_add_session (acl_main_t * am, u32 sess_id, u64 now) +acl_fa_conn_list_add_session (acl_main_t * am, fa_full_session_id_t sess_id, u64 now) { - fa_session_t *sess = am->fa_sessions_pool + sess_id; + fa_session_t *sess = get_session_ptr(am, sess_id.thread_index, sess_id.session_index); u8 list_id = fa_session_get_timeout_type(am, sess); + uword thread_index = os_get_thread_index (); + acl_fa_per_worker_data_t *pw = &am->per_worker_data[thread_index]; + /* the retrieved session thread index must be necessarily the same as the one in the key */ + ASSERT (sess->thread_index == sess_id.thread_index); + /* the retrieved session thread index must be the same as current thread */ + ASSERT (sess->thread_index == thread_index); sess->link_enqueue_time = now; sess->link_list_id = list_id; sess->link_next_idx = ~0; - sess->link_prev_idx = am->fa_conn_list_tail[list_id]; - if (~0 != am->fa_conn_list_tail[list_id]) { - fa_session_t *prev_sess = am->fa_sessions_pool + am->fa_conn_list_tail[list_id]; - prev_sess->link_next_idx = sess_id; + sess->link_prev_idx = pw->fa_conn_list_tail[list_id]; + if (~0 != pw->fa_conn_list_tail[list_id]) { + fa_session_t *prev_sess = get_session_ptr(am, thread_index, pw->fa_conn_list_tail[list_id]); + prev_sess->link_next_idx = sess_id.session_index; + /* We should never try to link with a session on another thread */ + ASSERT(prev_sess->thread_index == sess->thread_index); } - am->fa_conn_list_tail[list_id] = sess_id; + pw->fa_conn_list_tail[list_id] = sess_id.session_index; + pw->serviced_sw_if_index_bitmap = clib_bitmap_set(pw->serviced_sw_if_index_bitmap, sess->sw_if_index, 1); - if (~0 == am->fa_conn_list_head[list_id]) { - am->fa_conn_list_head[list_id] = sess_id; - /* If it is a first conn in any list, kick off the cleaner */ + if (~0 == pw->fa_conn_list_head[list_id]) { + pw->fa_conn_list_head[list_id] = sess_id.session_index; + /* If it is a first conn in any list, kick the cleaner */ vlib_process_signal_event (am->vlib_main, am->fa_cleaner_node_index, ACL_FA_CLEANER_RESCHEDULE, 0); - } } -static void -acl_fa_conn_list_delete_session (acl_main_t *am, u32 sess_id) +static int +acl_fa_conn_list_delete_session (acl_main_t *am, fa_full_session_id_t sess_id) { - fa_session_t *sess = am->fa_sessions_pool + sess_id; + uword thread_index = os_get_thread_index (); + acl_fa_per_worker_data_t *pw = &am->per_worker_data[thread_index]; + if (thread_index != sess_id.thread_index) { + /* If another thread attempts to delete the session, fail it. */ +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("thread id in key %d != curr thread index, not deleting"); +#endif + return 0; + } + fa_session_t *sess = get_session_ptr(am, sess_id.thread_index, sess_id.session_index); + /* we should never try to delete the session with another thread index */ + ASSERT(sess->thread_index == thread_index); if (~0 != sess->link_prev_idx) { - fa_session_t *prev_sess = am->fa_sessions_pool + sess->link_prev_idx; + fa_session_t *prev_sess = get_session_ptr(am, thread_index, sess->link_prev_idx); + /* the previous session must be in the same list as this one */ + ASSERT(prev_sess->link_list_id == sess->link_list_id); prev_sess->link_next_idx = sess->link_next_idx; - if (prev_sess->link_list_id != sess->link_list_id) - clib_warning("(prev_sess->link_list_id != sess->link_list_id)"); } if (~0 != sess->link_next_idx) { - fa_session_t *next_sess = am->fa_sessions_pool + sess->link_next_idx; + fa_session_t *next_sess = get_session_ptr(am, thread_index, sess->link_next_idx); + /* The next session must be in the same list as the one we are deleting */ + ASSERT(next_sess->link_list_id == sess->link_list_id); next_sess->link_prev_idx = sess->link_prev_idx; - if (next_sess->link_list_id != sess->link_list_id) - clib_warning("(next_sess->link_list_id != sess->link_list_id)"); } - if (am->fa_conn_list_head[sess->link_list_id] == sess_id) { - am->fa_conn_list_head[sess->link_list_id] = sess->link_next_idx; + if (pw->fa_conn_list_head[sess->link_list_id] == sess_id.session_index) { + pw->fa_conn_list_head[sess->link_list_id] = sess->link_next_idx; } - if (am->fa_conn_list_tail[sess->link_list_id] == sess_id) { - am->fa_conn_list_tail[sess->link_list_id] = sess->link_prev_idx; + if (pw->fa_conn_list_tail[sess->link_list_id] == sess_id.session_index) { + pw->fa_conn_list_tail[sess->link_list_id] = sess->link_prev_idx; } + return 1; } - -int -acl_fa_session_is_dead (acl_main_t * am, u32 sw_if_index, u64 now, - u32 sess_id) -{ - return 0; -} - -static void -acl_fa_restart_timer_for_session (acl_main_t * am, u64 now, u32 sess_id) +static int +acl_fa_restart_timer_for_session (acl_main_t * am, u64 now, fa_full_session_id_t sess_id) { - // fa_session_t *sess = am->fa_sessions_pool + sess_id; - acl_fa_conn_list_delete_session(am, sess_id); - acl_fa_conn_list_add_session(am, sess_id, now); + if (acl_fa_conn_list_delete_session(am, sess_id)) { + acl_fa_conn_list_add_session(am, sess_id, now); + return 1; + } else { + /* + * Our thread does not own this connection, so we can not delete + * The session. To avoid the complicated signaling, we simply + * pick the list waiting time to be the shortest of the timeouts. + * This way we do not have to do anything special, and let + * the regular requeue check take care of everything. + */ + return 0; + } } @@ -648,13 +711,16 @@ acl_fa_track_session (acl_main_t * am, int is_input, u32 sw_if_index, u64 now, static void -acl_fa_delete_session (acl_main_t * am, u32 sw_if_index, u32 sess_id) +acl_fa_delete_session (acl_main_t * am, u32 sw_if_index, fa_full_session_id_t sess_id) { - fa_session_t *sess = (fa_session_t *) am->fa_sessions_pool + sess_id; + fa_session_t *sess = get_session_ptr(am, sess_id.thread_index, sess_id.session_index); + ASSERT(sess->thread_index == os_get_thread_index ()); BV (clib_bihash_add_del) (&am->fa_sessions_by_sw_if_index[sw_if_index], &sess->info.kv, 0); - pool_put_index (am->fa_sessions_pool, sess_id); - /* Deleting from timer wheel not needed, as the cleaner deals with the timers. */ + acl_fa_per_worker_data_t *pw = &am->per_worker_data[sess_id.thread_index]; + pool_put_index (pw->fa_sessions_pool, sess_id.session_index); + /* Deleting from timer structures not needed, + as the caller must have dealt with the timers. */ vec_validate (am->fa_session_dels_by_sw_if_index, sw_if_index); am->fa_session_dels_by_sw_if_index[sw_if_index]++; } @@ -671,13 +737,114 @@ acl_fa_can_add_session (acl_main_t * am, int is_input, u32 sw_if_index) return (curr_sess < am->fa_conn_table_max_entries); } +static u64 +acl_fa_get_list_head_expiry_time(acl_main_t *am, acl_fa_per_worker_data_t *pw, u64 now, u16 thread_index, int timeout_type) +{ + if (~0 == pw->fa_conn_list_head[timeout_type]) { + return ~0LL; // infinity. + } else { + fa_session_t *sess = get_session_ptr(am, thread_index, pw->fa_conn_list_head[timeout_type]); + u64 timeout_time = + sess->link_enqueue_time + fa_session_get_list_timeout (am, sess); + return timeout_time; + } +} + +static int +acl_fa_conn_time_to_check (acl_main_t *am, acl_fa_per_worker_data_t *pw, u64 now, u16 thread_index, u32 session_index) +{ + fa_session_t *sess = get_session_ptr(am, thread_index, session_index); + u64 timeout_time = + sess->link_enqueue_time + fa_session_get_list_timeout (am, sess); + return (timeout_time < now) || (sess->link_enqueue_time <= pw->swipe_end_time); +} + +/* + * see if there are sessions ready to be checked, + * do the maintenance (requeue or delete), and + * return the total number of sessions reclaimed. + */ +static int +acl_fa_check_idle_sessions(acl_main_t *am, u16 thread_index, u64 now) +{ + acl_fa_per_worker_data_t *pw = &am->per_worker_data[thread_index]; + fa_full_session_id_t fsid; + fsid.thread_index = thread_index; + int total_expired = 0; + + { + u8 tt = 0; + for(tt = 0; tt < ACL_N_TIMEOUTS; tt++) { + while((vec_len(pw->expired) < am->fa_max_deleted_sessions_per_interval) + && (~0 != pw->fa_conn_list_head[tt]) + && (acl_fa_conn_time_to_check(am, pw, now, thread_index, + pw->fa_conn_list_head[tt]))) { + fsid.session_index = pw->fa_conn_list_head[tt]; + vec_add1(pw->expired, fsid.session_index); + acl_fa_conn_list_delete_session(am, fsid); + } + } + } + + u32 *psid = NULL; + vec_foreach (psid, pw->expired) + { + fsid.session_index = *psid; + if (!pool_is_free_index (pw->fa_sessions_pool, fsid.session_index)) + { + fa_session_t *sess = get_session_ptr(am, thread_index, fsid.session_index); + u32 sw_if_index = sess->sw_if_index; + u64 sess_timeout_time = + sess->last_active_time + fa_session_get_timeout (am, sess); + if ((now < sess_timeout_time) && (0 == clib_bitmap_get(pw->pending_clear_sw_if_index_bitmap, sw_if_index))) + { +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning ("ACL_FA_NODE_CLEAN: Restarting timer for session %d", + (int) session_index); +#endif + /* There was activity on the session, so the idle timeout + has not passed. Enqueue for another time period. */ + + acl_fa_conn_list_add_session(am, fsid, now); + pw->cnt_session_timer_restarted++; + } + else + { +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning ("ACL_FA_NODE_CLEAN: Deleting session %d", + (int) session_index); +#endif + acl_fa_delete_session (am, sw_if_index, fsid); + pw->cnt_deleted_sessions++; + } + } + else + { + pw->cnt_already_deleted_sessions++; + } + } + total_expired = vec_len(pw->expired); + /* zero out the vector which we have acted on */ + if (pw->expired) + _vec_len (pw->expired) = 0; + /* if we were advancing and reached the end + * (no more sessions to recycle), reset the fast-forward timestamp */ + + if (pw->swipe_end_time && 0 == total_expired) + pw->swipe_end_time = 0; + return (total_expired); +} + always_inline void -acl_fa_try_recycle_session (acl_main_t * am, int is_input, u32 sw_if_index) +acl_fa_try_recycle_session (acl_main_t * am, int is_input, u16 thread_index, u32 sw_if_index) { /* try to recycle a TCP transient session */ + acl_fa_per_worker_data_t *pw = &am->per_worker_data[thread_index]; u8 timeout_type = ACL_TIMEOUT_TCP_TRANSIENT; - u32 sess_id = am->fa_conn_list_head[timeout_type]; - if (~0 != sess_id) { + fa_full_session_id_t sess_id; + sess_id.session_index = pw->fa_conn_list_head[timeout_type]; + if (~0 != sess_id.session_index) { + sess_id.thread_index = thread_index; acl_fa_conn_list_delete_session(am, sess_id); acl_fa_delete_session(am, sw_if_index, sess_id); } @@ -689,25 +856,28 @@ acl_fa_add_session (acl_main_t * am, int is_input, u32 sw_if_index, u64 now, { clib_bihash_kv_40_8_t *pkv = &p5tuple->kv; clib_bihash_kv_40_8_t kv; - u32 sess_id; - fa_session_t *sess; + fa_full_session_id_t f_sess_id; + uword thread_index = os_get_thread_index(); + acl_fa_per_worker_data_t *pw = &am->per_worker_data[thread_index]; - pool_get (am->fa_sessions_pool, sess); - sess_id = sess - am->fa_sessions_pool; + f_sess_id.thread_index = thread_index; + fa_session_t *sess; + pool_get_aligned (pw->fa_sessions_pool, sess, CLIB_CACHE_LINE_BYTES); + f_sess_id.session_index = sess - pw->fa_sessions_pool; kv.key[0] = pkv->key[0]; kv.key[1] = pkv->key[1]; kv.key[2] = pkv->key[2]; kv.key[3] = pkv->key[3]; kv.key[4] = pkv->key[4]; - kv.value = sess_id; + kv.value = f_sess_id.as_u64; memcpy (sess, pkv, sizeof (pkv->key)); sess->last_active_time = now; sess->sw_if_index = sw_if_index; sess->tcp_flags_seen.as_u16 = 0; - sess->reserved1 = 0; + sess->thread_index = thread_index; sess->link_list_id = ~0; sess->link_prev_idx = ~0; sess->link_next_idx = ~0; @@ -721,7 +891,7 @@ acl_fa_add_session (acl_main_t * am, int is_input, u32 sw_if_index, u64 now, BV (clib_bihash_add_del) (&am->fa_sessions_by_sw_if_index[sw_if_index], &kv, 1); - acl_fa_conn_list_add_session(am, sess_id, now); + acl_fa_conn_list_add_session(am, f_sess_id, now); vec_validate (am->fa_session_adds_by_sw_if_index, sw_if_index); am->fa_session_adds_by_sw_if_index[sw_if_index]++; @@ -757,6 +927,7 @@ acl_fa_node_fn (vlib_main_t * vm, clib_bihash_kv_40_8_t value_sess; vlib_node_runtime_t *error_node; u64 now = clib_cpu_time_now (); + uword thread_index = os_get_thread_index (); from = vlib_frame_vector_args (frame); n_left_from = frame->n_vectors; @@ -827,16 +998,19 @@ acl_fa_node_fn (vlib_main_t * vm, { trace_bitmap |= 0x80000000; error0 = ACL_FA_ERROR_ACL_EXIST_SESSION; - // FIXME assert(value_sess.value == (0xffffffff & value_sess.value)); - u32 sess_id = value_sess.value; - fa_session_t *sess = am->fa_sessions_pool + sess_id; + fa_full_session_id_t f_sess_id; + + f_sess_id.as_u64 = value_sess.value; + ASSERT(f_sess_id.thread_index < vec_len(vlib_mains)); + + fa_session_t *sess = get_session_ptr(am, f_sess_id.thread_index, f_sess_id.session_index); int old_timeout_type = fa_session_get_timeout_type (am, sess); action = acl_fa_track_session (am, is_input, sw_if_index0, now, sess, &fa_5tuple); /* expose the session id to the tracer */ - match_rule_index = sess_id; + match_rule_index = f_sess_id.session_index; int new_timeout_type = fa_session_get_timeout_type (am, sess); acl_check_needed = 0; @@ -844,7 +1018,7 @@ acl_fa_node_fn (vlib_main_t * vm, /* Tracking might have changed the session timeout type, e.g. from transient to established */ if (PREDICT_FALSE (old_timeout_type != new_timeout_type)) { - acl_fa_restart_timer_for_session (am, now, sess_id); + acl_fa_restart_timer_for_session (am, now, f_sess_id); pkts_restart_session_timer++; trace_bitmap |= 0x00010000 + ((0xff & old_timeout_type) << 8) + @@ -865,7 +1039,7 @@ acl_fa_node_fn (vlib_main_t * vm, if (2 == action) { if (!acl_fa_can_add_session (am, is_input, sw_if_index0)) - acl_fa_try_recycle_session (am, is_input, sw_if_index0); + acl_fa_try_recycle_session (am, is_input, thread_index, sw_if_index0); if (acl_fa_can_add_session (am, is_input, sw_if_index0)) { @@ -1024,16 +1198,9 @@ acl_out_ip4_fa_node_fn (vlib_main_t * vm, } /* - * This process performs all the connection clean up - both for idle connections, - * as well as receiving the signals to clean up the connections in case of sw_if_index deletion, - * or (maybe in the future) the connection deletion due to policy reasons. - * - * The previous iteration (l2sess) attempted to clean up the connections in small increments, - * in-band, but the problem it tried to preemptively address (process starvation) is yet to be seen. - * - * The approach with a single thread deleting the connections is simpler, thus we use it until - * there is a real starvation problem to solve. - * + * This process ensures the connection cleanup happens every so often + * even in absence of traffic, as well as provides general orchestration + * for requests like connection deletion on a given sw_if_index. */ @@ -1056,57 +1223,131 @@ static char *acl_fa_cleaner_error_strings[] = { #undef _ }; -static int -acl_fa_clean_sessions_by_sw_if_index (acl_main_t *am, u32 sw_if_index, u32 *count) -{ - - int undeleted = 0; - fa_session_t *sess; - uword *dv = NULL; - uword *ii; - - pool_foreach(sess, am->fa_sessions_pool, ({ - if ( (~0 == sw_if_index) || (sw_if_index == sess->sw_if_index) ) - vec_add1(dv, sess-am->fa_sessions_pool); - })); - vec_foreach(ii, dv) - { - sess = pool_elt_at_index(am->fa_sessions_pool, *ii); - acl_fa_delete_session(am, sess->sw_if_index, *ii); - (*count)++; - } - - pool_foreach(sess, am->fa_sessions_pool, ({ - if ( (~0 == sw_if_index) || (sw_if_index == sess->sw_if_index) ) - undeleted++; - })); - if (undeleted == 0) - { - if (~0 == sw_if_index) - { - /* FIXME: clean-up tables ? */ - } - else - { - /* FIXME: clean-up tables ? */ - } - } - return (undeleted == 0); -} /* *INDENT-ON* */ static vlib_node_registration_t acl_fa_session_cleaner_process_node; +static vlib_node_registration_t acl_fa_worker_session_cleaner_process_node; -static int -acl_fa_conn_time_to_check (acl_main_t *am, u64 now, u32 session_index) +/* + * Per-worker thread interrupt-driven cleaner thread + * to clean idle connections if there are no packets + */ +static uword +acl_fa_worker_conn_cleaner_process(vlib_main_t * vm, + vlib_node_runtime_t * rt, vlib_frame_t * f) { - fa_session_t *sess = am->fa_sessions_pool + session_index; - u64 timeout_time = - sess->link_enqueue_time + fa_session_get_timeout (am, sess); - return (timeout_time < now); + acl_main_t *am = &acl_main; + u64 now = clib_cpu_time_now (); + u16 thread_index = os_get_thread_index (); + acl_fa_per_worker_data_t *pw = &am->per_worker_data[thread_index]; + int num_expired; +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("\nacl_fa_worker_conn_cleaner: thread index %d now %lu\n\n", thread_index, now); +#endif + /* allow another interrupt to be queued */ + pw->interrupt_is_pending = 0; + if (pw->clear_in_process) { + if (0 == pw->swipe_end_time) { + /* + * Someone has just set the flag to start clearing. + * we do this by combing through the connections up to a "time T" + * which is now, and requeueing everything except the expired + * connections and those matching the interface(s) being cleared. + */ + + /* + * first filter the sw_if_index bitmap that they want from us, by + * a bitmap of sw_if_index for which we actually have connections. + */ + if ((pw->pending_clear_sw_if_index_bitmap == 0) + || (pw->serviced_sw_if_index_bitmap == 0)) { +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("WORKER-CLEAR: someone tried to call clear, but one of the bitmaps are empty"); +#endif + clib_bitmap_zero(pw->pending_clear_sw_if_index_bitmap); + } else { +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("WORKER-CLEAR: (before and) swiping sw-if-index bitmap: %U, my serviced bitmap %U", + format_bitmap_hex, pw->pending_clear_sw_if_index_bitmap, + format_bitmap_hex, pw->serviced_sw_if_index_bitmap); +#endif + pw->pending_clear_sw_if_index_bitmap = clib_bitmap_and(pw->pending_clear_sw_if_index_bitmap, + pw->serviced_sw_if_index_bitmap); + } + + if (clib_bitmap_is_zero(pw->pending_clear_sw_if_index_bitmap)) { + /* if the cross-section is a zero vector, no need to do anything. */ + clib_warning("WORKER: clearing done - nothing to do"); + pw->clear_in_process = 0; + } else { +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("WORKER-CLEAR: swiping sw-if-index bitmap: %U, my serviced bitmap %U", + format_bitmap_hex, pw->pending_clear_sw_if_index_bitmap, + format_bitmap_hex, pw->serviced_sw_if_index_bitmap); +#endif + /* swipe through the connection lists until enqueue timestamps become above "now" */ + pw->swipe_end_time = now; + } + } + } + num_expired = acl_fa_check_idle_sessions(am, thread_index, now); + // clib_warning("WORKER-CLEAR: checked %d sessions (clear_in_progress: %d)", num_expired, pw->clear_in_process); + if (pw->clear_in_process) { + if (0 == num_expired) { + /* we were clearing but we could not process any more connections. time to stop. */ + clib_bitmap_zero(pw->pending_clear_sw_if_index_bitmap); + pw->clear_in_process = 0; +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("WORKER: clearing done, all done"); +#endif + } else { +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("WORKER-CLEAR: more work to do, raising interrupt"); +#endif + /* should continue clearing.. So could they please sent an interrupt again? */ + pw->interrupt_is_needed = 1; + } + } else { + if (num_expired >= am->fa_max_deleted_sessions_per_interval) { + /* there was too much work, we should get an interrupt ASAP */ + pw->interrupt_is_needed = 1; + } else if (num_expired <= am->fa_min_deleted_sessions_per_interval) { + /* signal that they should trigger us less */ + pw->interrupt_is_unwanted = 1; + } else { + /* the current rate of interrupts is ok */ + pw->interrupt_is_needed = 0; + pw->interrupt_is_unwanted = 0; + } + } + return 0; } +static void +send_one_worker_interrupt (vlib_main_t * vm, acl_main_t *am, int thread_index) +{ + acl_fa_per_worker_data_t *pw = &am->per_worker_data[thread_index]; + if (!pw->interrupt_is_pending) { + vlib_node_set_interrupt_pending (vlib_mains[thread_index], + acl_fa_worker_session_cleaner_process_node.index); + pw->interrupt_is_pending = 1; + /* if the interrupt was requested, mark that done. */ + pw->interrupt_is_needed = 0; + } +} +static void +send_interrupts_to_workers (vlib_main_t * vm, acl_main_t *am) +{ + int i; + /* Can't use vec_len(am->per_worker_data) since the threads might not have come up yet; */ + int n_threads = vec_len(vlib_mains); + for (i = n_threads > 1 ? 1 : 0; i < n_threads; i++) { + send_one_worker_interrupt(vm, am, i); + } +} + +/* centralized process to drive per-worker cleaners */ static uword acl_fa_session_cleaner_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f) @@ -1115,28 +1356,48 @@ acl_fa_session_cleaner_process (vlib_main_t * vm, vlib_node_runtime_t * rt, u64 now = clib_cpu_time_now (); f64 cpu_cps = vm->clib_time.clocks_per_second; u64 next_expire; - /* We should call timer wheel at least twice a second */ + /* We should check if there are connections to clean up - at least twice a second */ u64 max_timer_wait_interval = cpu_cps / 2; - am->fa_current_cleaner_timer_wait_interval = max_timer_wait_interval; - - u32 *expired = NULL; uword event_type, *event_data = 0; + acl_fa_per_worker_data_t *pw0; + am->fa_current_cleaner_timer_wait_interval = max_timer_wait_interval; am->fa_cleaner_node_index = acl_fa_session_cleaner_process_node.index; while (1) { - u32 count_deleted_sessions = 0; - u32 count_already_deleted = 0; now = clib_cpu_time_now (); next_expire = now + am->fa_current_cleaner_timer_wait_interval; int has_pending_conns = 0; + u16 ti; u8 tt; - for(tt = 0; tt < ACL_N_TIMEOUTS; tt++) - { - if (~0 != am->fa_conn_list_head[tt]) + + /* + * walk over all per-thread list heads of different timeouts, + * and see if there are any connections pending. + * If there aren't - we do not need to wake up until the + * worker code signals that it has added a connection. + * + * Also, while we are at it, calculate the earliest we need to wake up. + */ + for(ti = 0; ti < vec_len(vlib_mains); ti++) { + if (ti >= vec_len(am->per_worker_data)) { + continue; + } + acl_fa_per_worker_data_t *pw = &am->per_worker_data[ti]; + for(tt = 0; tt < vec_len(pw->fa_conn_list_head); tt++) { + u64 head_expiry = acl_fa_get_list_head_expiry_time(am, pw, now, ti, tt); + if ((head_expiry < next_expire) && !pw->interrupt_is_pending) { +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("Head expiry: %lu, now: %lu, next_expire: %lu (worker: %d, tt: %d)", head_expiry, now, next_expire, ti, tt); +#endif + next_expire = head_expiry; + } + if (~0 != pw->fa_conn_list_head[tt]) { has_pending_conns = 1; + } } + } /* If no pending connections then no point in timing out */ if (!has_pending_conns) @@ -1155,9 +1416,6 @@ acl_fa_session_cleaner_process (vlib_main_t * vm, vlib_node_runtime_t * rt, } else { - /* Timing wheel code is happier if it is called regularly */ - if (timeout > 0.5) - timeout = 0.5; am->fa_cleaner_cnt_wait_with_timeout++; (void) vlib_process_wait_for_event_or_clock (vm, timeout); event_type = vlib_process_get_events (vm, &event_data); @@ -1175,7 +1433,11 @@ acl_fa_session_cleaner_process (vlib_main_t * vm, vlib_node_runtime_t * rt, break; case ACL_FA_CLEANER_DELETE_BY_SW_IF_INDEX: { + uword *clear_sw_if_index_bitmap = 0; uword *sw_if_index0; +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("ACL_FA_CLEANER_DELETE_BY_SW_IF_INDEX received"); +#endif vec_foreach (sw_if_index0, event_data) { am->fa_cleaner_cnt_delete_by_sw_index++; @@ -1184,13 +1446,54 @@ acl_fa_session_cleaner_process (vlib_main_t * vm, vlib_node_runtime_t * rt, ("ACL_FA_NODE_CLEAN: ACL_FA_CLEANER_DELETE_BY_SW_IF_INDEX: %d", *sw_if_index0); #endif - u32 count = 0; - int result = - acl_fa_clean_sessions_by_sw_if_index (am, *sw_if_index0, - &count); - count_deleted_sessions += count; - am->fa_cleaner_cnt_delete_by_sw_index_ok += result; + clear_sw_if_index_bitmap = clib_bitmap_set(clear_sw_if_index_bitmap, *sw_if_index0, 1); } +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("ACL_FA_CLEANER_DELETE_BY_SW_IF_INDEX bitmap: %U", format_bitmap_hex, clear_sw_if_index_bitmap); +#endif + vec_foreach(pw0, am->per_worker_data) { + CLIB_MEMORY_BARRIER (); + while (pw0->clear_in_process) { + CLIB_MEMORY_BARRIER (); +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("ACL_FA_NODE_CLEAN: waiting previous cleaning cycle to finish on %d...", pw0 - am->per_worker_data); +#endif + vlib_process_suspend(vm, 0.0001); + if (pw0->interrupt_is_needed) { + send_one_worker_interrupt(vm, am, (pw0 - am->per_worker_data)); + } + } + if (pw0->clear_in_process) { + clib_warning("ERROR-BUG! Could not initiate cleaning on worker because another cleanup in progress"); + } else { + pw0->pending_clear_sw_if_index_bitmap = clib_bitmap_dup(clear_sw_if_index_bitmap); + pw0->clear_in_process = 1; + } + } + /* send some interrupts so they can start working */ + send_interrupts_to_workers(vm, am); + + /* now wait till they all complete */ +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("CLEANER mains len: %d per-worker len: %d", vec_len(vlib_mains), vec_len(am->per_worker_data)); +#endif + vec_foreach(pw0, am->per_worker_data) { + CLIB_MEMORY_BARRIER (); + while (pw0->clear_in_process) { + CLIB_MEMORY_BARRIER (); +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("ACL_FA_NODE_CLEAN: waiting for my cleaning cycle to finish on %d...", pw0 - am->per_worker_data); +#endif + vlib_process_suspend(vm, 0.0001); + if (pw0->interrupt_is_needed) { + send_one_worker_interrupt(vm, am, (pw0 - am->per_worker_data)); + } + } + } +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("ACL_FA_NODE_CLEAN: cleaning done"); +#endif + clib_bitmap_free(clear_sw_if_index_bitmap); } break; default: @@ -1206,74 +1509,34 @@ acl_fa_session_cleaner_process (vlib_main_t * vm, vlib_node_runtime_t * rt, break; } - { - u8 tt = 0; - for(tt = 0; tt < ACL_N_TIMEOUTS; tt++) { - while((vec_len(expired) < 2*am->fa_max_deleted_sessions_per_interval) - && (~0 != am->fa_conn_list_head[tt]) - && (acl_fa_conn_time_to_check(am, now, - am->fa_conn_list_head[tt]))) { - u32 sess_id = am->fa_conn_list_head[tt]; - vec_add1(expired, sess_id); - acl_fa_conn_list_delete_session(am, sess_id); - } - } - } + send_interrupts_to_workers(vm, am); - u32 *psid = NULL; - vec_foreach (psid, expired) - { - u32 session_index = *psid; - if (!pool_is_free_index (am->fa_sessions_pool, session_index)) - { - fa_session_t *sess = am->fa_sessions_pool + session_index; - u32 sw_if_index = sess->sw_if_index; - u64 sess_timeout_time = - sess->last_active_time + fa_session_get_timeout (am, sess); - if (now < sess_timeout_time) - { - /* clib_warning ("ACL_FA_NODE_CLEAN: Restarting timer for session %d", - (int) session_index); */ - - /* There was activity on the session, so the idle timeout - has not passed. Enqueue for another time period. */ - - acl_fa_conn_list_add_session(am, session_index, now); - - /* FIXME: When/if moving to timer wheel, - pretend we did this in the past, - at last_active moment, so the timer is accurate */ - am->fa_cleaner_cnt_timer_restarted++; - } - else - { - /* clib_warning ("ACL_FA_NODE_CLEAN: Deleting session %d", - (int) session_index); */ - acl_fa_delete_session (am, sw_if_index, session_index); - count_deleted_sessions++; - } - } - else - { - count_already_deleted++; - } - } - if (expired) - _vec_len (expired) = 0; if (event_data) _vec_len (event_data) = 0; - if (count_deleted_sessions > am->fa_max_deleted_sessions_per_interval) { - /* if there was too many sessions to delete, do less waiting around next time */ + + int interrupts_needed = 0; + int interrupts_unwanted = 0; + + vec_foreach(pw0, am->per_worker_data) { + if (pw0->interrupt_is_needed) { + interrupts_needed++; + /* the per-worker value is reset when sending the interrupt */ + } + if (pw0->interrupt_is_unwanted) { + interrupts_unwanted++; + pw0->interrupt_is_unwanted = 0; + } + } + if (interrupts_needed) { + /* they need more interrupts, do less waiting around next time */ am->fa_current_cleaner_timer_wait_interval /= 2; - } else if (count_deleted_sessions < am->fa_min_deleted_sessions_per_interval) { - /* Too few deleted sessions, slowly increase the amount of sleep up to a limit */ + } else if (interrupts_unwanted) { + /* slowly increase the amount of sleep up to a limit */ if (am->fa_current_cleaner_timer_wait_interval < max_timer_wait_interval) am->fa_current_cleaner_timer_wait_interval += cpu_cps * am->fa_cleaner_wait_time_increment; } am->fa_cleaner_cnt_event_cycles++; - am->fa_cleaner_cnt_deleted_sessions += count_deleted_sessions; - am->fa_cleaner_cnt_already_deleted += count_already_deleted; } /* NOT REACHED */ return 0; @@ -1307,6 +1570,9 @@ acl_fa_enable_disable (u32 sw_if_index, int is_input, int enable_disable) if ((!enable_disable) && (!acl_fa_ifc_has_in_acl (am, sw_if_index)) && (!acl_fa_ifc_has_out_acl (am, sw_if_index))) { +#ifdef FA_NODE_VERBOSE_DEBUG + clib_warning("ENABLE-DISABLE: clean the connections on interface %d", sw_if_index); +#endif vlib_process_signal_event (am->vlib_main, am->fa_cleaner_node_index, ACL_FA_CLEANER_DELETE_BY_SW_IF_INDEX, sw_if_index); @@ -1317,6 +1583,12 @@ acl_fa_enable_disable (u32 sw_if_index, int is_input, int enable_disable) /* *INDENT-OFF* */ +VLIB_REGISTER_NODE (acl_fa_worker_session_cleaner_process_node, static) = { + .function = acl_fa_worker_conn_cleaner_process, + .name = "acl-plugin-fa-worker-cleaner-process", + .type = VLIB_NODE_TYPE_INPUT, + .state = VLIB_NODE_STATE_INTERRUPT, +}; VLIB_REGISTER_NODE (acl_fa_session_cleaner_process_node, static) = { .function = acl_fa_session_cleaner_process, |