summaryrefslogtreecommitdiffstats
path: root/src/plugins/memif/device.c
diff options
context:
space:
mode:
authorDamjan Marion <damarion@cisco.com>2017-11-02 17:07:59 +0100
committerDamjan Marion <dmarion.lists@gmail.com>2018-03-22 15:54:43 +0000
commit5c37ce3e0264c0bec75610837c5819ff4407bd5c (patch)
tree445c8d5f9fa2ee7bdb1f8fb6e1228c1bcbf633d9 /src/plugins/memif/device.c
parent32f4e18c59f368e9c43f4483de12353280c2149b (diff)
memif: version 2
In version 1 of the protocol sender was always ring producer and receiver was consumer. In version 2 slave is always producer, and in case of master-to-slave rings, slave is responsible for populating ring with empty buffers. As this is major change, we need to bump version number. In addition, descriptor size is reduced to 16 bytes. This change allows zero-copy-slave operation (to be privided in the separate patch). Change-Id: I02115d232f455ffc05c0bd247f7d03f47252cfaf Signed-off-by: Damjan Marion <damarion@cisco.com> Signed-off-by: Jakub Grajciar <jgrajcia@cisco.com>
Diffstat (limited to 'src/plugins/memif/device.c')
-rw-r--r--src/plugins/memif/device.c265
1 files changed, 158 insertions, 107 deletions
diff --git a/src/plugins/memif/device.c b/src/plugins/memif/device.c
index 22f9753dc42..112db57b4b4 100644
--- a/src/plugins/memif/device.c
+++ b/src/plugins/memif/device.c
@@ -30,8 +30,7 @@
#define foreach_memif_tx_func_error \
_(NO_FREE_SLOTS, "no free tx slots") \
-_(TRUNC_PACKET, "packet > buffer size -- truncated in tx ring") \
-_(PENDING_MSGS, "pending msgs in tx ring")
+_(ROLLBACK, "no enough space in tx buffers")
typedef enum
{
@@ -86,77 +85,15 @@ format_memif_tx_trace (u8 * s, va_list * args)
}
static_always_inline void
-memif_prefetch_buffer_and_data (vlib_main_t * vm, u32 bi)
+memif_add_copy_op (memif_per_thread_data_t * ptd, void *data, u32 len,
+ u16 buffer_offset, u16 buffer_vec_index)
{
- vlib_buffer_t *b = vlib_get_buffer (vm, bi);
- vlib_prefetch_buffer_header (b, LOAD);
- CLIB_PREFETCH (b->data, CLIB_CACHE_LINE_BYTES, LOAD);
-}
-
-/**
- * @brief Copy buffer to tx ring
- *
- * @param * vm (in)
- * @param * node (in)
- * @param * mif (in) pointer to memif interface
- * @param bi (in) vlib buffer index
- * @param * ring (in) pointer to memif ring
- * @param * head (in/out) ring head
- * @param mask (in) ring size - 1
- */
-static_always_inline void
-memif_copy_buffer_to_tx_ring (vlib_main_t * vm, vlib_node_runtime_t * node,
- memif_if_t * mif, u32 bi, memif_ring_t * ring,
- u16 * head, u16 mask)
-{
- vlib_buffer_t *b0;
- void *mb0;
- u32 total = 0, len;
- u16 slot = (*head) & mask;
-
- mb0 = memif_get_buffer (mif, ring, slot);
- ring->desc[slot].flags = 0;
- do
- {
- b0 = vlib_get_buffer (vm, bi);
- len = b0->current_length;
- if (PREDICT_FALSE (ring->desc[slot].buffer_length < (total + len)))
- {
- if (PREDICT_TRUE (total))
- {
- ring->desc[slot].length = total;
- total = 0;
- ring->desc[slot].flags |= MEMIF_DESC_FLAG_NEXT;
- (*head)++;
- slot = (*head) & mask;
- mb0 = memif_get_buffer (mif, ring, slot);
- ring->desc[slot].flags = 0;
- }
- }
- if (PREDICT_TRUE (ring->desc[slot].buffer_length >= (total + len)))
- {
- clib_memcpy (mb0 + total, vlib_buffer_get_current (b0),
- CLIB_CACHE_LINE_BYTES);
- if (len > CLIB_CACHE_LINE_BYTES)
- clib_memcpy (mb0 + CLIB_CACHE_LINE_BYTES + total,
- vlib_buffer_get_current (b0) + CLIB_CACHE_LINE_BYTES,
- len - CLIB_CACHE_LINE_BYTES);
- total += len;
- }
- else
- {
- vlib_error_count (vm, node->node_index, MEMIF_TX_ERROR_TRUNC_PACKET,
- 1);
- break;
- }
- }
- while ((bi = (b0->flags & VLIB_BUFFER_NEXT_PRESENT) ? b0->next_buffer : 0));
-
- if (PREDICT_TRUE (total))
- {
- ring->desc[slot].length = total;
- (*head)++;
- }
+ memif_copy_op_t *co;
+ vec_add2_aligned (ptd->copy_ops, co, 1, CLIB_CACHE_LINE_BYTES);
+ co->data = data;
+ co->data_len = len;
+ co->buffer_offset = buffer_offset;
+ co->buffer_vec_index = buffer_vec_index;
}
static_always_inline uword
@@ -168,13 +105,18 @@ memif_interface_tx_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
memif_ring_t *ring;
u32 *buffers = vlib_frame_args (frame);
u32 n_left = frame->n_vectors;
- u16 ring_size, mask;
- u16 head, tail;
- u16 free_slots;
+ u32 n_copy_op;
+ u16 ring_size, mask, slot, free_slots;
u32 thread_index = vlib_get_thread_index ();
+ memif_per_thread_data_t *ptd = vec_elt_at_index (memif_main.per_thread_data,
+ thread_index);
u8 tx_queues = vec_len (mif->tx_queues);
memif_queue_t *mq;
int n_retries = 5;
+ vlib_buffer_t *b0, *b1, *b2, *b3;
+ memif_copy_op_t *co;
+ memif_region_index_t last_region = ~0;
+ void *last_region_shm = 0;
if (tx_queues < vec_len (vlib_mains))
{
@@ -189,49 +131,158 @@ memif_interface_tx_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
ring = mq->ring;
ring_size = 1 << mq->log2_ring_size;
mask = ring_size - 1;
-retry:
-
- /* free consumed buffers */
- head = ring->head;
- tail = ring->tail;
+retry:
- free_slots = ring_size - head + tail;
+ free_slots = ring->tail - mq->last_tail;
+ mq->last_tail += free_slots;
+ slot = (type == MEMIF_RING_S2M) ? ring->head : ring->tail;
- while (n_left > 5 && free_slots > 1)
- {
- CLIB_PREFETCH (memif_get_buffer (mif, ring, (head + 2) & mask),
- CLIB_CACHE_LINE_BYTES, STORE);
- CLIB_PREFETCH (memif_get_buffer (mif, ring, (head + 3) & mask),
- CLIB_CACHE_LINE_BYTES, STORE);
- CLIB_PREFETCH (&ring->desc[(head + 4) & mask], CLIB_CACHE_LINE_BYTES,
- STORE);
- CLIB_PREFETCH (&ring->desc[(head + 5) & mask], CLIB_CACHE_LINE_BYTES,
- STORE);
- memif_prefetch_buffer_and_data (vm, buffers[2]);
- memif_prefetch_buffer_and_data (vm, buffers[3]);
-
- memif_copy_buffer_to_tx_ring (vm, node, mif, buffers[0], ring, &head,
- mask);
- memif_copy_buffer_to_tx_ring (vm, node, mif, buffers[1], ring, &head,
- mask);
-
- buffers += 2;
- n_left -= 2;
- free_slots -= 2;
- }
+ if (type == MEMIF_RING_S2M)
+ free_slots = ring_size - ring->head + mq->last_tail;
+ else
+ free_slots = ring->head - ring->tail;
while (n_left && free_slots)
{
- memif_copy_buffer_to_tx_ring (vm, node, mif, buffers[0], ring, &head,
- mask);
+ memif_desc_t *d0;
+ void *mb0;
+ i32 src_off;
+ u32 bi0, dst_off, src_left, dst_left, bytes_to_copy;
+ u32 saved_ptd_copy_ops_len = _vec_len (ptd->copy_ops);
+ u32 saved_ptd_buffers_len = _vec_len (ptd->buffers);
+ u16 saved_slot = slot;
+
+ CLIB_PREFETCH (&ring->desc[(slot + 8) & mask], CLIB_CACHE_LINE_BYTES,
+ LOAD);
+
+ d0 = &ring->desc[slot & mask];
+ if (PREDICT_FALSE (last_region != d0->region))
+ {
+ last_region_shm = mif->regions[d0->region].shm;
+ last_region = d0->region;
+ }
+ mb0 = last_region_shm + d0->offset;
+
+ dst_off = 0;
+
+ /* slave is the producer, so it should be able to reset buffer length */
+ dst_left = (type == MEMIF_RING_S2M) ? mif->run.buffer_size : d0->length;
+
+ if (PREDICT_TRUE (n_left >= 4))
+ vlib_prefetch_buffer_header (vlib_get_buffer (vm, buffers[3]), LOAD);
+ bi0 = buffers[0];
+
+ next_in_chain:
+
+ b0 = vlib_get_buffer (vm, bi0);
+ src_off = b0->current_data;
+ src_left = b0->current_length;
+
+ while (src_left)
+ {
+ if (PREDICT_FALSE (dst_left == 0))
+ {
+ if (free_slots)
+ {
+ slot++;
+ free_slots--;
+ d0->flags = MEMIF_DESC_FLAG_NEXT;
+ d0 = &ring->desc[slot & mask];
+ dst_off = 0;
+ dst_left =
+ (type ==
+ MEMIF_RING_S2M) ? mif->run.buffer_size : d0->length;
+
+ if (PREDICT_FALSE (last_region != d0->region))
+ {
+ last_region_shm = mif->regions[d0->region].shm;
+ last_region = d0->region;
+ }
+ mb0 = last_region_shm + d0->offset;
+ }
+ else
+ {
+ /* we need to rollback vectors before bailing out */
+ _vec_len (ptd->buffers) = saved_ptd_buffers_len;
+ _vec_len (ptd->copy_ops) = saved_ptd_copy_ops_len;
+ vlib_error_count (vm, node->node_index,
+ MEMIF_TX_ERROR_ROLLBACK, 1);
+ slot = saved_slot;
+ goto no_free_slots;
+ }
+ }
+ bytes_to_copy = clib_min (src_left, dst_left);
+ memif_add_copy_op (ptd, mb0 + dst_off, bytes_to_copy, src_off,
+ vec_len (ptd->buffers));
+ vec_add1_aligned (ptd->buffers, bi0, CLIB_CACHE_LINE_BYTES);
+ src_off += bytes_to_copy;
+ dst_off += bytes_to_copy;
+ src_left -= bytes_to_copy;
+ dst_left -= bytes_to_copy;
+ }
+
+ if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_NEXT_PRESENT))
+ {
+ bi0 = b0->next_buffer;
+ goto next_in_chain;
+ }
+
+ d0->length = dst_off;
+ d0->flags = 0;
+
+ free_slots -= 1;
+ slot += 1;
+
buffers++;
n_left--;
- free_slots--;
}
+no_free_slots:
+
+ /* copy data */
+ n_copy_op = vec_len (ptd->copy_ops);
+ co = ptd->copy_ops;
+ while (n_copy_op >= 8)
+ {
+ CLIB_PREFETCH (co[4].data, CLIB_CACHE_LINE_BYTES, LOAD);
+ CLIB_PREFETCH (co[5].data, CLIB_CACHE_LINE_BYTES, LOAD);
+ CLIB_PREFETCH (co[6].data, CLIB_CACHE_LINE_BYTES, LOAD);
+ CLIB_PREFETCH (co[7].data, CLIB_CACHE_LINE_BYTES, LOAD);
+
+ b0 = vlib_get_buffer (vm, ptd->buffers[co[0].buffer_vec_index]);
+ b1 = vlib_get_buffer (vm, ptd->buffers[co[1].buffer_vec_index]);
+ b2 = vlib_get_buffer (vm, ptd->buffers[co[2].buffer_vec_index]);
+ b3 = vlib_get_buffer (vm, ptd->buffers[co[3].buffer_vec_index]);
+
+ clib_memcpy (co[0].data, b0->data + co[0].buffer_offset,
+ co[0].data_len);
+ clib_memcpy (co[1].data, b1->data + co[1].buffer_offset,
+ co[1].data_len);
+ clib_memcpy (co[2].data, b2->data + co[2].buffer_offset,
+ co[2].data_len);
+ clib_memcpy (co[3].data, b3->data + co[3].buffer_offset,
+ co[3].data_len);
+
+ co += 4;
+ n_copy_op -= 4;
+ }
+ while (n_copy_op)
+ {
+ b0 = vlib_get_buffer (vm, ptd->buffers[co[0].buffer_vec_index]);
+ clib_memcpy (co[0].data, b0->data + co[0].buffer_offset,
+ co[0].data_len);
+ co += 1;
+ n_copy_op -= 1;
+ }
+
+ vec_reset_length (ptd->copy_ops);
+ vec_reset_length (ptd->buffers);
CLIB_MEMORY_STORE_BARRIER ();
- ring->head = head;
+ if (type == MEMIF_RING_S2M)
+ ring->head = slot;
+ else
+ ring->tail = slot;
if (n_left && n_retries--)
goto retry;