summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDamjan Marion <damarion@cisco.com>2018-05-05 12:30:28 +0200
committerDamjan Marion <dmarion.lists@gmail.com>2018-05-09 09:32:39 +0000
commitee7f0bd9e7ce4106d3b9511b0efede4326bded51 (patch)
treec29b3111ae18ebe49ee3e3f8449857076f3b06d9
parentaf05bc018a465a993e795d7c81c2eb12d5b77e44 (diff)
dpdk: tx code rework
Change-Id: Ifea9c772e8784642433b92091f5769eb9ec06890 Signed-off-by: Damjan Marion <damarion@cisco.com>
-rw-r--r--src/plugins/dpdk/device/device.c395
-rw-r--r--src/plugins/dpdk/device/dpdk.h14
-rwxr-xr-xsrc/plugins/dpdk/device/init.c10
-rw-r--r--src/vppinfra/vector_avx2.h12
4 files changed, 170 insertions, 261 deletions
diff --git a/src/plugins/dpdk/device/device.c b/src/plugins/dpdk/device/device.c
index 9ae3f9cbe6e..044c8728212 100644
--- a/src/plugins/dpdk/device/device.c
+++ b/src/plugins/dpdk/device/device.c
@@ -26,7 +26,6 @@
#define foreach_dpdk_tx_func_error \
_(BAD_RETVAL, "DPDK tx function returned an error") \
- _(RING_FULL, "Tx packet drops (ring full)") \
_(PKT_DROP, "Tx packet drops (dpdk tx failure)") \
_(REPL_FAIL, "Tx packet drops (replication failure)")
@@ -111,10 +110,9 @@ dpdk_replicate_packet_mb (vlib_buffer_t * b)
}
static void
-dpdk_tx_trace_buffer (dpdk_main_t * dm,
- vlib_node_runtime_t * node,
- dpdk_device_t * xd,
- u16 queue_id, u32 buffer_index, vlib_buffer_t * buffer)
+dpdk_tx_trace_buffer (dpdk_main_t * dm, vlib_node_runtime_t * node,
+ dpdk_device_t * xd, u16 queue_id,
+ vlib_buffer_t * buffer)
{
vlib_main_t *vm = vlib_get_main ();
dpdk_tx_trace_t *t0;
@@ -125,7 +123,7 @@ dpdk_tx_trace_buffer (dpdk_main_t * dm,
t0 = vlib_add_trace (vm, node, buffer, sizeof (t0[0]));
t0->queue_index = queue_id;
t0->device_index = xd->device_index;
- t0->buffer_index = buffer_index;
+ t0->buffer_index = vlib_get_buffer_index (vm, buffer);
clib_memcpy (&t0->mb, mb, sizeof (t0->mb));
clib_memcpy (&t0->buffer, buffer,
sizeof (buffer[0]) - sizeof (buffer->pre_data));
@@ -181,58 +179,26 @@ dpdk_validate_rte_mbuf (vlib_main_t * vm, vlib_buffer_t * b,
}
/*
- * This function calls the dpdk's tx_burst function to transmit the packets
- * on the tx_vector. It manages a lock per-device if the device does not
+ * This function calls the dpdk's tx_burst function to transmit the packets.
+ * It manages a lock per-device if the device does not
* support multiple queues. It returns the number of packets untransmitted
- * on the tx_vector. If all packets are transmitted (the normal case), the
- * function returns 0.
- *
- * The function assumes there is at least one packet on the tx_vector.
+ * If all packets are transmitted (the normal case), the function returns 0.
*/
static_always_inline
u32 tx_burst_vector_internal (vlib_main_t * vm,
dpdk_device_t * xd,
- struct rte_mbuf **tx_vector)
+ struct rte_mbuf **mb, u32 n_left)
{
dpdk_main_t *dm = &dpdk_main;
- u32 n_packets;
- u32 tx_head;
- u32 tx_tail;
u32 n_retry;
- int rv;
+ int n_sent = 0;
int queue_id;
- tx_ring_hdr_t *ring;
-
- ring = vec_header (tx_vector, sizeof (*ring));
-
- n_packets = ring->tx_head - ring->tx_tail;
-
- tx_head = ring->tx_head % xd->nb_tx_desc;
-
- /*
- * Ensure rte_eth_tx_burst is not called with 0 packets, which can lead to
- * unpredictable results.
- */
- ASSERT (n_packets > 0);
-
- /*
- * Check for tx_vector overflow. If this fails it is a system configuration
- * error. The ring should be sized big enough to handle the largest un-flowed
- * off burst from a traffic manager. A larger size also helps performance
- * a bit because it decreases the probability of having to issue two tx_burst
- * calls due to a ring wrap.
- */
- ASSERT (n_packets < xd->nb_tx_desc);
- ASSERT (ring->tx_tail == 0);
n_retry = 16;
queue_id = vm->thread_index;
do
{
- /* start the burst at the tail */
- tx_tail = ring->tx_tail % xd->nb_tx_desc;
-
/*
* This device only supports one TX queue,
* and we're running multi-threaded...
@@ -253,30 +219,25 @@ static_always_inline
ASSERT (hqos->swq != NULL);
- dpdk_hqos_metadata_set (hqos,
- &tx_vector[tx_tail], tx_head - tx_tail);
- rv = rte_ring_sp_enqueue_burst (hqos->swq,
- (void **) &tx_vector[tx_tail],
- (uint16_t) (tx_head - tx_tail), 0);
+ dpdk_hqos_metadata_set (hqos, mb, n_left);
+ n_sent = rte_ring_sp_enqueue_burst (hqos->swq, (void **) mb,
+ n_left, 0);
}
else if (PREDICT_TRUE (xd->flags & DPDK_DEVICE_FLAG_PMD))
{
/* no wrap, transmit in one burst */
- rv = rte_eth_tx_burst (xd->device_index,
- (uint16_t) queue_id,
- &tx_vector[tx_tail],
- (uint16_t) (tx_head - tx_tail));
+ n_sent = rte_eth_tx_burst (xd->device_index, queue_id, mb, n_left);
}
else
{
ASSERT (0);
- rv = 0;
+ n_sent = 0;
}
if (PREDICT_FALSE (xd->lockp != 0))
*xd->lockp[queue_id] = 0;
- if (PREDICT_FALSE (rv < 0))
+ if (PREDICT_FALSE (n_sent < 0))
{
// emit non-fatal message, bump counter
vnet_main_t *vnm = dm->vnet_main;
@@ -288,24 +249,21 @@ static_always_inline
vlib_error_count (vm, node_index, DPDK_TX_FUNC_ERROR_BAD_RETVAL, 1);
clib_warning ("rte_eth_tx_burst[%d]: error %d", xd->device_index,
- rv);
- return n_packets; // untransmitted packets
+ n_sent);
+ return n_left; // untransmitted packets
}
- ring->tx_tail += (u16) rv;
- n_packets -= (uint16_t) rv;
+ n_left -= n_sent;
+ mb += n_sent;
}
- while (rv && n_packets && (n_retry > 0));
+ while (n_sent && n_left && (n_retry > 0));
- return n_packets;
+ return n_left;
}
static_always_inline void
-dpdk_prefetch_buffer_by_index (vlib_main_t * vm, u32 bi)
+dpdk_prefetch_buffer (vlib_main_t * vm, struct rte_mbuf *mb)
{
- vlib_buffer_t *b;
- struct rte_mbuf *mb;
- b = vlib_get_buffer (vm, bi);
- mb = rte_mbuf_from_vlib_buffer (b);
+ vlib_buffer_t *b = vlib_buffer_from_rte_mbuf (mb);
CLIB_PREFETCH (mb, 2 * CLIB_CACHE_LINE_BYTES, STORE);
CLIB_PREFETCH (b, CLIB_CACHE_LINE_BYTES, LOAD);
}
@@ -315,7 +273,6 @@ dpdk_buffer_recycle (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_buffer_t * b, u32 bi, struct rte_mbuf **mbp)
{
dpdk_main_t *dm = &dpdk_main;
- u32 my_cpu = vm->thread_index;
struct rte_mbuf *mb_new;
if (PREDICT_FALSE (b->flags & VLIB_BUFFER_RECYCLE) == 0)
@@ -331,7 +288,7 @@ dpdk_buffer_recycle (vlib_main_t * vm, vlib_node_runtime_t * node,
else
*mbp = mb_new;
- vec_add1 (dm->recycle[my_cpu], bi);
+ vec_add1 (dm->recycle[vm->thread_index], bi);
}
static_always_inline void
@@ -367,9 +324,8 @@ dpdk_buffer_tx_offload (dpdk_device_t * xd, vlib_buffer_t * b,
/*
* Transmits the packets on the frame to the interface associated with the
- * node. It first copies packets on the frame to a tx_vector containing the
- * rte_mbuf pointers. It then passes this vector to tx_burst_vector_internal
- * which calls the dpdk tx_burst function.
+ * node. It first copies packets on the frame to a per-thread arrays
+ * containing the rte_mbuf pointers.
*/
uword
CLIB_MULTIARCH_FN (dpdk_interface_tx) (vlib_main_t * vm,
@@ -382,46 +338,25 @@ CLIB_MULTIARCH_FN (dpdk_interface_tx) (vlib_main_t * vm,
u32 n_packets = f->n_vectors;
u32 n_left;
u32 *from;
- struct rte_mbuf **tx_vector;
- u16 i;
- u16 nb_tx_desc = xd->nb_tx_desc;
- int queue_id;
- u32 my_cpu;
- u32 tx_pkts = 0;
- tx_ring_hdr_t *ring;
- u32 n_on_ring;
-
- my_cpu = vm->thread_index;
-
- queue_id = my_cpu;
-
- tx_vector = xd->tx_vectors[queue_id];
- ring = vec_header (tx_vector, sizeof (*ring));
+ u32 thread_index = vm->thread_index;
+ int queue_id = thread_index;
+ u32 tx_pkts = 0, all_or_flags = 0;
+ dpdk_per_thread_data_t *ptd = vec_elt_at_index (dm->per_thread_data,
+ thread_index);
+ struct rte_mbuf **mb;
+ vlib_buffer_t *b[4];
+#ifdef CLIB_HAVE_VEC256
+ u64x4 off4 = u64x4_splat (buffer_main.buffer_mem_start -
+ sizeof (struct rte_mbuf));
+ u32x8 permute_mask = { 0, 4, 1, 5, 2, 6, 3, 7 };
+ u32x8 zero = { 0 };
+#endif
- n_on_ring = ring->tx_head - ring->tx_tail;
from = vlib_frame_vector_args (f);
ASSERT (n_packets <= VLIB_FRAME_SIZE);
- if (PREDICT_FALSE (n_on_ring + n_packets > nb_tx_desc))
- {
- /*
- * Overflowing the ring should never happen.
- * If it does then drop the whole frame.
- */
- vlib_error_count (vm, node->node_index, DPDK_TX_FUNC_ERROR_RING_FULL,
- n_packets);
-
- while (n_packets--)
- {
- u32 bi0 = from[n_packets];
- vlib_buffer_t *b0 = vlib_get_buffer (vm, bi0);
- struct rte_mbuf *mb0 = rte_mbuf_from_vlib_buffer (b0);
- rte_pktmbuf_free (mb0);
- }
- return n_on_ring;
- }
-
+ /* TX PCAP tracing */
if (PREDICT_FALSE (dm->tx_pcap_enable))
{
n_left = n_packets;
@@ -437,170 +372,162 @@ CLIB_MULTIARCH_FN (dpdk_interface_tx) (vlib_main_t * vm,
}
}
+ /* calculate rte_mbuf pointers out of buffer indices */
+ from = vlib_frame_vector_args (f);
+ n_left = n_packets;
+ mb = ptd->mbufs;
+ while (n_left >= 8)
+ {
+#ifdef CLIB_HAVE_VEC256
+ u32x8 bi0, bi1;
+ u64x4 mb0, mb1;
+ /* load 4 bufer indices into lower part of 256-bit register */
+ bi0 = u32x8_insert_lo (zero, u32x4_load_unaligned (from));
+ bi1 = u32x8_insert_lo (zero, u32x4_load_unaligned (from + 4));
+ /* permute 256-bit register so each buffer index is in own u64 */
+ mb0 = (u64x4) u32x8_permute (bi0, permute_mask);
+ mb1 = (u64x4) u32x8_permute (bi1, permute_mask);
+ /* shift and add to get rte_mbuf pointer */
+ mb0 <<= CLIB_LOG2_CACHE_LINE_BYTES;
+ mb1 <<= CLIB_LOG2_CACHE_LINE_BYTES;
+ u64x4_store_unaligned (mb0 + off4, mb);
+ u64x4_store_unaligned (mb1 + off4, mb + 4);
+#else
+ mb[0] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[0]));
+ mb[1] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[1]));
+ mb[2] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[2]));
+ mb[3] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[3]));
+ mb[4] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[4]));
+ mb[5] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[5]));
+ mb[6] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[6]));
+ mb[7] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[7]));
+#endif
+ from += 8;
+ mb += 8;
+ n_left -= 8;
+ }
+ while (n_left)
+ {
+ mb[0] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[0]));
+ from++;
+ mb++;
+ n_left--;
+ }
from = vlib_frame_vector_args (f);
n_left = n_packets;
- i = ring->tx_head % nb_tx_desc;
+ mb = ptd->mbufs;
while (n_left >= 8)
{
- u32 bi0, bi1, bi2, bi3;
- struct rte_mbuf *mb0, *mb1, *mb2, *mb3;
- vlib_buffer_t *b0, *b1, *b2, *b3;
u32 or_flags;
- dpdk_prefetch_buffer_by_index (vm, from[4]);
- dpdk_prefetch_buffer_by_index (vm, from[5]);
- dpdk_prefetch_buffer_by_index (vm, from[6]);
- dpdk_prefetch_buffer_by_index (vm, from[7]);
+ dpdk_prefetch_buffer (vm, mb[4]);
+ dpdk_prefetch_buffer (vm, mb[5]);
+ dpdk_prefetch_buffer (vm, mb[6]);
+ dpdk_prefetch_buffer (vm, mb[7]);
- bi0 = from[0];
- bi1 = from[1];
- bi2 = from[2];
- bi3 = from[3];
- from += 4;
+ b[0] = vlib_buffer_from_rte_mbuf (mb[0]);
+ b[1] = vlib_buffer_from_rte_mbuf (mb[1]);
+ b[2] = vlib_buffer_from_rte_mbuf (mb[2]);
+ b[3] = vlib_buffer_from_rte_mbuf (mb[3]);
- b0 = vlib_get_buffer (vm, bi0);
- b1 = vlib_get_buffer (vm, bi1);
- b2 = vlib_get_buffer (vm, bi2);
- b3 = vlib_get_buffer (vm, bi3);
+ or_flags = b[0]->flags | b[1]->flags | b[2]->flags | b[3]->flags;
+ all_or_flags |= or_flags;
- or_flags = b0->flags | b1->flags | b2->flags | b3->flags;
-
- VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
- VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b1);
- VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b2);
- VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b3);
+ VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b[0]);
+ VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b[1]);
+ VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b[2]);
+ VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b[3]);
if (or_flags & VLIB_BUFFER_NEXT_PRESENT)
{
- dpdk_validate_rte_mbuf (vm, b0, 1);
- dpdk_validate_rte_mbuf (vm, b1, 1);
- dpdk_validate_rte_mbuf (vm, b2, 1);
- dpdk_validate_rte_mbuf (vm, b3, 1);
+ dpdk_validate_rte_mbuf (vm, b[0], 1);
+ dpdk_validate_rte_mbuf (vm, b[1], 1);
+ dpdk_validate_rte_mbuf (vm, b[2], 1);
+ dpdk_validate_rte_mbuf (vm, b[3], 1);
}
else
{
- dpdk_validate_rte_mbuf (vm, b0, 0);
- dpdk_validate_rte_mbuf (vm, b1, 0);
- dpdk_validate_rte_mbuf (vm, b2, 0);
- dpdk_validate_rte_mbuf (vm, b3, 0);
+ dpdk_validate_rte_mbuf (vm, b[0], 0);
+ dpdk_validate_rte_mbuf (vm, b[1], 0);
+ dpdk_validate_rte_mbuf (vm, b[2], 0);
+ dpdk_validate_rte_mbuf (vm, b[3], 0);
}
- mb0 = rte_mbuf_from_vlib_buffer (b0);
- mb1 = rte_mbuf_from_vlib_buffer (b1);
- mb2 = rte_mbuf_from_vlib_buffer (b2);
- mb3 = rte_mbuf_from_vlib_buffer (b3);
-
if (PREDICT_FALSE ((xd->flags & DPDK_DEVICE_FLAG_TX_OFFLOAD) &&
(or_flags &
(VNET_BUFFER_F_OFFLOAD_TCP_CKSUM
| VNET_BUFFER_F_OFFLOAD_IP_CKSUM
| VNET_BUFFER_F_OFFLOAD_UDP_CKSUM))))
{
- dpdk_buffer_tx_offload (xd, b0, mb0);
- dpdk_buffer_tx_offload (xd, b1, mb1);
- dpdk_buffer_tx_offload (xd, b2, mb2);
- dpdk_buffer_tx_offload (xd, b3, mb3);
- }
-
- if (PREDICT_FALSE (or_flags & VLIB_BUFFER_RECYCLE))
- {
- dpdk_buffer_recycle (vm, node, b0, bi0, &mb0);
- dpdk_buffer_recycle (vm, node, b1, bi1, &mb1);
- dpdk_buffer_recycle (vm, node, b2, bi2, &mb2);
- dpdk_buffer_recycle (vm, node, b3, bi3, &mb3);
-
- /* dont enqueue packets if replication failed as they must
- be sent back to recycle */
- if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_REPL_FAIL) == 0))
- tx_vector[i++ % nb_tx_desc] = mb0;
- if (PREDICT_TRUE ((b1->flags & VLIB_BUFFER_REPL_FAIL) == 0))
- tx_vector[i++ % nb_tx_desc] = mb1;
- if (PREDICT_TRUE ((b2->flags & VLIB_BUFFER_REPL_FAIL) == 0))
- tx_vector[i++ % nb_tx_desc] = mb2;
- if (PREDICT_TRUE ((b3->flags & VLIB_BUFFER_REPL_FAIL) == 0))
- tx_vector[i++ % nb_tx_desc] = mb3;
- }
- else
- {
- if (PREDICT_FALSE (i + 3 >= nb_tx_desc))
- {
- tx_vector[i++ % nb_tx_desc] = mb0;
- tx_vector[i++ % nb_tx_desc] = mb1;
- tx_vector[i++ % nb_tx_desc] = mb2;
- tx_vector[i++ % nb_tx_desc] = mb3;
- i %= nb_tx_desc;
- }
- else
- {
- tx_vector[i++] = mb0;
- tx_vector[i++] = mb1;
- tx_vector[i++] = mb2;
- tx_vector[i++] = mb3;
- }
+ dpdk_buffer_tx_offload (xd, b[0], mb[0]);
+ dpdk_buffer_tx_offload (xd, b[1], mb[1]);
+ dpdk_buffer_tx_offload (xd, b[2], mb[2]);
+ dpdk_buffer_tx_offload (xd, b[3], mb[3]);
}
-
if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_TRACE))
{
- if (b0->flags & VLIB_BUFFER_IS_TRACED)
- dpdk_tx_trace_buffer (dm, node, xd, queue_id, bi0, b0);
- if (b1->flags & VLIB_BUFFER_IS_TRACED)
- dpdk_tx_trace_buffer (dm, node, xd, queue_id, bi1, b1);
- if (b2->flags & VLIB_BUFFER_IS_TRACED)
- dpdk_tx_trace_buffer (dm, node, xd, queue_id, bi2, b2);
- if (b3->flags & VLIB_BUFFER_IS_TRACED)
- dpdk_tx_trace_buffer (dm, node, xd, queue_id, bi3, b3);
+ if (b[0]->flags & VLIB_BUFFER_IS_TRACED)
+ dpdk_tx_trace_buffer (dm, node, xd, queue_id, b[0]);
+ if (b[1]->flags & VLIB_BUFFER_IS_TRACED)
+ dpdk_tx_trace_buffer (dm, node, xd, queue_id, b[1]);
+ if (b[2]->flags & VLIB_BUFFER_IS_TRACED)
+ dpdk_tx_trace_buffer (dm, node, xd, queue_id, b[2]);
+ if (b[3]->flags & VLIB_BUFFER_IS_TRACED)
+ dpdk_tx_trace_buffer (dm, node, xd, queue_id, b[3]);
}
+ mb += 4;
n_left -= 4;
}
while (n_left > 0)
{
- u32 bi0;
- struct rte_mbuf *mb0;
- vlib_buffer_t *b0;
+ b[0] = vlib_buffer_from_rte_mbuf (mb[0]);
+ all_or_flags |= b[0]->flags;
+ VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b[0]);
- bi0 = from[0];
- from++;
+ dpdk_validate_rte_mbuf (vm, b[0], 1);
+ dpdk_buffer_tx_offload (xd, b[0], mb[0]);
- b0 = vlib_get_buffer (vm, bi0);
- VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
+ if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_TRACE))
+ if (b[0]->flags & VLIB_BUFFER_IS_TRACED)
+ dpdk_tx_trace_buffer (dm, node, xd, queue_id, b[0]);
- dpdk_validate_rte_mbuf (vm, b0, 1);
+ mb++;
+ n_left--;
+ }
- mb0 = rte_mbuf_from_vlib_buffer (b0);
- dpdk_buffer_tx_offload (xd, b0, mb0);
- dpdk_buffer_recycle (vm, node, b0, bi0, &mb0);
+ /* run inly if we have buffers to recycle */
+ if (PREDICT_FALSE (all_or_flags & VLIB_BUFFER_RECYCLE))
+ {
+ struct rte_mbuf **mb_old;
+ from = vlib_frame_vector_args (f);
+ n_left = n_packets;
+ mb_old = mb = ptd->mbufs;
+ while (n_left > 0)
+ {
+ b[0] = vlib_buffer_from_rte_mbuf (mb[0]);
+ dpdk_buffer_recycle (vm, node, b[0], from[0], &mb_old[0]);
- if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_TRACE))
- if (b0->flags & VLIB_BUFFER_IS_TRACED)
- dpdk_tx_trace_buffer (dm, node, xd, queue_id, bi0, b0);
+ /* in case of REPL_FAIL we need to shift data */
+ mb[0] = mb_old[0];
- if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_REPL_FAIL) == 0))
- {
- tx_vector[i % nb_tx_desc] = mb0;
- i++;
+ if (PREDICT_TRUE ((b[0]->flags & VLIB_BUFFER_REPL_FAIL) == 0))
+ mb++;
+ mb_old++;
+ from++;
+ n_left--;
}
- n_left--;
}
- /* account for additional packets in the ring */
- ring->tx_head += n_packets;
- n_on_ring = ring->tx_head - ring->tx_tail;
-
/* transmit as many packets as possible */
- n_packets = tx_burst_vector_internal (vm, xd, tx_vector);
-
- /*
- * tx_pkts is the number of packets successfully transmitted
- * This is the number originally on ring minus the number remaining on ring
- */
- tx_pkts = n_on_ring - n_packets;
+ n_packets = mb - ptd->mbufs;
+ n_left = tx_burst_vector_internal (vm, xd, ptd->mbufs, n_packets);
{
/* If there is no callback then drop any non-transmitted packets */
- if (PREDICT_FALSE (n_packets))
+ if (PREDICT_FALSE (n_left))
{
vlib_simple_counter_main_t *cm;
vnet_main_t *vnm = vnet_get_main ();
@@ -608,31 +535,25 @@ CLIB_MULTIARCH_FN (dpdk_interface_tx) (vlib_main_t * vm,
cm = vec_elt_at_index (vnm->interface_main.sw_if_counters,
VNET_INTERFACE_COUNTER_TX_ERROR);
- vlib_increment_simple_counter (cm, my_cpu, xd->sw_if_index,
- n_packets);
+ vlib_increment_simple_counter (cm, thread_index, xd->sw_if_index,
+ n_left);
vlib_error_count (vm, node->node_index, DPDK_TX_FUNC_ERROR_PKT_DROP,
- n_packets);
+ n_left);
- while (n_packets--)
- rte_pktmbuf_free (tx_vector[ring->tx_tail + n_packets]);
+ while (n_left--)
+ rte_pktmbuf_free (ptd->mbufs[n_packets - n_left]);
}
-
- /* Reset head/tail to avoid unnecessary wrap */
- ring->tx_head = 0;
- ring->tx_tail = 0;
}
/* Recycle replicated buffers */
- if (PREDICT_FALSE (vec_len (dm->recycle[my_cpu])))
+ if (PREDICT_FALSE (vec_len (dm->recycle[thread_index])))
{
- vlib_buffer_free (vm, dm->recycle[my_cpu],
- vec_len (dm->recycle[my_cpu]));
- _vec_len (dm->recycle[my_cpu]) = 0;
+ vlib_buffer_free (vm, dm->recycle[thread_index],
+ vec_len (dm->recycle[thread_index]));
+ _vec_len (dm->recycle[thread_index]) = 0;
}
- ASSERT (ring->tx_head >= ring->tx_tail);
-
return tx_pkts;
}
diff --git a/src/plugins/dpdk/device/dpdk.h b/src/plugins/dpdk/device/dpdk.h
index f02e718dc9c..0778659db55 100644
--- a/src/plugins/dpdk/device/dpdk.h
+++ b/src/plugins/dpdk/device/dpdk.h
@@ -110,17 +110,6 @@ typedef enum
VNET_DPDK_PORT_TYPE_UNKNOWN,
} dpdk_port_type_t;
-/*
- * The header for the tx_vector in dpdk_device_t.
- * Head and tail are indexes into the tx_vector and are of type
- * u64 so they never overflow.
- */
-typedef struct
-{
- u64 tx_head;
- u64 tx_tail;
-} tx_ring_hdr_t;
-
typedef uint16_t dpdk_portid_t;
typedef struct
@@ -191,9 +180,6 @@ typedef struct
/* next node index if we decide to steal the rx graph arc */
u32 per_interface_next_index;
- /* dpdk rte_mbuf rx and tx vectors, VLIB_FRAME_SIZE */
- struct rte_mbuf ***tx_vectors; /* one per worker thread */
-
dpdk_pmd_t pmd:8;
i8 cpu_socket;
diff --git a/src/plugins/dpdk/device/init.c b/src/plugins/dpdk/device/init.c
index 9ed3efdd49d..83d26ce13e5 100755
--- a/src/plugins/dpdk/device/init.c
+++ b/src/plugins/dpdk/device/init.c
@@ -256,7 +256,6 @@ dpdk_lib_init (dpdk_main_t * dm)
{
u8 addr[6];
u8 vlan_strip = 0;
- int j;
struct rte_eth_dev_info dev_info;
struct rte_eth_link l;
dpdk_device_config_t *devconf = 0;
@@ -537,15 +536,6 @@ dpdk_lib_init (dpdk_main_t * dm)
dq->queue_id = 0;
}
- vec_validate_aligned (xd->tx_vectors, tm->n_vlib_mains,
- CLIB_CACHE_LINE_BYTES);
- for (j = 0; j < tm->n_vlib_mains; j++)
- {
- vec_validate_ha (xd->tx_vectors[j], xd->nb_tx_desc,
- sizeof (tx_ring_hdr_t), CLIB_CACHE_LINE_BYTES);
- vec_reset_length (xd->tx_vectors[j]);
- }
-
/* count the number of descriptors used for this device */
nb_desc += xd->nb_rx_desc + xd->nb_tx_desc * xd->tx_q_used;
diff --git a/src/vppinfra/vector_avx2.h b/src/vppinfra/vector_avx2.h
index ad7e7d4dea8..9c1ce4700c5 100644
--- a/src/vppinfra/vector_avx2.h
+++ b/src/vppinfra/vector_avx2.h
@@ -69,6 +69,18 @@ u32x8_extract_hi (u32x8 v)
return (u32x4) _mm256_extracti128_si256 ((__m256i) v, 1);
}
+always_inline u32x8
+u32x8_insert_lo (u32x8 v1, u32x4 v2)
+{
+ return (u32x8) _mm256_inserti128_si256 ((__m256i) v1, (__m128i) v2, 0);
+}
+
+always_inline u32x8
+u32x8_insert_hi (u32x8 v1, u32x4 v2)
+{
+ return (u32x8) _mm256_inserti128_si256 ((__m256i) v1, (__m128i) v2, 1);
+}
+
#endif /* included_vector_avx2_h */
/*