diff options
-rw-r--r-- | src/vlib/buffer_funcs.c | 286 | ||||
-rw-r--r-- | src/vlib/node.h | 1 | ||||
-rw-r--r-- | src/vlib/threads.c | 31 | ||||
-rw-r--r-- | src/vlib/threads.h | 137 | ||||
-rw-r--r-- | src/vlib/threads_cli.c | 4 |
5 files changed, 177 insertions, 282 deletions
diff --git a/src/vlib/buffer_funcs.c b/src/vlib/buffer_funcs.c index 83ff296e705..624b6e6b8c7 100644 --- a/src/vlib/buffer_funcs.c +++ b/src/vlib/buffer_funcs.c @@ -87,7 +87,10 @@ CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_next_fn) while (n_left) { while (PREDICT_FALSE (used_elt_bmp[off] == ~0)) - off++; + { + off++; + ASSERT (off < ARRAY_LEN (used_elt_bmp)); + } next_index = nexts[off * 64 + count_trailing_zeros (~used_elt_bmp[off])]; @@ -113,7 +116,10 @@ CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_next_fn) while (n_left) { while (PREDICT_FALSE (used_elt_bmp[off] == ~0)) - off++; + { + off++; + ASSERT (off < ARRAY_LEN (used_elt_bmp)); + } next_index = nexts[off * 64 + count_trailing_zeros (~used_elt_bmp[off])]; @@ -161,100 +167,88 @@ next: } CLIB_MARCH_FN_REGISTRATION (vlib_buffer_enqueue_to_single_next_fn); -u32 __clib_section (".vlib_buffer_enqueue_to_thread_fn") -CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_thread_fn) -(vlib_main_t *vm, vlib_node_runtime_t *node, u32 frame_queue_index, - u32 *buffer_indices, u16 *thread_indices, u32 n_packets, - int drop_on_congestion) +static inline vlib_frame_queue_elt_t * +vlib_get_frame_queue_elt (vlib_frame_queue_main_t *fqm, u32 index, + int dont_wait) { - vlib_thread_main_t *tm = vlib_get_thread_main (); - vlib_frame_queue_main_t *fqm; - vlib_frame_queue_per_thread_data_t *ptd; - u32 n_left = n_packets; - u32 drop_list[VLIB_FRAME_SIZE], *dbi = drop_list, n_drop = 0; - vlib_frame_queue_elt_t *hf = 0; - u32 n_left_to_next_thread = 0, *to_next_thread = 0; - u32 next_thread_index, current_thread_index = ~0; - int i; + vlib_frame_queue_t *fq; + u64 nelts, tail, new_tail; - fqm = vec_elt_at_index (tm->frame_queue_mains, frame_queue_index); - ptd = vec_elt_at_index (fqm->per_thread_data, vm->thread_index); + fq = fqm->vlib_frame_queues[index]; + ASSERT (fq); + nelts = fq->nelts; + +retry: + tail = __atomic_load_n (&fq->tail, __ATOMIC_ACQUIRE); + new_tail = tail + 1; - while (n_left) + if (new_tail >= fq->head + nelts) { - next_thread_index = thread_indices[0]; + if (dont_wait) + return 0; - if (next_thread_index != current_thread_index) - { - if (drop_on_congestion && - is_vlib_frame_queue_congested ( - frame_queue_index, next_thread_index, fqm->queue_hi_thresh, - ptd->congested_handoff_queue_by_thread_index)) - { - dbi[0] = buffer_indices[0]; - dbi++; - n_drop++; - goto next; - } + /* Wait until a ring slot is available */ + while (new_tail >= fq->head + nelts) + vlib_worker_thread_barrier_check (); + } - if (hf) - hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_thread; + if (!__atomic_compare_exchange_n (&fq->tail, &tail, new_tail, 0 /* weak */, + __ATOMIC_RELAXED, __ATOMIC_RELAXED)) + goto retry; - hf = vlib_get_worker_handoff_queue_elt ( - frame_queue_index, next_thread_index, - ptd->handoff_queue_elt_by_thread_index); + return fq->elts + (new_tail & (nelts - 1)); +} - n_left_to_next_thread = VLIB_FRAME_SIZE - hf->n_vectors; - to_next_thread = &hf->buffer_index[hf->n_vectors]; - current_thread_index = next_thread_index; - } +static_always_inline u32 +vlib_buffer_enqueue_to_thread_inline (vlib_main_t *vm, + vlib_node_runtime_t *node, + vlib_frame_queue_main_t *fqm, + u32 *buffer_indices, u16 *thread_indices, + u32 n_packets, int drop_on_congestion) +{ + u32 drop_list[VLIB_FRAME_SIZE], n_drop = 0; + u64 used_elts[VLIB_FRAME_SIZE / 64] = {}; + u64 mask[VLIB_FRAME_SIZE / 64]; + vlib_frame_queue_elt_t *hf = 0; + u16 thread_index; + u32 n_comp, off = 0, n_left = n_packets; - to_next_thread[0] = buffer_indices[0]; - to_next_thread++; - n_left_to_next_thread--; + thread_index = thread_indices[0]; - if (n_left_to_next_thread == 0) - { - hf->n_vectors = VLIB_FRAME_SIZE; - vlib_put_frame_queue_elt (hf); - vlib_get_main_by_index (current_thread_index)->check_frame_queues = - 1; - current_thread_index = ~0; - ptd->handoff_queue_elt_by_thread_index[next_thread_index] = 0; - hf = 0; - } +more: + clib_mask_compare_u16 (thread_index, thread_indices, mask, n_packets); + hf = vlib_get_frame_queue_elt (fqm, thread_index, drop_on_congestion); - /* next */ - next: - thread_indices += 1; - buffer_indices += 1; - n_left -= 1; - } + n_comp = clib_compress_u32 (hf ? hf->buffer_index : drop_list + n_drop, + buffer_indices, mask, n_packets); if (hf) - hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_thread; + { + if (node->flags & VLIB_NODE_FLAG_TRACE) + hf->maybe_trace = 1; + hf->n_vectors = n_comp; + __atomic_store_n (&hf->valid, 1, __ATOMIC_RELEASE); + vlib_get_main_by_index (thread_index)->check_frame_queues = 1; + } + else + n_drop += n_comp; - /* Ship frames to the thread nodes */ - for (i = 0; i < vec_len (ptd->handoff_queue_elt_by_thread_index); i++) + n_left -= n_comp; + + if (n_left) { - if (ptd->handoff_queue_elt_by_thread_index[i]) + for (int i = 0; i < ARRAY_LEN (used_elts); i++) + used_elts[i] |= mask[i]; + + while (PREDICT_FALSE (used_elts[off] == ~0)) { - hf = ptd->handoff_queue_elt_by_thread_index[i]; - /* - * It works better to let the handoff node - * rate-adapt, always ship the handoff queue element. - */ - if (1 || hf->n_vectors == hf->last_n_vectors) - { - vlib_put_frame_queue_elt (hf); - vlib_get_main_by_index (i)->check_frame_queues = 1; - ptd->handoff_queue_elt_by_thread_index[i] = 0; - } - else - hf->last_n_vectors = hf->n_vectors; + off++; + ASSERT (off < ARRAY_LEN (used_elts)); } - ptd->congested_handoff_queue_by_thread_index[i] = - (vlib_frame_queue_t *) (~0); + + thread_index = + thread_indices[off * 64 + count_trailing_zeros (~used_elts[off])]; + goto more; } if (drop_on_congestion && n_drop) @@ -263,25 +257,50 @@ CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_thread_fn) return n_packets - n_drop; } +u32 __clib_section (".vlib_buffer_enqueue_to_thread_fn") +CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_thread_fn) +(vlib_main_t *vm, vlib_node_runtime_t *node, u32 frame_queue_index, + u32 *buffer_indices, u16 *thread_indices, u32 n_packets, + int drop_on_congestion) +{ + vlib_thread_main_t *tm = vlib_get_thread_main (); + vlib_frame_queue_main_t *fqm; + u32 n_enq = 0; + + fqm = vec_elt_at_index (tm->frame_queue_mains, frame_queue_index); + + while (n_packets >= VLIB_FRAME_SIZE) + { + n_enq += vlib_buffer_enqueue_to_thread_inline ( + vm, node, fqm, buffer_indices, thread_indices, VLIB_FRAME_SIZE, + drop_on_congestion); + buffer_indices += VLIB_FRAME_SIZE; + thread_indices += VLIB_FRAME_SIZE; + n_packets -= VLIB_FRAME_SIZE; + } + + if (n_packets == 0) + return n_enq; + + n_enq += vlib_buffer_enqueue_to_thread_inline (vm, node, fqm, buffer_indices, + thread_indices, n_packets, + drop_on_congestion); + + return n_enq; +} + CLIB_MARCH_FN_REGISTRATION (vlib_buffer_enqueue_to_thread_fn); -/* - * Check the frame queue to see if any frames are available. - * If so, pull the packets off the frames and put them to - * the handoff node. - */ u32 __clib_section (".vlib_frame_queue_dequeue_fn") CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) (vlib_main_t *vm, vlib_frame_queue_main_t *fqm) { u32 thread_id = vm->thread_index; vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id]; + u32 mask = fq->nelts - 1; vlib_frame_queue_elt_t *elt; - u32 *from, *to; - vlib_frame_t *f; - int msg_type; - int processed = 0; - u32 vectors = 0; + u32 n_free, n_copy, *from, *to = 0, processed = 0, vectors = 0; + vlib_frame_t *f = 0; ASSERT (fq); ASSERT (vm == vlib_global_main.vlib_mains[thread_id]); @@ -301,7 +320,6 @@ CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) fqt->nelts = fq->nelts; fqt->head = fq->head; - fqt->head_hint = fq->head_hint; fqt->tail = fq->tail; fqt->threshold = fq->vector_threshold; fqt->n_in_use = fqt->tail - fqt->head; @@ -318,7 +336,7 @@ CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) /* Record a snapshot of the elements in use */ for (elix = 0; elix < fqt->nelts; elix++) { - elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1)); + elt = fq->elts + ((fq->head + 1 + elix) & (mask)); if (1 || elt->valid) { fqt->n_vectors[elix] = elt->n_vectors; @@ -329,61 +347,71 @@ CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) while (1) { - vlib_buffer_t *b; if (fq->head == fq->tail) - { - fq->head_hint = fq->head; - return processed; - } + break; - elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1)); + elt = fq->elts + ((fq->head + 1) & mask); - if (!elt->valid) - { - fq->head_hint = fq->head; - return processed; - } + if (!__atomic_load_n (&elt->valid, __ATOMIC_ACQUIRE)) + break; - from = elt->buffer_index; - msg_type = elt->msg_type; + from = elt->buffer_index + elt->offset; - ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME); - ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE); + ASSERT (elt->offset + elt->n_vectors <= VLIB_FRAME_SIZE); - f = vlib_get_frame_to_node (vm, fqm->node_index); + if (f == 0) + { + f = vlib_get_frame_to_node (vm, fqm->node_index); + to = vlib_frame_vector_args (f); + n_free = VLIB_FRAME_SIZE; + } - /* If the first vector is traced, set the frame trace flag */ - b = vlib_get_buffer (vm, from[0]); - if (b->flags & VLIB_BUFFER_IS_TRACED) + if (elt->maybe_trace) f->frame_flags |= VLIB_NODE_FLAG_TRACE; - to = vlib_frame_vector_args (f); - - vlib_buffer_copy_indices (to, from, elt->n_vectors); + n_copy = clib_min (n_free, elt->n_vectors); - vectors += elt->n_vectors; - f->n_vectors = elt->n_vectors; - vlib_put_frame_to_node (vm, fqm->node_index, f); + vlib_buffer_copy_indices (to, from, n_copy); + to += n_copy; + n_free -= n_copy; + vectors += n_copy; - elt->valid = 0; - elt->n_vectors = 0; - elt->msg_type = 0xfefefefe; - CLIB_MEMORY_BARRIER (); - fq->head++; - processed++; + if (n_free == 0) + { + f->n_vectors = VLIB_FRAME_SIZE; + vlib_put_frame_to_node (vm, fqm->node_index, f); + f = 0; + } - /* - * Limit the number of packets pushed into the graph - */ - if (vectors >= fq->vector_threshold) + if (n_copy < elt->n_vectors) { - fq->head_hint = fq->head; - return processed; + /* not empty - leave it on the ring */ + elt->n_vectors -= n_copy; + elt->offset += n_copy; } + else + { + /* empty - reset and bump head */ + u32 sz = STRUCT_OFFSET_OF (vlib_frame_queue_elt_t, end_of_reset); + clib_memset (elt, 0, sz); + __atomic_store_n (&fq->head, fq->head + 1, __ATOMIC_RELEASE); + processed++; + } + + /* Limit the number of packets pushed into the graph */ + if (vectors >= fq->vector_threshold) + break; } - ASSERT (0); + + if (f) + { + f->n_vectors = VLIB_FRAME_SIZE - n_free; + vlib_put_frame_to_node (vm, fqm->node_index, f); + } + return processed; } + CLIB_MARCH_FN_REGISTRATION (vlib_frame_queue_dequeue_fn); #ifndef CLIB_MARCH_VARIANT diff --git a/src/vlib/node.h b/src/vlib/node.h index a06c20211d4..75a0adba8d1 100644 --- a/src/vlib/node.h +++ b/src/vlib/node.h @@ -765,7 +765,6 @@ typedef struct { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); u64 head; - u64 head_hint; u64 tail; u32 n_in_use; u32 nelts; diff --git a/src/vlib/threads.c b/src/vlib/threads.c index b02271bf310..a33e70ab8d4 100644 --- a/src/vlib/threads.c +++ b/src/vlib/threads.c @@ -346,26 +346,13 @@ vlib_frame_queue_alloc (int nelts) fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES); clib_memset (fq, 0, sizeof (*fq)); fq->nelts = nelts; - fq->vector_threshold = 128; // packets + fq->vector_threshold = 2 * VLIB_FRAME_SIZE; vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES); - if (1) + if (nelts & (nelts - 1)) { - if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1)) - fformat (stderr, "WARNING: fq->tail unaligned\n"); - if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1)) - fformat (stderr, "WARNING: fq->head unaligned\n"); - if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1)) - fformat (stderr, "WARNING: fq->elts unaligned\n"); - - if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES) - fformat (stderr, "WARNING: fq->elts[0] size %d\n", - sizeof (fq->elts[0])); - if (nelts & (nelts - 1)) - { - fformat (stderr, "FATAL: nelts MUST be a power of 2\n"); - abort (); - } + fformat (stderr, "FATAL: nelts MUST be a power of 2\n"); + abort (); } return (fq); @@ -1587,23 +1574,13 @@ vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts) fqm->node_index = node_index; fqm->frame_queue_nelts = frame_queue_nelts; - fqm->queue_hi_thresh = frame_queue_nelts - num_threads; vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1); - vec_validate (fqm->per_thread_data, tm->n_vlib_mains - 1); _vec_len (fqm->vlib_frame_queues) = 0; for (i = 0; i < tm->n_vlib_mains; i++) { - vlib_frame_queue_per_thread_data_t *ptd; fq = vlib_frame_queue_alloc (frame_queue_nelts); vec_add1 (fqm->vlib_frame_queues, fq); - - ptd = vec_elt_at_index (fqm->per_thread_data, i); - vec_validate (ptd->handoff_queue_elt_by_thread_index, - tm->n_vlib_mains - 1); - vec_validate_init_empty (ptd->congested_handoff_queue_by_thread_index, - tm->n_vlib_mains - 1, - (vlib_frame_queue_t *) (~0)); } return (fqm - tm->frame_queue_mains); diff --git a/src/vlib/threads.h b/src/vlib/threads.h index 2dfb535c3a5..91727bacc23 100644 --- a/src/vlib/threads.h +++ b/src/vlib/threads.h @@ -64,20 +64,16 @@ typedef struct vlib_thread_registration_ #define VLIB_LOG2_THREAD_STACK_SIZE (21) #define VLIB_THREAD_STACK_SIZE (1<<VLIB_LOG2_THREAD_STACK_SIZE) -typedef enum -{ - VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME, -} vlib_frame_queue_msg_type_t; - typedef struct { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); volatile u32 valid; - u32 msg_type; + u32 maybe_trace : 1; u32 n_vectors; - u32 last_n_vectors; + u32 offset; + STRUCT_MARK (end_of_reset); - /* 256 * 4 = 1024 bytes, even mult of cache line size */ + CLIB_CACHE_LINE_ALIGN_MARK (cacheline1); u32 buffer_index[VLIB_FRAME_SIZE]; } vlib_frame_queue_elt_t; @@ -117,42 +113,29 @@ extern vlib_worker_thread_t *vlib_worker_threads; typedef struct { - /* enqueue side */ + /* static data */ CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); - volatile u64 tail; - u32 enqueue_full_events; - - /* dequeue side */ - CLIB_CACHE_LINE_ALIGN_MARK (cacheline1); - volatile u64 head; - u64 trace; + vlib_frame_queue_elt_t *elts; u64 vector_threshold; + u64 trace; + u32 nelts; - /* dequeue hint to enqueue side */ - CLIB_CACHE_LINE_ALIGN_MARK (cacheline2); - volatile u64 head_hint; + /* modified by enqueue side */ + CLIB_CACHE_LINE_ALIGN_MARK (cacheline1); + volatile u64 tail; - /* read-only, constant, shared */ - CLIB_CACHE_LINE_ALIGN_MARK (cacheline3); - vlib_frame_queue_elt_t *elts; - u32 nelts; + /* modified by dequeue side */ + CLIB_CACHE_LINE_ALIGN_MARK (cacheline2); + volatile u64 head; } vlib_frame_queue_t; typedef struct { - vlib_frame_queue_elt_t **handoff_queue_elt_by_thread_index; - vlib_frame_queue_t **congested_handoff_queue_by_thread_index; -} vlib_frame_queue_per_thread_data_t; - -typedef struct -{ u32 node_index; u32 frame_queue_nelts; - u32 queue_hi_thresh; vlib_frame_queue_t **vlib_frame_queues; - vlib_frame_queue_per_thread_data_t *per_thread_data; /* for frame queue tracing */ frame_queue_trace_t *frame_queue_traces; @@ -169,10 +152,6 @@ typedef struct /* Called early, in thread 0's context */ clib_error_t *vlib_thread_init (vlib_main_t * vm); -int vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index, - u32 frame_queue_index, vlib_frame_t * frame, - vlib_frame_queue_msg_type_t type); - void vlib_worker_thread_node_runtime_update (void); void vlib_create_worker_threads (vlib_main_t * vm, int n, @@ -510,94 +489,6 @@ vlib_thread_is_main_w_barrier (void) && vlib_worker_threads->wait_at_barrier[0]))); } -static inline void -vlib_put_frame_queue_elt (vlib_frame_queue_elt_t * hf) -{ - CLIB_MEMORY_BARRIER (); - hf->valid = 1; -} - -static inline vlib_frame_queue_elt_t * -vlib_get_frame_queue_elt (u32 frame_queue_index, u32 index) -{ - vlib_frame_queue_t *fq; - vlib_frame_queue_elt_t *elt; - vlib_thread_main_t *tm = &vlib_thread_main; - vlib_frame_queue_main_t *fqm = - vec_elt_at_index (tm->frame_queue_mains, frame_queue_index); - u64 new_tail; - - fq = fqm->vlib_frame_queues[index]; - ASSERT (fq); - - new_tail = clib_atomic_add_fetch (&fq->tail, 1); - - /* Wait until a ring slot is available */ - while (new_tail >= fq->head_hint + fq->nelts) - vlib_worker_thread_barrier_check (); - - elt = fq->elts + (new_tail & (fq->nelts - 1)); - - /* this would be very bad... */ - while (elt->valid) - ; - - elt->msg_type = VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME; - elt->last_n_vectors = elt->n_vectors = 0; - - return elt; -} - -static inline vlib_frame_queue_t * -is_vlib_frame_queue_congested (u32 frame_queue_index, - u32 index, - u32 queue_hi_thresh, - vlib_frame_queue_t ** - handoff_queue_by_worker_index) -{ - vlib_frame_queue_t *fq; - vlib_thread_main_t *tm = &vlib_thread_main; - vlib_frame_queue_main_t *fqm = - vec_elt_at_index (tm->frame_queue_mains, frame_queue_index); - - fq = handoff_queue_by_worker_index[index]; - if (fq != (vlib_frame_queue_t *) (~0)) - return fq; - - fq = fqm->vlib_frame_queues[index]; - ASSERT (fq); - - if (PREDICT_FALSE (fq->tail >= (fq->head_hint + queue_hi_thresh))) - { - /* a valid entry in the array will indicate the queue has reached - * the specified threshold and is congested - */ - handoff_queue_by_worker_index[index] = fq; - fq->enqueue_full_events++; - return fq; - } - - return NULL; -} - -static inline vlib_frame_queue_elt_t * -vlib_get_worker_handoff_queue_elt (u32 frame_queue_index, - u32 vlib_worker_index, - vlib_frame_queue_elt_t ** - handoff_queue_elt_by_worker_index) -{ - vlib_frame_queue_elt_t *elt; - - if (handoff_queue_elt_by_worker_index[vlib_worker_index]) - return handoff_queue_elt_by_worker_index[vlib_worker_index]; - - elt = vlib_get_frame_queue_elt (frame_queue_index, vlib_worker_index); - - handoff_queue_elt_by_worker_index[vlib_worker_index] = elt; - - return elt; -} - u8 *vlib_thread_stack_init (uword thread_index); int vlib_thread_cb_register (struct vlib_main_t *vm, vlib_thread_callbacks_t * cb); diff --git a/src/vlib/threads_cli.c b/src/vlib/threads_cli.c index bcb85ec69fb..d14e9c50e27 100644 --- a/src/vlib/threads_cli.c +++ b/src/vlib/threads_cli.c @@ -290,8 +290,8 @@ show_frame_queue_internal (vlib_main_t * vm, vlib_cli_output (vm, " vector-threshold %d ring size %d in use %d\n", fqt->threshold, fqt->nelts, fqt->n_in_use); - vlib_cli_output (vm, " head %12d head_hint %12d tail %12d\n", - fqt->head, fqt->head_hint, fqt->tail); + vlib_cli_output (vm, " head %12d tail %12d\n", fqt->head, + fqt->tail); vlib_cli_output (vm, " %3d %3d %3d %3d %3d %3d %3d %3d %3d %3d %3d %3d %3d %3d %3d %3d\n", fqt->n_vectors[0], fqt->n_vectors[1], |