diff options
author | Colin Tregenza Dancer <ctd@metaswitch.com> | 2017-09-04 15:27:49 +0100 |
---|---|---|
committer | Dave Barach <openvpp@barachs.net> | 2017-09-05 13:33:51 +0000 |
commit | 215961829c4ae5f738ffcd01a8d1afcab13bd0e2 (patch) | |
tree | 1cd897089bd30491ea362b4e827bd9401f34afae /src/vlib/threads.c | |
parent | f7a55ad74c90928d86f1bbf56590d9571c1b828f (diff) |
Refork worker thread data structures in parallel (VPP-970)
Change the rebuilding of worker thread clone datastructures
to run in parallel on the workers, instead of serially
on main.
Change-Id: Ib76bcfbef1e51f2399972090f4057be7aaa84e08
Signed-off-by: Colin Tregenza Dancer <ctd@metaswitch.com>
Diffstat (limited to 'src/vlib/threads.c')
-rw-r--r-- | src/vlib/threads.c | 333 |
1 files changed, 197 insertions, 136 deletions
diff --git a/src/vlib/threads.c b/src/vlib/threads.c index 0661d89ab6c..6cd325b3e2c 100644 --- a/src/vlib/threads.c +++ b/src/vlib/threads.c @@ -547,10 +547,17 @@ start_workers (vlib_main_t * vm) vlib_worker_threads->workers_at_barrier = clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES); + vlib_worker_threads->node_reforks_required = + clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES); + /* Ask for an initial barrier sync */ *vlib_worker_threads->workers_at_barrier = 0; *vlib_worker_threads->wait_at_barrier = 1; + /* Without update or refork */ + *vlib_worker_threads->node_reforks_required = 0; + vm->need_vlib_worker_thread_node_runtime_update = 0; + worker_thread_index = 1; for (i = 0; i < vec_len (tm->registrations); i++) @@ -568,6 +575,8 @@ start_workers (vlib_main_t * vm) for (k = 0; k < tr->count; k++) { + vlib_node_t *n; + vec_add2 (vlib_worker_threads, w, 1); if (tr->mheap_size) w->thread_mheap = @@ -628,10 +637,12 @@ start_workers (vlib_main_t * vm) /* fork nodes */ nm_clone->nodes = 0; + + /* Allocate all nodes in single block for speed */ + n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n)); + for (j = 0; j < vec_len (nm->nodes); j++) { - vlib_node_t *n; - n = clib_mem_alloc_no_fail (sizeof (*n)); clib_memcpy (n, nm->nodes[j], sizeof (*n)); /* none of the copied nodes have enqueue rights given out */ n->owner_node_index = VLIB_INVALID_NODE_INDEX; @@ -639,6 +650,7 @@ start_workers (vlib_main_t * vm) memset (&n->stats_last_clear, 0, sizeof (n->stats_last_clear)); vec_add1 (nm_clone->nodes, n); + n++; } nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] = vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]); @@ -778,17 +790,14 @@ start_workers (vlib_main_t * vm) VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers); -void -vlib_worker_thread_node_runtime_update (void) +static inline void +worker_thread_node_runtime_update_internal (void) { int i, j; - vlib_worker_thread_t *w; vlib_main_t *vm; vlib_node_main_t *nm, *nm_clone; - vlib_node_t **old_nodes_clone; vlib_main_t *vm_clone; - vlib_node_runtime_t *rt, *old_rt; - void *oldheap; + vlib_node_runtime_t *rt; never_inline void vlib_node_runtime_sync_stats (vlib_main_t * vm, vlib_node_runtime_t * r, @@ -797,13 +806,9 @@ vlib_worker_thread_node_runtime_update (void) ASSERT (vlib_get_thread_index () == 0); - if (vec_len (vlib_mains) == 1) - return; - vm = vlib_mains[0]; nm = &vm->node_main; - ASSERT (vlib_get_thread_index () == 0); ASSERT (*vlib_worker_threads->wait_at_barrier == 1); /* @@ -833,146 +838,170 @@ vlib_worker_thread_node_runtime_update (void) } } - for (i = 1; i < vec_len (vlib_mains); i++) - { - vlib_node_runtime_t *rt; - w = vlib_worker_threads + i; - oldheap = clib_mem_set_heap (w->thread_mheap); + /* Per-worker clone rebuilds are now done on each thread */ +} - vm_clone = vlib_mains[i]; - /* Re-clone error heap */ - u64 *old_counters = vm_clone->error_main.counters; - u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear; - clib_memcpy (&vm_clone->error_main, &vm->error_main, - sizeof (vm->error_main)); - j = vec_len (vm->error_main.counters) - 1; - vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES); - vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES); - vm_clone->error_main.counters = old_counters; - vm_clone->error_main.counters_last_clear = old_counters_all_clear; +void +vlib_worker_thread_node_refork (void) +{ + vlib_main_t *vm, *vm_clone; + vlib_node_main_t *nm, *nm_clone; + vlib_node_t **old_nodes_clone; + vlib_node_runtime_t *rt, *old_rt; - nm_clone = &vm_clone->node_main; - vec_free (nm_clone->next_frames); - nm_clone->next_frames = vec_dup (nm->next_frames); + vlib_node_t *new_n_clone; - for (j = 0; j < vec_len (nm_clone->next_frames); j++) - { - vlib_next_frame_t *nf = &nm_clone->next_frames[j]; - u32 save_node_runtime_index; - u32 save_flags; - - save_node_runtime_index = nf->node_runtime_index; - save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH; - vlib_next_frame_init (nf); - nf->node_runtime_index = save_node_runtime_index; - nf->flags = save_flags; - } + int j; - old_nodes_clone = nm_clone->nodes; - nm_clone->nodes = 0; + vm = vlib_mains[0]; + nm = &vm->node_main; + vm_clone = vlib_get_main (); + nm_clone = &vm_clone->node_main; + + /* Re-clone error heap */ + u64 *old_counters = vm_clone->error_main.counters; + u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear; + + clib_memcpy (&vm_clone->error_main, &vm->error_main, + sizeof (vm->error_main)); + j = vec_len (vm->error_main.counters) - 1; + vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES); + vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES); + vm_clone->error_main.counters = old_counters; + vm_clone->error_main.counters_last_clear = old_counters_all_clear; + + nm_clone = &vm_clone->node_main; + vec_free (nm_clone->next_frames); + nm_clone->next_frames = vec_dup (nm->next_frames); + + for (j = 0; j < vec_len (nm_clone->next_frames); j++) + { + vlib_next_frame_t *nf = &nm_clone->next_frames[j]; + u32 save_node_runtime_index; + u32 save_flags; + + save_node_runtime_index = nf->node_runtime_index; + save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH; + vlib_next_frame_init (nf); + nf->node_runtime_index = save_node_runtime_index; + nf->flags = save_flags; + } - /* re-fork nodes */ - for (j = 0; j < vec_len (nm->nodes); j++) - { - vlib_node_t *old_n_clone; - vlib_node_t *new_n, *new_n_clone; + old_nodes_clone = nm_clone->nodes; + nm_clone->nodes = 0; - new_n = nm->nodes[j]; - old_n_clone = old_nodes_clone[j]; + /* re-fork nodes */ - new_n_clone = clib_mem_alloc_no_fail (sizeof (*new_n_clone)); - clib_memcpy (new_n_clone, new_n, sizeof (*new_n)); - /* none of the copied nodes have enqueue rights given out */ - new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX; + /* Allocate all nodes in single block for speed */ + new_n_clone = + clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*new_n_clone)); + for (j = 0; j < vec_len (nm->nodes); j++) + { + vlib_node_t *old_n_clone; + vlib_node_t *new_n; - if (j >= vec_len (old_nodes_clone)) - { - /* new node, set to zero */ - memset (&new_n_clone->stats_total, 0, - sizeof (new_n_clone->stats_total)); - memset (&new_n_clone->stats_last_clear, 0, - sizeof (new_n_clone->stats_last_clear)); - } - else - { - /* Copy stats if the old data is valid */ - clib_memcpy (&new_n_clone->stats_total, - &old_n_clone->stats_total, - sizeof (new_n_clone->stats_total)); - clib_memcpy (&new_n_clone->stats_last_clear, - &old_n_clone->stats_last_clear, - sizeof (new_n_clone->stats_last_clear)); - - /* keep previous node state */ - new_n_clone->state = old_n_clone->state; - } - vec_add1 (nm_clone->nodes, new_n_clone); - } - /* Free the old node clone */ - for (j = 0; j < vec_len (old_nodes_clone); j++) - clib_mem_free (old_nodes_clone[j]); - vec_free (old_nodes_clone); - - - /* re-clone internal nodes */ - old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]; - nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] = - vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]); - - vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]) - { - vlib_node_t *n = vlib_get_node (vm, rt->node_index); - rt->thread_index = vm_clone->thread_index; - /* copy runtime_data, will be overwritten later for existing rt */ - if (n->runtime_data && n->runtime_data_bytes > 0) - clib_memcpy (rt->runtime_data, n->runtime_data, - clib_min (VLIB_NODE_RUNTIME_DATA_SIZE, - n->runtime_data_bytes)); - } - - for (j = 0; j < vec_len (old_rt); j++) + new_n = nm->nodes[j]; + old_n_clone = old_nodes_clone[j]; + + clib_memcpy (new_n_clone, new_n, sizeof (*new_n)); + /* none of the copied nodes have enqueue rights given out */ + new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX; + + if (j >= vec_len (old_nodes_clone)) { - rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index); - rt->state = old_rt[j].state; - clib_memcpy (rt->runtime_data, old_rt[j].runtime_data, - VLIB_NODE_RUNTIME_DATA_SIZE); + /* new node, set to zero */ + memset (&new_n_clone->stats_total, 0, + sizeof (new_n_clone->stats_total)); + memset (&new_n_clone->stats_last_clear, 0, + sizeof (new_n_clone->stats_last_clear)); } - - vec_free (old_rt); - - /* re-clone input nodes */ - old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]; - nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] = - vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]); - - vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]) - { - vlib_node_t *n = vlib_get_node (vm, rt->node_index); - rt->thread_index = vm_clone->thread_index; - /* copy runtime_data, will be overwritten later for existing rt */ - if (n->runtime_data && n->runtime_data_bytes > 0) - clib_memcpy (rt->runtime_data, n->runtime_data, - clib_min (VLIB_NODE_RUNTIME_DATA_SIZE, - n->runtime_data_bytes)); - } - - for (j = 0; j < vec_len (old_rt); j++) + else { - rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index); - rt->state = old_rt[j].state; - clib_memcpy (rt->runtime_data, old_rt[j].runtime_data, - VLIB_NODE_RUNTIME_DATA_SIZE); + /* Copy stats if the old data is valid */ + clib_memcpy (&new_n_clone->stats_total, + &old_n_clone->stats_total, + sizeof (new_n_clone->stats_total)); + clib_memcpy (&new_n_clone->stats_last_clear, + &old_n_clone->stats_last_clear, + sizeof (new_n_clone->stats_last_clear)); + + /* keep previous node state */ + new_n_clone->state = old_n_clone->state; } + vec_add1 (nm_clone->nodes, new_n_clone); + new_n_clone++; + } + /* Free the old node clones */ + clib_mem_free (old_nodes_clone[0]); + + vec_free (old_nodes_clone); - vec_free (old_rt); - nm_clone->processes = vec_dup (nm->processes); + /* re-clone internal nodes */ + old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]; + nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] = + vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]); - clib_mem_set_heap (oldheap); + vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]) + { + vlib_node_t *n = vlib_get_node (vm, rt->node_index); + rt->thread_index = vm_clone->thread_index; + /* copy runtime_data, will be overwritten later for existing rt */ + if (n->runtime_data && n->runtime_data_bytes > 0) + clib_memcpy (rt->runtime_data, n->runtime_data, + clib_min (VLIB_NODE_RUNTIME_DATA_SIZE, + n->runtime_data_bytes)); + } - // vnet_main_fork_fixup (i); + for (j = 0; j < vec_len (old_rt); j++) + { + rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index); + rt->state = old_rt[j].state; + clib_memcpy (rt->runtime_data, old_rt[j].runtime_data, + VLIB_NODE_RUNTIME_DATA_SIZE); } + + vec_free (old_rt); + + /* re-clone input nodes */ + old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]; + nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] = + vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]); + + vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]) + { + vlib_node_t *n = vlib_get_node (vm, rt->node_index); + rt->thread_index = vm_clone->thread_index; + /* copy runtime_data, will be overwritten later for existing rt */ + if (n->runtime_data && n->runtime_data_bytes > 0) + clib_memcpy (rt->runtime_data, n->runtime_data, + clib_min (VLIB_NODE_RUNTIME_DATA_SIZE, + n->runtime_data_bytes)); + } + + for (j = 0; j < vec_len (old_rt); j++) + { + rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index); + rt->state = old_rt[j].state; + clib_memcpy (rt->runtime_data, old_rt[j].runtime_data, + VLIB_NODE_RUNTIME_DATA_SIZE); + } + + vec_free (old_rt); + + nm_clone->processes = vec_dup (nm->processes); +} + + +void +vlib_worker_thread_node_runtime_update (void) +{ + /* + * Make a note that we need to do a node runtime update + * prior to releasing the barrier. + */ + vlib_global_main.need_vlib_worker_thread_node_runtime_update = 1; } u32 @@ -1172,6 +1201,8 @@ vlib_worker_thread_barrier_sync (vlib_main_t * vm) if (vec_len (vlib_mains) < 2) return; + ASSERT (vlib_get_thread_index () == 0); + count = vec_len (vlib_mains) - 1; /* Tolerate recursive calls */ @@ -1180,8 +1211,6 @@ vlib_worker_thread_barrier_sync (vlib_main_t * vm) vlib_worker_threads[0].barrier_sync_count++; - ASSERT (vlib_get_thread_index () == 0); - deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT; *vlib_worker_threads->wait_at_barrier = 1; @@ -1199,13 +1228,29 @@ void vlib_worker_thread_barrier_release (vlib_main_t * vm) { f64 deadline; + int refork_needed = 0; if (vec_len (vlib_mains) < 2) return; + ASSERT (vlib_get_thread_index () == 0); + if (--vlib_worker_threads[0].recursion_level > 0) return; + /* Update (all) node runtimes before releasing the barrier, if needed */ + if (vm->need_vlib_worker_thread_node_runtime_update) + { + /* Do stats elements on main thread */ + worker_thread_node_runtime_update_internal (); + vm->need_vlib_worker_thread_node_runtime_update = 0; + + /* Do per thread rebuilds in parallel */ + refork_needed = 1; + clib_smp_atomic_add (vlib_worker_threads->node_reforks_required, + (vec_len (vlib_mains) - 1)); + } + deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT; *vlib_worker_threads->wait_at_barrier = 0; @@ -1218,6 +1263,22 @@ vlib_worker_thread_barrier_release (vlib_main_t * vm) os_panic (); } } + + /* Wait for reforks before continuing */ + if (refork_needed) + { + deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT; + + while (*vlib_worker_threads->node_reforks_required > 0) + { + if (vlib_time_now (vm) > deadline) + { + fformat (stderr, "%s: worker thread refork deadlock\n", + __FUNCTION__); + os_panic (); + } + } + } } /* |