aboutsummaryrefslogtreecommitdiffstats
path: root/lib/libtle_udp/udp_rxtx.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libtle_udp/udp_rxtx.c')
-rw-r--r--lib/libtle_udp/udp_rxtx.c235
1 files changed, 103 insertions, 132 deletions
diff --git a/lib/libtle_udp/udp_rxtx.c b/lib/libtle_udp/udp_rxtx.c
index a8ab3fb..7136714 100644
--- a/lib/libtle_udp/udp_rxtx.c
+++ b/lib/libtle_udp/udp_rxtx.c
@@ -217,13 +217,13 @@ tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[],
}
static inline void
-tx_cage_release(struct buf_cage *bc)
+stream_drb_release(struct tle_udp_stream *s, struct tle_drb * drb[],
+ uint32_t nb_drb)
{
- struct tle_udp_stream *s;
uint32_t n;
- s = bcg_get_udata(bc);
- n = bcg_free(bc);
+ n = rte_ring_count(s->tx.drb.r);
+ rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
/* If stream is still open, then mark it as avaialble for writing. */
if (rwl_try_acquire(&s->tx.use) > 0) {
@@ -232,7 +232,7 @@ tx_cage_release(struct buf_cage *bc)
tle_event_raise(s->tx.ev);
/* if stream send buffer was full invoke TX callback */
- else if (s->tx.cb.func != NULL && n == 1)
+ else if (s->tx.cb.func != NULL && n == 0)
s->tx.cb.func(s->tx.cb.data, s);
}
@@ -240,53 +240,32 @@ tx_cage_release(struct buf_cage *bc)
rwl_release(&s->tx.use);
}
-static inline void
-tx_cage_update(struct tle_udp_dev *dev, struct buf_cage *bc)
-{
- struct tle_udp_stream *s;
- struct tle_udp_ctx *ctx;
- uint32_t idx;
-
- ctx = dev->ctx;
- s = bcg_get_udata(bc);
- idx = dev - ctx->dev;
-
- /* mark cage as closed to the stream. */
- rte_spinlock_lock(&s->tx.lock);
- if (bc == s->tx.cg[idx])
- s->tx.cg[idx] = NULL;
- rte_spinlock_unlock(&s->tx.lock);
-}
-
uint16_t
tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
{
- struct buf_cage *bc;
- uint32_t i, n;
+ uint32_t i, j, k, n;
+ struct tle_drb *drb[num];
+ struct tle_udp_stream *s;
- for (i = 0; i != num; i += n) {
+ /* extract packets from device TX queue. */
- bc = dev->tx.bc;
- if (bc == NULL) {
- if (dev->tx.beq.num == 0)
- bcg_queue_append(&dev->tx.beq, &dev->tx.feq);
- bc = __bcg_dequeue_head(&dev->tx.beq);
- if (bc == NULL)
- break;
- tx_cage_update(dev, bc);
- dev->tx.bc = bc;
- }
+ k = num;
+ n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
+ num, drb, &k);
- n = bcg_get(bc, (const void **)(uintptr_t)&pkt[i], num - i);
+ if (n == 0)
+ return 0;
- /* cage is empty, need to free it and notify related stream. */
- if (bcg_fill_count(bc) == 0) {
- tx_cage_release(bc);
- dev->tx.bc = NULL;
- }
+ /* free empty drbs and notify related streams. */
+
+ for (i = 0; i != k; i = j) {
+ s = drb[i]->udata;
+ for (j = i + 1; j != k && s == drb[i]->udata; j++)
+ ;
+ stream_drb_release(s, drb + i, j - i);
}
- return i;
+ return n;
}
static int
@@ -359,30 +338,41 @@ compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero)
* helper function, do the necessary pre-processing for the received packets
* before handiing them to the strem_recv caller.
*/
-static inline struct rte_mbuf *
-recv_pkt_process(struct rte_mbuf *m, uint32_t type)
+static inline uint32_t
+recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
{
- uint64_t f;
-
- f = m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
- if (f != 0) {
- if (check_pkt_csum(m, type) == 0)
- m->ol_flags ^= f;
- else {
- rte_pktmbuf_free(m);
- return NULL;
+ uint32_t i, k;
+ uint64_t f, flg[num], ofl[num];
+
+ for (i = 0; i != num; i++) {
+ flg[i] = m[i]->ol_flags;
+ ofl[i] = m[i]->tx_offload;
+ }
+
+ k = 0;
+ for (i = 0; i != num; i++) {
+
+ f = flg[i] & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
+
+ /* drop packets with invalid cksum(s). */
+ if (f != 0 && check_pkt_csum(m[i], type) != 0) {
+ rte_pktmbuf_free(m[i]);
+ m[i] = NULL;
+ k++;
}
+
+ m[i]->ol_flags ^= f;
+ rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
}
- rte_pktmbuf_adj(m, m->l2_len + m->l3_len + m->l4_len);
- return m;
+ return k;
}
uint16_t
tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
uint16_t num)
{
- uint32_t i, k, n;
+ uint32_t k, n;
n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
if (n == 0)
@@ -398,28 +388,7 @@ tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
rwl_release(&s->rx.use);
}
- k = 0;
- for (i = 0; i != RTE_ALIGN_FLOOR(n, 4); i += 4) {
- pkt[i] = recv_pkt_process(pkt[i], s->type);
- pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type);
- pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type);
- pkt[i + 3] = recv_pkt_process(pkt[i + 3], s->type);
- k += (pkt[i] == NULL) + (pkt[i + 1] == NULL) +
- (pkt[i + 2] == NULL) + (pkt[i + 3] == NULL);
- }
-
- switch (n % 4) {
- case 3:
- pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type);
- k += (pkt[i + 2] == NULL);
- case 2:
- pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type);
- k += (pkt[i + 1] == NULL);
- case 1:
- pkt[i] = recv_pkt_process(pkt[i], s->type);
- k += (pkt[i] == NULL);
- }
-
+ k = recv_pkt_process(pkt, n, s->type);
return compress_pkt_list(pkt, n, k);
}
@@ -586,65 +555,59 @@ fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
return frag_num;
}
+static inline void
+stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[],
+ uint32_t nb_drb)
+{
+ rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+}
+
+static inline uint32_t
+stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[],
+ uint32_t nb_drb)
+{
+ return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+}
+
/* enqueue up to num packets to the destination device queue. */
static inline uint16_t
-queue_pkt_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di,
- const void *pkt[], uint16_t num)
+queue_pkt_out(struct tle_udp_stream *s, struct tle_udp_dev *dev,
+ const void *pkt[], uint16_t nb_pkt,
+ struct tle_drb *drbs[], uint32_t *nb_drb)
{
- struct buf_cage *bc;
- uint32_t i, n;
+ uint32_t bsz, i, n, nb, nbc, nbm;
- rte_spinlock_lock(&s->tx.lock);
- bc = s->tx.cg[di];
+ bsz = s->tx.drb.nb_elem;
- for (i = 0; i != num; i += n) {
- if (bc == NULL) {
- bc = bcg_alloc(s->tx.st);
- if (bc == NULL)
- break;
- n = bcg_put(bc, pkt + i, num - i);
- bcg_enqueue_tail(bq, bc);
- } else
- n = bcg_put(bc, pkt + i, num - i);
+ /* calulate how many drbs are needed.*/
+ nbc = *nb_drb;
+ nbm = (nb_pkt + bsz - 1) / bsz;
+ nb = RTE_MAX(nbm, nbc) - nbc;
- if (n != num - i)
- bc = NULL;
- }
+ /* allocate required drbs */
+ if (nb != 0)
+ nb = stream_drb_alloc(s, drbs + nbc, nb);
- s->tx.cg[di] = bc;
- rte_spinlock_unlock(&s->tx.lock);
- return i;
-}
+ nb += nbc;
-/*
- * etiher enqueue all num packets or none.
- * assumes that all number of input packets not exceed size of buf_cage.
- */
-static inline uint16_t
-queue_frg_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di,
- const void *pkt[], uint16_t num)
-{
- struct buf_cage *bc, *bcp;
- uint32_t n;
+ /* no free drbs, can't send anything */
+ if (nb == 0)
+ return 0;
- rte_spinlock_lock(&s->tx.lock);
- bc = s->tx.cg[di];
+ /* not enough free drbs, reduce number of packets to send. */
+ else if (nb != nbm)
+ nb_pkt = nb * bsz;
- n = 0;
- if (bc == NULL || bcg_free_count(bc) < num) {
- bcp = bc;
- bc = bcg_alloc(s->tx.st);
- if (bc != NULL) {
- if (bcp != NULL)
- n = bcg_put(bcp, pkt, num);
- n += bcg_put(bc, pkt, num - n);
- bcg_enqueue_tail(bq, bc);
- }
- } else
- n = bcg_put(bc, pkt, num);
+ /* enqueue packets to the destination device. */
+ nbc = nb;
+ n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb);
- s->tx.cg[di] = bc;
- rte_spinlock_unlock(&s->tx.lock);
+ /* if not all available drbs were consumed, move them to the start. */
+ nbc -= nb;
+ for (i = 0; i != nb; i++)
+ drbs[i] = drbs[nbc + i];
+
+ *nb_drb = nb;
return n;
}
@@ -654,13 +617,14 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
{
int32_t di, frg, rc;
uint64_t ol_flags;
- uint32_t i, k, n;
+ uint32_t i, k, n, nb;
uint32_t mtu, pid, type;
const struct sockaddr_in *d4;
const struct sockaddr_in6 *d6;
const void *da;
union udph udph;
struct tle_udp_dest dst;
+ struct tle_drb *drb[num];
type = s->type;
@@ -704,6 +668,7 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
if (rwl_acquire(&s->tx.use) < 0)
return 0;
+ nb = 0;
for (i = 0, k = 0; k != num; k = i) {
/* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */
@@ -726,8 +691,9 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
/* enqueue non-fragment packets to the destination device. */
if (k != i) {
- k += queue_pkt_out(s, &dst.dev->tx.feq, di,
- (const void **)(uintptr_t)&pkt[k], i - k);
+ k += queue_pkt_out(s, dst.dev,
+ (const void **)(uintptr_t)&pkt[k], i - k,
+ drb, &nb);
/* stream TX queue is full. */
if (k != i)
@@ -746,8 +712,8 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
break;
}
- n = queue_frg_out(s, &dst.dev->tx.feq, di,
- (const void **)(uintptr_t)frag, rc);
+ n = queue_pkt_out(s, dst.dev,
+ (const void **)(uintptr_t)frag, rc, drb, &nb);
if (n == 0) {
while (rc-- != 0)
rte_pktmbuf_free(frag[rc]);
@@ -765,6 +731,11 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
tle_event_raise(s->tx.ev);
out:
+ /* free unused drbs. */
+ if (nb != 0)
+ stream_drb_free(s, drb, nb);
+
+ /* stream can be closed. */
rwl_release(&s->tx.use);
/*