aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorColin Tregenza Dancer <ctd@metaswitch.com>2017-09-04 15:27:49 +0100
committerDave Barach <openvpp@barachs.net>2017-09-05 13:33:51 +0000
commit215961829c4ae5f738ffcd01a8d1afcab13bd0e2 (patch)
tree1cd897089bd30491ea362b4e827bd9401f34afae
parentf7a55ad74c90928d86f1bbf56590d9571c1b828f (diff)
Refork worker thread data structures in parallel (VPP-970)
Change the rebuilding of worker thread clone datastructures to run in parallel on the workers, instead of serially on main. Change-Id: Ib76bcfbef1e51f2399972090f4057be7aaa84e08 Signed-off-by: Colin Tregenza Dancer <ctd@metaswitch.com>
-rw-r--r--src/vlib/main.h6
-rw-r--r--src/vlib/threads.c333
-rw-r--r--src/vlib/threads.h11
3 files changed, 214 insertions, 136 deletions
diff --git a/src/vlib/main.h b/src/vlib/main.h
index bfa7ddbe6af..b63c63fa95d 100644
--- a/src/vlib/main.h
+++ b/src/vlib/main.h
@@ -181,6 +181,12 @@ typedef struct vlib_main_t
/* Attempt to do a post-mortem elog dump */
int elog_post_mortem_dump;
+ /*
+ * Need to call vlib_worker_thread_node_runtime_update before
+ * releasing worker thread barrier. Only valid in vlib_global_main.
+ */
+ int need_vlib_worker_thread_node_runtime_update;
+
} vlib_main_t;
/* Global main structure. */
diff --git a/src/vlib/threads.c b/src/vlib/threads.c
index 0661d89ab6c..6cd325b3e2c 100644
--- a/src/vlib/threads.c
+++ b/src/vlib/threads.c
@@ -547,10 +547,17 @@ start_workers (vlib_main_t * vm)
vlib_worker_threads->workers_at_barrier =
clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
+ vlib_worker_threads->node_reforks_required =
+ clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
+
/* Ask for an initial barrier sync */
*vlib_worker_threads->workers_at_barrier = 0;
*vlib_worker_threads->wait_at_barrier = 1;
+ /* Without update or refork */
+ *vlib_worker_threads->node_reforks_required = 0;
+ vm->need_vlib_worker_thread_node_runtime_update = 0;
+
worker_thread_index = 1;
for (i = 0; i < vec_len (tm->registrations); i++)
@@ -568,6 +575,8 @@ start_workers (vlib_main_t * vm)
for (k = 0; k < tr->count; k++)
{
+ vlib_node_t *n;
+
vec_add2 (vlib_worker_threads, w, 1);
if (tr->mheap_size)
w->thread_mheap =
@@ -628,10 +637,12 @@ start_workers (vlib_main_t * vm)
/* fork nodes */
nm_clone->nodes = 0;
+
+ /* Allocate all nodes in single block for speed */
+ n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n));
+
for (j = 0; j < vec_len (nm->nodes); j++)
{
- vlib_node_t *n;
- n = clib_mem_alloc_no_fail (sizeof (*n));
clib_memcpy (n, nm->nodes[j], sizeof (*n));
/* none of the copied nodes have enqueue rights given out */
n->owner_node_index = VLIB_INVALID_NODE_INDEX;
@@ -639,6 +650,7 @@ start_workers (vlib_main_t * vm)
memset (&n->stats_last_clear, 0,
sizeof (n->stats_last_clear));
vec_add1 (nm_clone->nodes, n);
+ n++;
}
nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
@@ -778,17 +790,14 @@ start_workers (vlib_main_t * vm)
VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
-void
-vlib_worker_thread_node_runtime_update (void)
+static inline void
+worker_thread_node_runtime_update_internal (void)
{
int i, j;
- vlib_worker_thread_t *w;
vlib_main_t *vm;
vlib_node_main_t *nm, *nm_clone;
- vlib_node_t **old_nodes_clone;
vlib_main_t *vm_clone;
- vlib_node_runtime_t *rt, *old_rt;
- void *oldheap;
+ vlib_node_runtime_t *rt;
never_inline void
vlib_node_runtime_sync_stats (vlib_main_t * vm,
vlib_node_runtime_t * r,
@@ -797,13 +806,9 @@ vlib_worker_thread_node_runtime_update (void)
ASSERT (vlib_get_thread_index () == 0);
- if (vec_len (vlib_mains) == 1)
- return;
-
vm = vlib_mains[0];
nm = &vm->node_main;
- ASSERT (vlib_get_thread_index () == 0);
ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
/*
@@ -833,146 +838,170 @@ vlib_worker_thread_node_runtime_update (void)
}
}
- for (i = 1; i < vec_len (vlib_mains); i++)
- {
- vlib_node_runtime_t *rt;
- w = vlib_worker_threads + i;
- oldheap = clib_mem_set_heap (w->thread_mheap);
+ /* Per-worker clone rebuilds are now done on each thread */
+}
- vm_clone = vlib_mains[i];
- /* Re-clone error heap */
- u64 *old_counters = vm_clone->error_main.counters;
- u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
- clib_memcpy (&vm_clone->error_main, &vm->error_main,
- sizeof (vm->error_main));
- j = vec_len (vm->error_main.counters) - 1;
- vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
- vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
- vm_clone->error_main.counters = old_counters;
- vm_clone->error_main.counters_last_clear = old_counters_all_clear;
+void
+vlib_worker_thread_node_refork (void)
+{
+ vlib_main_t *vm, *vm_clone;
+ vlib_node_main_t *nm, *nm_clone;
+ vlib_node_t **old_nodes_clone;
+ vlib_node_runtime_t *rt, *old_rt;
- nm_clone = &vm_clone->node_main;
- vec_free (nm_clone->next_frames);
- nm_clone->next_frames = vec_dup (nm->next_frames);
+ vlib_node_t *new_n_clone;
- for (j = 0; j < vec_len (nm_clone->next_frames); j++)
- {
- vlib_next_frame_t *nf = &nm_clone->next_frames[j];
- u32 save_node_runtime_index;
- u32 save_flags;
-
- save_node_runtime_index = nf->node_runtime_index;
- save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
- vlib_next_frame_init (nf);
- nf->node_runtime_index = save_node_runtime_index;
- nf->flags = save_flags;
- }
+ int j;
- old_nodes_clone = nm_clone->nodes;
- nm_clone->nodes = 0;
+ vm = vlib_mains[0];
+ nm = &vm->node_main;
+ vm_clone = vlib_get_main ();
+ nm_clone = &vm_clone->node_main;
+
+ /* Re-clone error heap */
+ u64 *old_counters = vm_clone->error_main.counters;
+ u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
+
+ clib_memcpy (&vm_clone->error_main, &vm->error_main,
+ sizeof (vm->error_main));
+ j = vec_len (vm->error_main.counters) - 1;
+ vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
+ vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
+ vm_clone->error_main.counters = old_counters;
+ vm_clone->error_main.counters_last_clear = old_counters_all_clear;
+
+ nm_clone = &vm_clone->node_main;
+ vec_free (nm_clone->next_frames);
+ nm_clone->next_frames = vec_dup (nm->next_frames);
+
+ for (j = 0; j < vec_len (nm_clone->next_frames); j++)
+ {
+ vlib_next_frame_t *nf = &nm_clone->next_frames[j];
+ u32 save_node_runtime_index;
+ u32 save_flags;
+
+ save_node_runtime_index = nf->node_runtime_index;
+ save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
+ vlib_next_frame_init (nf);
+ nf->node_runtime_index = save_node_runtime_index;
+ nf->flags = save_flags;
+ }
- /* re-fork nodes */
- for (j = 0; j < vec_len (nm->nodes); j++)
- {
- vlib_node_t *old_n_clone;
- vlib_node_t *new_n, *new_n_clone;
+ old_nodes_clone = nm_clone->nodes;
+ nm_clone->nodes = 0;
- new_n = nm->nodes[j];
- old_n_clone = old_nodes_clone[j];
+ /* re-fork nodes */
- new_n_clone = clib_mem_alloc_no_fail (sizeof (*new_n_clone));
- clib_memcpy (new_n_clone, new_n, sizeof (*new_n));
- /* none of the copied nodes have enqueue rights given out */
- new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
+ /* Allocate all nodes in single block for speed */
+ new_n_clone =
+ clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*new_n_clone));
+ for (j = 0; j < vec_len (nm->nodes); j++)
+ {
+ vlib_node_t *old_n_clone;
+ vlib_node_t *new_n;
- if (j >= vec_len (old_nodes_clone))
- {
- /* new node, set to zero */
- memset (&new_n_clone->stats_total, 0,
- sizeof (new_n_clone->stats_total));
- memset (&new_n_clone->stats_last_clear, 0,
- sizeof (new_n_clone->stats_last_clear));
- }
- else
- {
- /* Copy stats if the old data is valid */
- clib_memcpy (&new_n_clone->stats_total,
- &old_n_clone->stats_total,
- sizeof (new_n_clone->stats_total));
- clib_memcpy (&new_n_clone->stats_last_clear,
- &old_n_clone->stats_last_clear,
- sizeof (new_n_clone->stats_last_clear));
-
- /* keep previous node state */
- new_n_clone->state = old_n_clone->state;
- }
- vec_add1 (nm_clone->nodes, new_n_clone);
- }
- /* Free the old node clone */
- for (j = 0; j < vec_len (old_nodes_clone); j++)
- clib_mem_free (old_nodes_clone[j]);
- vec_free (old_nodes_clone);
-
-
- /* re-clone internal nodes */
- old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL];
- nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
- vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
-
- vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
- {
- vlib_node_t *n = vlib_get_node (vm, rt->node_index);
- rt->thread_index = vm_clone->thread_index;
- /* copy runtime_data, will be overwritten later for existing rt */
- if (n->runtime_data && n->runtime_data_bytes > 0)
- clib_memcpy (rt->runtime_data, n->runtime_data,
- clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
- n->runtime_data_bytes));
- }
-
- for (j = 0; j < vec_len (old_rt); j++)
+ new_n = nm->nodes[j];
+ old_n_clone = old_nodes_clone[j];
+
+ clib_memcpy (new_n_clone, new_n, sizeof (*new_n));
+ /* none of the copied nodes have enqueue rights given out */
+ new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
+
+ if (j >= vec_len (old_nodes_clone))
{
- rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
- rt->state = old_rt[j].state;
- clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
- VLIB_NODE_RUNTIME_DATA_SIZE);
+ /* new node, set to zero */
+ memset (&new_n_clone->stats_total, 0,
+ sizeof (new_n_clone->stats_total));
+ memset (&new_n_clone->stats_last_clear, 0,
+ sizeof (new_n_clone->stats_last_clear));
}
-
- vec_free (old_rt);
-
- /* re-clone input nodes */
- old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
- nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
- vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
-
- vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
- {
- vlib_node_t *n = vlib_get_node (vm, rt->node_index);
- rt->thread_index = vm_clone->thread_index;
- /* copy runtime_data, will be overwritten later for existing rt */
- if (n->runtime_data && n->runtime_data_bytes > 0)
- clib_memcpy (rt->runtime_data, n->runtime_data,
- clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
- n->runtime_data_bytes));
- }
-
- for (j = 0; j < vec_len (old_rt); j++)
+ else
{
- rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
- rt->state = old_rt[j].state;
- clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
- VLIB_NODE_RUNTIME_DATA_SIZE);
+ /* Copy stats if the old data is valid */
+ clib_memcpy (&new_n_clone->stats_total,
+ &old_n_clone->stats_total,
+ sizeof (new_n_clone->stats_total));
+ clib_memcpy (&new_n_clone->stats_last_clear,
+ &old_n_clone->stats_last_clear,
+ sizeof (new_n_clone->stats_last_clear));
+
+ /* keep previous node state */
+ new_n_clone->state = old_n_clone->state;
}
+ vec_add1 (nm_clone->nodes, new_n_clone);
+ new_n_clone++;
+ }
+ /* Free the old node clones */
+ clib_mem_free (old_nodes_clone[0]);
+
+ vec_free (old_nodes_clone);
- vec_free (old_rt);
- nm_clone->processes = vec_dup (nm->processes);
+ /* re-clone internal nodes */
+ old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL];
+ nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
+ vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
- clib_mem_set_heap (oldheap);
+ vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
+ {
+ vlib_node_t *n = vlib_get_node (vm, rt->node_index);
+ rt->thread_index = vm_clone->thread_index;
+ /* copy runtime_data, will be overwritten later for existing rt */
+ if (n->runtime_data && n->runtime_data_bytes > 0)
+ clib_memcpy (rt->runtime_data, n->runtime_data,
+ clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
+ n->runtime_data_bytes));
+ }
- // vnet_main_fork_fixup (i);
+ for (j = 0; j < vec_len (old_rt); j++)
+ {
+ rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
+ rt->state = old_rt[j].state;
+ clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
+ VLIB_NODE_RUNTIME_DATA_SIZE);
}
+
+ vec_free (old_rt);
+
+ /* re-clone input nodes */
+ old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
+ nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
+ vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
+
+ vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
+ {
+ vlib_node_t *n = vlib_get_node (vm, rt->node_index);
+ rt->thread_index = vm_clone->thread_index;
+ /* copy runtime_data, will be overwritten later for existing rt */
+ if (n->runtime_data && n->runtime_data_bytes > 0)
+ clib_memcpy (rt->runtime_data, n->runtime_data,
+ clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
+ n->runtime_data_bytes));
+ }
+
+ for (j = 0; j < vec_len (old_rt); j++)
+ {
+ rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
+ rt->state = old_rt[j].state;
+ clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
+ VLIB_NODE_RUNTIME_DATA_SIZE);
+ }
+
+ vec_free (old_rt);
+
+ nm_clone->processes = vec_dup (nm->processes);
+}
+
+
+void
+vlib_worker_thread_node_runtime_update (void)
+{
+ /*
+ * Make a note that we need to do a node runtime update
+ * prior to releasing the barrier.
+ */
+ vlib_global_main.need_vlib_worker_thread_node_runtime_update = 1;
}
u32
@@ -1172,6 +1201,8 @@ vlib_worker_thread_barrier_sync (vlib_main_t * vm)
if (vec_len (vlib_mains) < 2)
return;
+ ASSERT (vlib_get_thread_index () == 0);
+
count = vec_len (vlib_mains) - 1;
/* Tolerate recursive calls */
@@ -1180,8 +1211,6 @@ vlib_worker_thread_barrier_sync (vlib_main_t * vm)
vlib_worker_threads[0].barrier_sync_count++;
- ASSERT (vlib_get_thread_index () == 0);
-
deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
*vlib_worker_threads->wait_at_barrier = 1;
@@ -1199,13 +1228,29 @@ void
vlib_worker_thread_barrier_release (vlib_main_t * vm)
{
f64 deadline;
+ int refork_needed = 0;
if (vec_len (vlib_mains) < 2)
return;
+ ASSERT (vlib_get_thread_index () == 0);
+
if (--vlib_worker_threads[0].recursion_level > 0)
return;
+ /* Update (all) node runtimes before releasing the barrier, if needed */
+ if (vm->need_vlib_worker_thread_node_runtime_update)
+ {
+ /* Do stats elements on main thread */
+ worker_thread_node_runtime_update_internal ();
+ vm->need_vlib_worker_thread_node_runtime_update = 0;
+
+ /* Do per thread rebuilds in parallel */
+ refork_needed = 1;
+ clib_smp_atomic_add (vlib_worker_threads->node_reforks_required,
+ (vec_len (vlib_mains) - 1));
+ }
+
deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
*vlib_worker_threads->wait_at_barrier = 0;
@@ -1218,6 +1263,22 @@ vlib_worker_thread_barrier_release (vlib_main_t * vm)
os_panic ();
}
}
+
+ /* Wait for reforks before continuing */
+ if (refork_needed)
+ {
+ deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
+
+ while (*vlib_worker_threads->node_reforks_required > 0)
+ {
+ if (vlib_time_now (vm) > deadline)
+ {
+ fformat (stderr, "%s: worker thread refork deadlock\n",
+ __FUNCTION__);
+ os_panic ();
+ }
+ }
+ }
}
/*
diff --git a/src/vlib/threads.h b/src/vlib/threads.h
index 572ce77ffcc..c3f1cadee16 100644
--- a/src/vlib/threads.h
+++ b/src/vlib/threads.h
@@ -102,6 +102,7 @@ typedef struct
vlib_thread_registration_t *registration;
u8 *name;
u64 barrier_sync_count;
+ volatile u32 *node_reforks_required;
long lwp;
int lcore_id;
@@ -180,6 +181,7 @@ u32 vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts);
void vlib_worker_thread_barrier_sync (vlib_main_t * vm);
void vlib_worker_thread_barrier_release (vlib_main_t * vm);
+void vlib_worker_thread_node_refork (void);
static_always_inline uword
vlib_get_thread_index (void)
@@ -369,6 +371,15 @@ vlib_worker_thread_barrier_check (void)
if (CLIB_DEBUG > 0)
vm->parked_at_barrier = 0;
clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, -1);
+
+ if (PREDICT_FALSE (*vlib_worker_threads->node_reforks_required))
+ {
+ vlib_worker_thread_node_refork ();
+ clib_smp_atomic_add (vlib_worker_threads->node_reforks_required,
+ -1);
+ while (*vlib_worker_threads->node_reforks_required)
+ ;
+ }
}
}