From 53d8d4fd625c21777d11172dac9ea3aa6602edd0 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Wed, 9 Mar 2022 13:55:38 -0800 Subject: session: add infra for safe pool reallocs This is not to be used lightly. The idea is to forces pool reallocs to be done only on main thread with a barrier to make sure pools are always reallocated without peekers/readers. If rpcs are delayed and the pool runs out of elements, workers will block waiting for barrier and force the realloc. Consumers of this api should be session layer and transports. Type: feature Signed-off-by: Florin Coras Change-Id: I533272a29534338935a3fcf7027c0e7af2ca948c --- src/vnet/session/session.c | 1 + src/vnet/session/session.h | 147 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+) (limited to 'src/vnet') diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 2a5d13d57c5..60991cdd8d0 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -1887,6 +1887,7 @@ session_manager_main_enable (vlib_main_t * vm) /* Allocate cache line aligned worker contexts */ vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES); + clib_spinlock_init (&session_main.pool_realloc_lock); for (i = 0; i < num_threads; i++) { diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index fe8a85cd5f4..0ea621aa385 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -200,6 +200,12 @@ typedef struct session_main_ transport_proto_t last_transport_proto_type; + /** Number of workers at pool realloc barrier */ + u32 pool_realloc_at_barrier; + + /** Lock to synchronize parallel forced reallocs */ + clib_spinlock_t pool_realloc_lock; + /* * Config parameters */ @@ -784,6 +790,147 @@ clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en); session_t *session_alloc_for_connection (transport_connection_t * tc); session_t *session_alloc_for_half_open (transport_connection_t *tc); +typedef void (pool_safe_realloc_rpc_fn) (void *rpc_args); + +typedef struct +{ + u8 ph[STRUCT_OFFSET_OF (pool_header_t, max_elts) + 4]; + u32 flag; +} pool_safe_realloc_header_t; + +STATIC_ASSERT_SIZEOF (pool_safe_realloc_header_t, sizeof (pool_header_t)); + +#define POOL_REALLOC_SAFE_ELT_THRESH 32 + +#define pool_realloc_flag(PH) \ + ((pool_safe_realloc_header_t *) pool_header (PH))->flag + +#define pool_realloc_safe_aligned(P, align) \ + do \ + { \ + vlib_main_t *vm = vlib_get_main (); \ + u32 free_elts, max_elts, n_alloc; \ + ASSERT (vlib_get_thread_index () == 0); \ + vlib_worker_thread_barrier_sync (vm); \ + free_elts = pool_free_elts (P); \ + max_elts = pool_max_len (P); \ + n_alloc = clib_max (2 * max_elts, POOL_REALLOC_SAFE_ELT_THRESH); \ + pool_alloc_aligned (P, free_elts + n_alloc, align); \ + clib_bitmap_validate (pool_header (P)->free_bitmap, \ + max_elts + n_alloc); \ + pool_realloc_flag (P) = 0; \ + vlib_worker_thread_barrier_release (vm); \ + } \ + while (0) + +always_inline void +pool_program_safe_realloc (void *p, u32 thread_index, + pool_safe_realloc_rpc_fn *rpc_fn) +{ + /* Reuse pad as a realloc flag */ + if (pool_realloc_flag (p)) + return; + + pool_realloc_flag (p) = 1; + session_send_rpc_evt_to_thread (0 /* thread index */, rpc_fn, + uword_to_pointer (thread_index, void *)); +} + +always_inline void +pool_realloc_maybe_wait_at_barrier (void) +{ + if (!(*vlib_worker_threads->wait_at_barrier)) + return; + + /* Node refork required. Don't stop at the barrier from within a node */ + if (*vlib_worker_threads->node_reforks_required) + return; + + clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1); + + while (*vlib_worker_threads->wait_at_barrier) + ; + + clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1); +} + +#define pool_realloc_all_at_barrier(_not) \ + (*vlib_worker_threads->workers_at_barrier >= (vlib_num_workers () - _not)) + +#define pool_realloc_safe_force(P) \ + do \ + { \ + ALWAYS_ASSERT (*vlib_worker_threads->node_reforks_required); \ + if (pool_realloc_all_at_barrier (1)) \ + { \ + pool_alloc (P, pool_max_len (P)); \ + } \ + else \ + { \ + session_main_t *sm = &session_main; \ + clib_warning ("forced pool realloc"); \ + clib_atomic_fetch_add (&sm->pool_realloc_at_barrier, 1); \ + while (!pool_realloc_all_at_barrier (sm->pool_realloc_at_barrier)) \ + ; \ + clib_spinlock_lock (&sm->pool_realloc_lock); \ + pool_alloc (P, pool_max_len (P)); \ + clib_spinlock_unlock (&sm->pool_realloc_lock); \ + clib_atomic_fetch_add (&sm->pool_realloc_at_barrier, -1); \ + } \ + } \ + while (0) + +#define pool_needs_realloc(P) \ + ((!P) || \ + (vec_len (pool_header (P)->free_indices) < POOL_REALLOC_SAFE_ELT_THRESH && \ + pool_free_elts (P) < POOL_REALLOC_SAFE_ELT_THRESH)) + +#define pool_get_aligned_safe(P, E, thread_index, rpc_fn, align) \ + do \ + { \ + ASSERT (vlib_get_thread_index () == thread_index || \ + vlib_thread_is_main_w_barrier ()); \ + if (PREDICT_FALSE (pool_needs_realloc (P))) \ + { \ + if (PREDICT_FALSE (!(P))) \ + { \ + pool_alloc_aligned (P, 2 * POOL_REALLOC_SAFE_ELT_THRESH, \ + align); \ + } \ + else if (PREDICT_FALSE (pool_free_elts (P) < \ + POOL_REALLOC_SAFE_ELT_THRESH / 2)) \ + { \ + volatile typeof (P) *PP = &(P); \ + pool_program_safe_realloc (P, thread_index, rpc_fn); \ + if (thread_index) \ + { \ + while (pool_realloc_flag (P)) \ + { \ + /* If refork required abort and consume existing elt */ \ + if (*vlib_worker_threads->node_reforks_required) \ + { \ + /* All workers at barrier realloc now */ \ + if (pool_realloc_all_at_barrier (1)) \ + pool_alloc_aligned (P, pool_max_len (P), align); \ + break; \ + } \ + pool_realloc_maybe_wait_at_barrier (); \ + } \ + if (pool_free_elts (P) == 0) \ + pool_realloc_safe_force (P); \ + ALWAYS_ASSERT (pool_free_elts (P) > 0); \ + } \ + (P) = *PP; \ + } \ + else \ + { \ + pool_program_safe_realloc (P, thread_index, rpc_fn); \ + } \ + } \ + pool_get_aligned (P, E, align); \ + } \ + while (0) + #endif /* __included_session_h__ */ /* -- cgit 1.2.3-korg