diff options
Diffstat (limited to 'lib/libtle_udp/udp_rxtx.c')
-rw-r--r-- | lib/libtle_udp/udp_rxtx.c | 767 |
1 files changed, 767 insertions, 0 deletions
diff --git a/lib/libtle_udp/udp_rxtx.c b/lib/libtle_udp/udp_rxtx.c new file mode 100644 index 0000000..d5d248e --- /dev/null +++ b/lib/libtle_udp/udp_rxtx.c @@ -0,0 +1,767 @@ + +#include <rte_malloc.h> +#include <rte_errno.h> +#include <rte_ethdev.h> +#include <rte_ip.h> +#include <rte_ip_frag.h> +#include <rte_udp.h> + +#include "udp_impl.h" +#include "misc.h" + +static inline struct tle_udp_stream * +rx_stream_obtain(struct tle_udp_dev *dev, uint32_t type, uint32_t port) +{ + struct tle_udp_stream *s; + + if (type >= TLE_UDP_VNUM || dev->dp[type] == NULL) + return NULL; + + s = dev->dp[type]->streams[port]; + if (s == NULL) + return NULL; + + if (rwl_acquire(&s->rx.use) < 0) + return NULL; + + return s; +} + +static inline uint16_t +get_pkt_type(const struct rte_mbuf *m) +{ + uint32_t v; + + v = m->packet_type & + (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK); + if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_UDP)) + return TLE_UDP_V4; + else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP)) + return TLE_UDP_V6; + else + return TLE_UDP_VNUM; +} + +static inline union udp_ports +pkt_info(const struct tle_udp_dev *dev, struct rte_mbuf *m, + union udp_ports *ports, union ipv4_addrs *addr4, + union ipv6_addrs **addr6) +{ + uint32_t len; + union udp_ports ret, *up; + union ipv4_addrs *pa4; + + ret.src = get_pkt_type(m); + + len = m->l2_len; + if (ret.src == TLE_UDP_V4) { + pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *, + len + offsetof(struct ipv4_hdr, src_addr)); + addr4->raw = pa4->raw; + m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V4]; + } else if (ret.src == TLE_UDP_V6) { + *addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *, + len + offsetof(struct ipv6_hdr, src_addr)); + m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V6]; + } + + len += m->l3_len; + up = rte_pktmbuf_mtod_offset(m, union udp_ports *, + len + offsetof(struct udp_hdr, src_port)); + ports->raw = up->raw; + ret.dst = ports->dst; + return ret; +} + +/* + * Helper routine, enqueues packets to the stream and calls RX + * notification callback, if needed. + */ +static inline uint16_t +rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[], + int32_t rc[], uint32_t num) +{ + uint32_t i, k, r; + + r = rte_ring_enqueue_burst(s->rx.q, mb, num); + + /* if RX queue was empty invoke user RX notification callback. */ + if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r) + s->rx.cb.func(s->rx.cb.data, s); + + for (i = r, k = 0; i != num; i++, k++) { + rc[k] = ENOBUFS; + rp[k] = mb[i]; + } + + return r; +} + +static inline uint16_t +rx_stream6(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + union ipv6_addrs *addr[], union udp_ports port[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + uint32_t i, k, n; + void *mb[num]; + + k = 0; + n = 0; + + for (i = 0; i != num; i++) { + + if ((port[i].raw & s->pmsk.raw) != s->port.raw || + ymm_mask_cmp(&addr[i]->raw, &s->ipv6.addr.raw, + &s->ipv6.mask.raw) != 0) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } else { + mb[n] = pkt[i]; + n++; + } + } + + return rx_stream(s, mb, rp + k, rc + k, n); +} + +static inline uint16_t +rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + union ipv4_addrs addr[], union udp_ports port[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + uint32_t i, k, n; + void *mb[num]; + + k = 0; + n = 0; + + for (i = 0; i != num; i++) { + + if ((addr[i].raw & s->ipv4.mask.raw) != s->ipv4.addr.raw || + (port[i].raw & s->pmsk.raw) != + s->port.raw) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } else { + mb[n] = pkt[i]; + n++; + } + } + + return rx_stream(s, mb, rp + k, rc + k, n); +} + +uint16_t +tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + struct tle_udp_stream *s; + uint32_t i, j, k, n, p, t; + union udp_ports tp[num], port[num]; + union ipv4_addrs a4[num]; + union ipv6_addrs *pa6[num]; + + for (i = 0; i != num; i++) + tp[i] = pkt_info(dev, pkt[i], &port[i], &a4[i], &pa6[i]); + + k = 0; + for (i = 0; i != num; i = j) { + + for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++) + ; + + t = tp[i].src; + p = tp[i].dst; + s = rx_stream_obtain(dev, t, p); + if (s != NULL) { + + if (t == TLE_UDP_V4) + n = rx_stream4(s, pkt + i, a4 + i, + port + i, rp + k, rc + k, j - i); + else + n = rx_stream6(s, pkt + i, pa6 + i, port + i, + rp + k, rc + k, j - i); + + k += j - i - n; + + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + rwl_release(&s->rx.use); + + } else { + for (; i != j; i++) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } + } + } + + return num - k; +} + +static inline void +tx_cage_release(struct buf_cage *bc) +{ + struct tle_udp_stream *s; + uint32_t n; + + s = bcg_get_udata(bc); + n = bcg_free(bc); + + /* If stream is still open, then mark it as avaialble for writing. */ + if (rwl_try_acquire(&s->tx.use) > 0) { + + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + + /* if stream send buffer was full invoke TX callback */ + else if (s->tx.cb.func != NULL && n == 1) + s->tx.cb.func(s->tx.cb.data, s); + + } + + rwl_release(&s->tx.use); +} + +static inline void +tx_cage_update(struct tle_udp_dev *dev, struct buf_cage *bc) +{ + struct tle_udp_stream *s; + struct tle_udp_ctx *ctx; + uint32_t idx; + + ctx = dev->ctx; + s = bcg_get_udata(bc); + idx = dev - ctx->dev; + + /* mark cage as closed to the stream. */ + rte_spinlock_lock(&s->tx.lock); + if (bc == s->tx.cg[idx]) + s->tx.cg[idx] = NULL; + rte_spinlock_unlock(&s->tx.lock); +} + +uint16_t +tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num) +{ + struct buf_cage *bc; + uint32_t i, n; + + for (i = 0; i != num; i += n) { + + bc = dev->tx.bc; + if (bc == NULL) { + if (dev->tx.beq.num == 0) + bcg_queue_append(&dev->tx.beq, &dev->tx.feq); + bc = __bcg_dequeue_head(&dev->tx.beq); + if (bc == NULL) + break; + tx_cage_update(dev, bc); + dev->tx.bc = bc; + } + + n = bcg_get(bc, (const void **)(uintptr_t)&pkt[i], num - i); + + /* cage is empty, need to free it and notify related stream. */ + if (bcg_fill_count(bc) == 0) { + tx_cage_release(bc); + dev->tx.bc = NULL; + } + } + + return i; +} + +static int +check_pkt_csum(const struct rte_mbuf *m, uint32_t type) +{ + const struct ipv4_hdr *l3h4; + const struct ipv6_hdr *l3h6; + const struct udp_hdr *l4h; + int32_t ret; + uint16_t csum; + + ret = 0; + l3h4 = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, m->l2_len); + l3h6 = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, m->l2_len); + + if ((m->ol_flags & PKT_RX_IP_CKSUM_BAD) != 0) { + csum = _ipv4x_cksum(l3h4, m->l3_len); + ret = (csum != UINT16_MAX); + } + + if (ret == 0 && (m->ol_flags & PKT_RX_L4_CKSUM_BAD) != 0) { + + /* + * for IPv4 it is allowed to have zero UDP cksum, + * for IPv6 valid UDP cksum is mandatory. + */ + if (type == TLE_UDP_V4) { + l4h = (const struct udp_hdr *)((uintptr_t)l3h4 + + m->l3_len); + csum = (l4h->dgram_cksum == 0) ? UINT16_MAX : + _ipv4_udptcp_mbuf_cksum(m, + m->l2_len + m->l3_len, l3h4); + } else + csum = _ipv6_udptcp_mbuf_cksum(m, + m->l2_len + m->l3_len, l3h6); + + ret = (csum != UINT16_MAX); + } + + return ret; +} + +/* exclude NULLs from the final list of packets. */ +static inline uint32_t +compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero) +{ + uint32_t i, j, k, l; + + for (j = nb_pkt; nb_zero != 0 && j-- != 0; ) { + + /* found a hole. */ + if (pkt[j] == NULL) { + + /* find how big is it. */ + for (i = j; i-- != 0 && pkt[i] == NULL; ) + ; + /* fill the hole. */ + for (k = j + 1, l = i + 1; k != nb_pkt; k++, l++) + pkt[l] = pkt[k]; + + nb_pkt -= j - i; + nb_zero -= j - i; + } + } + + return nb_pkt; +} + +/* + * helper function, do the necessary pre-processing for the received packets + * before handiing them to the strem_recv caller. + */ +static inline struct rte_mbuf * +recv_pkt_process(struct rte_mbuf *m, uint32_t type) +{ + uint64_t f; + + f = m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD); + if (f != 0) { + if (check_pkt_csum(m, type) == 0) + m->ol_flags ^= f; + else { + rte_pktmbuf_free(m); + return NULL; + } + } + + rte_pktmbuf_adj(m, m->l2_len + m->l3_len + m->l4_len); + return m; +} + +uint16_t +tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + uint16_t num) +{ + uint32_t i, k, n; + + n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num); + if (n == 0) + return 0; + + /* + * if we still have packets to read, + * then rearm stream RX event. + */ + if (n == num && rte_ring_count(s->rx.q) != 0) { + if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + rwl_release(&s->rx.use); + } + + k = 0; + for (i = 0; i != RTE_ALIGN_FLOOR(n, 4); i += 4) { + pkt[i] = recv_pkt_process(pkt[i], s->type); + pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type); + pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type); + pkt[i + 3] = recv_pkt_process(pkt[i + 3], s->type); + k += (pkt[i] == NULL) + (pkt[i + 1] == NULL) + + (pkt[i + 2] == NULL) + (pkt[i + 3] == NULL); + } + + switch (n % 4) { + case 3: + pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type); + k += (pkt[i + 2] == NULL); + case 2: + pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type); + k += (pkt[i + 1] == NULL); + case 1: + pkt[i] = recv_pkt_process(pkt[i], s->type); + k += (pkt[i] == NULL); + } + + return compress_pkt_list(pkt, n, k); +} + +static int32_t +udp_get_dest(struct tle_udp_stream *s, const void *dst_addr, + struct tle_udp_dest *dst) +{ + int32_t rc; + const struct in_addr *d4; + const struct in6_addr *d6; + struct tle_udp_ctx *ctx; + struct tle_udp_dev *dev; + + ctx = s->ctx; + + /* it is here just to keep gcc happy. */ + d4 = NULL; + + if (s->type == TLE_UDP_V4) { + d4 = dst_addr; + rc = ctx->prm.lookup4(ctx->prm.lookup4_data, d4, dst); + } else if (s->type == TLE_UDP_V6) { + d6 = dst_addr; + rc = ctx->prm.lookup6(ctx->prm.lookup6_data, d6, dst); + } else + rc = -ENOENT; + + if (rc < 0 || dst->dev == NULL || dst->dev->ctx != ctx) + return -ENOENT; + + dev = dst->dev; + if (s->type == TLE_UDP_V4) { + struct ipv4_hdr *l3h; + l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len); + l3h->src_addr = dev->prm.local_addr4.s_addr; + l3h->dst_addr = d4->s_addr; + } else { + struct ipv6_hdr *l3h; + l3h = (struct ipv6_hdr *)(dst->hdr + dst->l2_len); + rte_memcpy(l3h->src_addr, &dev->prm.local_addr6, + sizeof(l3h->src_addr)); + rte_memcpy(l3h->dst_addr, d6, sizeof(l3h->dst_addr)); + } + + return dev - ctx->dev; +} + +static inline int +udp_fill_mbuf(struct rte_mbuf *m, + uint32_t type, uint64_t ol_flags, uint32_t pid, + union udph udph, const struct tle_udp_dest *dst) +{ + uint32_t len, plen; + char *l2h; + union udph *l4h; + + len = dst->l2_len + dst->l3_len; + plen = m->pkt_len; + + /* copy to mbuf L2/L3 header template. */ + + l2h = rte_pktmbuf_prepend(m, len + sizeof(*l4h)); + if (l2h == NULL) + return -ENOBUFS; + + /* copy L2/L3 header */ + rte_memcpy(l2h, dst->hdr, len); + + /* copy UDP header */ + l4h = (union udph *)(l2h + len); + l4h->raw = udph.raw; + + /* setup mbuf TX offload related fields. */ + m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, + sizeof(*l4h), 0, 0, 0); + m->ol_flags |= ol_flags; + + l4h->len = rte_cpu_to_be_16(plen + sizeof(*l4h)); + + /* update proto specific fields. */ + + if (type == TLE_UDP_V4) { + struct ipv4_hdr *l3h; + l3h = (struct ipv4_hdr *)(l2h + dst->l2_len); + l3h->packet_id = rte_cpu_to_be_16(pid); + l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + + sizeof(*l4h)); + + if ((ol_flags & PKT_TX_UDP_CKSUM) != 0) + l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len, + ol_flags); + else + l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h); + + if ((ol_flags & PKT_TX_IP_CKSUM) == 0) + l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len); + } else { + struct ipv6_hdr *l3h; + l3h = (struct ipv6_hdr *)(l2h + dst->l2_len); + l3h->payload_len = rte_cpu_to_be_16(plen + sizeof(*l4h)); + if ((ol_flags & PKT_TX_UDP_CKSUM) != 0) + l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags); + else + l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h); + } + + return 0; +} + +/* ??? + * probably this function should be there - + * rte_ipv[4,6]_fragment_packet should do that. + */ +static inline void +frag_fixup(const struct rte_mbuf *ms, struct rte_mbuf *mf, uint32_t type) +{ + struct ipv4_hdr *l3h; + + mf->ol_flags = ms->ol_flags; + mf->tx_offload = ms->tx_offload; + + if (type == TLE_UDP_V4 && (ms->ol_flags & PKT_TX_IP_CKSUM) == 0) { + l3h = rte_pktmbuf_mtod(mf, struct ipv4_hdr *); + l3h->hdr_checksum = _ipv4x_cksum(l3h, mf->l3_len); + } +} + +/* + * Returns negative for failure to fragment or actual number of fragments. + */ +static inline int +fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num, + uint32_t type, const struct tle_udp_dest *dst) +{ + int32_t frag_num, i; + uint16_t mtu; + void *eth_hdr; + + /* Remove the Ethernet header from the input packet */ + rte_pktmbuf_adj(pkt, dst->l2_len); + mtu = dst->mtu - dst->l2_len; + + /* fragment packet */ + if (type == TLE_UDP_V4) + frag_num = rte_ipv4_fragment_packet(pkt, frag, num, mtu, + dst->head_mp, dst->head_mp); + else + frag_num = rte_ipv6_fragment_packet(pkt, frag, num, mtu, + dst->head_mp, dst->head_mp); + + if (frag_num > 0) { + for (i = 0; i != frag_num; i++) { + + frag_fixup(pkt, frag[i], type); + + /* Move data_off to include l2 header first */ + eth_hdr = rte_pktmbuf_prepend(frag[i], dst->l2_len); + + /* copy l2 header into fragment */ + rte_memcpy(eth_hdr, dst->hdr, dst->l2_len); + } + } + + return frag_num; +} + +/* enqueue up to num packets to the destination device queue. */ +static inline uint16_t +queue_pkt_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di, + const void *pkt[], uint16_t num) +{ + struct buf_cage *bc; + uint32_t i, n; + + rte_spinlock_lock(&s->tx.lock); + bc = s->tx.cg[di]; + + for (i = 0; i != num; i += n) { + if (bc == NULL) { + bc = bcg_alloc(s->tx.st); + if (bc == NULL) + break; + n = bcg_put(bc, pkt + i, num - i); + bcg_enqueue_tail(bq, bc); + } else + n = bcg_put(bc, pkt + i, num - i); + + if (n != num - i) + bc = NULL; + } + + s->tx.cg[di] = bc; + rte_spinlock_unlock(&s->tx.lock); + return i; +} + +/* + * etiher enqueue all num packets or none. + * assumes that all number of input packets not exceed size of buf_cage. + */ +static inline uint16_t +queue_frg_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di, + const void *pkt[], uint16_t num) +{ + struct buf_cage *bc, *bcp; + uint32_t n; + + rte_spinlock_lock(&s->tx.lock); + bc = s->tx.cg[di]; + + n = 0; + if (bc == NULL || bcg_free_count(bc) < num) { + bcp = bc; + bc = bcg_alloc(s->tx.st); + if (bc != NULL) { + if (bcp != NULL) + n = bcg_put(bcp, pkt, num); + n += bcg_put(bc, pkt, num - n); + bcg_enqueue_tail(bq, bc); + } + } else + n = bcg_put(bc, pkt, num); + + s->tx.cg[di] = bc; + rte_spinlock_unlock(&s->tx.lock); + return n; +} + +uint16_t +tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + uint16_t num, const struct sockaddr *dst_addr) +{ + int32_t di, frg, rc; + uint64_t ol_flags; + uint32_t i, k, n; + uint32_t mtu, pid, type; + const struct sockaddr_in *d4; + const struct sockaddr_in6 *d6; + const void *da; + union udph udph; + struct tle_udp_dest dst; + + type = s->type; + + /* start filling UDP header. */ + udph.raw = 0; + udph.ports.src = s->port.dst; + + /* figure out what destination addr/port to use. */ + if (dst_addr != NULL) { + if (dst_addr->sa_family != s->prm.remote_addr.ss_family) { + rte_errno = EINVAL; + return 0; + } + if (type == TLE_UDP_V4) { + d4 = (const struct sockaddr_in *)dst_addr; + da = &d4->sin_addr; + udph.ports.dst = d4->sin_port; + } else { + d6 = (const struct sockaddr_in6 *)dst_addr; + da = &d6->sin6_addr; + udph.ports.dst = d6->sin6_port; + } + } else { + udph.ports.dst = s->port.src; + if (type == TLE_UDP_V4) + da = &s->ipv4.addr.src; + else + da = &s->ipv6.addr.src; + } + + di = udp_get_dest(s, da, &dst); + if (di < 0) { + rte_errno = -di; + return 0; + } + + pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num; + mtu = dst.mtu - dst.l2_len - dst.l3_len; + + /* mark stream as not closable. */ + if (rwl_acquire(&s->tx.use) < 0) + return 0; + + for (i = 0, k = 0; k != num; k = i) { + + /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */ + + frg = 0; + ol_flags = dst.dev->tx.ol_flags[type]; + + while (i != num && frg == 0) { + frg = pkt[i]->pkt_len > mtu; + if (frg != 0) + ol_flags &= ~PKT_TX_UDP_CKSUM; + rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i, + udph, &dst); + if (rc != 0) { + rte_errno = -rc; + goto out; + } + i += (frg == 0); + } + + /* enqueue non-fragment packets to the destination device. */ + if (k != i) { + k += queue_pkt_out(s, &dst.dev->tx.feq, di, + (const void **)(uintptr_t)&pkt[k], i - k); + + /* stream TX queue is full. */ + if (k != i) + break; + } + + /* enqueue packet that need to be fragmented */ + if (i != num) { + + struct rte_mbuf *frag[RTE_LIBRTE_IP_FRAG_MAX_FRAG]; + + /* fragment the packet. */ + rc = fragment(pkt[i], frag, RTE_DIM(frag), type, &dst); + if (rc < 0) { + rte_errno = -rc; + break; + } + + n = queue_frg_out(s, &dst.dev->tx.feq, di, + (const void **)(uintptr_t)frag, rc); + if (n == 0) { + while (rc-- != 0) + rte_pktmbuf_free(frag[rc]); + break; + } + + /* all fragments enqueued, free the original packet. */ + rte_pktmbuf_free(pkt[i]); + i++; + } + } + + /* if possible, rearm socket write event. */ + if (k == num && s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + +out: + rwl_release(&s->tx.use); + + /* + * remove pkt l2/l3 headers, restore ol_flags for unsent, but + * already modified packets. + */ + ol_flags = ~dst.dev->tx.ol_flags[type]; + for (n = k; n != i; n++) { + rte_pktmbuf_adj(pkt[n], dst.l2_len + dst.l3_len + sizeof(udph)); + pkt[n]->ol_flags &= ol_flags; + } + + return k; +} |