diff options
Diffstat (limited to 'src/vlib/buffer_funcs.c')
-rw-r--r-- | src/vlib/buffer_funcs.c | 286 |
1 files changed, 157 insertions, 129 deletions
diff --git a/src/vlib/buffer_funcs.c b/src/vlib/buffer_funcs.c index 83ff296e705..624b6e6b8c7 100644 --- a/src/vlib/buffer_funcs.c +++ b/src/vlib/buffer_funcs.c @@ -87,7 +87,10 @@ CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_next_fn) while (n_left) { while (PREDICT_FALSE (used_elt_bmp[off] == ~0)) - off++; + { + off++; + ASSERT (off < ARRAY_LEN (used_elt_bmp)); + } next_index = nexts[off * 64 + count_trailing_zeros (~used_elt_bmp[off])]; @@ -113,7 +116,10 @@ CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_next_fn) while (n_left) { while (PREDICT_FALSE (used_elt_bmp[off] == ~0)) - off++; + { + off++; + ASSERT (off < ARRAY_LEN (used_elt_bmp)); + } next_index = nexts[off * 64 + count_trailing_zeros (~used_elt_bmp[off])]; @@ -161,100 +167,88 @@ next: } CLIB_MARCH_FN_REGISTRATION (vlib_buffer_enqueue_to_single_next_fn); -u32 __clib_section (".vlib_buffer_enqueue_to_thread_fn") -CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_thread_fn) -(vlib_main_t *vm, vlib_node_runtime_t *node, u32 frame_queue_index, - u32 *buffer_indices, u16 *thread_indices, u32 n_packets, - int drop_on_congestion) +static inline vlib_frame_queue_elt_t * +vlib_get_frame_queue_elt (vlib_frame_queue_main_t *fqm, u32 index, + int dont_wait) { - vlib_thread_main_t *tm = vlib_get_thread_main (); - vlib_frame_queue_main_t *fqm; - vlib_frame_queue_per_thread_data_t *ptd; - u32 n_left = n_packets; - u32 drop_list[VLIB_FRAME_SIZE], *dbi = drop_list, n_drop = 0; - vlib_frame_queue_elt_t *hf = 0; - u32 n_left_to_next_thread = 0, *to_next_thread = 0; - u32 next_thread_index, current_thread_index = ~0; - int i; + vlib_frame_queue_t *fq; + u64 nelts, tail, new_tail; - fqm = vec_elt_at_index (tm->frame_queue_mains, frame_queue_index); - ptd = vec_elt_at_index (fqm->per_thread_data, vm->thread_index); + fq = fqm->vlib_frame_queues[index]; + ASSERT (fq); + nelts = fq->nelts; + +retry: + tail = __atomic_load_n (&fq->tail, __ATOMIC_ACQUIRE); + new_tail = tail + 1; - while (n_left) + if (new_tail >= fq->head + nelts) { - next_thread_index = thread_indices[0]; + if (dont_wait) + return 0; - if (next_thread_index != current_thread_index) - { - if (drop_on_congestion && - is_vlib_frame_queue_congested ( - frame_queue_index, next_thread_index, fqm->queue_hi_thresh, - ptd->congested_handoff_queue_by_thread_index)) - { - dbi[0] = buffer_indices[0]; - dbi++; - n_drop++; - goto next; - } + /* Wait until a ring slot is available */ + while (new_tail >= fq->head + nelts) + vlib_worker_thread_barrier_check (); + } - if (hf) - hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_thread; + if (!__atomic_compare_exchange_n (&fq->tail, &tail, new_tail, 0 /* weak */, + __ATOMIC_RELAXED, __ATOMIC_RELAXED)) + goto retry; - hf = vlib_get_worker_handoff_queue_elt ( - frame_queue_index, next_thread_index, - ptd->handoff_queue_elt_by_thread_index); + return fq->elts + (new_tail & (nelts - 1)); +} - n_left_to_next_thread = VLIB_FRAME_SIZE - hf->n_vectors; - to_next_thread = &hf->buffer_index[hf->n_vectors]; - current_thread_index = next_thread_index; - } +static_always_inline u32 +vlib_buffer_enqueue_to_thread_inline (vlib_main_t *vm, + vlib_node_runtime_t *node, + vlib_frame_queue_main_t *fqm, + u32 *buffer_indices, u16 *thread_indices, + u32 n_packets, int drop_on_congestion) +{ + u32 drop_list[VLIB_FRAME_SIZE], n_drop = 0; + u64 used_elts[VLIB_FRAME_SIZE / 64] = {}; + u64 mask[VLIB_FRAME_SIZE / 64]; + vlib_frame_queue_elt_t *hf = 0; + u16 thread_index; + u32 n_comp, off = 0, n_left = n_packets; - to_next_thread[0] = buffer_indices[0]; - to_next_thread++; - n_left_to_next_thread--; + thread_index = thread_indices[0]; - if (n_left_to_next_thread == 0) - { - hf->n_vectors = VLIB_FRAME_SIZE; - vlib_put_frame_queue_elt (hf); - vlib_get_main_by_index (current_thread_index)->check_frame_queues = - 1; - current_thread_index = ~0; - ptd->handoff_queue_elt_by_thread_index[next_thread_index] = 0; - hf = 0; - } +more: + clib_mask_compare_u16 (thread_index, thread_indices, mask, n_packets); + hf = vlib_get_frame_queue_elt (fqm, thread_index, drop_on_congestion); - /* next */ - next: - thread_indices += 1; - buffer_indices += 1; - n_left -= 1; - } + n_comp = clib_compress_u32 (hf ? hf->buffer_index : drop_list + n_drop, + buffer_indices, mask, n_packets); if (hf) - hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_thread; + { + if (node->flags & VLIB_NODE_FLAG_TRACE) + hf->maybe_trace = 1; + hf->n_vectors = n_comp; + __atomic_store_n (&hf->valid, 1, __ATOMIC_RELEASE); + vlib_get_main_by_index (thread_index)->check_frame_queues = 1; + } + else + n_drop += n_comp; - /* Ship frames to the thread nodes */ - for (i = 0; i < vec_len (ptd->handoff_queue_elt_by_thread_index); i++) + n_left -= n_comp; + + if (n_left) { - if (ptd->handoff_queue_elt_by_thread_index[i]) + for (int i = 0; i < ARRAY_LEN (used_elts); i++) + used_elts[i] |= mask[i]; + + while (PREDICT_FALSE (used_elts[off] == ~0)) { - hf = ptd->handoff_queue_elt_by_thread_index[i]; - /* - * It works better to let the handoff node - * rate-adapt, always ship the handoff queue element. - */ - if (1 || hf->n_vectors == hf->last_n_vectors) - { - vlib_put_frame_queue_elt (hf); - vlib_get_main_by_index (i)->check_frame_queues = 1; - ptd->handoff_queue_elt_by_thread_index[i] = 0; - } - else - hf->last_n_vectors = hf->n_vectors; + off++; + ASSERT (off < ARRAY_LEN (used_elts)); } - ptd->congested_handoff_queue_by_thread_index[i] = - (vlib_frame_queue_t *) (~0); + + thread_index = + thread_indices[off * 64 + count_trailing_zeros (~used_elts[off])]; + goto more; } if (drop_on_congestion && n_drop) @@ -263,25 +257,50 @@ CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_thread_fn) return n_packets - n_drop; } +u32 __clib_section (".vlib_buffer_enqueue_to_thread_fn") +CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_thread_fn) +(vlib_main_t *vm, vlib_node_runtime_t *node, u32 frame_queue_index, + u32 *buffer_indices, u16 *thread_indices, u32 n_packets, + int drop_on_congestion) +{ + vlib_thread_main_t *tm = vlib_get_thread_main (); + vlib_frame_queue_main_t *fqm; + u32 n_enq = 0; + + fqm = vec_elt_at_index (tm->frame_queue_mains, frame_queue_index); + + while (n_packets >= VLIB_FRAME_SIZE) + { + n_enq += vlib_buffer_enqueue_to_thread_inline ( + vm, node, fqm, buffer_indices, thread_indices, VLIB_FRAME_SIZE, + drop_on_congestion); + buffer_indices += VLIB_FRAME_SIZE; + thread_indices += VLIB_FRAME_SIZE; + n_packets -= VLIB_FRAME_SIZE; + } + + if (n_packets == 0) + return n_enq; + + n_enq += vlib_buffer_enqueue_to_thread_inline (vm, node, fqm, buffer_indices, + thread_indices, n_packets, + drop_on_congestion); + + return n_enq; +} + CLIB_MARCH_FN_REGISTRATION (vlib_buffer_enqueue_to_thread_fn); -/* - * Check the frame queue to see if any frames are available. - * If so, pull the packets off the frames and put them to - * the handoff node. - */ u32 __clib_section (".vlib_frame_queue_dequeue_fn") CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) (vlib_main_t *vm, vlib_frame_queue_main_t *fqm) { u32 thread_id = vm->thread_index; vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id]; + u32 mask = fq->nelts - 1; vlib_frame_queue_elt_t *elt; - u32 *from, *to; - vlib_frame_t *f; - int msg_type; - int processed = 0; - u32 vectors = 0; + u32 n_free, n_copy, *from, *to = 0, processed = 0, vectors = 0; + vlib_frame_t *f = 0; ASSERT (fq); ASSERT (vm == vlib_global_main.vlib_mains[thread_id]); @@ -301,7 +320,6 @@ CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) fqt->nelts = fq->nelts; fqt->head = fq->head; - fqt->head_hint = fq->head_hint; fqt->tail = fq->tail; fqt->threshold = fq->vector_threshold; fqt->n_in_use = fqt->tail - fqt->head; @@ -318,7 +336,7 @@ CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) /* Record a snapshot of the elements in use */ for (elix = 0; elix < fqt->nelts; elix++) { - elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1)); + elt = fq->elts + ((fq->head + 1 + elix) & (mask)); if (1 || elt->valid) { fqt->n_vectors[elix] = elt->n_vectors; @@ -329,61 +347,71 @@ CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) while (1) { - vlib_buffer_t *b; if (fq->head == fq->tail) - { - fq->head_hint = fq->head; - return processed; - } + break; - elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1)); + elt = fq->elts + ((fq->head + 1) & mask); - if (!elt->valid) - { - fq->head_hint = fq->head; - return processed; - } + if (!__atomic_load_n (&elt->valid, __ATOMIC_ACQUIRE)) + break; - from = elt->buffer_index; - msg_type = elt->msg_type; + from = elt->buffer_index + elt->offset; - ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME); - ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE); + ASSERT (elt->offset + elt->n_vectors <= VLIB_FRAME_SIZE); - f = vlib_get_frame_to_node (vm, fqm->node_index); + if (f == 0) + { + f = vlib_get_frame_to_node (vm, fqm->node_index); + to = vlib_frame_vector_args (f); + n_free = VLIB_FRAME_SIZE; + } - /* If the first vector is traced, set the frame trace flag */ - b = vlib_get_buffer (vm, from[0]); - if (b->flags & VLIB_BUFFER_IS_TRACED) + if (elt->maybe_trace) f->frame_flags |= VLIB_NODE_FLAG_TRACE; - to = vlib_frame_vector_args (f); - - vlib_buffer_copy_indices (to, from, elt->n_vectors); + n_copy = clib_min (n_free, elt->n_vectors); - vectors += elt->n_vectors; - f->n_vectors = elt->n_vectors; - vlib_put_frame_to_node (vm, fqm->node_index, f); + vlib_buffer_copy_indices (to, from, n_copy); + to += n_copy; + n_free -= n_copy; + vectors += n_copy; - elt->valid = 0; - elt->n_vectors = 0; - elt->msg_type = 0xfefefefe; - CLIB_MEMORY_BARRIER (); - fq->head++; - processed++; + if (n_free == 0) + { + f->n_vectors = VLIB_FRAME_SIZE; + vlib_put_frame_to_node (vm, fqm->node_index, f); + f = 0; + } - /* - * Limit the number of packets pushed into the graph - */ - if (vectors >= fq->vector_threshold) + if (n_copy < elt->n_vectors) { - fq->head_hint = fq->head; - return processed; + /* not empty - leave it on the ring */ + elt->n_vectors -= n_copy; + elt->offset += n_copy; } + else + { + /* empty - reset and bump head */ + u32 sz = STRUCT_OFFSET_OF (vlib_frame_queue_elt_t, end_of_reset); + clib_memset (elt, 0, sz); + __atomic_store_n (&fq->head, fq->head + 1, __ATOMIC_RELEASE); + processed++; + } + + /* Limit the number of packets pushed into the graph */ + if (vectors >= fq->vector_threshold) + break; } - ASSERT (0); + + if (f) + { + f->n_vectors = VLIB_FRAME_SIZE - n_free; + vlib_put_frame_to_node (vm, fqm->node_index, f); + } + return processed; } + CLIB_MARCH_FN_REGISTRATION (vlib_frame_queue_dequeue_fn); #ifndef CLIB_MARCH_VARIANT |