aboutsummaryrefslogtreecommitdiffstats
path: root/lib/libtle_l4p/udp_rxtx.c
diff options
context:
space:
mode:
authorJianfeng Tan <henry.tjf@antfin.com>2019-11-18 06:59:50 +0000
committerJianfeng Tan <henry.tjf@antfin.com>2020-03-05 01:31:33 +0800
commit78c896b3b3127515478090c19447e27dc406427e (patch)
treed6d67d4683e9ca0409f9984a834547a572fb5310 /lib/libtle_l4p/udp_rxtx.c
parente4380f4866091fd92a7a57667dd938a99144f9cd (diff)
Signed-off-by: Jianfeng Tan <henry.tjf@antfin.com> Signed-off-by: Jielong Zhou <jielong.zjl@antfin.com> Signed-off-by: Jian Zhang <wuzai.zj@antfin.com> Signed-off-by: Chen Zhao <winters.zc@antfin.com> Change-Id: I55c39de4c6cd30f991f35631eb507f770230f08e
Diffstat (limited to 'lib/libtle_l4p/udp_rxtx.c')
-rw-r--r--lib/libtle_l4p/udp_rxtx.c186
1 files changed, 140 insertions, 46 deletions
diff --git a/lib/libtle_l4p/udp_rxtx.c b/lib/libtle_l4p/udp_rxtx.c
index 84a13ea..e9539b9 100644
--- a/lib/libtle_l4p/udp_rxtx.c
+++ b/lib/libtle_l4p/udp_rxtx.c
@@ -13,7 +13,6 @@
* limitations under the License.
*/
-#include <rte_malloc.h>
#include <rte_errno.h>
#include <rte_ethdev.h>
#include <rte_ip.h>
@@ -24,14 +23,11 @@
#include "misc.h"
static inline struct tle_udp_stream *
-rx_stream_obtain(struct tle_dev *dev, uint32_t type, uint32_t port)
+rx_stream_obtain_by_tuples(struct stbl *st, const union pkt_info *pi)
{
struct tle_udp_stream *s;
- if (type >= TLE_VNUM || dev->dp[type] == NULL)
- return NULL;
-
- s = (struct tle_udp_stream *)dev->dp[type]->streams[port];
+ s = UDP_STREAM(stbl_find_stream(st, pi));
if (s == NULL)
return NULL;
@@ -41,6 +37,24 @@ rx_stream_obtain(struct tle_dev *dev, uint32_t type, uint32_t port)
return s;
}
+static inline struct tle_udp_stream *
+rx_stream_obtain(struct tle_dev *dev, uint32_t type, const union pkt_info *pi)
+{
+ struct tle_udp_stream *s;
+
+ if (type == TLE_V4)
+ s = bhash_lookup4(dev->ctx->bhash[type],
+ pi->addr4.dst, pi->port.dst, 1);
+ else
+ s = bhash_lookup6(dev->ctx->bhash[type],
+ pi->addr6->dst, pi->port.dst, 1);
+
+ if (s == NULL || rwl_acquire(&s->rx.use) < 0)
+ return NULL;
+
+ return s;
+}
+
static inline uint16_t
get_pkt_type(const struct rte_mbuf *m)
{
@@ -57,8 +71,9 @@ get_pkt_type(const struct rte_mbuf *m)
}
static inline union l4_ports
-pkt_info(struct rte_mbuf *m, union l4_ports *ports, union ipv4_addrs *addr4,
- union ipv6_addrs **addr6)
+pkt_info_udp(struct rte_mbuf *m, union l4_ports *ports,
+ union ipv4_addrs *addr4, union ipv6_addrs **addr6,
+ union pkt_info *pi)
{
uint32_t len;
union l4_ports ret, *up;
@@ -71,15 +86,20 @@ pkt_info(struct rte_mbuf *m, union l4_ports *ports, union ipv4_addrs *addr4,
pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *,
len + offsetof(struct ipv4_hdr, src_addr));
addr4->raw = pa4->raw;
+ pi->addr4.raw = pa4->raw;
+ pi->tf.type = TLE_V4;
} else if (ret.src == TLE_V6) {
*addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *,
len + offsetof(struct ipv6_hdr, src_addr));
+ pi->addr6 = *addr6;
+ pi->tf.type = TLE_V6;
}
len += m->l3_len;
up = rte_pktmbuf_mtod_offset(m, union l4_ports *,
len + offsetof(struct udp_hdr, src_port));
ports->raw = up->raw;
+ pi->port.raw = up->raw;
ret.dst = ports->dst;
return ret;
}
@@ -96,6 +116,11 @@ rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[],
r = _rte_ring_enqueue_burst(s->rx.q, mb, num);
+ if (unlikely(r != num)) {
+ UDP_ADD_STATS(UDP_MIB_RCVBUFERRORS, num - r);
+ UDP_ADD_STATS(UDP_MIB_INERRORS, num - r);
+ }
+
/* if RX queue was empty invoke user RX notification callback. */
if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r)
s->rx.cb.func(s->rx.cb.data, &s->s);
@@ -164,28 +189,64 @@ rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
return rx_stream(s, mb, rp + k, rc + k, n);
}
+/*
+ * Consider 2 UDP pkt_info *equal* if their:
+ * - types (IPv4/IPv6)
+ * - TCP src and dst ports
+ * - IP src and dst addresses
+ * are equal.
+ */
+static inline int
+udp_pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
+{
+ uint32_t i;
+
+ i = 1;
+
+ if (pi[0].tf.type == TLE_V4) {
+ while (i != num && pi[i].tf.type == TLE_V4 &&
+ pi[0].port.raw == pi[i].port.raw &&
+ pi[0].addr4.raw == pi[i].addr4.raw)
+ i++;
+ } else if (pi[0].tf.type == TLE_V6) {
+ while (i != num && pi[i].tf.type == TLE_V6 &&
+ pi[0].port.raw == pi[i].port.raw &&
+ ymm_cmp(&pi[0].addr6->raw,
+ &pi[i].addr6->raw) == 0)
+ i++;
+ }
+
+ return i;
+}
+
uint16_t
tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
{
+ struct stbl *st;
struct tle_udp_stream *s;
- uint32_t i, j, k, n, p, t;
+ uint32_t i, j, k, n, t;
union l4_ports tp[num], port[num];
union ipv4_addrs a4[num];
union ipv6_addrs *pa6[num];
+ union pkt_info pi[num];
+
+ st = CTX_UDP_STLB(dev->ctx);
for (i = 0; i != num; i++)
- tp[i] = pkt_info(pkt[i], &port[i], &a4[i], &pa6[i]);
+ tp[i] = pkt_info_udp(pkt[i], &port[i], &a4[i],
+ &pa6[i], &pi[i]);
k = 0;
for (i = 0; i != num; i = j) {
- for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++)
- ;
+ j = i + udp_pkt_info_bulk_eq(pi + i, num - i);
t = tp[i].src;
- p = tp[i].dst;
- s = rx_stream_obtain(dev, t, p);
+
+ s = rx_stream_obtain_by_tuples(st, &pi[i]);
+ if (s == NULL)
+ s = rx_stream_obtain(dev, t, &pi[i]);
if (s != NULL) {
if (t == TLE_V4)
@@ -202,6 +263,7 @@ tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
rwl_release(&s->rx.use);
} else {
+ UDP_ADD_STATS(UDP_MIB_NOPORTS, j - i);
for (; i != j; i++) {
rc[k] = ENOENT;
rp[k] = pkt[i];
@@ -262,6 +324,8 @@ tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
stream_drb_release(s, drb + i, j - i);
}
+ UDP_ADD_STATS(UDP_MIB_OUTDATAGRAMS, n);
+
return n;
}
@@ -272,24 +336,18 @@ tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
static inline uint32_t
recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
{
- uint32_t i, k;
- uint64_t flg[num], ofl[num];
-
- for (i = 0; i != num; i++) {
- flg[i] = m[i]->ol_flags;
- ofl[i] = m[i]->tx_offload;
- }
+ uint32_t i, k, offset;
- k = 0;
- for (i = 0; i != num; i++) {
-
- /* drop packets with invalid cksum(s). */
- if (check_pkt_csum(m[i], flg[i], type, IPPROTO_UDP) != 0) {
+ for (i = 0, k = 0; i != num; i++) {
+ if (check_pkt_csum(m[i], type, IPPROTO_UDP) != 0) {
+ UDP_INC_STATS(UDP_MIB_CSUMERRORS);
rte_pktmbuf_free(m[i]);
m[i] = NULL;
k++;
- } else
- rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
+ } else {
+ offset = _tx_offload_l4_offset(m[i]->tx_offload);
+ rte_pktmbuf_adj(m[i], offset);
+ }
}
return k;
@@ -302,9 +360,25 @@ tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num)
struct tle_udp_stream *s;
s = UDP_STREAM(us);
+ n = 0;
+
+again:
n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
- if (n == 0)
+ if (n == 0) {
+ if (rwl_try_acquire(&s->rx.use) > 0)
+ rte_errno = EAGAIN;
+ else
+ rte_errno = ESHUTDOWN;
+ rwl_release(&s->rx.use);
return 0;
+ }
+
+ k = recv_pkt_process(pkt, n, s->s.type);
+ if (unlikely(k))
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_CSUMERRORS, k);
+ n = compress_pkt_list(pkt, n, k);
+ if (n == 0)
+ goto again;
/*
* if we still have packets to read,
@@ -316,8 +390,8 @@ tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num)
rwl_release(&s->rx.use);
}
- k = recv_pkt_process(pkt, n, s->s.type);
- return compress_pkt_list(pkt, n, k);
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_INDATAGRAMS, n);
+ return n;
}
static inline int
@@ -413,7 +487,7 @@ fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
/* Remove the Ethernet header from the input packet */
rte_pktmbuf_adj(pkt, dst->l2_len);
- mtu = dst->mtu - dst->l2_len;
+ mtu = dst->mtu;
/* fragment packet */
if (type == TLE_V4)
@@ -475,13 +549,22 @@ queue_pkt_out(struct tle_udp_stream *s, struct tle_dev *dev,
nb += nbc;
/* no free drbs, can't send anything */
- if (nb == 0)
+ if (unlikely(nb == 0)) {
+ if (all_or_nothing)
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, 1);
+ else
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, nb_pkt);
return 0;
+ }
/* not enough free drbs, reduce number of packets to send. */
else if (nb != nbm) {
- if (all_or_nothing)
+ if (all_or_nothing) {
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, 1);
return 0;
+ }
+
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, nb_pkt - nb * bsz);
nb_pkt = nb * bsz;
}
@@ -509,12 +592,18 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
const struct sockaddr_in *d4;
const struct sockaddr_in6 *d6;
struct tle_udp_stream *s;
- const void *da;
+ const void *sa, *da;
union udph udph;
struct tle_dest dst;
struct tle_drb *drb[num];
+ uint8_t ufo;
s = UDP_STREAM(us);
+ if (rwl_acquire(&s->tx.use) < 0) {
+ rte_errno = EPIPE; /* tx is shutdown */
+ return 0;
+ }
+
type = s->s.type;
/* start filling UDP header. */
@@ -523,7 +612,10 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
/* figure out what destination addr/port to use. */
if (dst_addr != NULL) {
- if (dst_addr->sa_family != s->prm.remote_addr.ss_family) {
+ if (dst_addr->sa_family != s->prm.remote_addr.ss_family &&
+ (s->prm.remote_addr.ss_family == AF_INET ||
+ !IN6_IS_ADDR_UNSPECIFIED(&s->s.ipv6.addr.dst))) {
+ rwl_release(&s->tx.use);
rte_errno = EINVAL;
return 0;
}
@@ -531,21 +623,28 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
d4 = (const struct sockaddr_in *)dst_addr;
da = &d4->sin_addr;
udph.ports.dst = d4->sin_port;
+ sa = &s->s.ipv4.addr.dst;
} else {
d6 = (const struct sockaddr_in6 *)dst_addr;
da = &d6->sin6_addr;
udph.ports.dst = d6->sin6_port;
+ sa = &s->s.ipv6.addr.dst;
}
} else {
udph.ports.dst = s->s.port.src;
- if (type == TLE_V4)
+ if (type == TLE_V4) {
da = &s->s.ipv4.addr.src;
- else
+ sa = &s->s.ipv4.addr.dst;
+ }
+ else {
da = &s->s.ipv6.addr.src;
+ sa = &s->s.ipv6.addr.dst;
+ }
}
- di = stream_get_dest(&s->s, da, &dst);
+ di = stream_get_dest(type, &s->s, sa, da, &dst);
if (di < 0) {
+ rwl_release(&s->tx.use);
rte_errno = -di;
return 0;
}
@@ -553,12 +652,7 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num;
mtu = dst.mtu - dst.l2_len - dst.l3_len;
- /* mark stream as not closable. */
- if (rwl_acquire(&s->tx.use) < 0) {
- rte_errno = EAGAIN;
- return 0;
- }
-
+ ufo = dst.dev->prm.tx_offload & DEV_TX_OFFLOAD_UDP_TSO;
nb = 0;
for (i = 0, k = 0; k != num; k = i) {
@@ -568,7 +662,7 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
ol_flags = dst.dev->tx.ol_flags[type];
while (i != num && frg == 0) {
- frg = pkt[i]->pkt_len > mtu;
+ frg = (!ufo) && pkt[i]->pkt_len > mtu;
if (frg != 0)
ol_flags &= ~PKT_TX_UDP_CKSUM;
rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i,