aboutsummaryrefslogtreecommitdiffstats
path: root/lib/libtle_udp/udp_rxtx.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libtle_udp/udp_rxtx.c')
-rw-r--r--lib/libtle_udp/udp_rxtx.c767
1 files changed, 767 insertions, 0 deletions
diff --git a/lib/libtle_udp/udp_rxtx.c b/lib/libtle_udp/udp_rxtx.c
new file mode 100644
index 0000000..d5d248e
--- /dev/null
+++ b/lib/libtle_udp/udp_rxtx.c
@@ -0,0 +1,767 @@
+
+#include <rte_malloc.h>
+#include <rte_errno.h>
+#include <rte_ethdev.h>
+#include <rte_ip.h>
+#include <rte_ip_frag.h>
+#include <rte_udp.h>
+
+#include "udp_impl.h"
+#include "misc.h"
+
+static inline struct tle_udp_stream *
+rx_stream_obtain(struct tle_udp_dev *dev, uint32_t type, uint32_t port)
+{
+ struct tle_udp_stream *s;
+
+ if (type >= TLE_UDP_VNUM || dev->dp[type] == NULL)
+ return NULL;
+
+ s = dev->dp[type]->streams[port];
+ if (s == NULL)
+ return NULL;
+
+ if (rwl_acquire(&s->rx.use) < 0)
+ return NULL;
+
+ return s;
+}
+
+static inline uint16_t
+get_pkt_type(const struct rte_mbuf *m)
+{
+ uint32_t v;
+
+ v = m->packet_type &
+ (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK);
+ if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_UDP))
+ return TLE_UDP_V4;
+ else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP))
+ return TLE_UDP_V6;
+ else
+ return TLE_UDP_VNUM;
+}
+
+static inline union udp_ports
+pkt_info(const struct tle_udp_dev *dev, struct rte_mbuf *m,
+ union udp_ports *ports, union ipv4_addrs *addr4,
+ union ipv6_addrs **addr6)
+{
+ uint32_t len;
+ union udp_ports ret, *up;
+ union ipv4_addrs *pa4;
+
+ ret.src = get_pkt_type(m);
+
+ len = m->l2_len;
+ if (ret.src == TLE_UDP_V4) {
+ pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *,
+ len + offsetof(struct ipv4_hdr, src_addr));
+ addr4->raw = pa4->raw;
+ m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V4];
+ } else if (ret.src == TLE_UDP_V6) {
+ *addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *,
+ len + offsetof(struct ipv6_hdr, src_addr));
+ m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V6];
+ }
+
+ len += m->l3_len;
+ up = rte_pktmbuf_mtod_offset(m, union udp_ports *,
+ len + offsetof(struct udp_hdr, src_port));
+ ports->raw = up->raw;
+ ret.dst = ports->dst;
+ return ret;
+}
+
+/*
+ * Helper routine, enqueues packets to the stream and calls RX
+ * notification callback, if needed.
+ */
+static inline uint16_t
+rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[],
+ int32_t rc[], uint32_t num)
+{
+ uint32_t i, k, r;
+
+ r = rte_ring_enqueue_burst(s->rx.q, mb, num);
+
+ /* if RX queue was empty invoke user RX notification callback. */
+ if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r)
+ s->rx.cb.func(s->rx.cb.data, s);
+
+ for (i = r, k = 0; i != num; i++, k++) {
+ rc[k] = ENOBUFS;
+ rp[k] = mb[i];
+ }
+
+ return r;
+}
+
+static inline uint16_t
+rx_stream6(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
+ union ipv6_addrs *addr[], union udp_ports port[],
+ struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
+{
+ uint32_t i, k, n;
+ void *mb[num];
+
+ k = 0;
+ n = 0;
+
+ for (i = 0; i != num; i++) {
+
+ if ((port[i].raw & s->pmsk.raw) != s->port.raw ||
+ ymm_mask_cmp(&addr[i]->raw, &s->ipv6.addr.raw,
+ &s->ipv6.mask.raw) != 0) {
+ rc[k] = ENOENT;
+ rp[k] = pkt[i];
+ k++;
+ } else {
+ mb[n] = pkt[i];
+ n++;
+ }
+ }
+
+ return rx_stream(s, mb, rp + k, rc + k, n);
+}
+
+static inline uint16_t
+rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
+ union ipv4_addrs addr[], union udp_ports port[],
+ struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
+{
+ uint32_t i, k, n;
+ void *mb[num];
+
+ k = 0;
+ n = 0;
+
+ for (i = 0; i != num; i++) {
+
+ if ((addr[i].raw & s->ipv4.mask.raw) != s->ipv4.addr.raw ||
+ (port[i].raw & s->pmsk.raw) !=
+ s->port.raw) {
+ rc[k] = ENOENT;
+ rp[k] = pkt[i];
+ k++;
+ } else {
+ mb[n] = pkt[i];
+ n++;
+ }
+ }
+
+ return rx_stream(s, mb, rp + k, rc + k, n);
+}
+
+uint16_t
+tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[],
+ struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
+{
+ struct tle_udp_stream *s;
+ uint32_t i, j, k, n, p, t;
+ union udp_ports tp[num], port[num];
+ union ipv4_addrs a4[num];
+ union ipv6_addrs *pa6[num];
+
+ for (i = 0; i != num; i++)
+ tp[i] = pkt_info(dev, pkt[i], &port[i], &a4[i], &pa6[i]);
+
+ k = 0;
+ for (i = 0; i != num; i = j) {
+
+ for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++)
+ ;
+
+ t = tp[i].src;
+ p = tp[i].dst;
+ s = rx_stream_obtain(dev, t, p);
+ if (s != NULL) {
+
+ if (t == TLE_UDP_V4)
+ n = rx_stream4(s, pkt + i, a4 + i,
+ port + i, rp + k, rc + k, j - i);
+ else
+ n = rx_stream6(s, pkt + i, pa6 + i, port + i,
+ rp + k, rc + k, j - i);
+
+ k += j - i - n;
+
+ if (s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ rwl_release(&s->rx.use);
+
+ } else {
+ for (; i != j; i++) {
+ rc[k] = ENOENT;
+ rp[k] = pkt[i];
+ k++;
+ }
+ }
+ }
+
+ return num - k;
+}
+
+static inline void
+tx_cage_release(struct buf_cage *bc)
+{
+ struct tle_udp_stream *s;
+ uint32_t n;
+
+ s = bcg_get_udata(bc);
+ n = bcg_free(bc);
+
+ /* If stream is still open, then mark it as avaialble for writing. */
+ if (rwl_try_acquire(&s->tx.use) > 0) {
+
+ if (s->tx.ev != NULL)
+ tle_event_raise(s->tx.ev);
+
+ /* if stream send buffer was full invoke TX callback */
+ else if (s->tx.cb.func != NULL && n == 1)
+ s->tx.cb.func(s->tx.cb.data, s);
+
+ }
+
+ rwl_release(&s->tx.use);
+}
+
+static inline void
+tx_cage_update(struct tle_udp_dev *dev, struct buf_cage *bc)
+{
+ struct tle_udp_stream *s;
+ struct tle_udp_ctx *ctx;
+ uint32_t idx;
+
+ ctx = dev->ctx;
+ s = bcg_get_udata(bc);
+ idx = dev - ctx->dev;
+
+ /* mark cage as closed to the stream. */
+ rte_spinlock_lock(&s->tx.lock);
+ if (bc == s->tx.cg[idx])
+ s->tx.cg[idx] = NULL;
+ rte_spinlock_unlock(&s->tx.lock);
+}
+
+uint16_t
+tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
+{
+ struct buf_cage *bc;
+ uint32_t i, n;
+
+ for (i = 0; i != num; i += n) {
+
+ bc = dev->tx.bc;
+ if (bc == NULL) {
+ if (dev->tx.beq.num == 0)
+ bcg_queue_append(&dev->tx.beq, &dev->tx.feq);
+ bc = __bcg_dequeue_head(&dev->tx.beq);
+ if (bc == NULL)
+ break;
+ tx_cage_update(dev, bc);
+ dev->tx.bc = bc;
+ }
+
+ n = bcg_get(bc, (const void **)(uintptr_t)&pkt[i], num - i);
+
+ /* cage is empty, need to free it and notify related stream. */
+ if (bcg_fill_count(bc) == 0) {
+ tx_cage_release(bc);
+ dev->tx.bc = NULL;
+ }
+ }
+
+ return i;
+}
+
+static int
+check_pkt_csum(const struct rte_mbuf *m, uint32_t type)
+{
+ const struct ipv4_hdr *l3h4;
+ const struct ipv6_hdr *l3h6;
+ const struct udp_hdr *l4h;
+ int32_t ret;
+ uint16_t csum;
+
+ ret = 0;
+ l3h4 = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, m->l2_len);
+ l3h6 = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, m->l2_len);
+
+ if ((m->ol_flags & PKT_RX_IP_CKSUM_BAD) != 0) {
+ csum = _ipv4x_cksum(l3h4, m->l3_len);
+ ret = (csum != UINT16_MAX);
+ }
+
+ if (ret == 0 && (m->ol_flags & PKT_RX_L4_CKSUM_BAD) != 0) {
+
+ /*
+ * for IPv4 it is allowed to have zero UDP cksum,
+ * for IPv6 valid UDP cksum is mandatory.
+ */
+ if (type == TLE_UDP_V4) {
+ l4h = (const struct udp_hdr *)((uintptr_t)l3h4 +
+ m->l3_len);
+ csum = (l4h->dgram_cksum == 0) ? UINT16_MAX :
+ _ipv4_udptcp_mbuf_cksum(m,
+ m->l2_len + m->l3_len, l3h4);
+ } else
+ csum = _ipv6_udptcp_mbuf_cksum(m,
+ m->l2_len + m->l3_len, l3h6);
+
+ ret = (csum != UINT16_MAX);
+ }
+
+ return ret;
+}
+
+/* exclude NULLs from the final list of packets. */
+static inline uint32_t
+compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero)
+{
+ uint32_t i, j, k, l;
+
+ for (j = nb_pkt; nb_zero != 0 && j-- != 0; ) {
+
+ /* found a hole. */
+ if (pkt[j] == NULL) {
+
+ /* find how big is it. */
+ for (i = j; i-- != 0 && pkt[i] == NULL; )
+ ;
+ /* fill the hole. */
+ for (k = j + 1, l = i + 1; k != nb_pkt; k++, l++)
+ pkt[l] = pkt[k];
+
+ nb_pkt -= j - i;
+ nb_zero -= j - i;
+ }
+ }
+
+ return nb_pkt;
+}
+
+/*
+ * helper function, do the necessary pre-processing for the received packets
+ * before handiing them to the strem_recv caller.
+ */
+static inline struct rte_mbuf *
+recv_pkt_process(struct rte_mbuf *m, uint32_t type)
+{
+ uint64_t f;
+
+ f = m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
+ if (f != 0) {
+ if (check_pkt_csum(m, type) == 0)
+ m->ol_flags ^= f;
+ else {
+ rte_pktmbuf_free(m);
+ return NULL;
+ }
+ }
+
+ rte_pktmbuf_adj(m, m->l2_len + m->l3_len + m->l4_len);
+ return m;
+}
+
+uint16_t
+tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
+ uint16_t num)
+{
+ uint32_t i, k, n;
+
+ n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
+ if (n == 0)
+ return 0;
+
+ /*
+ * if we still have packets to read,
+ * then rearm stream RX event.
+ */
+ if (n == num && rte_ring_count(s->rx.q) != 0) {
+ if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ rwl_release(&s->rx.use);
+ }
+
+ k = 0;
+ for (i = 0; i != RTE_ALIGN_FLOOR(n, 4); i += 4) {
+ pkt[i] = recv_pkt_process(pkt[i], s->type);
+ pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type);
+ pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type);
+ pkt[i + 3] = recv_pkt_process(pkt[i + 3], s->type);
+ k += (pkt[i] == NULL) + (pkt[i + 1] == NULL) +
+ (pkt[i + 2] == NULL) + (pkt[i + 3] == NULL);
+ }
+
+ switch (n % 4) {
+ case 3:
+ pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type);
+ k += (pkt[i + 2] == NULL);
+ case 2:
+ pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type);
+ k += (pkt[i + 1] == NULL);
+ case 1:
+ pkt[i] = recv_pkt_process(pkt[i], s->type);
+ k += (pkt[i] == NULL);
+ }
+
+ return compress_pkt_list(pkt, n, k);
+}
+
+static int32_t
+udp_get_dest(struct tle_udp_stream *s, const void *dst_addr,
+ struct tle_udp_dest *dst)
+{
+ int32_t rc;
+ const struct in_addr *d4;
+ const struct in6_addr *d6;
+ struct tle_udp_ctx *ctx;
+ struct tle_udp_dev *dev;
+
+ ctx = s->ctx;
+
+ /* it is here just to keep gcc happy. */
+ d4 = NULL;
+
+ if (s->type == TLE_UDP_V4) {
+ d4 = dst_addr;
+ rc = ctx->prm.lookup4(ctx->prm.lookup4_data, d4, dst);
+ } else if (s->type == TLE_UDP_V6) {
+ d6 = dst_addr;
+ rc = ctx->prm.lookup6(ctx->prm.lookup6_data, d6, dst);
+ } else
+ rc = -ENOENT;
+
+ if (rc < 0 || dst->dev == NULL || dst->dev->ctx != ctx)
+ return -ENOENT;
+
+ dev = dst->dev;
+ if (s->type == TLE_UDP_V4) {
+ struct ipv4_hdr *l3h;
+ l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len);
+ l3h->src_addr = dev->prm.local_addr4.s_addr;
+ l3h->dst_addr = d4->s_addr;
+ } else {
+ struct ipv6_hdr *l3h;
+ l3h = (struct ipv6_hdr *)(dst->hdr + dst->l2_len);
+ rte_memcpy(l3h->src_addr, &dev->prm.local_addr6,
+ sizeof(l3h->src_addr));
+ rte_memcpy(l3h->dst_addr, d6, sizeof(l3h->dst_addr));
+ }
+
+ return dev - ctx->dev;
+}
+
+static inline int
+udp_fill_mbuf(struct rte_mbuf *m,
+ uint32_t type, uint64_t ol_flags, uint32_t pid,
+ union udph udph, const struct tle_udp_dest *dst)
+{
+ uint32_t len, plen;
+ char *l2h;
+ union udph *l4h;
+
+ len = dst->l2_len + dst->l3_len;
+ plen = m->pkt_len;
+
+ /* copy to mbuf L2/L3 header template. */
+
+ l2h = rte_pktmbuf_prepend(m, len + sizeof(*l4h));
+ if (l2h == NULL)
+ return -ENOBUFS;
+
+ /* copy L2/L3 header */
+ rte_memcpy(l2h, dst->hdr, len);
+
+ /* copy UDP header */
+ l4h = (union udph *)(l2h + len);
+ l4h->raw = udph.raw;
+
+ /* setup mbuf TX offload related fields. */
+ m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len,
+ sizeof(*l4h), 0, 0, 0);
+ m->ol_flags |= ol_flags;
+
+ l4h->len = rte_cpu_to_be_16(plen + sizeof(*l4h));
+
+ /* update proto specific fields. */
+
+ if (type == TLE_UDP_V4) {
+ struct ipv4_hdr *l3h;
+ l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
+ l3h->packet_id = rte_cpu_to_be_16(pid);
+ l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len +
+ sizeof(*l4h));
+
+ if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
+ l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
+ ol_flags);
+ else
+ l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
+
+ if ((ol_flags & PKT_TX_IP_CKSUM) == 0)
+ l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
+ } else {
+ struct ipv6_hdr *l3h;
+ l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
+ l3h->payload_len = rte_cpu_to_be_16(plen + sizeof(*l4h));
+ if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
+ l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
+ else
+ l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
+ }
+
+ return 0;
+}
+
+/* ???
+ * probably this function should be there -
+ * rte_ipv[4,6]_fragment_packet should do that.
+ */
+static inline void
+frag_fixup(const struct rte_mbuf *ms, struct rte_mbuf *mf, uint32_t type)
+{
+ struct ipv4_hdr *l3h;
+
+ mf->ol_flags = ms->ol_flags;
+ mf->tx_offload = ms->tx_offload;
+
+ if (type == TLE_UDP_V4 && (ms->ol_flags & PKT_TX_IP_CKSUM) == 0) {
+ l3h = rte_pktmbuf_mtod(mf, struct ipv4_hdr *);
+ l3h->hdr_checksum = _ipv4x_cksum(l3h, mf->l3_len);
+ }
+}
+
+/*
+ * Returns negative for failure to fragment or actual number of fragments.
+ */
+static inline int
+fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
+ uint32_t type, const struct tle_udp_dest *dst)
+{
+ int32_t frag_num, i;
+ uint16_t mtu;
+ void *eth_hdr;
+
+ /* Remove the Ethernet header from the input packet */
+ rte_pktmbuf_adj(pkt, dst->l2_len);
+ mtu = dst->mtu - dst->l2_len;
+
+ /* fragment packet */
+ if (type == TLE_UDP_V4)
+ frag_num = rte_ipv4_fragment_packet(pkt, frag, num, mtu,
+ dst->head_mp, dst->head_mp);
+ else
+ frag_num = rte_ipv6_fragment_packet(pkt, frag, num, mtu,
+ dst->head_mp, dst->head_mp);
+
+ if (frag_num > 0) {
+ for (i = 0; i != frag_num; i++) {
+
+ frag_fixup(pkt, frag[i], type);
+
+ /* Move data_off to include l2 header first */
+ eth_hdr = rte_pktmbuf_prepend(frag[i], dst->l2_len);
+
+ /* copy l2 header into fragment */
+ rte_memcpy(eth_hdr, dst->hdr, dst->l2_len);
+ }
+ }
+
+ return frag_num;
+}
+
+/* enqueue up to num packets to the destination device queue. */
+static inline uint16_t
+queue_pkt_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di,
+ const void *pkt[], uint16_t num)
+{
+ struct buf_cage *bc;
+ uint32_t i, n;
+
+ rte_spinlock_lock(&s->tx.lock);
+ bc = s->tx.cg[di];
+
+ for (i = 0; i != num; i += n) {
+ if (bc == NULL) {
+ bc = bcg_alloc(s->tx.st);
+ if (bc == NULL)
+ break;
+ n = bcg_put(bc, pkt + i, num - i);
+ bcg_enqueue_tail(bq, bc);
+ } else
+ n = bcg_put(bc, pkt + i, num - i);
+
+ if (n != num - i)
+ bc = NULL;
+ }
+
+ s->tx.cg[di] = bc;
+ rte_spinlock_unlock(&s->tx.lock);
+ return i;
+}
+
+/*
+ * etiher enqueue all num packets or none.
+ * assumes that all number of input packets not exceed size of buf_cage.
+ */
+static inline uint16_t
+queue_frg_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di,
+ const void *pkt[], uint16_t num)
+{
+ struct buf_cage *bc, *bcp;
+ uint32_t n;
+
+ rte_spinlock_lock(&s->tx.lock);
+ bc = s->tx.cg[di];
+
+ n = 0;
+ if (bc == NULL || bcg_free_count(bc) < num) {
+ bcp = bc;
+ bc = bcg_alloc(s->tx.st);
+ if (bc != NULL) {
+ if (bcp != NULL)
+ n = bcg_put(bcp, pkt, num);
+ n += bcg_put(bc, pkt, num - n);
+ bcg_enqueue_tail(bq, bc);
+ }
+ } else
+ n = bcg_put(bc, pkt, num);
+
+ s->tx.cg[di] = bc;
+ rte_spinlock_unlock(&s->tx.lock);
+ return n;
+}
+
+uint16_t
+tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
+ uint16_t num, const struct sockaddr *dst_addr)
+{
+ int32_t di, frg, rc;
+ uint64_t ol_flags;
+ uint32_t i, k, n;
+ uint32_t mtu, pid, type;
+ const struct sockaddr_in *d4;
+ const struct sockaddr_in6 *d6;
+ const void *da;
+ union udph udph;
+ struct tle_udp_dest dst;
+
+ type = s->type;
+
+ /* start filling UDP header. */
+ udph.raw = 0;
+ udph.ports.src = s->port.dst;
+
+ /* figure out what destination addr/port to use. */
+ if (dst_addr != NULL) {
+ if (dst_addr->sa_family != s->prm.remote_addr.ss_family) {
+ rte_errno = EINVAL;
+ return 0;
+ }
+ if (type == TLE_UDP_V4) {
+ d4 = (const struct sockaddr_in *)dst_addr;
+ da = &d4->sin_addr;
+ udph.ports.dst = d4->sin_port;
+ } else {
+ d6 = (const struct sockaddr_in6 *)dst_addr;
+ da = &d6->sin6_addr;
+ udph.ports.dst = d6->sin6_port;
+ }
+ } else {
+ udph.ports.dst = s->port.src;
+ if (type == TLE_UDP_V4)
+ da = &s->ipv4.addr.src;
+ else
+ da = &s->ipv6.addr.src;
+ }
+
+ di = udp_get_dest(s, da, &dst);
+ if (di < 0) {
+ rte_errno = -di;
+ return 0;
+ }
+
+ pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num;
+ mtu = dst.mtu - dst.l2_len - dst.l3_len;
+
+ /* mark stream as not closable. */
+ if (rwl_acquire(&s->tx.use) < 0)
+ return 0;
+
+ for (i = 0, k = 0; k != num; k = i) {
+
+ /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */
+
+ frg = 0;
+ ol_flags = dst.dev->tx.ol_flags[type];
+
+ while (i != num && frg == 0) {
+ frg = pkt[i]->pkt_len > mtu;
+ if (frg != 0)
+ ol_flags &= ~PKT_TX_UDP_CKSUM;
+ rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i,
+ udph, &dst);
+ if (rc != 0) {
+ rte_errno = -rc;
+ goto out;
+ }
+ i += (frg == 0);
+ }
+
+ /* enqueue non-fragment packets to the destination device. */
+ if (k != i) {
+ k += queue_pkt_out(s, &dst.dev->tx.feq, di,
+ (const void **)(uintptr_t)&pkt[k], i - k);
+
+ /* stream TX queue is full. */
+ if (k != i)
+ break;
+ }
+
+ /* enqueue packet that need to be fragmented */
+ if (i != num) {
+
+ struct rte_mbuf *frag[RTE_LIBRTE_IP_FRAG_MAX_FRAG];
+
+ /* fragment the packet. */
+ rc = fragment(pkt[i], frag, RTE_DIM(frag), type, &dst);
+ if (rc < 0) {
+ rte_errno = -rc;
+ break;
+ }
+
+ n = queue_frg_out(s, &dst.dev->tx.feq, di,
+ (const void **)(uintptr_t)frag, rc);
+ if (n == 0) {
+ while (rc-- != 0)
+ rte_pktmbuf_free(frag[rc]);
+ break;
+ }
+
+ /* all fragments enqueued, free the original packet. */
+ rte_pktmbuf_free(pkt[i]);
+ i++;
+ }
+ }
+
+ /* if possible, rearm socket write event. */
+ if (k == num && s->tx.ev != NULL)
+ tle_event_raise(s->tx.ev);
+
+out:
+ rwl_release(&s->tx.use);
+
+ /*
+ * remove pkt l2/l3 headers, restore ol_flags for unsent, but
+ * already modified packets.
+ */
+ ol_flags = ~dst.dev->tx.ol_flags[type];
+ for (n = k; n != i; n++) {
+ rte_pktmbuf_adj(pkt[n], dst.l2_len + dst.l3_len + sizeof(udph));
+ pkt[n]->ol_flags &= ol_flags;
+ }
+
+ return k;
+}