diff options
Diffstat (limited to 'lib/libtle_udp/udp_rxtx.c')
-rw-r--r-- | lib/libtle_udp/udp_rxtx.c | 235 |
1 files changed, 103 insertions, 132 deletions
diff --git a/lib/libtle_udp/udp_rxtx.c b/lib/libtle_udp/udp_rxtx.c index a8ab3fb..7136714 100644 --- a/lib/libtle_udp/udp_rxtx.c +++ b/lib/libtle_udp/udp_rxtx.c @@ -217,13 +217,13 @@ tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], } static inline void -tx_cage_release(struct buf_cage *bc) +stream_drb_release(struct tle_udp_stream *s, struct tle_drb * drb[], + uint32_t nb_drb) { - struct tle_udp_stream *s; uint32_t n; - s = bcg_get_udata(bc); - n = bcg_free(bc); + n = rte_ring_count(s->tx.drb.r); + rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb); /* If stream is still open, then mark it as avaialble for writing. */ if (rwl_try_acquire(&s->tx.use) > 0) { @@ -232,7 +232,7 @@ tx_cage_release(struct buf_cage *bc) tle_event_raise(s->tx.ev); /* if stream send buffer was full invoke TX callback */ - else if (s->tx.cb.func != NULL && n == 1) + else if (s->tx.cb.func != NULL && n == 0) s->tx.cb.func(s->tx.cb.data, s); } @@ -240,53 +240,32 @@ tx_cage_release(struct buf_cage *bc) rwl_release(&s->tx.use); } -static inline void -tx_cage_update(struct tle_udp_dev *dev, struct buf_cage *bc) -{ - struct tle_udp_stream *s; - struct tle_udp_ctx *ctx; - uint32_t idx; - - ctx = dev->ctx; - s = bcg_get_udata(bc); - idx = dev - ctx->dev; - - /* mark cage as closed to the stream. */ - rte_spinlock_lock(&s->tx.lock); - if (bc == s->tx.cg[idx]) - s->tx.cg[idx] = NULL; - rte_spinlock_unlock(&s->tx.lock); -} - uint16_t tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num) { - struct buf_cage *bc; - uint32_t i, n; + uint32_t i, j, k, n; + struct tle_drb *drb[num]; + struct tle_udp_stream *s; - for (i = 0; i != num; i += n) { + /* extract packets from device TX queue. */ - bc = dev->tx.bc; - if (bc == NULL) { - if (dev->tx.beq.num == 0) - bcg_queue_append(&dev->tx.beq, &dev->tx.feq); - bc = __bcg_dequeue_head(&dev->tx.beq); - if (bc == NULL) - break; - tx_cage_update(dev, bc); - dev->tx.bc = bc; - } + k = num; + n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt, + num, drb, &k); - n = bcg_get(bc, (const void **)(uintptr_t)&pkt[i], num - i); + if (n == 0) + return 0; - /* cage is empty, need to free it and notify related stream. */ - if (bcg_fill_count(bc) == 0) { - tx_cage_release(bc); - dev->tx.bc = NULL; - } + /* free empty drbs and notify related streams. */ + + for (i = 0; i != k; i = j) { + s = drb[i]->udata; + for (j = i + 1; j != k && s == drb[i]->udata; j++) + ; + stream_drb_release(s, drb + i, j - i); } - return i; + return n; } static int @@ -359,30 +338,41 @@ compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero) * helper function, do the necessary pre-processing for the received packets * before handiing them to the strem_recv caller. */ -static inline struct rte_mbuf * -recv_pkt_process(struct rte_mbuf *m, uint32_t type) +static inline uint32_t +recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type) { - uint64_t f; - - f = m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD); - if (f != 0) { - if (check_pkt_csum(m, type) == 0) - m->ol_flags ^= f; - else { - rte_pktmbuf_free(m); - return NULL; + uint32_t i, k; + uint64_t f, flg[num], ofl[num]; + + for (i = 0; i != num; i++) { + flg[i] = m[i]->ol_flags; + ofl[i] = m[i]->tx_offload; + } + + k = 0; + for (i = 0; i != num; i++) { + + f = flg[i] & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD); + + /* drop packets with invalid cksum(s). */ + if (f != 0 && check_pkt_csum(m[i], type) != 0) { + rte_pktmbuf_free(m[i]); + m[i] = NULL; + k++; } + + m[i]->ol_flags ^= f; + rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i])); } - rte_pktmbuf_adj(m, m->l2_len + m->l3_len + m->l4_len); - return m; + return k; } uint16_t tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[], uint16_t num) { - uint32_t i, k, n; + uint32_t k, n; n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num); if (n == 0) @@ -398,28 +388,7 @@ tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[], rwl_release(&s->rx.use); } - k = 0; - for (i = 0; i != RTE_ALIGN_FLOOR(n, 4); i += 4) { - pkt[i] = recv_pkt_process(pkt[i], s->type); - pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type); - pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type); - pkt[i + 3] = recv_pkt_process(pkt[i + 3], s->type); - k += (pkt[i] == NULL) + (pkt[i + 1] == NULL) + - (pkt[i + 2] == NULL) + (pkt[i + 3] == NULL); - } - - switch (n % 4) { - case 3: - pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type); - k += (pkt[i + 2] == NULL); - case 2: - pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type); - k += (pkt[i + 1] == NULL); - case 1: - pkt[i] = recv_pkt_process(pkt[i], s->type); - k += (pkt[i] == NULL); - } - + k = recv_pkt_process(pkt, n, s->type); return compress_pkt_list(pkt, n, k); } @@ -586,65 +555,59 @@ fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num, return frag_num; } +static inline void +stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[], + uint32_t nb_drb) +{ + rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb); +} + +static inline uint32_t +stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[], + uint32_t nb_drb) +{ + return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb); +} + /* enqueue up to num packets to the destination device queue. */ static inline uint16_t -queue_pkt_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di, - const void *pkt[], uint16_t num) +queue_pkt_out(struct tle_udp_stream *s, struct tle_udp_dev *dev, + const void *pkt[], uint16_t nb_pkt, + struct tle_drb *drbs[], uint32_t *nb_drb) { - struct buf_cage *bc; - uint32_t i, n; + uint32_t bsz, i, n, nb, nbc, nbm; - rte_spinlock_lock(&s->tx.lock); - bc = s->tx.cg[di]; + bsz = s->tx.drb.nb_elem; - for (i = 0; i != num; i += n) { - if (bc == NULL) { - bc = bcg_alloc(s->tx.st); - if (bc == NULL) - break; - n = bcg_put(bc, pkt + i, num - i); - bcg_enqueue_tail(bq, bc); - } else - n = bcg_put(bc, pkt + i, num - i); + /* calulate how many drbs are needed.*/ + nbc = *nb_drb; + nbm = (nb_pkt + bsz - 1) / bsz; + nb = RTE_MAX(nbm, nbc) - nbc; - if (n != num - i) - bc = NULL; - } + /* allocate required drbs */ + if (nb != 0) + nb = stream_drb_alloc(s, drbs + nbc, nb); - s->tx.cg[di] = bc; - rte_spinlock_unlock(&s->tx.lock); - return i; -} + nb += nbc; -/* - * etiher enqueue all num packets or none. - * assumes that all number of input packets not exceed size of buf_cage. - */ -static inline uint16_t -queue_frg_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di, - const void *pkt[], uint16_t num) -{ - struct buf_cage *bc, *bcp; - uint32_t n; + /* no free drbs, can't send anything */ + if (nb == 0) + return 0; - rte_spinlock_lock(&s->tx.lock); - bc = s->tx.cg[di]; + /* not enough free drbs, reduce number of packets to send. */ + else if (nb != nbm) + nb_pkt = nb * bsz; - n = 0; - if (bc == NULL || bcg_free_count(bc) < num) { - bcp = bc; - bc = bcg_alloc(s->tx.st); - if (bc != NULL) { - if (bcp != NULL) - n = bcg_put(bcp, pkt, num); - n += bcg_put(bc, pkt, num - n); - bcg_enqueue_tail(bq, bc); - } - } else - n = bcg_put(bc, pkt, num); + /* enqueue packets to the destination device. */ + nbc = nb; + n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb); - s->tx.cg[di] = bc; - rte_spinlock_unlock(&s->tx.lock); + /* if not all available drbs were consumed, move them to the start. */ + nbc -= nb; + for (i = 0; i != nb; i++) + drbs[i] = drbs[nbc + i]; + + *nb_drb = nb; return n; } @@ -654,13 +617,14 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], { int32_t di, frg, rc; uint64_t ol_flags; - uint32_t i, k, n; + uint32_t i, k, n, nb; uint32_t mtu, pid, type; const struct sockaddr_in *d4; const struct sockaddr_in6 *d6; const void *da; union udph udph; struct tle_udp_dest dst; + struct tle_drb *drb[num]; type = s->type; @@ -704,6 +668,7 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], if (rwl_acquire(&s->tx.use) < 0) return 0; + nb = 0; for (i = 0, k = 0; k != num; k = i) { /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */ @@ -726,8 +691,9 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], /* enqueue non-fragment packets to the destination device. */ if (k != i) { - k += queue_pkt_out(s, &dst.dev->tx.feq, di, - (const void **)(uintptr_t)&pkt[k], i - k); + k += queue_pkt_out(s, dst.dev, + (const void **)(uintptr_t)&pkt[k], i - k, + drb, &nb); /* stream TX queue is full. */ if (k != i) @@ -746,8 +712,8 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], break; } - n = queue_frg_out(s, &dst.dev->tx.feq, di, - (const void **)(uintptr_t)frag, rc); + n = queue_pkt_out(s, dst.dev, + (const void **)(uintptr_t)frag, rc, drb, &nb); if (n == 0) { while (rc-- != 0) rte_pktmbuf_free(frag[rc]); @@ -765,6 +731,11 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], tle_event_raise(s->tx.ev); out: + /* free unused drbs. */ + if (nb != 0) + stream_drb_free(s, drb, nb); + + /* stream can be closed. */ rwl_release(&s->tx.use); /* |