diff options
Diffstat (limited to 'lib/libtle_l4p')
30 files changed, 3138 insertions, 1242 deletions
diff --git a/lib/libtle_l4p/Makefile b/lib/libtle_l4p/Makefile index e1357d1..ee81d4a 100644 --- a/lib/libtle_l4p/Makefile +++ b/lib/libtle_l4p/Makefile @@ -45,6 +45,7 @@ SYMLINK-y-include += tle_ctx.h SYMLINK-y-include += tle_event.h SYMLINK-y-include += tle_tcp.h SYMLINK-y-include += tle_udp.h +SYMLINK-y-include += tle_stats.h # this lib dependencies DEPDIRS-y += lib/libtle_misc diff --git a/lib/libtle_l4p/ctx.c b/lib/libtle_l4p/ctx.c index b8067f0..d6bde48 100644 --- a/lib/libtle_l4p/ctx.c +++ b/lib/libtle_l4p/ctx.c @@ -21,9 +21,14 @@ #include <rte_ip.h> #include "stream.h" +#include "stream_table.h" #include "misc.h" #include <halfsiphash.h> +struct tle_mib default_mib; + +RTE_DEFINE_PER_LCORE(struct tle_mib *, mib) = &default_mib; + #define LPORT_START 0x8000 #define LPORT_END MAX_PORT_NUM @@ -103,6 +108,16 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm) ctx->prm = *ctx_prm; + rc = bhash_init(ctx); + if (rc != 0) { + UDP_LOG(ERR, "create bhash table (ctx=%p, proto=%u) failed " + "with error code: %d;\n", + ctx, ctx_prm->proto, rc); + tle_ctx_destroy(ctx); + rte_errno = -rc; + return NULL; + } + rc = tle_stream_ops[ctx_prm->proto].init_streams(ctx); if (rc != 0) { UDP_LOG(ERR, "init_streams(ctx=%p, proto=%u) failed " @@ -114,9 +129,10 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm) } for (i = 0; i != RTE_DIM(ctx->use); i++) - tle_pbm_init(ctx->use + i, LPORT_START_BLK); + tle_psm_init(ctx->use + i); - ctx->streams.nb_free = ctx->prm.max_streams; + ctx->streams.nb_free = ctx->prm.min_streams; + ctx->streams.nb_cur = ctx->prm.min_streams; /* Initialization of siphash state is done here to speed up the * fastpath processing. @@ -124,6 +140,11 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm) if (ctx->prm.hash_alg == TLE_SIPHASH) siphash_initialization(&ctx->prm.secret_key, &ctx->prm.secret_key); + + rte_spinlock_init(&ctx->dev_lock); + rte_spinlock_init(&ctx->bhash_lock[TLE_V4]); + rte_spinlock_init(&ctx->bhash_lock[TLE_V6]); + return ctx; } @@ -137,6 +158,8 @@ tle_ctx_destroy(struct tle_ctx *ctx) return; } + bhash_fini(ctx); + for (i = 0; i != RTE_DIM(ctx->dev); i++) tle_del_dev(ctx->dev + i); @@ -150,37 +173,6 @@ tle_ctx_invalidate(struct tle_ctx *ctx) RTE_SET_USED(ctx); } -static void -fill_pbm(struct tle_pbm *pbm, const struct tle_bl_port *blp) -{ - uint32_t i; - - for (i = 0; i != blp->nb_port; i++) - tle_pbm_set(pbm, blp->port[i]); -} - -static int -init_dev_proto(struct tle_dev *dev, uint32_t idx, int32_t socket_id, - const struct tle_bl_port *blp) -{ - size_t sz; - - sz = sizeof(*dev->dp[idx]); - dev->dp[idx] = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, - socket_id); - - if (dev->dp[idx] == NULL) { - UDP_LOG(ERR, "allocation of %zu bytes on " - "socket %d for %u-th device failed\n", - sz, socket_id, idx); - return ENOMEM; - } - - tle_pbm_init(&dev->dp[idx]->use, LPORT_START_BLK); - fill_pbm(&dev->dp[idx]->use, blp); - return 0; -} - static struct tle_dev * find_free_dev(struct tle_ctx *ctx) { @@ -214,27 +206,8 @@ tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm) return NULL; rc = 0; - /* device can handle IPv4 traffic */ - if (dev_prm->local_addr4.s_addr != INADDR_ANY) { - rc = init_dev_proto(dev, TLE_V4, ctx->prm.socket_id, - &dev_prm->bl4); - if (rc == 0) - fill_pbm(&ctx->use[TLE_V4], &dev_prm->bl4); - } - - /* device can handle IPv6 traffic */ - if (rc == 0 && memcmp(&dev_prm->local_addr6, &tle_ipv6_any, - sizeof(tle_ipv6_any)) != 0) { - rc = init_dev_proto(dev, TLE_V6, ctx->prm.socket_id, - &dev_prm->bl6); - if (rc == 0) - fill_pbm(&ctx->use[TLE_V6], &dev_prm->bl6); - } - if (rc != 0) { /* cleanup and return an error. */ - rte_free(dev->dp[TLE_V4]); - rte_free(dev->dp[TLE_V6]); rte_errno = rc; return NULL; } @@ -246,16 +219,19 @@ tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm) if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0 && ctx->prm.proto == TLE_PROTO_UDP) { - dev->tx.ol_flags[TLE_V4] |= PKT_TX_IPV4 | PKT_TX_UDP_CKSUM; - dev->tx.ol_flags[TLE_V6] |= PKT_TX_IPV6 | PKT_TX_UDP_CKSUM; + dev->tx.ol_flags[TLE_V4] |= PKT_TX_UDP_CKSUM; + dev->tx.ol_flags[TLE_V6] |= PKT_TX_UDP_CKSUM; } else if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_TCP_CKSUM) != 0 && ctx->prm.proto == TLE_PROTO_TCP) { - dev->tx.ol_flags[TLE_V4] |= PKT_TX_IPV4 | PKT_TX_TCP_CKSUM; - dev->tx.ol_flags[TLE_V6] |= PKT_TX_IPV6 | PKT_TX_TCP_CKSUM; + dev->tx.ol_flags[TLE_V4] |= PKT_TX_TCP_CKSUM; + dev->tx.ol_flags[TLE_V6] |= PKT_TX_TCP_CKSUM; } if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_IPV4_CKSUM) != 0) - dev->tx.ol_flags[TLE_V4] |= PKT_TX_IPV4 | PKT_TX_IP_CKSUM; + dev->tx.ol_flags[TLE_V4] |= PKT_TX_IP_CKSUM; + + dev->tx.ol_flags[TLE_V4] |= PKT_TX_IPV4; + dev->tx.ol_flags[TLE_V6] |= PKT_TX_IPV6; dev->prm = *dev_prm; dev->ctx = ctx; @@ -300,220 +276,97 @@ tle_del_dev(struct tle_dev *dev) ctx = dev->ctx; p = dev - ctx->dev; - if (p >= RTE_DIM(ctx->dev) || - (dev->dp[TLE_V4] == NULL && - dev->dp[TLE_V6] == NULL)) + if (p >= RTE_DIM(ctx->dev)) return -EINVAL; /* emtpy TX queues. */ empty_dring(&dev->tx.dr, ctx->prm.proto); - rte_free(dev->dp[TLE_V4]); - rte_free(dev->dp[TLE_V6]); memset(dev, 0, sizeof(*dev)); ctx->nb_dev--; return 0; } -static struct tle_dev * -find_ipv4_dev(struct tle_ctx *ctx, const struct in_addr *addr) -{ - uint32_t i; - - for (i = 0; i != RTE_DIM(ctx->dev); i++) { - if (ctx->dev[i].prm.local_addr4.s_addr == addr->s_addr && - ctx->dev[i].dp[TLE_V4] != NULL) - return ctx->dev + i; - } - - return NULL; -} - -static struct tle_dev * -find_ipv6_dev(struct tle_ctx *ctx, const struct in6_addr *addr) +int +stream_fill_ctx(struct tle_ctx *ctx, struct tle_stream *s, + const struct sockaddr *laddr, const struct sockaddr *raddr) { - uint32_t i; + struct sockaddr_storage addr; + int32_t rc = 0; - for (i = 0; i != RTE_DIM(ctx->dev); i++) { - if (memcmp(&ctx->dev[i].prm.local_addr6, addr, - sizeof(*addr)) == 0 && - ctx->dev[i].dp[TLE_V6] != NULL) - return ctx->dev + i; + if (laddr->sa_family == AF_INET) { + s->type = TLE_V4; + } else if (laddr->sa_family == AF_INET6) { + s->type = TLE_V6; } - return NULL; -} - -static int -stream_fill_dev(struct tle_ctx *ctx, struct tle_stream *s, - const struct sockaddr *addr) -{ - struct tle_dev *dev; - struct tle_pbm *pbm; - const struct sockaddr_in *lin4; - const struct sockaddr_in6 *lin6; - uint32_t i, p, sp, t; - - if (addr->sa_family == AF_INET) { - lin4 = (const struct sockaddr_in *)addr; - t = TLE_V4; - p = lin4->sin_port; - } else if (addr->sa_family == AF_INET6) { - lin6 = (const struct sockaddr_in6 *)addr; - t = TLE_V6; - p = lin6->sin6_port; - } else - return EINVAL; - + uint16_t p = ((const struct sockaddr_in *)laddr)->sin_port; p = ntohs(p); - - /* if local address is not wildcard, find device it belongs to. */ - if (t == TLE_V4 && lin4->sin_addr.s_addr != INADDR_ANY) { - dev = find_ipv4_dev(ctx, &lin4->sin_addr); - if (dev == NULL) - return ENODEV; - } else if (t == TLE_V6 && memcmp(&tle_ipv6_any, &lin6->sin6_addr, - sizeof(tle_ipv6_any)) != 0) { - dev = find_ipv6_dev(ctx, &lin6->sin6_addr); - if (dev == NULL) - return ENODEV; - } else - dev = NULL; - - if (dev != NULL) - pbm = &dev->dp[t]->use; - else - pbm = &ctx->use[t]; - + struct tle_psm *psm = &ctx->use[s->type]; /* try to acquire local port number. */ + rte_spinlock_lock(&ctx->dev_lock); if (p == 0) { - p = tle_pbm_find_range(pbm, pbm->blk, LPORT_END_BLK); - if (p == 0 && pbm->blk > LPORT_START_BLK) - p = tle_pbm_find_range(pbm, LPORT_START_BLK, pbm->blk); - } else if (tle_pbm_check(pbm, p) != 0) - return EEXIST; - - if (p == 0) - return ENFILE; - - /* fill socket's dst port and type */ - - sp = htons(p); - s->type = t; - s->port.dst = sp; - - /* mark port as in-use */ - - tle_pbm_set(&ctx->use[t], p); - if (dev != NULL) { - tle_pbm_set(pbm, p); - dev->dp[t]->streams[sp] = s; - } else { - for (i = 0; i != RTE_DIM(ctx->dev); i++) { - if (ctx->dev[i].dp[t] != NULL) { - tle_pbm_set(&ctx->dev[i].dp[t]->use, p); - ctx->dev[i].dp[t]->streams[sp] = s; - } + if (s->type == TLE_V6 && is_empty_addr(laddr) && !s->option.ipv6only) + p = tle_psm_alloc_dual_port(&ctx->use[TLE_V4], psm); + else + p = tle_psm_alloc_port(psm); + if (p == 0) { + rte_spinlock_unlock(&ctx->dev_lock); + return ENFILE; } + rte_memcpy(&addr, laddr, sizeof(struct sockaddr_storage)); + ((struct sockaddr_in *)&addr)->sin_port = htons(p); + laddr = (const struct sockaddr*)&addr; } - return 0; -} + if (tle_psm_set(psm, p, s->option.reuseport) != 0) { + rte_spinlock_unlock(&ctx->dev_lock); + return EADDRINUSE; + } -static int -stream_clear_dev(struct tle_ctx *ctx, const struct tle_stream *s) -{ - struct tle_dev *dev; - uint32_t i, p, sp, t; - - t = s->type; - sp = s->port.dst; - p = ntohs(sp); - - /* if local address is not wildcard, find device it belongs to. */ - if (t == TLE_V4 && s->ipv4.addr.dst != INADDR_ANY) { - dev = find_ipv4_dev(ctx, - (const struct in_addr *)&s->ipv4.addr.dst); - if (dev == NULL) - return ENODEV; - } else if (t == TLE_V6 && memcmp(&tle_ipv6_any, &s->ipv6.addr.dst, - sizeof(tle_ipv6_any)) != 0) { - dev = find_ipv6_dev(ctx, - (const struct in6_addr *)&s->ipv6.addr.dst); - if (dev == NULL) - return ENODEV; - } else - dev = NULL; - - tle_pbm_clear(&ctx->use[t], p); - if (dev != NULL) { - if (dev->dp[t]->streams[sp] == s) { - tle_pbm_clear(&dev->dp[t]->use, p); - dev->dp[t]->streams[sp] = NULL; - } - } else { - for (i = 0; i != RTE_DIM(ctx->dev); i++) { - if (ctx->dev[i].dp[t] != NULL && - ctx->dev[i].dp[t]->streams[sp] == s) { - tle_pbm_clear(&ctx->dev[i].dp[t]->use, p); - ctx->dev[i].dp[t]->streams[sp] = NULL; + if (is_empty_addr(laddr)) { + if (s->type == TLE_V6 && !s->option.ipv6only) { + rc = tle_psm_set(&ctx->use[TLE_V4], p, s->option.reuseport); + if (rc != 0) { + tle_psm_clear(psm, p); + rte_spinlock_unlock(&ctx->dev_lock); + return EADDRINUSE; } } } - return 0; -} - -static void -fill_ipv4_am(const struct sockaddr_in *in, uint32_t *addr, uint32_t *mask) -{ - *addr = in->sin_addr.s_addr; - *mask = (*addr == INADDR_ANY) ? INADDR_ANY : INADDR_NONE; -} + if (is_empty_addr(raddr)) + rc = bhash_add_entry(ctx, laddr, s); -static void -fill_ipv6_am(const struct sockaddr_in6 *in, rte_xmm_t *addr, rte_xmm_t *mask) -{ - const struct in6_addr *pm; - - memcpy(addr, &in->sin6_addr, sizeof(*addr)); - if (memcmp(&tle_ipv6_any, addr, sizeof(*addr)) == 0) - pm = &tle_ipv6_any; - else - pm = &tle_ipv6_none; - - memcpy(mask, pm, sizeof(*mask)); -} + if (rc) { + tle_psm_clear(psm, p); + } -int -stream_fill_ctx(struct tle_ctx *ctx, struct tle_stream *s, - const struct sockaddr *laddr, const struct sockaddr *raddr) -{ - const struct sockaddr_in *rin; - int32_t rc; + rte_spinlock_unlock(&ctx->dev_lock); + /* fill socket's dst (src actually) port */ + s->port.dst = htons(p); - /* setup ports and port mask fields (except dst port). */ - rin = (const struct sockaddr_in *)raddr; - s->port.src = rin->sin_port; - s->pmsk.src = (s->port.src == 0) ? 0 : UINT16_MAX; - s->pmsk.dst = UINT16_MAX; + if (rc) + return rc; - /* setup src and dst addresses. */ + /* setup src, dst addresses, and src port. */ if (laddr->sa_family == AF_INET) { fill_ipv4_am((const struct sockaddr_in *)laddr, &s->ipv4.addr.dst, &s->ipv4.mask.dst); fill_ipv4_am((const struct sockaddr_in *)raddr, &s->ipv4.addr.src, &s->ipv4.mask.src); + s->port.src = ((const struct sockaddr_in *)raddr)->sin_port; } else if (laddr->sa_family == AF_INET6) { fill_ipv6_am((const struct sockaddr_in6 *)laddr, &s->ipv6.addr.dst, &s->ipv6.mask.dst); fill_ipv6_am((const struct sockaddr_in6 *)raddr, &s->ipv6.addr.src, &s->ipv6.mask.src); + s->port.src = ((const struct sockaddr_in6 *)raddr)->sin6_port; } - rte_spinlock_lock(&ctx->dev_lock); - rc = stream_fill_dev(ctx, s, laddr); - rte_spinlock_unlock(&ctx->dev_lock); + /* setup port mask fields. */ + s->pmsk.src = (s->port.src == 0) ? 0 : UINT16_MAX; + s->pmsk.dst = UINT16_MAX; return rc; } @@ -522,11 +375,41 @@ stream_fill_ctx(struct tle_ctx *ctx, struct tle_stream *s, int stream_clear_ctx(struct tle_ctx *ctx, struct tle_stream *s) { - int32_t rc; + bool is_any = false; + struct sockaddr_storage addr; + struct sockaddr_in *addr4; + struct sockaddr_in6 *addr6; + + if (s->type == TLE_V4) { + if (s->ipv4.addr.src == INADDR_ANY) { + is_any = true; + addr4 = (struct sockaddr_in *)&addr; + addr4->sin_addr.s_addr = s->ipv4.addr.dst; + addr4->sin_port = s->port.dst; + addr.ss_family = AF_INET; + bhash_del_entry(ctx, s, (struct sockaddr*)&addr); + } + } else { + if (IN6_IS_ADDR_UNSPECIFIED(&s->ipv6.addr.src)) { + is_any = true; + addr6 = (struct sockaddr_in6 *)&addr; + memcpy(&addr6->sin6_addr, &s->ipv6.addr.dst, + sizeof(tle_ipv6_any)); + addr6->sin6_port = s->port.dst; + addr.ss_family = AF_INET6; + bhash_del_entry(ctx, s, (struct sockaddr*)&addr); + } + } rte_spinlock_lock(&ctx->dev_lock); - rc = stream_clear_dev(ctx, s); + /* strange behaviour to match linux stack */ + if (is_any) { + if (s->type == TLE_V6 && !s->option.ipv6only) + tle_psm_clear(&ctx->use[TLE_V4], ntohs(s->port.dst)); + } + + tle_psm_clear(&ctx->use[s->type], ntohs(s->port.dst)); rte_spinlock_unlock(&ctx->dev_lock); - return rc; + return 0; } diff --git a/lib/libtle_l4p/ctx.h b/lib/libtle_l4p/ctx.h index f18060b..9483976 100644 --- a/lib/libtle_l4p/ctx.h +++ b/lib/libtle_l4p/ctx.h @@ -21,7 +21,7 @@ #include <tle_dring.h> #include <tle_ctx.h> -#include "port_bitmap.h" +#include "port_statmap.h" #include "osdep.h" #include "net_misc.h" @@ -29,11 +29,6 @@ extern "C" { #endif -struct tle_dport { - struct tle_pbm use; /* ports in use. */ - struct tle_stream *streams[MAX_PORT_NUM]; /* port to stream. */ -}; - struct tle_dev { struct tle_ctx *ctx; struct { @@ -45,7 +40,6 @@ struct tle_dev { struct tle_dring dr; } tx; struct tle_dev_param prm; /* copy of device parameters. */ - struct tle_dport *dp[TLE_VNUM]; /* device L4 ports */ }; struct tle_ctx { @@ -54,18 +48,23 @@ struct tle_ctx { struct { rte_spinlock_t lock; uint32_t nb_free; /* number of free streams. */ + uint32_t nb_cur; /* number of allocated streams. */ STAILQ_HEAD(, tle_stream) free; void *buf; /* space allocated for streams */ } streams; - rte_spinlock_t dev_lock; + rte_spinlock_t bhash_lock[TLE_VNUM]; + struct rte_hash *bhash[TLE_VNUM]; /* bind and listen hash table */ + uint32_t nb_dev; - struct tle_pbm use[TLE_VNUM]; /* all ports in use. */ + rte_spinlock_t dev_lock; + struct tle_psm use[TLE_VNUM]; /* all ports in use. */ struct tle_dev dev[RTE_MAX_ETHPORTS]; }; struct stream_ops { int (*init_streams)(struct tle_ctx *); + uint32_t (*more_streams)(struct tle_ctx *); void (*fini_streams)(struct tle_ctx *); void (*free_drbs)(struct tle_stream *, struct tle_drb *[], uint32_t); }; @@ -77,6 +76,27 @@ int stream_fill_ctx(struct tle_ctx *ctx, struct tle_stream *s, int stream_clear_ctx(struct tle_ctx *ctx, struct tle_stream *s); +static inline void +fill_ipv4_am(const struct sockaddr_in *in, uint32_t *addr, uint32_t *mask) +{ + *addr = in->sin_addr.s_addr; + *mask = (*addr == INADDR_ANY) ? INADDR_ANY : INADDR_NONE; +} + +static inline void +fill_ipv6_am(const struct sockaddr_in6 *in, rte_xmm_t *addr, rte_xmm_t *mask) +{ + const struct in6_addr *pm; + + memcpy(addr, &in->sin6_addr, sizeof(*addr)); + if (IN6_IS_ADDR_UNSPECIFIED(addr)) + pm = &tle_ipv6_any; + else + pm = &tle_ipv6_none; + + memcpy(mask, pm, sizeof(*mask)); +} + #ifdef __cplusplus } #endif diff --git a/lib/libtle_l4p/misc.h b/lib/libtle_l4p/misc.h index 327296f..d39e5a1 100644 --- a/lib/libtle_l4p/misc.h +++ b/lib/libtle_l4p/misc.h @@ -16,12 +16,34 @@ #ifndef _MISC_H_ #define _MISC_H_ +#include <tle_stats.h> #include <tle_dpdk_wrapper.h> #ifdef __cplusplus extern "C" { #endif +union typflg { + uint16_t raw; + struct { + uint8_t type; /* TLE_V4/TLE_V6 */ + uint8_t flags; /* TCP header flags */ + }; +}; + +union pkt_info { + rte_xmm_t raw; + struct { + union typflg tf; + uint16_t csf; /* checksum flags */ + union l4_ports port; + union { + union ipv4_addrs addr4; + const union ipv6_addrs *addr6; + }; + }; +}; + static inline int xmm_cmp(const rte_xmm_t *da, const rte_xmm_t *sa) { @@ -286,43 +308,41 @@ _ipv4x_cksum(const void *iph, size_t len) return (cksum == 0xffff) ? cksum : ~cksum; } -/* - * helper function to check csum. - */ static inline int -check_pkt_csum(const struct rte_mbuf *m, uint64_t ol_flags, uint32_t type, - uint32_t proto) +check_pkt_csum(const struct rte_mbuf *m, uint32_t type, uint32_t proto) { const struct ipv4_hdr *l3h4; const struct ipv6_hdr *l3h6; const struct udp_hdr *l4h; - uint64_t fl3, fl4; - uint16_t csum; int32_t ret; - - fl4 = ol_flags & PKT_RX_L4_CKSUM_MASK; - fl3 = (type == TLE_V4) ? - (ol_flags & PKT_RX_IP_CKSUM_MASK) : PKT_RX_IP_CKSUM_GOOD; + uint16_t csum; + uint64_t ol_flags = m->ol_flags; /* case 0: both ip and l4 cksum is verified or data is valid */ - if ((fl3 | fl4) == (PKT_RX_IP_CKSUM_GOOD | PKT_RX_L4_CKSUM_GOOD)) + if ((ol_flags & PKT_RX_IP_CKSUM_GOOD) && + (ol_flags & PKT_RX_L4_CKSUM_GOOD)) return 0; /* case 1: either ip or l4 cksum bad */ - if (fl3 == PKT_RX_IP_CKSUM_BAD || fl4 == PKT_RX_L4_CKSUM_BAD) + if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) + return 1; + + if ((ol_flags & PKT_RX_L4_CKSUM_MASK) == PKT_RX_L4_CKSUM_BAD) return 1; /* case 2: either ip or l4 or both cksum is unknown */ + ret = 0; l3h4 = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, m->l2_len); l3h6 = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, m->l2_len); - ret = 0; - if (fl3 == PKT_RX_IP_CKSUM_UNKNOWN && l3h4->hdr_checksum != 0) { + if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_UNKNOWN && + l3h4->hdr_checksum != 0) { csum = _ipv4x_cksum(l3h4, m->l3_len); ret = (csum != UINT16_MAX); } - if (ret == 0 && fl4 == PKT_RX_L4_CKSUM_UNKNOWN) { + if (ret == 0 && (ol_flags & PKT_RX_L4_CKSUM_MASK) == + PKT_RX_L4_CKSUM_UNKNOWN) { /* * for IPv4 it is allowed to have zero UDP cksum, @@ -376,8 +396,20 @@ rwl_acquire(rte_atomic32_t *p) static inline void rwl_down(rte_atomic32_t *p) { - while (rte_atomic32_cmpset((volatile uint32_t *)p, 0, INT32_MIN) == 0) + while (rte_atomic32_cmpset((volatile uint32_t *)p, 0, INT32_MIN) == 0) + rte_pause(); +} + +static inline int +rwl_try_down(rte_atomic32_t *p) +{ + while (rte_atomic32_cmpset((volatile uint32_t *)p, 0, INT32_MIN) == 0) { + /* Already down */ + if (rte_atomic32_read(p) == INT32_MIN) + return -1; rte_pause(); + } + return 0; } static inline void diff --git a/lib/libtle_l4p/net_misc.h b/lib/libtle_l4p/net_misc.h index 2d8dac2..c1d946b 100644 --- a/lib/libtle_l4p/net_misc.h +++ b/lib/libtle_l4p/net_misc.h @@ -16,6 +16,7 @@ #ifndef _NET_MISC_H_ #define _NET_MISC_H_ +#include <stdbool.h> #include <rte_ip.h> #include <rte_udp.h> #include "osdep.h" @@ -71,6 +72,26 @@ union ip_addrs { union ipv6_addrs v6; }; +static inline bool +is_empty_addr(const struct sockaddr *addr) +{ + bool any = false; + const struct sockaddr_in *in4; + const struct sockaddr_in6 *in6; + + if (addr->sa_family == AF_INET) { + in4 = (const struct sockaddr_in *)addr; + if (in4->sin_addr.s_addr == INADDR_ANY) + any = true; + } else if (addr->sa_family == AF_INET6) { + in6 = (const struct sockaddr_in6 *)addr; + if (IN6_IS_ADDR_UNSPECIFIED(&in6->sin6_addr)) + any = true; + } + + return any; +} + #ifdef __cplusplus } #endif diff --git a/lib/libtle_l4p/port_statmap.h b/lib/libtle_l4p/port_statmap.h new file mode 100644 index 0000000..8bbb0ba --- /dev/null +++ b/lib/libtle_l4p/port_statmap.h @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2019 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _PORT_STATMAP_H_ +#define _PORT_STATMAP_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#define MAX_PORT_NUM (UINT16_MAX + 1) +#define ALLOC_PORT_START 0x8000 + +struct tle_psm { + uint32_t nb_used; /* Number of ports already in use. */ + uint32_t next_alloc; /* Next port to try allocate. */ + uint8_t stat[MAX_PORT_NUM]; /* Status of the port: + * 1) the most significant bit indicates + * if SO_REUSEPORT is allowed; + * 2) lowest 7 bits indicate # of streams + * using the port. + */ +}; + +static inline void +tle_psm_init(struct tle_psm *psm) +{ + memset(psm, 0, sizeof(struct tle_psm)); + psm->next_alloc = ALLOC_PORT_START; +} + +static inline int +tle_psm_set(struct tle_psm *psm, uint16_t port, uint8_t reuseport) +{ + if (psm->stat[port] == 0) { + /* port has not been used */ + psm->stat[port]++; + if (reuseport) + psm->stat[port] |= 0x80; + } else { + /* port is used by some socket */ + if (reuseport && (psm->stat[port] & 0x80)) { + /* all sockets set reuseport */ + psm->stat[port]++; + } else + return -1; + } + + return 0; +} + +static inline void +tle_psm_clear(struct tle_psm *psm, uint16_t port) +{ + psm->stat[port]--; + if ((psm->stat[port] & 0x7f) == 0) + psm->stat[port] = 0; +} + + +static inline uint8_t +tle_psm_check(const struct tle_psm *psm, uint16_t port) +{ + return psm->stat[port]; +} + +static inline uint16_t +tle_psm_alloc_port(struct tle_psm *psm) +{ + uint32_t i = psm->next_alloc; + + for (; i < MAX_PORT_NUM; i++) { + if (psm->stat[i] == 0) { + psm->next_alloc = i + 1; + return (uint16_t)i; + } + } + + for (i = ALLOC_PORT_START; i < psm->next_alloc; i++) { + if (psm->stat[i] == 0) { + psm->next_alloc = i + 1; + return (uint16_t)i; + } + } + + return 0; +} + +static inline uint16_t +tle_psm_alloc_dual_port(struct tle_psm *psm4, struct tle_psm *psm6) +{ + uint32_t i = psm6->next_alloc; + + for (; i < MAX_PORT_NUM; i++) { + if (psm6->stat[i] == 0 && psm4->stat[i] == 0) { + psm6->next_alloc = i + 1; + return (uint16_t)i; + } + } + + for (i = ALLOC_PORT_START; i < psm6->next_alloc; i++) { + if (psm6->stat[i] == 0 && psm4->stat[i] == 0) { + psm6->next_alloc = i + 1; + return (uint16_t)i; + } + } + + return 0; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _PORT_STATMAP_H_ */ diff --git a/lib/libtle_l4p/stream.h b/lib/libtle_l4p/stream.h index 49a2809..9f2bbc1 100644 --- a/lib/libtle_l4p/stream.h +++ b/lib/libtle_l4p/stream.h @@ -31,7 +31,11 @@ struct tle_stream { STAILQ_ENTRY(tle_stream) link; struct tle_ctx *ctx; - uint8_t type; /* TLE_V4 or TLE_V6 */ + tle_stream_options_t option; + unsigned long timestamp; + uint16_t reuseport_seed; + uint8_t type; /* TLE_V4 or TLE_V6 */ + uint8_t padding; /* Stream address information. */ union l4_ports port; @@ -53,15 +57,25 @@ static inline uint32_t get_streams(struct tle_ctx *ctx, struct tle_stream *s[], uint32_t num) { struct tle_stream *p; - uint32_t i, n; + uint32_t i, n, inc; rte_spinlock_lock(&ctx->streams.lock); - n = RTE_MIN(ctx->streams.nb_free, num); - for (i = 0, p = STAILQ_FIRST(&ctx->streams.free); - i != n; - i++, p = STAILQ_NEXT(p, link)) + n = ctx->streams.nb_free; + if (n < num) { + inc = tle_stream_ops[ctx->prm.proto].more_streams(ctx); + ctx->streams.nb_free += inc; + ctx->streams.nb_cur += inc; + n = ctx->streams.nb_free; + } + n = RTE_MIN(n, num); + + for (i = 0, p = STAILQ_FIRST(&ctx->streams.free); i != n; ) { s[i] = p; + p = STAILQ_NEXT(p, link); + s[i]->link.stqe_next = NULL; + i++; + } if (p == NULL) /* we retrieved all free entries */ @@ -80,9 +94,6 @@ get_stream(struct tle_ctx *ctx) struct tle_stream *s; s = NULL; - if (ctx->streams.nb_free == 0) - return s; - get_streams(ctx, &s, 1); return s; } @@ -120,8 +131,8 @@ drb_nb_elem(const struct tle_ctx *ctx) } static inline int32_t -stream_get_dest(struct tle_stream *s, const void *dst_addr, - struct tle_dest *dst) +stream_get_dest(uint8_t type, struct tle_stream *s, const void *src_addr, + const void *dst_addr, struct tle_dest *dst) { int32_t rc; const struct in_addr *d4; @@ -133,12 +144,13 @@ stream_get_dest(struct tle_stream *s, const void *dst_addr, /* it is here just to keep gcc happy. */ d4 = NULL; + /* it is here just to keep gcc happy. */ d6 = NULL; - if (s->type == TLE_V4) { + if (type == TLE_V4) { d4 = dst_addr; rc = ctx->prm.lookup4(ctx->prm.lookup4_data, d4, dst); - } else if (s->type == TLE_V6) { + } else if (type == TLE_V6) { d6 = dst_addr; rc = ctx->prm.lookup6(ctx->prm.lookup6_data, d6, dst); } else @@ -148,18 +160,25 @@ stream_get_dest(struct tle_stream *s, const void *dst_addr, return -ENOENT; dev = dst->dev; - dst->ol_flags = dev->tx.ol_flags[s->type]; + dst->ol_flags = dev->tx.ol_flags[type]; - if (s->type == TLE_V4) { + if (type == TLE_V4) { struct ipv4_hdr *l3h; l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len); - l3h->src_addr = dev->prm.local_addr4.s_addr; + if (((const struct in_addr*)src_addr)->s_addr != INADDR_ANY) + l3h->src_addr = ((const struct in_addr*)src_addr)->s_addr; + else + l3h->src_addr = dev->prm.local_addr4.s_addr; l3h->dst_addr = d4->s_addr; } else { struct ipv6_hdr *l3h; l3h = (struct ipv6_hdr *)(dst->hdr + dst->l2_len); - rte_memcpy(l3h->src_addr, &dev->prm.local_addr6, - sizeof(l3h->src_addr)); + if (!IN6_IS_ADDR_UNSPECIFIED(src_addr)) + rte_memcpy(l3h->src_addr, src_addr, + sizeof(l3h->src_addr)); + else + rte_memcpy(l3h->src_addr, &dev->prm.local_addr6, + sizeof(l3h->src_addr)); rte_memcpy(l3h->dst_addr, d6, sizeof(l3h->dst_addr)); } diff --git a/lib/libtle_l4p/stream_table.c b/lib/libtle_l4p/stream_table.c index 5a89553..e029306 100644 --- a/lib/libtle_l4p/stream_table.c +++ b/lib/libtle_l4p/stream_table.c @@ -13,68 +13,47 @@ * limitations under the License. */ #include <string.h> -#include <rte_malloc.h> #include <rte_errno.h> #include "stream_table.h" void -stbl_fini(struct stbl *st) +bhash_fini(struct tle_ctx *ctx) { uint32_t i; - for (i = 0; i != RTE_DIM(st->ht); i++) { - rte_hash_free(st->ht[i].t); - rte_free(st->ht[i].ent); - } - - memset(st, 0, sizeof(*st)); + for (i = 0; i != RTE_DIM(ctx->bhash); i++) + rte_hash_free(ctx->bhash[i]); } int -stbl_init(struct stbl *st, uint32_t num, int32_t socket) +bhash_init(struct tle_ctx *ctx) { - int32_t rc; - size_t i, sz; - struct rte_hash_parameters hprm; + int rc = 0; + struct rte_hash_parameters hprm = {0}; + bool ipv6 = ctx->prm.lookup6 != NULL; char buf[RTE_HASH_NAMESIZE]; - num = RTE_MAX(5 * num / 4, 0x10U); - - memset(&hprm, 0, sizeof(hprm)); hprm.name = buf; - hprm.entries = num; - hprm.socket_id = socket; - - rc = 0; - - snprintf(buf, sizeof(buf), "stbl4@%p", st); - hprm.key_len = sizeof(struct stbl4_key); - st->ht[TLE_V4].t = rte_hash_create(&hprm); - if (st->ht[TLE_V4].t == NULL) + hprm.entries = 4096; + hprm.extra_flag = RTE_HASH_EXTRA_FLAGS_EXT_TABLE; + hprm.socket_id = ctx->prm.socket_id; + + snprintf(buf, sizeof(buf), "bhash4@%p", ctx); + hprm.key_len = sizeof(struct bhash4_key); + ctx->bhash[TLE_V4] = rte_hash_create(&hprm); + if (ctx->bhash[TLE_V4] == NULL) rc = (rte_errno != 0) ? -rte_errno : -ENOMEM; - if (rc == 0) { - snprintf(buf, sizeof(buf), "stbl6@%p", st); - hprm.key_len = sizeof(struct stbl6_key); - st->ht[TLE_V6].t = rte_hash_create(&hprm); - if (st->ht[TLE_V6].t == NULL) + if (rc == 0 && ipv6) { + snprintf(buf, sizeof(buf), "bhash6@%p", ctx); + hprm.key_len = sizeof(struct bhash6_key); + ctx->bhash[TLE_V6] = rte_hash_create(&hprm); + if (ctx->bhash[TLE_V6] == NULL) { + rte_hash_free(ctx->bhash[TLE_V4]); rc = (rte_errno != 0) ? -rte_errno : -ENOMEM; + } } - for (i = 0; i != RTE_DIM(st->ht) && rc == 0; i++) { - - sz = sizeof(*st->ht[i].ent) * num; - st->ht[i].ent = rte_zmalloc_socket(NULL, sz, - RTE_CACHE_LINE_SIZE, socket); - if (st->ht[i].ent == NULL) - rc = -ENOMEM; - else - st->ht[i].nb_ent = num; - } - - if (rc != 0) - stbl_fini(st); - return rc; } diff --git a/lib/libtle_l4p/stream_table.h b/lib/libtle_l4p/stream_table.h index 033c306..ba8d165 100644 --- a/lib/libtle_l4p/stream_table.h +++ b/lib/libtle_l4p/stream_table.h @@ -16,199 +16,415 @@ #ifndef _STREAM_TABLE_H_ #define _STREAM_TABLE_H_ +#include <string.h> #include <rte_hash.h> -#include "tcp_misc.h" +#include "stream.h" +#include "misc.h" #ifdef __cplusplus extern "C" { #endif +#define HASH_SIZE_32K 32771 +#define HASH_SIZE_64K 65537 +#define HASH_SIZE_128K 131071 + +#define HASH_SIZE HASH_SIZE_64K + struct stbl_entry { void *data; }; -struct shtbl { - uint32_t nb_ent; /* max number of entries in the table. */ - rte_spinlock_t l; /* lock to protect the hash table */ - struct rte_hash *t; - struct stbl_entry *ent; +struct stbl { + rte_spinlock_t l; + uint32_t need_lock; + struct stbl_entry head[HASH_SIZE]; } __rte_cache_aligned; -struct stbl { - struct shtbl ht[TLE_VNUM]; -}; +static inline int +stbl_init(struct stbl *st, uint32_t lock) +{ + st->need_lock = lock; + return 0; +} -struct stbl4_key { - union l4_ports port; - union ipv4_addrs addr; -} __attribute__((__packed__)); +static inline int +stbl_fini(struct stbl *st) +{ + st->need_lock = 0; + return 0; +} -struct stbl6_key { - union l4_ports port; - union ipv6_addrs addr; -} __attribute__((__packed__)); +static inline uint8_t +compare_pkt(const struct tle_stream *s, const union pkt_info *pi) +{ + if (s->type != pi->tf.type) + return -1; -struct stbl_key { - union l4_ports port; - union { - union ipv4_addrs addr4; - union ipv6_addrs addr6; - }; -} __attribute__((__packed__)); + if (s->port.raw != pi->port.raw) + return -1; -extern void stbl_fini(struct stbl *st); + if (s->type == TLE_V4) { + if (s->ipv4.addr.raw != pi->addr4.raw) + return -1; + } else { + if (memcmp(&s->ipv6.addr, pi->addr6, sizeof(union ipv6_addrs))) + return -1; + } -extern int stbl_init(struct stbl *st, uint32_t num, int32_t socket); + return 0; +} -static inline void -stbl_pkt_fill_key(struct stbl_key *k, const union pkt_info *pi, uint32_t type) +static inline uint32_t +stbl_hash_stream(const struct tle_stream *s) { - static const struct stbl_key zero = { - .port.raw = 0, - }; - - k->port = pi->port; - if (type == TLE_V4) - k->addr4 = pi->addr4; - else if (type == TLE_V6) - k->addr6 = *pi->addr6; - else - *k = zero; + int i; + unsigned int hash; + + if (s->type == TLE_V4) { + hash = s->ipv4.addr.src ^ s->ipv4.addr.dst + ^ s->port.src ^ s->port.dst; + } else { + hash = s->port.src ^ s->port.dst; + for (i = 0; i < 4; i++) { + hash ^= s->ipv6.addr.src.u32[i]; + hash ^= s->ipv6.addr.dst.u32[i]; + } + } + + return hash % HASH_SIZE; } -static inline void -stbl_lock(struct stbl *st, uint32_t type) +static inline uint32_t +stbl_hash_pkt(const union pkt_info* pi) { - rte_spinlock_lock(&st->ht[type].l); + int i; + unsigned int hash; + + if (pi->tf.type == TLE_V4) { + hash = pi->addr4.src ^ pi->addr4.dst ^ pi->port.src ^ pi->port.dst; + } else { + hash = pi->port.src ^ pi->port.dst; + for (i = 0; i < 4; i++) { + hash ^= pi->addr6->src.u32[i]; + hash ^= pi->addr6->dst.u32[i]; + } + } + + return hash % HASH_SIZE; } -static inline void -stbl_unlock(struct stbl *st, uint32_t type) +static inline struct stbl_entry* +stbl_add_stream(struct stbl *st, struct tle_stream *s) { - rte_spinlock_unlock(&st->ht[type].l); + struct stbl_entry* entry; + + if (st->need_lock) + rte_spinlock_lock(&st->l); + entry = &st->head[stbl_hash_stream(s)]; + s->link.stqe_next = (struct tle_stream*)entry->data; + entry->data = s; + if (st->need_lock) + rte_spinlock_unlock(&st->l); + + return entry; } -static inline struct stbl_entry * -stbl_add_entry(struct stbl *st, const union pkt_info *pi) +static inline struct tle_stream * +stbl_find_stream(struct stbl *st, const union pkt_info *pi) { - int32_t rc; - uint32_t type; - struct shtbl *ht; - struct stbl_key k; - - type = pi->tf.type; - stbl_pkt_fill_key(&k, pi, type); - ht = st->ht + type; - - rc = rte_hash_add_key(ht->t, &k); - if ((uint32_t)rc >= ht->nb_ent) - return NULL; - return ht->ent + rc; + struct tle_stream* head; + + if (st->need_lock) + rte_spinlock_lock(&st->l); + head = (struct tle_stream*)st->head[stbl_hash_pkt(pi)].data; + while (head != NULL) { + if (compare_pkt(head, pi) == 0) + break; + + head = head->link.stqe_next; + } + if (st->need_lock) + rte_spinlock_unlock(&st->l); + return head; } -static inline struct stbl_entry * -stbl_add_stream(struct stbl *st, const union pkt_info *pi, const void *s) +static inline void +stbl_del_stream(struct stbl *st, struct stbl_entry *se, + struct tle_stream *s) { - struct stbl_entry *se; + struct tle_stream *prev, *current; - se = stbl_add_entry(st, pi); - if (se != NULL) - se->data = (void *)(uintptr_t)s; - return se; + if (st->need_lock) + rte_spinlock_lock(&st->l); + if (se == NULL) + se = &st->head[stbl_hash_stream(s)]; + prev = NULL; + current = (struct tle_stream*)se->data; + while (current != NULL) { + if (current != s) { + prev = current; + current = current->link.stqe_next; + continue; + } + + if (prev) + prev->link.stqe_next = current->link.stqe_next; + else + se->data = current->link.stqe_next; + break; + } + if (st->need_lock) + rte_spinlock_unlock(&st->l); + + s->link.stqe_next = NULL; } -static inline struct stbl_entry * -stbl_find_entry(struct stbl *st, const union pkt_info *pi) +struct bhash4_key { + uint16_t port; + uint32_t addr; +} __attribute__((__packed__)); + +struct bhash6_key { + uint16_t port; + rte_xmm_t addr; +} __attribute__((__packed__)); + +struct bhash_key { + uint16_t port; + union { + uint32_t addr4; + rte_xmm_t addr6; + }; +} __attribute__((__packed__)); + +void bhash_fini(struct tle_ctx *ctx); + +int bhash_init(struct tle_ctx *ctx); + +static inline int +bhash_sockaddr2key(const struct sockaddr *addr, struct bhash_key *key) { - int32_t rc; - uint32_t type; - struct shtbl *ht; - struct stbl_key k; - - type = pi->tf.type; - stbl_pkt_fill_key(&k, pi, type); - ht = st->ht + type; - - rc = rte_hash_lookup(ht->t, &k); - if ((uint32_t)rc >= ht->nb_ent) - return NULL; - return ht->ent + rc; + int t; + const struct sockaddr_in *lin4; + const struct sockaddr_in6 *lin6; + + if (addr->sa_family == AF_INET) { + lin4 = (const struct sockaddr_in *)addr; + key->port = lin4->sin_port; + key->addr4 = lin4->sin_addr.s_addr; + t = TLE_V4; + } else { + lin6 = (const struct sockaddr_in6 *)addr; + memcpy(&key->addr6, &lin6->sin6_addr, sizeof(key->addr6)); + key->port = lin6->sin6_port; + t = TLE_V6; + } + + return t; } -static inline void * -stbl_find_data(struct stbl *st, const union pkt_info *pi) +/* Return 0 on success; + * Return errno on failure. + */ +static inline int +bhash_add_entry(struct tle_ctx *ctx, const struct sockaddr *addr, + struct tle_stream *s) { - struct stbl_entry *ent; - - ent = stbl_find_entry(st, pi); - return (ent == NULL) ? NULL : ent->data; + int t; + int rc; + int is_first; + struct bhash_key key; + struct rte_hash *bhash; + struct tle_stream *old, *tmp; + + is_first = 0; + t = bhash_sockaddr2key(addr, &key); + + rte_spinlock_lock(&ctx->bhash_lock[t]); + bhash = ctx->bhash[t]; + rc = rte_hash_lookup_data(bhash, &key, (void **)&old); + if (rc == -ENOENT) { + is_first = 1; + s->link.stqe_next = NULL; /* just to avoid follow */ + rc = rte_hash_add_key_data(bhash, &key, s); + } else if (rc >= 0) { + if (t == TLE_V4 && old->type == TLE_V6) { + /* V6 stream may listen V4 address, assure V4 stream + * is ahead of V6 stream in the list + */ + s->link.stqe_next = old; + rte_hash_add_key_data(bhash, &key, s); + } else { + tmp = old->link.stqe_next; + old->link.stqe_next = s; + s->link.stqe_next = tmp; + } + } + rte_spinlock_unlock(&ctx->bhash_lock[t]); + + /* IPv6 socket with unspecified address could receive IPv4 packets. + * So the stream should also be recorded in IPv4 table. + * Only the first stream need be inserted into V4 list, otherwise + * the V6 list is already following V4 list. + */ + if (t == TLE_V6 && !s->option.ipv6only && is_first && + IN6_IS_ADDR_UNSPECIFIED(&key.addr6)) { + t = TLE_V4; + rte_spinlock_lock(&ctx->bhash_lock[t]); + bhash = ctx->bhash[t]; + rc = rte_hash_lookup_data(bhash, &key, (void **)&old); + if (rc == -ENOENT) + rc = rte_hash_add_key_data(bhash, &key, s); + else if (rc >= 0) { + while(old->link.stqe_next != NULL) + old = old->link.stqe_next; + old->link.stqe_next = s; + s->link.stqe_next = NULL; + } + rte_spinlock_unlock(&ctx->bhash_lock[t]); + } + + return (rc >= 0) ? 0 : (-rc); } -#include "tcp_stream.h" - static inline void -stbl_stream_fill_key(struct stbl_key *k, const struct tle_stream *s, - uint32_t type) +bhash_del_entry(struct tle_ctx *ctx, struct tle_stream *s, + const struct sockaddr *addr) { - static const struct stbl_key zero = { - .port.raw = 0, - }; + int t; + int rc; + struct bhash_key key; + struct tle_stream *f, *cur, *pre = NULL; + + t = bhash_sockaddr2key(addr, &key); + + rte_spinlock_lock(&ctx->bhash_lock[t]); + rc = rte_hash_lookup_data(ctx->bhash[t], &key, (void **)&f); + if (rc >= 0) { + cur = f; + pre = NULL; + while (cur != s) { + pre = cur; + cur = cur->link.stqe_next; + } + + if (pre == NULL) { + cur = cur->link.stqe_next; + if (cur == NULL) + rte_hash_del_key(ctx->bhash[t], &key); + else /* change data */ + rte_hash_add_key_data(ctx->bhash[t], &key, cur); + } else + pre->link.stqe_next = cur->link.stqe_next; + } + + rte_spinlock_unlock(&ctx->bhash_lock[t]); + + if (rc < 0) + return; + + s->link.stqe_next = NULL; + + /* IPv6 socket with unspecified address could receive IPv4 packets. + * So the stream should also be recorded in IPv4 table*/ + if (t == TLE_V6 && !s->option.ipv6only && pre == NULL && + IN6_IS_ADDR_UNSPECIFIED(&key.addr6)) { + t = TLE_V4; + rte_spinlock_lock(&ctx->bhash_lock[t]); + rc = rte_hash_lookup_data(ctx->bhash[t], &key, (void **)&f); + if (rc >= 0) { + cur = f; + pre = NULL; + while (cur != s) { + pre = cur; + cur = cur->link.stqe_next; + } + + if (pre == NULL) { + cur = cur->link.stqe_next; + if (cur == NULL) + rte_hash_del_key(ctx->bhash[t], &key); + else /* change data */ + rte_hash_add_key_data(ctx->bhash[t], &key, cur); + } else + pre->link.stqe_next = cur->link.stqe_next; + } + + rte_spinlock_unlock(&ctx->bhash_lock[t]); + } - k->port = s->port; - if (type == TLE_V4) - k->addr4 = s->ipv4.addr; - else if (type == TLE_V6) - k->addr6 = s->ipv6.addr; - else - *k = zero; } -static inline struct stbl_entry * -stbl_add_stream_lock(struct stbl *st, const struct tle_tcp_stream *s) +static inline void * +bhash_reuseport_get_stream(struct tle_stream *s) { - uint32_t type; - struct stbl_key k; - struct stbl_entry *se; - struct shtbl *ht; - int32_t rc; - - type = s->s.type; - stbl_stream_fill_key(&k, &s->s, type); - ht = st->ht + type; + int n = 0; + struct tle_stream *e, *all[32]; + + e = s; + while(e && n < 32) { + all[n++] = e; + e = e->link.stqe_next; + } + + /* for each connection, this function will be called twice + * 1st time for the first handshake: SYN + * 2nd time for the third handshake: ACK + */ + return all[(s->reuseport_seed++) % n]; +} - stbl_lock(st, type); - rc = rte_hash_add_key(ht->t, &k); - stbl_unlock(st, type); +static inline void * +bhash_lookup4(struct rte_hash *t, uint32_t addr, uint16_t port, uint8_t reuse) +{ + int rc; + void *s = NULL; + struct bhash_key key = { + .port = port, + .addr4 = addr, + }; - if ((uint32_t)rc >= ht->nb_ent) - return NULL; + rc = rte_hash_lookup_data(t, &key, &s); + if (rc == -ENOENT) { + key.addr4 = INADDR_ANY; + rc = rte_hash_lookup_data(t, &key, &s); + } - se = ht->ent + rc; - if (se != NULL) - se->data = (void *)(uintptr_t)s; + if (rc >= 0) { + if (reuse) + return bhash_reuseport_get_stream(s); + else + return s; + } - return se; + return NULL; } -static inline void -stbl_del_stream(struct stbl *st, struct stbl_entry *se, - const struct tle_tcp_stream *s, uint32_t lock) +static inline void * +bhash_lookup6(struct rte_hash *t, rte_xmm_t addr, uint16_t port, uint8_t reuse) { - uint32_t type; - struct stbl_key k; + int rc; + void *s = NULL; + struct bhash_key key = { + .port = port, + .addr6 = addr, + }; - if (se == NULL) - return; + rc = rte_hash_lookup_data(t, &key, &s); + if (rc == -ENOENT) { + memcpy(&key.addr6, &tle_ipv6_any, sizeof(key.addr6)); + rc = rte_hash_lookup_data(t, &key, &s); + } - se->data = NULL; + if (rc >= 0) { + if (reuse) + return bhash_reuseport_get_stream(s); + else + return s; + } - type = s->s.type; - stbl_stream_fill_key(&k, &s->s, type); - if (lock != 0) - stbl_lock(st, type); - rte_hash_del_key(st->ht[type].t, &k); - if (lock != 0) - stbl_unlock(st, type); + return NULL; } #ifdef __cplusplus diff --git a/lib/libtle_l4p/syncookie.h b/lib/libtle_l4p/syncookie.h index 61bfce4..bf01e78 100644 --- a/lib/libtle_l4p/syncookie.h +++ b/lib/libtle_l4p/syncookie.h @@ -182,9 +182,12 @@ sync_fill_tcb(struct tcb *tcb, const union seg_info *si, const union tsopt *to) { uint32_t ack, mss, seq, wscale; + tcb->err = 0; + seq = si->seq; tcb->rcv.nxt = seq; + tcb->rcv.cpy = seq; tcb->rcv.irs = seq - 1; tcb->snd.wu.wl1 = seq; @@ -202,6 +205,7 @@ sync_fill_tcb(struct tcb *tcb, const union seg_info *si, const union tsopt *to) tcb->so.mss = mss; tcb->snd.ts = to->ecr; + tcb->snd.cork_ts = 0; tcb->rcv.ts = to->val; tcb->so.ts.raw = to->raw; diff --git a/lib/libtle_l4p/tcp_ctl.h b/lib/libtle_l4p/tcp_ctl.h index bec1e76..3196470 100644 --- a/lib/libtle_l4p/tcp_ctl.h +++ b/lib/libtle_l4p/tcp_ctl.h @@ -22,6 +22,7 @@ #include "tcp_stream.h" #include "tcp_ofo.h" +#include "tcp_timer.h" #ifdef __cplusplus extern "C" { @@ -97,10 +98,10 @@ calc_rx_wnd(const struct tle_tcp_stream *s, uint32_t scale) /* peer doesn't support WSCALE option, wnd size is limited to 64K */ if (scale == TCP_WSCALE_NONE) { - wnd = _rte_ring_get_mask(s->rx.q) << TCP_WSCALE_DEFAULT; + wnd = rte_ring_free_count(s->rx.q) << TCP_WSCALE_DEFAULT; return RTE_MIN(wnd, (uint32_t)UINT16_MAX); } else - return _rte_ring_get_mask(s->rx.q) << scale; + return rte_ring_free_count(s->rx.q) << scale; } /* empty stream's send queue */ @@ -144,31 +145,34 @@ static inline void tcp_stream_reset(struct tle_ctx *ctx, struct tle_tcp_stream *s) { struct stbl *st; - uint16_t uop; + uint16_t state; + uint8_t i; st = CTX_TCP_STLB(ctx); - /* reset TX armed */ - rte_atomic32_set(&s->tx.arm, 0); + for (i = 0; i < TIMER_NUM; i++) + timer_stop(s, i); /* reset TCB */ - uop = s->tcb.uop & ~TCP_OP_CLOSE; + state = s->tcb.state; memset(&s->tcb, 0, sizeof(s->tcb)); /* reset cached destination */ memset(&s->tx.dst, 0, sizeof(s->tx.dst)); - if (uop != TCP_OP_ACCEPT) { + /* state could be ESTABLISHED, CLOSED or LISTEN + * stream in CLOSED state has already been cleared by stream_term + * stream in ESTABLISHED state is accepted stream, and doesn't need clear + */ + if (state == TCP_ST_LISTEN) { /* free stream's destination port */ stream_clear_ctx(ctx, &s->s); - if (uop == TCP_OP_LISTEN) - empty_lq(s); + empty_lq(s); } if (s->ste != NULL) { /* remove entry from RX streams table */ - stbl_del_stream(st, s->ste, s, - (s->flags & TLE_CTX_FLAG_ST) == 0); + stbl_del_stream(st, s->ste, &s->s); s->ste = NULL; empty_rq(s); } @@ -184,6 +188,48 @@ tcp_stream_reset(struct tle_ctx *ctx, struct tle_tcp_stream *s) put_stream(ctx, &s->s, TCP_STREAM_TX_FINISHED(s)); } +static inline void +stream_term(struct tle_tcp_stream *s) +{ + struct sdr *dr; + + /* 1) recv a RST packet; 2) keepalive timeout */ + if (s->tcb.state == TCP_ST_ESTABLISHED) { + TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB); + TCP_INC_STATS(TCP_MIB_ESTABRESETS); + } + + s->tcb.state = TCP_ST_CLOSED; + rte_smp_wmb(); + + /* close() was already invoked, schedule final cleanup */ + if ((s->tcb.uop & TCP_OP_CLOSE) != 0) { + if ((s->tcb.uop & TCP_OP_ACCEPT) == 0) { + /* free stream's destination port */ + stream_clear_ctx(s->s.ctx, &s->s); + if ((s->tcb.uop & TCP_OP_LISTEN) != 0) + empty_lq(s); + } + + if (s->ste != NULL) { + /* remove entry from RX streams table */ + stbl_del_stream(CTX_TCP_STLB(s->s.ctx), s->ste, &s->s); + s->ste = NULL; + empty_rq(s); + } + + dr = CTX_TCP_SDR(s->s.ctx); + rte_spinlock_lock(&dr->lock); + STAILQ_INSERT_TAIL(&dr->be, &s->s, link); + rte_spinlock_unlock(&dr->lock); + + /* notify user that stream need to be closed */ + } else if (s->err.ev != NULL) + tle_event_raise(s->err.ev); + else if (s->err.cb.func != NULL) + s->err.cb.func(s->err.cb.data, &s->s); +} + #ifdef __cplusplus } #endif diff --git a/lib/libtle_l4p/tcp_misc.h b/lib/libtle_l4p/tcp_misc.h index 0cef8b2..1f7974e 100644 --- a/lib/libtle_l4p/tcp_misc.h +++ b/lib/libtle_l4p/tcp_misc.h @@ -30,7 +30,7 @@ extern "C" { * of protocol related data. */ -#define TCP_WSCALE_DEFAULT 7 +#define TCP_WSCALE_DEFAULT 10 #define TCP_WSCALE_NONE 0 #define TCP_TX_HDR_MAX (sizeof(struct tcp_hdr) + TCP_TX_OPT_LEN_MAX) @@ -71,27 +71,6 @@ extern "C" { /* TCP flags mask. */ #define TCP_FLAG_MASK UINT8_MAX -union typflg { - uint16_t raw; - struct { - uint8_t type; /* TLE_V4/TLE_V6 */ - uint8_t flags; /* TCP header flags */ - }; -}; - -union pkt_info { - rte_xmm_t raw; - struct { - union typflg tf; - uint16_t csf; /* checksum flags */ - union l4_ports port; - union { - union ipv4_addrs addr4; - const union ipv6_addrs *addr6; - }; - }; -}; - union seg_info { rte_xmm_t raw; struct { @@ -226,7 +205,7 @@ struct dack_info { }; /* get current timestamp in ms */ -static inline uint32_t +static inline uint64_t tcp_get_tms(uint32_t mshift) { uint64_t ts; @@ -344,7 +323,9 @@ fill_syn_opts(void *p, const struct syn_opts *so) opt = (struct tcpopt *)to; } - to[0] = TCP_OPT_KIND_EOL; + to[0] = TCP_OPT_KIND_NOP; + to[1] = TCP_OPT_KIND_NOP; + to[2] = TCP_OPT_KIND_NOP; } /* @@ -390,6 +371,8 @@ get_tms_opts(uintptr_t p, uint32_t len) else if (kind == TCP_OPT_KIND_NOP) i += sizeof(to->kl.kind); else { + if (to->kl.len == 0) + break; i += to->kl.len; if (i <= len && to->kl.raw == TCP_OPT_KL_TMS) { ts.val = rte_be_to_cpu_32(to->ts.val); @@ -449,7 +432,6 @@ get_pkt_info(const struct rte_mbuf *m, union pkt_info *pi, union seg_info *si) ((uintptr_t)tcph + offsetof(struct tcp_hdr, src_port)); pi->tf.flags = tcph->tcp_flags; pi->tf.type = type; - pi->csf = m->ol_flags & (PKT_RX_IP_CKSUM_MASK | PKT_RX_L4_CKSUM_MASK); pi->port.raw = prt->raw; get_seg_info(tcph, si); @@ -462,7 +444,7 @@ tcp_mbuf_seq_free(struct rte_mbuf *mb[], uint32_t num) len = 0; for (i = 0; i != num; i++) { - len += mb[i]->pkt_len; + len += PKT_L4_PLEN(mb[i]); rte_pktmbuf_free(mb[i]); } diff --git a/lib/libtle_l4p/tcp_ofo.c b/lib/libtle_l4p/tcp_ofo.c index 1565445..b31f2b5 100644 --- a/lib/libtle_l4p/tcp_ofo.c +++ b/lib/libtle_l4p/tcp_ofo.c @@ -12,7 +12,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include <rte_malloc.h> #include <rte_errno.h> #include "tcp_stream.h" @@ -28,12 +27,6 @@ #define OFO_OBJ_MAX (OFODB_OBJ_MAX * OFO_DB_MAX) void -tcp_ofo_free(struct ofo *ofo) -{ - rte_free(ofo); -} - -static void calc_ofo_elems(uint32_t nbufs, uint32_t *nobj, uint32_t *ndb) { uint32_t n, nd, no; @@ -51,35 +44,3 @@ calc_ofo_elems(uint32_t nbufs, uint32_t *nobj, uint32_t *ndb) *nobj = no; *ndb = nd; } - -struct ofo * -tcp_ofo_alloc(uint32_t nbufs, int32_t socket) -{ - uint32_t i, ndb, nobj; - size_t dsz, osz, sz; - struct ofo *ofo; - struct rte_mbuf **obj; - - calc_ofo_elems(nbufs, &nobj, &ndb); - osz = sizeof(*ofo) + sizeof(ofo->db[0]) * ndb; - dsz = sizeof(ofo->db[0].obj[0]) * nobj * ndb; - sz = osz + dsz; - - ofo = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, socket); - if (ofo == NULL) { - TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d " - "failed with error code: %d\n", - __func__, sz, socket, rte_errno); - return NULL; - } - - obj = (struct rte_mbuf **)&ofo->db[ndb]; - for (i = 0; i != ndb; i++) { - ofo->db[i].nb_max = nobj; - ofo->db[i].obj = obj + i * nobj; - } - - ofo->nb_max = ndb; - return ofo; -} - diff --git a/lib/libtle_l4p/tcp_ofo.h b/lib/libtle_l4p/tcp_ofo.h index 9d88266..0857f17 100644 --- a/lib/libtle_l4p/tcp_ofo.h +++ b/lib/libtle_l4p/tcp_ofo.h @@ -20,8 +20,6 @@ extern "C" { #endif -#include <stdbool.h> - struct ofodb { uint32_t nb_elem; uint32_t nb_max; @@ -103,7 +101,7 @@ _ofo_insert_mbuf(struct ofo* ofo, uint32_t pos, union seqlen* sl, db->obj[k + i] = mb[i]; } if (tcp_seq_lt(end, seq)) - rte_pktmbuf_trim(mb[i - 1], seq - end); + _rte_pktmbuf_trim(mb[i - 1], seq - end); db->nb_elem += i; db->sl.len += tcp_seq_min(seq, end) - sl->seq; @@ -157,7 +155,7 @@ _ofo_insert_right(struct ofo *ofo, uint32_t pos, union seqlen *sl, plen = mb[i]->pkt_len; if (n < plen) { /* adjust partially overlapped packet. */ - rte_pktmbuf_adj(mb[i], n); + mb[i] = _rte_pktmbuf_adj(mb[i], n); break; } } @@ -258,7 +256,7 @@ static inline uint32_t _ofodb_enqueue(struct rte_ring *r, const struct ofodb *db, uint32_t *seq) { uint32_t i, n, num, begin, end; - struct rte_mbuf *pkt; + struct rte_mbuf* pkt; n = 0; num = db->nb_elem; @@ -289,11 +287,7 @@ _ofodb_enqueue(struct rte_ring *r, const struct ofodb *db, uint32_t *seq) return num - n; } -struct ofo * -tcp_ofo_alloc(uint32_t nbufs, int32_t socket); - -void -tcp_ofo_free(struct ofo *ofo); +void calc_ofo_elems(uint32_t nbufs, uint32_t *nobj, uint32_t *ndb); #ifdef __cplusplus } diff --git a/lib/libtle_l4p/tcp_rxq.h b/lib/libtle_l4p/tcp_rxq.h index 2351ee6..be092f9 100644 --- a/lib/libtle_l4p/tcp_rxq.h +++ b/lib/libtle_l4p/tcp_rxq.h @@ -17,6 +17,7 @@ #define _TCP_RXQ_H_ #include "tcp_ofo.h" +#include "tcp_ctl.h" #ifdef __cplusplus extern "C" { @@ -74,6 +75,7 @@ rx_ofo_reduce(struct tle_tcp_stream *s) s->tcb.rcv.nxt = seq; _ofo_remove(ofo, 0, i); + return n; } @@ -133,6 +135,8 @@ rx_data_enqueue(struct tle_tcp_stream *s, uint32_t seq, uint32_t len, } n = rte_ring_count(s->rx.q); + /* update receive window with left recv buffer*/ + s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale); if (r != n) { /* raise RX event */ if (s->rx.ev != NULL) diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c index a519645..5d7e0d1 100644 --- a/lib/libtle_l4p/tcp_rxtx.c +++ b/lib/libtle_l4p/tcp_rxtx.c @@ -28,8 +28,30 @@ #include "tcp_rxq.h" #include "tcp_txq.h" #include "tcp_tx_seg.h" +#include "tcp_rxtx.h" -#define TCP_MAX_PKT_SEG 0x20 +/* Uncomment below line to debug cwnd */ +// #define DEBUG_CWND + +#ifdef DEBUG_CWND +#define CWND_INFO(msg, value) printf("CWND: %s: %d\n", msg, value) +#else +#define CWND_INFO(msg, value) do {} while (0) +#endif + +#define TCP_MAX_PKT_SEG 0x20 +#define DELAY_ACK_CHECK_INTERVAL 100 + +/* must larger than l2_len(14)+l3_len(20)+l4_len(20)+tms_option(12) */ +#define RESERVE_HEADER_LEN 128 + +/* If we encounter exhaustion of recv win, we set this thresh to + * update recv win to the remote. It's not set to 1 or some smaller + * value to avoid too-frequent update. + */ +#define RECV_WIN_NOTIFY_THRESH 64 + +static inline int stream_fill_dest(struct tle_tcp_stream *s); /* * checks if input TCP ports and IP addresses match given stream. @@ -54,11 +76,17 @@ rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi) static inline struct tle_tcp_stream * rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi, - uint32_t type) + uint32_t type, uint8_t reuse) { struct tle_tcp_stream *s; - s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst]; + if (type == TLE_V4) + s = bhash_lookup4(dev->ctx->bhash[type], + pi->addr4.dst, pi->port.dst, reuse); + else + s = bhash_lookup6(dev->ctx->bhash[type], + pi->addr6->dst, pi->port.dst, reuse); + if (s == NULL || tcp_stream_acquire(s) < 0) return NULL; @@ -77,10 +105,10 @@ rx_obtain_stream(const struct tle_dev *dev, struct stbl *st, { struct tle_tcp_stream *s; - s = stbl_find_data(st, pi); + s = TCP_STREAM(stbl_find_stream(st, pi)); if (s == NULL) { - if (pi->tf.flags == TCP_FLAG_ACK) - return rx_obtain_listen_stream(dev, pi, type); + if (pi->tf.flags & TCP_FLAG_ACK) + return rx_obtain_listen_stream(dev, pi, type, 1); return NULL; } @@ -150,131 +178,6 @@ pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num) return i; } -static inline void -stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[], - uint32_t nb_drb) -{ - _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb); -} - -static inline uint32_t -stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[], - uint32_t nb_drb) -{ - return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb); -} - -static inline uint32_t -get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st) -{ - uint32_t pid; - rte_atomic32_t *pa; - - pa = &dev->tx.packet_id[type]; - - if (st == 0) { - pid = rte_atomic32_add_return(pa, num); - return pid - num; - } else { - pid = rte_atomic32_read(pa); - rte_atomic32_set(pa, pid + num); - return pid; - } -} - -static inline void -fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port, - uint32_t seq, uint8_t hlen, uint8_t flags) -{ - uint16_t wnd; - - l4h->src_port = port.dst; - l4h->dst_port = port.src; - - wnd = (flags & TCP_FLAG_SYN) ? - RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) : - tcb->rcv.wnd >> tcb->rcv.wscale; - - /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */ - l4h->sent_seq = rte_cpu_to_be_32(seq); - l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt); - l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET; - l4h->tcp_flags = flags; - l4h->rx_win = rte_cpu_to_be_16(wnd); - l4h->cksum = 0; - l4h->tcp_urp = 0; - - if (flags & TCP_FLAG_SYN) - fill_syn_opts(l4h + 1, &tcb->so); - else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0) - fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts); -} - -static inline int -tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s, - const struct tle_dest *dst, uint64_t ol_flags, - union l4_ports port, uint32_t seq, uint32_t flags, - uint32_t pid, uint32_t swcsm) -{ - uint32_t l4, len, plen; - struct tcp_hdr *l4h; - char *l2h; - - len = dst->l2_len + dst->l3_len; - plen = m->pkt_len; - - if (flags & TCP_FLAG_SYN) - l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX; - else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0) - l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS; - else - l4 = sizeof(*l4h); - - /* adjust mbuf to put L2/L3/L4 headers into it. */ - l2h = rte_pktmbuf_prepend(m, len + l4); - if (l2h == NULL) - return -EINVAL; - - /* copy L2/L3 header */ - rte_memcpy(l2h, dst->hdr, len); - - /* setup TCP header & options */ - l4h = (struct tcp_hdr *)(l2h + len); - fill_tcph(l4h, &s->tcb, port, seq, l4, flags); - - /* setup mbuf TX offload related fields. */ - m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0); - m->ol_flags |= ol_flags; - - /* update proto specific fields. */ - - if (s->s.type == TLE_V4) { - struct ipv4_hdr *l3h; - l3h = (struct ipv4_hdr *)(l2h + dst->l2_len); - l3h->packet_id = rte_cpu_to_be_16(pid); - l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4); - - if ((ol_flags & PKT_TX_TCP_CKSUM) != 0) - l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len, - ol_flags); - else if (swcsm != 0) - l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h); - - if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0) - l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len); - } else { - struct ipv6_hdr *l3h; - l3h = (struct ipv6_hdr *)(l2h + dst->l2_len); - l3h->payload_len = rte_cpu_to_be_16(plen + l4); - if ((ol_flags & PKT_TX_TCP_CKSUM) != 0) - l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags); - else if (swcsm != 0) - l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h); - } - - return 0; -} - /* * That function supposed to be used only for data packets. * Assumes that L2/L3/L4 headers and mbuf fields already setup properly. @@ -355,6 +258,9 @@ tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num) i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m, num, drb, &nb); + if (i > 0) + timer_stop(s, TIMER_DACK); + /* free unused drbs. */ if (nb != 0) stream_drb_free(s, drb + nbm - nb, nb); @@ -362,6 +268,113 @@ tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num) return i; } +/* + * case 0: pkt is not split yet, (indicate plen > sl->len) + * case 1: pkt is split, but left packet > sl->len + * case 2: pkt is split, but left packet <= sl->len + */ +static inline struct rte_mbuf * +get_indirect_mbuf(struct tle_tcp_stream *s, + struct rte_mbuf *m, uint32_t *p_plen, + union seqlen *sl, uint32_t type, + uint32_t mss) +{ + uint32_t hdr_len = PKT_L234_HLEN(m), plen, left; + struct rte_mbuf *f, *t; + uint16_t i, nb_segs, adj; + void *hdr; + + if (s->tcb.snd.nxt_pkt) { + f = s->tcb.snd.nxt_pkt; + plen = f->data_len - s->tcb.snd.nxt_offset; + if (f == m) /* 1st segment contains net headers */ + plen -= hdr_len; + } else { + f = m; + plen = f->data_len - hdr_len; + } + + TCP_LOG(DEBUG, "m(%p): pkt_len=%u, nb_segs=%u, sl->len = %u\n", + m, m->pkt_len, m->nb_segs, sl->len); + + nb_segs = 1; + if (sl->len < plen) { + /* Segment split needed: sometimes, cwnd will be reset to + * 1 or 2 mss. In this case, we send part of this seg, and + * record which segment we've sent, and the offset of sent + * data in tcb. + */ + left = plen - sl->len; + plen = sl->len; + s->tcb.snd.nxt_pkt = f; + } else { + left = 0; + t = f->next; + while (t && plen + t->data_len <= sl->len) { + plen += t->data_len; + t = t->next; + nb_segs++; + } + s->tcb.snd.nxt_pkt = t; + } + + struct rte_mbuf *pkts[1 + nb_segs]; + if (rte_pktmbuf_alloc_bulk(s->tx.dst.head_mp, pkts, 1 + nb_segs) < 0) + return NULL; + + rte_pktmbuf_attach(pkts[1], f); + + /* remove bytes in the beginning */ + adj = s->tcb.snd.nxt_offset; + if (f == m) + adj += hdr_len; + if (adj) + rte_pktmbuf_adj(pkts[1], adj); + + /* remove bytes in the end */ + if (left > 0) { + rte_pktmbuf_trim(pkts[1], left); + s->tcb.snd.nxt_offset += plen; + } else + s->tcb.snd.nxt_offset = 0; + + /* attach chaining segment if we have */ + for (i = 1, t = f->next; i < nb_segs; ++i) { + rte_pktmbuf_attach(pkts[i+1], t); + pkts[i]->next = pkts[i+1]; + t = t->next; + } + + /* prepare l2/l3/l4 header */ + hdr = rte_pktmbuf_append(pkts[0], hdr_len); + rte_memcpy(hdr, rte_pktmbuf_mtod(m, void *), hdr_len); + pkts[0]->nb_segs = nb_segs + 1; + pkts[0]->pkt_len = plen + hdr_len; + pkts[0]->ol_flags = m->ol_flags; + pkts[0]->tx_offload = m->tx_offload; + if (type == TLE_V4) { + struct ipv4_hdr *l3h; + + l3h = rte_pktmbuf_mtod_offset(pkts[0], + struct ipv4_hdr *, m->l2_len); + l3h->total_length = + rte_cpu_to_be_16(plen + m->l3_len + m->l4_len); + } else { + struct ipv6_hdr *l3h; + + l3h = rte_pktmbuf_mtod_offset(pkts[0], + struct ipv6_hdr *, m->l2_len); + l3h->payload_len = + rte_cpu_to_be_16(plen + m->l4_len); + } + if (plen <= mss) + pkts[0]->ol_flags &= ~PKT_TX_TCP_SEG; + pkts[0]->next = pkts[1]; + + *p_plen = plen; + return pkts[0]; +} + static inline uint32_t tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[], uint32_t num) @@ -371,11 +384,13 @@ tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[], struct rte_mbuf *mb; struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG]; + /* check stream has drb to send pkts */ + if (stream_drb_empty(s)) + return 0; + mss = s->tcb.snd.mss; type = s->s.type; - dev = s->tx.dst.dev; - pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0); k = 0; tn = 0; @@ -383,26 +398,64 @@ tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[], for (i = 0; i != num && sl->len != 0 && fail == 0; i++) { mb = mi[i]; - sz = RTE_MIN(sl->len, mss); plen = PKT_L4_PLEN(mb); /*fast path, no need to use indirect mbufs. */ - if (plen <= sz) { - + if (s->tcb.snd.nxt_pkt == NULL && plen <= sl->len) { + pid = get_ip_pid(dev, calc_seg_cnt(plen, s->tcb.snd.mss), + type, (s->flags & TLE_CTX_FLAG_ST) != 0); /* update pkt TCP header */ - tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i); + tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid); /* keep mbuf till ACK is received. */ rte_pktmbuf_refcnt_update(mb, 1); sl->len -= plen; sl->seq += plen; mo[k++] = mb; - /* remaining snd.wnd is less them MSS, send nothing */ - } else if (sz < mss) + if (sl->seq <= s->tcb.snd.rcvr) + TCP_INC_STATS(TCP_MIB_RETRANSSEGS); + /* remaining snd.wnd is less than MSS, send nothing */ + } else if (sl->len < mss) { + break; + /* some data to send already */ + } else if (k != 0 || tn != 0) { break; /* packet indirection needed */ - else - RTE_VERIFY(0); + } else { + struct rte_mbuf *out; + + out = get_indirect_mbuf(s, mb, &plen, sl, type, mss); + if (out == NULL) + return 0; + + pid = get_ip_pid(dev, calc_seg_cnt(plen, s->tcb.snd.mss), + type, (s->flags & TLE_CTX_FLAG_ST) != 0); + /* update pkt TCP header */ + tcp_update_mbuf(out, type, &s->tcb, sl->seq, pid); + + /* no need to bump refcnt !!! */ + + sl->len -= plen; + sl->seq += plen; + + if (tx_data_pkts(s, &out, 1) == 0) { + /* should not happen, we have checked at least one + * drb is available to send this mbuf + */ + rte_pktmbuf_free(out); + return 0; + } + + if (sl->seq <= s->tcb.snd.rcvr) + TCP_INC_STATS(TCP_MIB_RETRANSSEGS); + + if (s->tcb.snd.nxt_pkt) + return 0; + else { + tn = 1; + continue; + } + } if (k >= MAX_PKT_BURST) { n = tx_data_pkts(s, mo, k); @@ -466,14 +519,17 @@ tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms) tcp_txq_set_nxt_head(s, n); } while (n == num); - s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt; + if (sl.seq != (uint32_t)s->tcb.snd.nxt) { + s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt; + s->tcb.snd.ack = s->tcb.rcv.nxt; + } return tn; } static inline void free_una_data(struct tle_tcp_stream *s, uint32_t len) { - uint32_t i, num, plen; + uint32_t i, num, plen, una_data; struct rte_mbuf **mi; plen = 0; @@ -487,14 +543,18 @@ free_una_data(struct tle_tcp_stream *s, uint32_t len) /* free acked data */ for (i = 0; i != num && plen != len; i++) { - uint32_t next_pkt_len = PKT_L4_PLEN(mi[i]); - if (plen + next_pkt_len > len) { - /* keep SND.UNA at the start of the packet */ - len = plen; + una_data = PKT_L4_PLEN(mi[i]) - s->tcb.snd.una_offset; + + /* partial ack */ + if (plen + una_data > len) { + s->tcb.snd.una_offset += len - plen; + plen = len; break; - } else { - plen += next_pkt_len; } + + /* monolithic ack */ + s->tcb.snd.una_offset = 0; + plen += una_data; rte_pktmbuf_free(mi[i]); } @@ -503,6 +563,7 @@ free_una_data(struct tle_tcp_stream *s, uint32_t len) } while (plen < len); s->tcb.snd.una += len; + s->tcb.snd.waitlen -= len; /* * that could happen in case of retransmit, @@ -519,7 +580,7 @@ calc_smss(uint16_t mss, const struct tle_dest *dst) { uint16_t n; - n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK; + n = dst->mtu - dst->l3_len - sizeof(struct tcp_hdr); mss = RTE_MIN(n, mss); return mss; } @@ -537,71 +598,53 @@ initial_cwnd(uint32_t smss, uint32_t icw) return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw)); } -/* - * queue standalone packet to he particular output device - * It assumes that: - * - L2/L3/L4 headers should be already set. - * - packet fits into one segment. - */ -static inline int -send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m) +void +tle_tcp_stream_kill(struct tle_stream *ts) { - uint32_t n, nb; - struct tle_drb *drb; - - if (stream_drb_alloc(s, &drb, 1) == 0) - return -ENOBUFS; - - /* enqueue pkt for TX. */ - nb = 1; - n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1, - &drb, &nb); - - /* free unused drbs. */ - if (nb != 0) - stream_drb_free(s, &drb, 1); - - return (n == 1) ? 0 : -ENOBUFS; -} + struct tle_tcp_stream *s; -static inline int -send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq, - uint32_t flags) -{ - const struct tle_dest *dst; - uint32_t pid, type; - int32_t rc; + s = TCP_STREAM(ts); + if (ts == NULL || s->s.type >= TLE_VNUM) + return; - dst = &s->tx.dst; - type = s->s.type; - pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0); + if (s->tcb.state > TCP_ST_LISTEN) + send_rst(s, s->tcb.snd.nxt); - rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1); - if (rc == 0) - rc = send_pkt(s, dst->dev, m); + if (s->tcb.state == TCP_ST_ESTABLISHED) + TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB); - return rc; + s->tcb.state = TCP_ST_CLOSED; + rte_smp_wmb(); + timer_stop(s, TIMER_RTO); } static inline int -send_rst(struct tle_tcp_stream *s, uint32_t seq) +send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags) { struct rte_mbuf *m; + uint32_t seq; int32_t rc; m = rte_pktmbuf_alloc(s->tx.dst.head_mp); if (m == NULL) return -ENOMEM; - rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST); - if (rc != 0) + seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0); + s->tcb.snd.ts = tms; + + rc = send_ctrl_pkt(s, m, seq, flags); + if (rc != 0) { rte_pktmbuf_free(m); + return rc; + } - return rc; + timer_stop(s, TIMER_DACK); + s->tcb.snd.ack = s->tcb.rcv.nxt; + return 0; } static inline int -send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags) +send_keepalive(struct tle_tcp_stream *s) { struct rte_mbuf *m; uint32_t seq; @@ -611,20 +654,16 @@ send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags) if (m == NULL) return -ENOMEM; - seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0); - s->tcb.snd.ts = tms; + seq = s->tcb.snd.una - 1; - rc = send_ctrl_pkt(s, m, seq, flags); + rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_ACK); if (rc != 0) { rte_pktmbuf_free(m); return rc; } - - s->tcb.snd.ack = s->tcb.rcv.nxt; return 0; } - static int sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi, const union seg_info *si, uint32_t ts, struct rte_mbuf *m) @@ -633,19 +672,23 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi, int32_t rc; uint32_t pid, seq, type; struct tle_dev *dev; - const void *da; + const void *sa, *da; struct tle_dest dst; const struct tcp_hdr *th; - type = s->s.type; + type = pi->tf.type; /* get destination information. */ - if (type == TLE_V4) + if (type == TLE_V4) { da = &pi->addr4.src; - else + sa = &pi->addr4.dst; + } + else { da = &pi->addr6->src; + sa = &pi->addr6->dst; + } - rc = stream_get_dest(&s->s, da, &dst); + rc = stream_get_dest(type, &s->s, sa, da, &dst); if (rc < 0) return rc; @@ -654,11 +697,16 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi, get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th)); s->tcb.rcv.nxt = si->seq + 1; + s->tcb.rcv.cpy = si->seq + 1; seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss, s->s.ctx->prm.hash_alg, &s->s.ctx->prm.secret_key); - s->tcb.so.ts.ecr = s->tcb.so.ts.val; - s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale); + + if (s->tcb.so.ts.raw) { + s->tcb.so.ts.ecr = s->tcb.so.ts.val; + s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale); + } + s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ? TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT; s->tcb.so.mss = calc_smss(dst.mtu, &dst); @@ -672,11 +720,13 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi, dev = dst.dev; pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0); - rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq, - TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1); + rc = tcp_fill_mbuf(m, s, &dst, TCP_OLFLAGS_CKSUM(dst.ol_flags), + pi->port, seq, TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1); if (rc == 0) rc = send_pkt(s, dev, m); + TCP_INC_STATS(TCP_MIB_PASSIVEOPENS); + return rc; } @@ -800,43 +850,24 @@ restore_syn_opt(union seg_info *si, union tsopt *to, return 0; } -static inline void -stream_term(struct tle_tcp_stream *s) -{ - struct sdr *dr; - - s->tcb.state = TCP_ST_CLOSED; - rte_smp_wmb(); - - timer_stop(s); - - /* close() was already invoked, schedule final cleanup */ - if ((s->tcb.uop & TCP_OP_CLOSE) != 0) { - - dr = CTX_TCP_SDR(s->s.ctx); - STAILQ_INSERT_TAIL(&dr->be, &s->s, link); - - /* notify user that stream need to be closed */ - } else if (s->err.ev != NULL) - tle_event_raise(s->err.ev); - else if (s->err.cb.func != NULL) - s->err.cb.func(s->err.cb.data, &s->s); -} - static inline int stream_fill_dest(struct tle_tcp_stream *s) { int32_t rc; uint32_t type; - const void *da; + const void *sa, *da; - type = s->s.type; - if (type == TLE_V4) + type = s->s.type; + if (type == TLE_V4) { + sa = &s->s.ipv4.addr.dst; da = &s->s.ipv4.addr.src; - else + } + else { + sa = &s->s.ipv6.addr.dst; da = &s->s.ipv6.addr.src; + } - rc = stream_get_dest(&s->s, da, &s->tx.dst); + rc = stream_get_dest(type, &s->s, sa, da, &s->tx.dst); return (rc < 0) ? rc : 0; } @@ -851,19 +882,17 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, int32_t rc; uint32_t rtt; - /* some TX still pending for that stream. */ - if (TCP_STREAM_TX_PENDING(cs)) - return -EAGAIN; - /* setup L4 ports and L3 addresses fields. */ cs->s.port.raw = pi->port.raw; cs->s.pmsk.raw = UINT32_MAX; if (pi->tf.type == TLE_V4) { + cs->s.type = TLE_V4; cs->s.ipv4.addr = pi->addr4; cs->s.ipv4.mask.src = INADDR_NONE; cs->s.ipv4.mask.dst = INADDR_NONE; } else if (pi->tf.type == TLE_V6) { + cs->s.type = TLE_V6; cs->s.ipv6.addr = *pi->addr6; rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none, sizeof(cs->s.ipv6.mask.src)); @@ -887,7 +916,7 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, cs->tcb.snd.rto = TCP_RTO_DEFAULT; /* copy streams type & flags. */ - cs->s.type = ps->s.type; + cs->s.type = pi->tf.type; cs->flags = ps->flags; /* retrive and cache destination information. */ @@ -897,16 +926,23 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, /* update snd.mss with SMSS value */ cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst); + if (cs->tcb.so.ts.raw != 0) { + cs->tcb.snd.mss -= TCP_TX_OPT_LEN_TMS; + } /* setup congestion variables */ cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd); + CWND_INFO("accept", cs->tcb.snd.cwnd); + cs->tcb.snd.ssthresh = cs->tcb.snd.wnd; cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw; + cs->tcb.snd.rto_fw = ps->tcb.snd.rto_fw; cs->tcb.state = TCP_ST_ESTABLISHED; + TCP_INC_STATS_ATOMIC(TCP_MIB_CURRESTAB); /* add stream to the table */ - cs->ste = stbl_add_stream(st, pi, cs); + cs->ste = stbl_add_stream(st, &cs->s); if (cs->ste == NULL) return -ENOBUFS; @@ -937,7 +973,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, *csp = NULL; - if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0) + if ((pi->tf.flags & TCP_FLAG_ACK) == 0|| rx_check_stream(s, pi) != 0) return -EINVAL; ctx = s->s.ctx; @@ -964,7 +1000,8 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, /* cleanup on failure */ tcp_stream_down(cs); - stbl_del_stream(st, cs->ste, cs, 0); + TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB); + stbl_del_stream(st, cs->ste, &cs->s); cs->ste = NULL; } @@ -982,6 +1019,10 @@ data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf **mb, uint32_t hlen, len = *plen; rte_pktmbuf_adj(*mb, hlen); + /* header is removed, so we clear tx_offload here to make sure + * we can get correct payload length with PKT_L4_PLEN. + */ + (*mb)->tx_offload = 0; if (len == 0) return -ENODATA; /* cut off the start of the packet */ @@ -1018,7 +1059,8 @@ rx_ackdata(struct tle_tcp_stream *s, uint32_t ack) tle_event_raise(s->tx.ev); else if (k == 0 && s->tx.cb.func != NULL) s->tx.cb.func(s->tx.cb.data, &s->s); - } + } else + txs_enqueue(s->s.ctx, s); } return n; @@ -1029,8 +1071,7 @@ stream_timewait(struct tle_tcp_stream *s, uint32_t rto) { if (rto != 0) { s->tcb.state = TCP_ST_TIME_WAIT; - s->tcb.snd.rto = rto; - timer_reset(s); + timer_reset(s, TIMER_RTO, rto); } else stream_term(s); } @@ -1041,20 +1082,30 @@ rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp) uint32_t state; int32_t ackfin; + s->tcb.rcv.frs.on = 2; s->tcb.rcv.nxt += 1; ackfin = (s->tcb.snd.una == s->tcb.snd.fss); state = s->tcb.state; if (state == TCP_ST_ESTABLISHED) { + TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB); s->tcb.state = TCP_ST_CLOSE_WAIT; /* raise err.ev & err.cb */ - if (s->err.ev != NULL) + /* raise error event only when recvbuf is empty, to inform + * that the stream will not receive data any more. + */ + if (rte_ring_count(s->rx.q) == 0 && s->err.ev != NULL) tle_event_raise(s->err.ev); else if (s->err.cb.func != NULL) s->err.cb.func(s->err.cb.data, &s->s); } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) { rsp->flags |= TCP_FLAG_ACK; + + /* shutdown instead of close happens */ + if (rte_ring_count(s->rx.q) == 0 && s->err.ev != NULL) + tle_event_raise(s->err.ev); + if (ackfin != 0) stream_timewait(s, s->tcb.snd.rto_tw); else @@ -1089,8 +1140,10 @@ rx_fin(struct tle_tcp_stream *s, uint32_t state, ts = rx_tms_opt(&s->tcb, mb); ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts); - if (ret != 0) + if (ret != 0) { + rsp->flags |= TCP_FLAG_ACK; return ret; + } if (state < TCP_ST_ESTABLISHED) return -EINVAL; @@ -1108,9 +1161,10 @@ rx_fin(struct tle_tcp_stream *s, uint32_t state, * fast-path: all data & FIN was already sent out * and now is acknowledged. */ - if (s->tcb.snd.fss == s->tcb.snd.nxt && - si->ack == (uint32_t)s->tcb.snd.nxt) { + if (s->tcb.snd.fss >= s->tcb.snd.nxt && + si->ack == (uint32_t)s->tcb.snd.fss) { s->tcb.snd.una = s->tcb.snd.fss; + s->tcb.snd.nxt = s->tcb.snd.una; empty_tq(s); /* conventional ACK processiing */ } else @@ -1148,8 +1202,25 @@ rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags, else rc = check_seqn(&s->tcb, si->seq, 0); - if (rc == 0) + if (rc == 0) { + /* receive rst, connection is closed abnormal + * and should return errno in later operations. + */ + switch (state) { + case TCP_ST_SYN_SENT: + TCP_INC_STATS(TCP_MIB_ATTEMPTFAILS); + s->tcb.err = ECONNREFUSED; + break; + case TCP_ST_CLOSE_WAIT: + s->tcb.err = EPIPE; + break; + case TCP_ST_CLOSED: + return rc; + default: + s->tcb.err = ECONNRESET; + } stream_term(s); + } return rc; } @@ -1222,6 +1293,7 @@ rto_cwnd_update(struct tcb *tcb) * no more than 1 full-sized segment. */ tcb->snd.cwnd = tcb->snd.mss; + CWND_INFO("update", tcb->snd.cwnd); } static inline void @@ -1330,13 +1402,17 @@ rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack, ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack, plen, ts); - if (ret != 0) - break; - /* account for segment received */ ack_info_update(tack, &si[j], ret != 0, plen, ts); + if (ret != 0) + break; + rte_pktmbuf_adj(mb[j], hlen); + /* header is removed, so we clear tx_offload here to make sure + * we can get correct payload length with PKT_L4_PLEN. + */ + mb[j]->tx_offload = 0; } n = j - i; @@ -1377,6 +1453,7 @@ start_fast_retransmit(struct tle_tcp_stream *s) tcp_txq_rst_nxt_head(s); tcb->snd.nxt = tcb->snd.una; tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss; + CWND_INFO("start fast retrans", tcb->snd.cwnd); } static inline void @@ -1389,6 +1466,7 @@ stop_fast_retransmit(struct tle_tcp_stream *s) n = tcb->snd.nxt - tcb->snd.una; tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh, RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss); + CWND_INFO("stop fast retrans", tcb->snd.cwnd); tcb->snd.fastack = 0; } @@ -1415,8 +1493,10 @@ in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num, * during fast recovery, also reset the * retransmit timer. */ - if (tcb->snd.fastack == 1) - timer_reset(s); + if (tcb->snd.fastack == 1) { + timer_reset(s, TIMER_RTO, s->tcb.snd.rto); + s->tcb.snd.nb_retx = 0; + } tcb->snd.fastack += ack_num; return 1; @@ -1456,7 +1536,8 @@ process_ack(struct tle_tcp_stream *s, uint32_t acked, /* remain in normal mode */ } else if (acked != 0) { ack_cwnd_update(&s->tcb, acked, tack); - timer_stop(s); + timer_stop(s, TIMER_RTO); + s->tcb.snd.nb_retx = 0; } /* fast retransmit mode */ @@ -1470,7 +1551,7 @@ process_ack(struct tle_tcp_stream *s, uint32_t acked, } else { /* RFC 5682 3.2.3 full ACK */ stop_fast_retransmit(s); - timer_stop(s); + timer_stop(s, TIMER_RTO); /* if we have another series of dup ACKs */ if (tack->dup3.seg != 0 && @@ -1501,17 +1582,22 @@ rx_ackfin(struct tle_tcp_stream *s) uint32_t state; s->tcb.snd.una = s->tcb.snd.fss; + s->tcb.snd.nxt = s->tcb.snd.una; empty_tq(s); state = s->tcb.state; if (state == TCP_ST_LAST_ACK) stream_term(s); else if (state == TCP_ST_FIN_WAIT_1) { - timer_stop(s); + timer_stop(s, TIMER_RTO); s->tcb.state = TCP_ST_FIN_WAIT_2; - } else if (state == TCP_ST_CLOSING) { + /* if stream is closed, should be released + * before timeout even without fin from peer + */ + if (s->tcb.uop & TCP_OP_CLOSE) + timer_start(s, TIMER_RTO, s->tcb.snd.rto_fw); + } else if (state == TCP_ST_CLOSING) stream_timewait(s, s->tcb.snd.rto_tw); - } } static inline void @@ -1532,7 +1618,7 @@ rx_process_ack(struct tle_tcp_stream *s, uint32_t ts, /* restart RTO timer. */ if (s->tcb.snd.nxt != s->tcb.snd.una) - timer_start(s); + timer_start(s, TIMER_RTO, s->tcb.snd.rto); /* update rto, if fresh packet is here then calculate rtt */ if (tack->ts.ecr != 0) @@ -1554,15 +1640,9 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state, if (state != TCP_ST_SYN_SENT) return -EINVAL; - /* - * RFC 793 3.9: in the SYN-SENT state - * If SEG.ACK =< ISS, or SEG.ACK > SND.NXT, send a reset - * <SEQ=SEG.ACK><CTL=RST> - * and discard the segment. - * The connection remains in the same state. - */ + /* invalid SEG.SEQ */ if (si->ack != (uint32_t)s->tcb.snd.nxt) { - send_rst(s, si->ack); + rsp->flags = TCP_FLAG_RST; return 0; } @@ -1574,18 +1654,25 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state, s->tcb.snd.una = s->tcb.snd.nxt; s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst); + if (s->tcb.so.ts.raw != 0) { + s->tcb.snd.mss -= TCP_TX_OPT_LEN_TMS; + } s->tcb.snd.wnd = si->wnd << so.wscale; s->tcb.snd.wu.wl1 = si->seq; s->tcb.snd.wu.wl2 = si->ack; s->tcb.snd.wscale = so.wscale; + s->tcb.snd.cork_ts = 0; /* setup congestion variables */ s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd); + CWND_INFO("synack", s->tcb.snd.cwnd); + s->tcb.snd.ssthresh = s->tcb.snd.wnd; s->tcb.rcv.ts = so.ts.val; s->tcb.rcv.irs = si->seq; s->tcb.rcv.nxt = si->seq + 1; + s->tcb.rcv.cpy = si->seq + 1; /* if peer doesn't support WSCALE opt, recalculate RCV.WND */ s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ? @@ -1597,9 +1684,14 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state, rsp->flags |= TCP_FLAG_ACK; - timer_stop(s); + timer_stop(s, TIMER_RTO); + s->tcb.snd.nb_retx = 0; s->tcb.state = TCP_ST_ESTABLISHED; rte_smp_wmb(); + TCP_INC_STATS_ATOMIC(TCP_MIB_CURRESTAB); + + if (s->s.option.keepalive) + timer_start(s, TIMER_KEEPALIVE, s->s.option.keepidle * MS_PER_S); if (s->tx.ev != NULL) tle_event_raise(s->tx.ev); @@ -1689,8 +1781,8 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts, * fast-path: all data & FIN was already sent out * and now is acknowledged. */ - if (s->tcb.snd.fss == s->tcb.snd.nxt && - tack.ack == (uint32_t)s->tcb.snd.nxt) + if (s->tcb.snd.fss >= s->tcb.snd.nxt && + tack.ack == (uint32_t)s->tcb.snd.fss) rx_ackfin(s); else rx_process_ack(s, ts, &tack); @@ -1702,27 +1794,44 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts, * - received segment with INO data and no TX is scheduled * for that stream. */ - if (tack.segs.badseq != 0 || tack.segs.ofo != 0 || - (tack.segs.data != 0 && - rte_atomic32_read(&s->tx.arm) == 0)) + if (tack.segs.badseq != 0 || tack.segs.ofo != 0) + rsp.flags |= TCP_FLAG_ACK; + else if (tack.segs.data != 0 && + rte_atomic32_read(&s->tx.arm) == 0 && + (s->s.option.tcpquickack || + s->tcb.rcv.nxt - s->tcb.snd.ack > 8 * s->tcb.so.mss)) { rsp.flags |= TCP_FLAG_ACK; + if (s->s.option.tcpquickack > 0) + s->s.option.tcpquickack--; + } + else if (tack.segs.data && rsp.flags == 0) + timer_start(s, TIMER_DACK, DELAY_ACK_CHECK_INTERVAL); rx_ofo_fin(s, &rsp); k += num - n; i = num; + if (s->s.option.keepalive) { + s->tcb.snd.nb_keepalive = 0; + timer_reset(s, TIMER_KEEPALIVE, s->s.option.keepidle * MS_PER_S); + } /* unhandled state, drop all packets. */ } else i = 0; /* we have a response packet to send. */ - if (rsp.flags != 0) { + if (rsp.flags == TCP_FLAG_RST) { + send_rst(s, si[i].ack); + stream_term(s); + } else if (rsp.flags != 0) { send_ack(s, ts, rsp.flags); /* start the timer for FIN packet */ - if ((rsp.flags & TCP_FLAG_FIN) != 0) - timer_reset(s); + if ((rsp.flags & TCP_FLAG_FIN) != 0) { + timer_reset(s, TIMER_RTO, s->tcb.snd.rto); + s->tcb.snd.nb_retx = 0; + } } /* unprocessed packets */ @@ -1778,7 +1887,6 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, state = s->tcb.state; if (state == TCP_ST_LISTEN) { - /* one connection per flow */ cs = NULL; ret = -EINVAL; @@ -1835,6 +1943,74 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, return num - k; } +static inline void +sync_refuse(struct tle_tcp_stream *s, struct tle_dev *dev, + const union pkt_info *pi, struct rte_mbuf *m) +{ + struct ether_hdr *eth_h; + struct ether_addr eth_addr; + struct ipv4_hdr *ip_h; + uint32_t ip_addr; + struct ipv6_hdr *ipv6_h; + struct in6_addr ipv6_addr; + struct tcp_hdr *th; + uint16_t port; + + /* rst pkt should not contain options for syn */ + rte_pktmbuf_trim(m, m->l4_len - sizeof(*th)); + + eth_h = rte_pktmbuf_mtod(m, struct ether_hdr*); + ether_addr_copy(ð_h->s_addr, ð_addr); + ether_addr_copy(ð_h->d_addr, ð_h->s_addr); + ether_addr_copy(ð_addr, ð_h->d_addr); + + th = rte_pktmbuf_mtod_offset(m, struct tcp_hdr*, + m->l2_len + m->l3_len); + port = th->src_port; + th->src_port = th->dst_port; + th->dst_port = port; + th->tcp_flags = TCP_FLAG_RST | TCP_FLAG_ACK; + th->recv_ack = rte_cpu_to_be_32(rte_be_to_cpu_32(th->sent_seq) + 1); + th->sent_seq = 0; + th->data_off &= 0x0f; + th->data_off |= (sizeof(*th) / 4) << 4; + th->cksum = 0; + + if (pi->tf.type == TLE_V4) { + ip_h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr*, + m->l2_len); + ip_addr = ip_h->src_addr; + ip_h->src_addr = ip_h->dst_addr; + ip_h->dst_addr = ip_addr; + ip_h->total_length = rte_cpu_to_be_16( + rte_be_to_cpu_16(ip_h->total_length) - + (m->l4_len - sizeof(*th))); + ip_h->hdr_checksum = 0; + th->cksum = rte_ipv4_udptcp_cksum(ip_h, th); + ip_h->hdr_checksum = rte_ipv4_cksum(ip_h); + } else { + ipv6_h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr*, + m->l2_len); + rte_memcpy(&ipv6_addr, ipv6_h->src_addr, + sizeof(struct in6_addr)); + rte_memcpy(ipv6_h->src_addr, ipv6_h->dst_addr, + sizeof(struct in6_addr)); + rte_memcpy(ipv6_h->dst_addr, &ipv6_addr, + sizeof(struct in6_addr)); + ipv6_h->payload_len = rte_cpu_to_be_16( + rte_be_to_cpu_16(ipv6_h->payload_len) - + (m->l4_len - sizeof(*th))); + th->cksum = rte_ipv6_udptcp_cksum(ipv6_h, th); + } + + if (m->pkt_len < ETHER_MIN_LEN) + rte_pktmbuf_append(m, ETHER_MIN_LEN - m->pkt_len); + + if (send_pkt(s, dev, m) != 0) + rte_pktmbuf_free(m); + else + TCP_INC_STATS(TCP_MIB_OUTRSTS); +} static inline uint32_t rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts, @@ -1846,20 +2022,35 @@ rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts, uint32_t i, k; int32_t ret; - s = rx_obtain_listen_stream(dev, &pi[0], type); + s = rx_obtain_listen_stream(dev, &pi[0], type, 0); if (s == NULL) { - for (i = 0; i != num; i++) { - rc[i] = ENOENT; - rp[i] = mb[i]; + /* no socket listening this syn, send rst to refuse connect */ + s = TCP_STREAM(get_stream(dev->ctx)); + if (s != NULL) { + sync_refuse(s, dev, &pi[0], mb[0]); + put_stream(dev->ctx, &s->s, 0); + i = 1; + } else { + i = 0; } - return 0; + k = 0; + for (; i != num; i++) { + rc[k] = ENOENT; + rp[k] = mb[i]; + k++; + } + return num - k; } k = 0; for (i = 0; i != num; i++) { - + /* check if stream has space to maintain new connection */ + if (rte_ring_free_count(s->rx.q) == 0 || + (s->s.ctx->streams.nb_free == 0 && + s->s.ctx->streams.nb_cur >= s->s.ctx->prm.max_streams - 1)) + ret = -ENOSPC; /* check that this remote is allowed to connect */ - if (rx_check_stream(s, &pi[i]) != 0) + else if (rx_check_stream(s, &pi[i]) != 0) ret = -ENOENT; else /* syncokie: reply with <SYN,ACK> */ @@ -1882,43 +2073,34 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], { struct stbl *st; struct tle_ctx *ctx; - uint32_t i, j, k, mt, n, t, ts; + uint32_t i, j, k, n, t; + uint64_t ts; union pkt_info pi[num]; union seg_info si[num]; - union { - uint8_t t[TLE_VNUM]; - uint32_t raw; - } stu; + + TCP_ADD_STATS(TCP_MIB_INSEGS, num); ctx = dev->ctx; ts = tcp_get_tms(ctx->cycles_ms_shift); st = CTX_TCP_STLB(ctx); - mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0); - - stu.raw = 0; /* extract packet info and check the L3/L4 csums */ for (i = 0; i != num; i++) { get_pkt_info(pkt[i], &pi[i], &si[i]); - t = pi[i].tf.type; - pi[i].csf = check_pkt_csum(pkt[i], pi[i].csf, t, IPPROTO_TCP); - stu.t[t] = mt; + pi[i].csf = check_pkt_csum(pkt[i], t, IPPROTO_TCP); } - if (stu.t[TLE_V4] != 0) - stbl_lock(st, TLE_V4); - if (stu.t[TLE_V6] != 0) - stbl_lock(st, TLE_V6); - k = 0; for (i = 0; i != num; i += j) { - t = pi[i].tf.type; /*basic checks for incoming packet */ - if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) { + if (t >= TLE_VNUM || pi[i].csf != 0) { + TCP_INC_STATS(TCP_MIB_INERRS); + if (t < TLE_VNUM) + TCP_INC_STATS(TCP_MIB_CSUMERRORS); rc[k] = EINVAL; rp[k] = pkt[i]; j = 1; @@ -1937,11 +2119,6 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], } } - if (stu.t[TLE_V4] != 0) - stbl_unlock(st, TLE_V4); - if (stu.t[TLE_V6] != 0) - stbl_unlock(st, TLE_V6); - return num - k; } @@ -1953,21 +2130,37 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[], struct tle_tcp_stream *s; s = TCP_STREAM(ts); - n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num); - if (n == 0) - return 0; - /* - * if we still have packets to read, - * then rearm stream RX event. - */ - if (n == num && rte_ring_count(s->rx.q) != 0) { - if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) - tle_event_raise(s->rx.ev); + if (tcp_stream_try_acquire(s) > 0) { + if (s->tcb.state != TCP_ST_LISTEN) { + tcp_stream_release(s); + rte_errno = EINVAL; + return 0; + } + + n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num); + if (n == 0) + { + tcp_stream_release(s); + rte_errno = EAGAIN; + return 0; + } + + /* + * if we still have packets to read, + * then rearm stream RX event. + */ + if (n == num && rte_ring_count(s->rx.q) != 0) { + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + } + tcp_stream_release(s); + return n; + } else { tcp_stream_release(s); + rte_errno = EINVAL; + return 0; } - - return n; } uint16_t @@ -1995,6 +2188,7 @@ tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num) stream_drb_free(s, drb + i, j - i); } + TCP_ADD_STATS(TCP_MIB_OUTSEGS, n); return n; } @@ -2010,73 +2204,17 @@ stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi) pi->tf.type = s->s.type; } -static int -stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr) -{ - const struct sockaddr_in *in4; - const struct sockaddr_in6 *in6; - const struct tle_dev_param *prm; - int32_t rc; - - rc = 0; - s->s.pmsk.raw = UINT32_MAX; - - /* setup L4 src ports and src address fields. */ - if (s->s.type == TLE_V4) { - in4 = (const struct sockaddr_in *)addr; - if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0) - return -EINVAL; - - s->s.port.src = in4->sin_port; - s->s.ipv4.addr.src = in4->sin_addr.s_addr; - s->s.ipv4.mask.src = INADDR_NONE; - s->s.ipv4.mask.dst = INADDR_NONE; - - } else if (s->s.type == TLE_V6) { - in6 = (const struct sockaddr_in6 *)addr; - if (memcmp(&in6->sin6_addr, &tle_ipv6_any, - sizeof(tle_ipv6_any)) == 0 || - in6->sin6_port == 0) - return -EINVAL; - - s->s.port.src = in6->sin6_port; - rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr, - sizeof(s->s.ipv6.addr.src)); - rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none, - sizeof(s->s.ipv6.mask.src)); - rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none, - sizeof(s->s.ipv6.mask.dst)); - } - - /* setup the destination device. */ - rc = stream_fill_dest(s); - if (rc != 0) - return rc; - - /* setup L4 dst address from device param */ - prm = &s->tx.dst.dev->prm; - if (s->s.type == TLE_V4) { - if (s->s.ipv4.addr.dst == INADDR_ANY) - s->s.ipv4.addr.dst = prm->local_addr4.s_addr; - } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any, - sizeof(tle_ipv6_any)) == 0) - memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6, - sizeof(s->s.ipv6.addr.dst)); - - return rc; -} - static inline int -tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) +tx_syn(struct tle_tcp_stream *s) { int32_t rc; - uint32_t tms, seq; + uint32_t seq; + uint64_t tms; union pkt_info pi; struct stbl *st; struct stbl_entry *se; - /* fill stream address */ - rc = stream_fill_addr(s, addr); + rc = stream_fill_dest(s); if (rc != 0) return rc; @@ -2107,7 +2245,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) /* add the stream in stream table */ st = CTX_TCP_STLB(s->s.ctx); - se = stbl_add_stream_lock(st, s); + se = stbl_add_stream(st, &s->s); if (se == NULL) return -ENOBUFS; s->ste = se; @@ -2115,6 +2253,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) /* put stream into the to-send queue */ txs_enqueue(s->s.ctx, s); + TCP_INC_STATS(TCP_MIB_ACTIVEOPENS); return 0; } @@ -2147,7 +2286,7 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr) /* fill stream, prepare and transmit syn pkt */ s->tcb.uop |= TCP_OP_CONNECT; - rc = tx_syn(s, addr); + rc = tx_syn(s); tcp_stream_release(s); /* error happened, do a cleanup */ @@ -2160,13 +2299,29 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr) uint16_t tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) { - uint32_t n; + uint32_t n, i; + uint32_t free_slots; struct tle_tcp_stream *s; s = TCP_STREAM(ts); + + free_slots = rte_ring_free_count(s->rx.q); + n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num); - if (n == 0) + if (n == 0) { + if (s->tcb.err != 0) { + rte_errno = s->tcb.err; + } else { + rte_errno = EAGAIN; + } return 0; + } + + for (i = 0; i < n; ++i) + s->tcb.rcv.cpy += rte_pktmbuf_pkt_len(pkt[i]); + + /* update receive window with left recv buffer*/ + s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale); /* * if we still have packets to read, @@ -2176,28 +2331,99 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) tle_event_raise(s->rx.ev); tcp_stream_release(s); + /* if we have received fin, no more data will come, raise err event. */ + } else if (s->tcb.rcv.frs.on == 2) { + if (tcp_stream_try_acquire(s) > 0 && s->err.ev != NULL) + tle_event_raise(s->err.ev); + tcp_stream_release(s); + } + + /* update recv win to the remote */ + if (free_slots < RECV_WIN_NOTIFY_THRESH && + rte_ring_free_count(s->rx.q) >= RECV_WIN_NOTIFY_THRESH) { + s->tcb.snd.update_rcv = true; + txs_enqueue(s->s.ctx, s); } return n; } +uint16_t +tle_tcp_stream_inq(struct tle_stream *ts) +{ + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + return s->tcb.rcv.nxt - s->tcb.rcv.cpy; +} + +#define DECONST(type, var) ((type)(uintptr_t)(const void *)(var)) + +ssize_t +tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, int iovcnt) +{ + struct msghdr msg = {0}; + + msg.msg_iov = DECONST(struct iovec *, iov); /* Recover const later */ + msg.msg_iovlen = iovcnt; + return tle_tcp_stream_recvmsg(ts, &msg); +} + ssize_t -tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, - int iovcnt) +tle_tcp_stream_recvmsg(struct tle_stream *ts, struct msghdr *msg) { + size_t sz; int32_t i; uint32_t mn, n, tn; - size_t sz; + uint32_t free_slots; struct tle_tcp_stream *s; struct iovec iv; struct rxq_objs mo[2]; + struct sockaddr_in *addr; + struct sockaddr_in6 *addr6; + const struct iovec *iov = msg->msg_iov; + int iovcnt = msg->msg_iovlen; s = TCP_STREAM(ts); + free_slots = rte_ring_free_count(s->rx.q); + /* get group of packets */ mn = tcp_rxq_get_objs(s, mo); - if (mn == 0) - return 0; + if (mn == 0) { + if (s->tcb.err != 0) + rte_errno = s->tcb.err; + else + rte_errno = EAGAIN; + return -1; + } + + if (!ts->option.timestamp) + ts->timestamp = mo[0].mb[0]->timestamp; + + if (msg->msg_control != NULL) { + if (ts->option.timestamp) + tle_set_timestamp(msg, mo[0].mb[0]); + else + msg->msg_controllen = 0; + } + + if (msg->msg_name != NULL) { + if (s->s.type == TLE_V4) { + addr = (struct sockaddr_in*)msg->msg_name; + addr->sin_family = AF_INET; + addr->sin_addr.s_addr = s->s.ipv4.addr.src; + addr->sin_port = s->s.port.src; + msg->msg_namelen = sizeof(struct sockaddr_in); + } else { + addr6 = (struct sockaddr_in6*)msg->msg_name; + addr6->sin6_family = AF_INET6; + rte_memcpy(&addr6->sin6_addr, &s->s.ipv6.addr.src, + sizeof(struct sockaddr_in6)); + addr6->sin6_port = s->s.port.src; + msg->msg_namelen = sizeof(struct sockaddr_in6); + } + } sz = 0; n = 0; @@ -2229,6 +2455,8 @@ tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, } tcp_rxq_consume(s, tn); + /* update receive window with left recv buffer*/ + s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale); /* * if we still have packets to read, @@ -2238,6 +2466,20 @@ tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) tle_event_raise(s->rx.ev); tcp_stream_release(s); + /* if we have received fin, no more data will come, raise err event. */ + } else if (s->tcb.rcv.frs.on == 2) { + if (tcp_stream_try_acquire(s) > 0 && s->err.ev != NULL) + tle_event_raise(s->err.ev); + tcp_stream_release(s); + } + + s->tcb.rcv.cpy += sz; + + /* update recv win to the remote */ + if (free_slots < RECV_WIN_NOTIFY_THRESH && + rte_ring_free_count(s->rx.q) >= RECV_WIN_NOTIFY_THRESH) { + s->tcb.snd.update_rcv = true; + txs_enqueue(s->s.ctx, s); } return sz; @@ -2263,48 +2505,35 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags, if (i == num) { /* queue packets for further transmission. */ rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num); - if (rc != 0) + if (rc != 0) { + rc = -EAGAIN; free_mbufs(segs, num); + } } return rc; } -uint16_t -tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) +static inline uint16_t +stream_send(struct tle_tcp_stream *s, struct rte_mbuf *pkt[], + uint16_t num, uint16_t mss, uint64_t ol_flags) { - uint32_t i, j, k, mss, n, state; + uint16_t i, j, k; int32_t rc; - uint64_t ol_flags; - struct tle_tcp_stream *s; + uint32_t n, free_slots; struct rte_mbuf *segs[TCP_MAX_PKT_SEG]; - - s = TCP_STREAM(ts); - - /* mark stream as not closable. */ - if (tcp_stream_acquire(s) < 0) { - rte_errno = EAGAIN; - return 0; - } - - state = s->tcb.state; - if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) { - rte_errno = ENOTCONN; - tcp_stream_release(s); - return 0; - } - - mss = s->tcb.snd.mss; - ol_flags = s->tx.dst.ol_flags; + int32_t pkt_len; k = 0; rc = 0; + pkt_len = 0; while (k != num) { /* prepare and check for TX */ for (i = k; i != num; i++) { if (pkt[i]->pkt_len > mss || pkt[i]->nb_segs > TCP_MAX_PKT_SEG) break; + pkt_len += pkt[i]->pkt_len; rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags, s->s.port, 0, TCP_FLAG_ACK, 0, 0); if (rc != 0) @@ -2328,6 +2557,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) pkt[j]->l3_len + pkt[j]->l4_len); pkt[j]->ol_flags &= ol_flags; + pkt_len -= pkt[j]->pkt_len; } break; } @@ -2339,8 +2569,10 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) /* segment large packet and enqueue for sending */ } else if (i != num) { + free_slots = rte_ring_free_count(s->tx.q); + free_slots = RTE_MIN(free_slots, RTE_DIM(segs)); /* segment the packet. */ - rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs), + rc = tcp_segmentation(pkt[i], segs, free_slots, &s->tx.dst, mss); if (rc < 0) { rte_errno = -rc; @@ -2351,19 +2583,161 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) if (rc == 0) { /* free the large mbuf */ rte_pktmbuf_free(pkt[i]); + pkt_len += pkt[i]->pkt_len; /* set the mbuf as consumed */ k++; - } else + } else { /* no space left in tx queue */ + RTE_VERIFY(0); break; + } } } + s->tcb.snd.waitlen += pkt_len; + return k; +} + +static inline uint16_t +stream_send_tso(struct tle_tcp_stream *s, struct rte_mbuf *pkt[], + uint16_t num, uint16_t mss, uint64_t ol_flags) +{ + uint16_t i, k, nb_segs; + int32_t rc, pkt_len; + uint64_t ol_flags1; + struct rte_mbuf *pre_tail; + + k = 0; + rc = 0; + while (k != num) { + /* Make sure there is at least one slot available */ + if (rte_ring_free_count(s->tx.q) == 0) + break; + + /* prepare and check for TX */ + nb_segs = 0; + pkt_len = 0; + pre_tail = NULL; + for (i = k; i != num; i++) { + if (pkt[i]->nb_segs != 1) + rte_panic("chained mbuf: %p\n", pkt[i]); + /* We shall consider cwnd and snd wnd when limit len */ + if (nb_segs + pkt[i]->nb_segs <= TCP_MAX_PKT_SEG && + pkt_len + pkt[i]->pkt_len <= 65535 - RESERVE_HEADER_LEN) { + nb_segs += pkt[i]->nb_segs; + pkt_len += pkt[i]->pkt_len; + if (pre_tail) + pre_tail->next = pkt[i]; + pre_tail = rte_pktmbuf_lastseg(pkt[i]); + } else { + /* enqueue this one now */ + break; + } + } + + if (unlikely(i == k)) { + /* pkt[k] is a too big packet, now we fall back to + * non-tso send; we can optimize it later by + * splitting the mbuf. + */ + if (stream_send(s, &pkt[k], 1, mss, ol_flags) == 1) { + k++; + continue; + } else + break; + } + + pkt[k]->nb_segs = nb_segs; + pkt[k]->pkt_len = pkt_len; + + ol_flags1 = ol_flags; + if (pkt_len > mss) + ol_flags1 |= PKT_TX_TCP_SEG; + + rc = tcp_fill_mbuf(pkt[k], s, &s->tx.dst, ol_flags1, + s->s.port, 0, TCP_FLAG_ACK, 0, 0); + if (rc != 0) /* hard to recover */ + rte_panic("failed to fill mbuf: %p\n", pkt[k]); + + /* correct mss */ + pkt[k]->tso_segsz = mss; + + s->tcb.snd.waitlen += pkt_len; + /* We already make sure there is at least one slot */ + if (_rte_ring_enqueue_burst(s->tx.q, (void **)pkt + k, 1) < 1) + RTE_VERIFY(0); + + k = i; + } + + return k; +} + +uint16_t +tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) +{ + uint16_t k, mss, state; + uint64_t ol_flags; + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + + if (s->tcb.err != 0) { + rte_errno = s->tcb.err; + return 0; + } + + /* mark stream as not closable. */ + if (tcp_stream_acquire(s) < 0) { + rte_errno = EAGAIN; + return 0; + } + + state = s->tcb.state; + switch (state) { + case TCP_ST_ESTABLISHED: + case TCP_ST_CLOSE_WAIT: + break; + case TCP_ST_FIN_WAIT_1: + case TCP_ST_FIN_WAIT_2: + case TCP_ST_CLOSING: + case TCP_ST_LAST_ACK: + rte_errno = EPIPE; + tcp_stream_release(s); + return 0; + default: + rte_errno = ENOTCONN; + tcp_stream_release(s); + return 0; + } + + mss = s->tcb.snd.mss; + + ol_flags = s->tx.dst.ol_flags; + + /* Some reference number on the case: + * "<netperf with uss> - tap - <kernel stack> - <netserver>" + * ~2Gbps with tso disabled; + * ~16Gbps with tso enabled. + */ + if (rte_ring_free_count(s->tx.q) == 0) { + /* Block send may try without waiting for tx event (raised by acked + * data), so here we will still put this stream for further process + */ + txs_enqueue(s->s.ctx, s); + rte_errno = EAGAIN; + k = 0; + } else if (s->tx.dst.dev->prm.tx_offload & DEV_TX_OFFLOAD_TCP_TSO) + k = stream_send_tso(s, pkt, num, mss, ol_flags); + else + k = stream_send(s, pkt, num, mss, ol_flags); + /* notify BE about more data to send */ if (k != 0) txs_enqueue(s->s.ctx, s); + /* if possible, re-arm stream write event. */ - if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL) + if (rte_ring_free_count(s->tx.q) && s->tx.ev != NULL && k == num) tle_event_raise(s->tx.ev); tcp_stream_release(s); @@ -2382,9 +2756,15 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, struct tle_tcp_stream *s; struct iovec iv; struct rte_mbuf *mb[2 * MAX_PKT_BURST]; + uint16_t mss; s = TCP_STREAM(ts); + if (s->tcb.err != 0) { + rte_errno = s->tcb.err; + return -1; + } + /* mark stream as not closable. */ if (tcp_stream_acquire(s) < 0) { rte_errno = EAGAIN; @@ -2392,7 +2772,18 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, } state = s->tcb.state; - if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) { + switch (state) { + case TCP_ST_ESTABLISHED: + case TCP_ST_CLOSE_WAIT: + break; + case TCP_ST_FIN_WAIT_1: + case TCP_ST_FIN_WAIT_2: + case TCP_ST_CLOSING: + case TCP_ST_LAST_ACK: + rte_errno = EPIPE; + tcp_stream_release(s); + return -1; + default: rte_errno = ENOTCONN; tcp_stream_release(s); return -1; @@ -2403,11 +2794,24 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, for (i = 0; i != iovcnt; i++) tsz += iov[i].iov_len; + if (tsz == 0) { + tcp_stream_release(s); + return 0; + } + slen = rte_pktmbuf_data_room_size(mp); - slen = RTE_MIN(slen, s->tcb.snd.mss); + mss = s->tcb.snd.mss; + + slen = RTE_MIN(slen, mss); num = (tsz + slen - 1) / slen; n = rte_ring_free_count(s->tx.q); + + if (n == 0) { + tcp_stream_release(s); + return 0; + } + num = RTE_MIN(num, n); n = RTE_MIN(num, RTE_DIM(mb)); @@ -2451,7 +2855,6 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, k = 0; if (k != j) { - /* free pkts that were not enqueued */ free_mbufs(mb + k, j - k); @@ -2466,14 +2869,16 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, } } - if (k != 0) { - + if (k != 0) { /* notify BE about more data to send */ txs_enqueue(s->s.ctx, s); /* if possible, re-arm stream write event. */ if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL) tle_event_raise(s->tx.ev); + } else { + rte_errno = EAGAIN; + sz = -1; } tcp_stream_release(s); @@ -2485,7 +2890,7 @@ static inline void tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state) { /* try to send some data */ - tx_nxt_data(s, tms); + uint32_t tn = tx_nxt_data(s, tms); /* we also have to send a FIN */ if (state != TCP_ST_ESTABLISHED && @@ -2495,6 +2900,13 @@ tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state) s->tcb.snd.fss = ++s->tcb.snd.nxt; send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK); } + + if (s->tcb.snd.update_rcv) { + if (tn == 0) + send_ack(s, tms, TCP_FLAG_ACK); /* update recv window */ + + s->tcb.snd.update_rcv = false; + } } static inline void @@ -2507,7 +2919,7 @@ tx_stream(struct tle_tcp_stream *s, uint32_t tms) if (state == TCP_ST_SYN_SENT) { /* send the SYN, start the rto timer */ send_ack(s, tms, TCP_FLAG_SYN); - timer_start(s); + timer_start(s, TIMER_RTO, s->tcb.snd.rto); } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) { @@ -2515,7 +2927,7 @@ tx_stream(struct tle_tcp_stream *s, uint32_t tms) /* start RTO timer. */ if (s->tcb.snd.nxt != s->tcb.snd.una) - timer_start(s); + timer_start(s, TIMER_RTO, s->tcb.snd.rto); } } @@ -2544,7 +2956,6 @@ rto_stream(struct tle_tcp_stream *s, uint32_t tms) if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) { if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) { - /* update SND.CWD and SND.SSTHRESH */ rto_cwnd_update(&s->tcb); @@ -2570,50 +2981,131 @@ rto_stream(struct tle_tcp_stream *s, uint32_t tms) * than one SYN or SYN/ACK retransmissions or true loss * detection has been made. */ - if (s->tcb.snd.nb_retx != 0) + if (s->tcb.snd.nb_retx != 0) { s->tcb.snd.cwnd = s->tcb.snd.mss; + CWND_INFO("synsent", s->tcb.snd.cwnd); + } send_ack(s, tms, TCP_FLAG_SYN); - - } else if (state == TCP_ST_TIME_WAIT) { - stream_term(s); + TCP_INC_STATS(TCP_MIB_RETRANSSEGS); } /* RFC6298:5.5 back off the timer */ s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto); s->tcb.snd.nb_retx++; - timer_restart(s); + timer_restart(s, TIMER_RTO, s->tcb.snd.rto); } else { - send_rst(s, s->tcb.snd.nxt); + if (state == TCP_ST_SYN_SENT) { + if (stream_fill_dest(s) != 0 || + is_broadcast_ether_addr((struct ether_addr *)s->tx.dst.hdr)) + s->tcb.err = EHOSTUNREACH; + else + /* TODO: do we send rst on this */ + s->tcb.err = ENOTCONN; + } else + send_rst(s, s->tcb.snd.una); stream_term(s); } } +static inline void +set_keepalive_timer(struct tle_tcp_stream *s) +{ + if (s->s.option.keepalive) { + if (s->tcb.state == TCP_ST_ESTABLISHED) { + if (s->tcb.snd.nb_keepalive == 0) + timer_reset(s, TIMER_KEEPALIVE, + s->s.option.keepidle * MS_PER_S); + else + timer_reset(s, TIMER_KEEPALIVE, + s->s.option.keepintvl * MS_PER_S); + } + } else { + timer_stop(s, TIMER_KEEPALIVE); + s->tcb.snd.nb_keepalive = 0; + } +} + int tle_tcp_process(struct tle_ctx *ctx, uint32_t num) { - uint32_t i, k, tms; + uint8_t type; + uint32_t i, k; + uint64_t tms; struct sdr *dr; struct tle_timer_wheel *tw; struct tle_stream *p; struct tle_tcp_stream *s, *rs[num]; - /* process streams with RTO exipred */ + tms = tcp_get_tms(ctx->cycles_ms_shift); + /* process streams with RTO exipred */ tw = CTX_TCP_TMWHL(ctx); - tms = tcp_get_tms(ctx->cycles_ms_shift); tle_timer_expire(tw, tms); k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs)); for (i = 0; i != k; i++) { - - s = rs[i]; - s->timer.handle = NULL; - if (tcp_stream_try_acquire(s) > 0) - rto_stream(s, tms); - tcp_stream_release(s); + s = timer_stream(rs[i]); + type = timer_type(rs[i]); + s->timer.handle[type] = NULL; + + switch (type) { + case TIMER_RTO: + /* FE cannot change stream into below states, + * that's why we don't put it into lock + */ + if (s->tcb.state == TCP_ST_TIME_WAIT || + s->tcb.state == TCP_ST_FIN_WAIT_2) { + tcp_stream_down(s); + stream_term(s); + tcp_stream_up(s); + } else if (tcp_stream_acquire(s) > 0) { + /* + * stream may be closed in frontend concurrently. + * if stream has already been closed, it need not + * to retransmit anymore. + */ + if (s->tcb.state != TCP_ST_CLOSED) + rto_stream(s, tms); + tcp_stream_release(s); + } + /* Fail to aquire lock? FE is shutdown or close this + * stream, either FIN or RST needs to be sent, which + * means it's in tsq, will be processed later. + */ + break; + case TIMER_DACK: + if (rte_atomic32_read(&s->tx.arm) == 0 && + s->tcb.rcv.nxt != s->tcb.snd.ack && + tcp_stream_acquire(s) > 0) { + s->s.option.tcpquickack = 8; + send_ack(s, tms, TCP_FLAG_ACK); + tcp_stream_release(s); + } + break; + case TIMER_KEEPALIVE: + if (s->tcb.snd.nb_keepalive < s->s.option.keepcnt) { + if (tcp_stream_try_acquire(s) > 0 && + s->tcb.state == TCP_ST_ESTABLISHED) { + send_keepalive(s); + s->tcb.snd.nb_keepalive++; + timer_start(s, TIMER_KEEPALIVE, + s->s.option.keepintvl * MS_PER_S); + } + tcp_stream_release(s); + } else { + tcp_stream_down(s); + send_rst(s, s->tcb.snd.nxt); + s->tcb.err = ETIMEDOUT; + stream_term(s); + tcp_stream_up(s); + } + break; + default: + rte_panic("Invalid timer type: %d\n", type); + } } /* process streams from to-send queue */ @@ -2621,20 +3113,63 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num) k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs)); for (i = 0; i != k; i++) { - s = rs[i]; - rte_atomic32_set(&s->tx.arm, 0); - if (tcp_stream_try_acquire(s) > 0) + if (s->tcb.uop & TCP_OP_RESET) { + /* already put into death row in close() */ + send_rst(s, s->tcb.snd.nxt); + continue; + } + + if (tcp_stream_acquire(s) > 0) { + if (s->tcb.uop & TCP_OP_KEEPALIVE) { + s->tcb.uop &= ~TCP_OP_KEEPALIVE; + set_keepalive_timer(s); + } + + if (s->tcb.state == TCP_ST_FIN_WAIT_2 && + s->tcb.uop & TCP_OP_CLOSE) { + /* This could happen after: + * 1) shutdown; + * 2) FIN sent; + * 3) ack received; + * 4) close; + */ + timer_start(s, TIMER_RTO, s->tcb.snd.rto_fw); + tcp_stream_release(s); + continue; + } + + if (s->tcb.state == TCP_ST_ESTABLISHED && + s->s.option.tcpcork) { + if (s->tcb.snd.cork_ts == 0) + s->tcb.snd.cork_ts = (uint32_t)tms; + + if (s->tcb.snd.waitlen < s->tcb.snd.mss && + (uint32_t)tms - s->tcb.snd.cork_ts < 200) { + txs_enqueue(s->s.ctx, s); + tcp_stream_release(s); + continue; + } + + s->tcb.snd.cork_ts = 0; + } + tx_stream(s, tms); - else + tcp_stream_release(s); + continue; + } + + if (s->tcb.state != TCP_ST_CLOSED) txs_enqueue(s->s.ctx, s); - tcp_stream_release(s); + + /* TCP_ST_CLOSED? See close with TCP_ST_CLOSED state */ } /* collect streams to close from the death row */ dr = CTX_TCP_SDR(ctx); + rte_spinlock_lock(&dr->lock); for (k = 0, p = STAILQ_FIRST(&dr->be); k != num && p != NULL; k++, p = STAILQ_NEXT(p, link)) @@ -2645,9 +3180,21 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num) else STAILQ_FIRST(&dr->be) = p; + /* if stream still in tsq, wait one more round */ + for (i = 0; i != k; i++) { + if (rte_atomic32_read(&rs[i]->tx.arm) > 0) { + STAILQ_INSERT_TAIL(&dr->be, &rs[i]->s, link); + rs[i] = NULL; + } + } + + rte_spinlock_unlock(&dr->lock); + /* cleanup closed streams */ for (i = 0; i != k; i++) { s = rs[i]; + if (s == NULL) + continue; tcp_stream_down(s); tcp_stream_reset(ctx, s); } diff --git a/lib/libtle_l4p/tcp_rxtx.h b/lib/libtle_l4p/tcp_rxtx.h new file mode 100644 index 0000000..e7f8e3e --- /dev/null +++ b/lib/libtle_l4p/tcp_rxtx.h @@ -0,0 +1,252 @@ +/* + * Copyright (c) 2016-2017 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TCP_RXTX_H_ +#define _TCP_RXTX_H_ + +#include "tcp_stream.h" + +#ifdef __cplusplus +extern "C" { +#endif + +static inline uint32_t +calc_seg_cnt(uint32_t plen, uint32_t mss) +{ + if (plen > mss) + return (plen + mss - 1) / mss; + else + return 1; +} + +static inline uint32_t +get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st) +{ + uint32_t pid; + rte_atomic32_t *pa; + + pa = &dev->tx.packet_id[type]; + + if (st == 0) { + pid = rte_atomic32_add_return(pa, num); + return pid - num; + } else { + pid = rte_atomic32_read(pa); + rte_atomic32_set(pa, pid + num); + return pid; + } +} + +static inline void +fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port, + uint32_t seq, uint8_t hlen, uint8_t flags) +{ + uint16_t wnd; + + l4h->src_port = port.dst; + l4h->dst_port = port.src; + + wnd = (flags & TCP_FLAG_SYN) ? + RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) : + tcb->rcv.wnd >> tcb->rcv.wscale; + + /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */ + l4h->sent_seq = rte_cpu_to_be_32(seq); + l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt); + l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET; + l4h->tcp_flags = flags; + l4h->rx_win = rte_cpu_to_be_16(wnd); + l4h->cksum = 0; + l4h->tcp_urp = 0; + + if (flags & TCP_FLAG_SYN) + fill_syn_opts(l4h + 1, &tcb->so); + else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0) + fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts); +} + +static inline int +tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s, + const struct tle_dest *dst, uint64_t ol_flags, + union l4_ports port, uint32_t seq, uint32_t flags, + uint32_t pid, uint32_t swcsm) +{ + uint32_t l4, len, plen; + struct tcp_hdr *l4h; + char *l2h, *l3; + + len = dst->l2_len + dst->l3_len; + plen = m->pkt_len; + + if (flags & TCP_FLAG_SYN) { + /* basic length */ + l4 = sizeof(*l4h) + TCP_OPT_LEN_MSS; + + /* add wscale space and nop */ + if (s->tcb.so.wscale) { + l4 += TCP_OPT_LEN_WSC + TCP_OPT_LEN_NOP; + } + + /* add timestamp space and nop */ + if (s->tcb.so.ts.raw) { + l4 += TCP_TX_OPT_LEN_TMS; + } + } else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0) { + l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS; + } else { + l4 = sizeof(*l4h); + } + + /* adjust mbuf to put L2/L3/L4 headers into it. */ + l2h = rte_pktmbuf_prepend(m, len + l4); + if (l2h == NULL) + return -EINVAL; + + /* copy L2/L3 header */ + rte_memcpy(l2h, dst->hdr, len); + + /* setup TCP header & options */ + l4h = (struct tcp_hdr *)(l2h + len); + fill_tcph(l4h, &s->tcb, port, seq, l4, flags); + + /* setup mbuf TX offload related fields. */ + m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0); + m->ol_flags |= ol_flags; + + /* update proto specific fields. */ + + l3 = l2h + dst->l2_len; + if (((struct ipv4_hdr*)l3)->version_ihl>>4 == 4) { + struct ipv4_hdr *l3h; + l3h = (struct ipv4_hdr *)l3; + l3h->packet_id = rte_cpu_to_be_16(pid); + l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4); + + if ((ol_flags & PKT_TX_TCP_CKSUM) != 0) + l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len, + ol_flags); + else if (swcsm != 0) + l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h); + + if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0) + l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len); + } else { + struct ipv6_hdr *l3h; + l3h = (struct ipv6_hdr *)l3; + l3h->payload_len = rte_cpu_to_be_16(plen + l4); + if ((ol_flags & PKT_TX_TCP_CKSUM) != 0) + l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags); + else if (swcsm != 0) + l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h); + } + + return 0; +} + +static inline int +stream_drb_empty(struct tle_tcp_stream *s) +{ + return rte_ring_empty(s->tx.drb.r); +} + +static inline void +stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[], + uint32_t nb_drb) +{ + _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb); +} + +static inline uint32_t +stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[], + uint32_t nb_drb) +{ + return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb); +} + +/* + * queue standalone packet to he particular output device + * It assumes that: + * - L2/L3/L4 headers should be already set. + * - packet fits into one segment. + */ +static inline int +send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m) +{ + uint32_t n, nb; + struct tle_drb *drb; + + if (stream_drb_alloc(s, &drb, 1) == 0) + return -ENOBUFS; + + /* enqueue pkt for TX. */ + nb = 1; + n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1, + &drb, &nb); + + /* free unused drbs. */ + if (nb != 0) + stream_drb_free(s, &drb, 1); + + return (n == 1) ? 0 : -ENOBUFS; +} + +#define TCP_OLFLAGS_CKSUM(flags) (flags & (PKT_TX_IP_CKSUM | PKT_TX_TCP_CKSUM)) + +static inline int +send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq, + uint32_t flags) +{ + const struct tle_dest *dst; + uint32_t pid, type; + int32_t rc; + + dst = &s->tx.dst; + type = s->s.type; + pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0); + + rc = tcp_fill_mbuf(m, s, dst, TCP_OLFLAGS_CKSUM(dst->ol_flags), + s->s.port, seq, flags, pid, 1); + if (rc == 0) + rc = send_pkt(s, dst->dev, m); + + return rc; +} + +static inline int +send_rst(struct tle_tcp_stream *s, uint32_t seq) +{ + struct rte_mbuf *m; + int32_t rc; + + m = rte_pktmbuf_alloc(s->tx.dst.head_mp); + if (m == NULL) + return -ENOMEM; + + rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST | TCP_FLAG_ACK); + if (rc != 0) + rte_pktmbuf_free(m); + else + TCP_INC_STATS(TCP_MIB_OUTRSTS); + + return rc; +} + + + +#ifdef __cplusplus +} +#endif + +#endif /* _TCP_RXTX_H_ */ diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c index 676521b..4a65053 100644 --- a/lib/libtle_l4p/tcp_stream.c +++ b/lib/libtle_l4p/tcp_stream.c @@ -20,6 +20,8 @@ #include <rte_ip.h> #include <rte_tcp.h> +#include <netinet/tcp.h> + #include "tcp_stream.h" #include "tcp_timer.h" #include "stream_table.h" @@ -27,6 +29,7 @@ #include "tcp_ctl.h" #include "tcp_ofo.h" #include "tcp_txq.h" +#include "tcp_rxtx.h" static void unuse_stream(struct tle_tcp_stream *s) @@ -38,25 +41,27 @@ unuse_stream(struct tle_tcp_stream *s) static void fini_stream(struct tle_tcp_stream *s) { - if (s != NULL) { - rte_free(s->rx.q); - tcp_ofo_free(s->rx.ofo); - rte_free(s->tx.q); - rte_free(s->tx.drb.r); - } + rte_free(s); } static void tcp_fini_streams(struct tle_ctx *ctx) { - uint32_t i; struct tcp_streams *ts; + struct tle_stream *s; ts = CTX_TCP_STREAMS(ctx); if (ts != NULL) { stbl_fini(&ts->st); - for (i = 0; i != ctx->prm.max_streams; i++) - fini_stream(&ts->s[i]); + + /* TODO: free those in use? may be not necessary, as we assume + * all streams have been closed and are free. + */ + while (ctx->streams.nb_free--) { + s = STAILQ_FIRST(&ctx->streams.free); + STAILQ_FIRST(&ctx->streams.free) = STAILQ_NEXT(s, link); + fini_stream(TCP_STREAM(s)); + } /* free the timer wheel */ tle_timer_free(ts->tmr); @@ -94,61 +99,100 @@ alloc_ring(uint32_t n, uint32_t flags, int32_t socket) return r; } +/* stream memory layout: + * [tle_tcp_stream] [rx.q] [rx.ofo] [tx.q] [tx.drb.r] + */ static int -init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s) +add_stream(struct tle_ctx *ctx) { - size_t bsz, rsz, sz; - uint32_t f, i, k, n, nb; + size_t sz_s, sz_rxq, sz_ofo, sz_txq, sz_drb_r, sz; + /* for rx.q */ + uint32_t n_rxq; + /* for rx.ofo */ + struct ofo *ofo; + struct rte_mbuf **obj; + uint32_t ndb, nobj; + size_t dsz, osz; + /* for tx.q */ + uint32_t n_txq; + /* for tx.drb.r */ + size_t bsz, rsz; struct tle_drb *drb; - char name[RTE_RING_NAMESIZE]; - - f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 : - (RING_F_SP_ENQ | RING_F_SC_DEQ); - - /* init RX part. */ - - n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); - s->rx.q = alloc_ring(n, f | RING_F_SP_ENQ, ctx->prm.socket_id); - if (s->rx.q == NULL) - return -ENOMEM; - - s->rx.ofo = tcp_ofo_alloc(n, ctx->prm.socket_id); - if (s->rx.ofo == NULL) - return -ENOMEM; - - /* init TX part. */ + uint32_t k, nb, n_drb; - n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U); - s->tx.q = alloc_ring(n, f | RING_F_SC_DEQ, ctx->prm.socket_id); - if (s->tx.q == NULL) - return -ENOMEM; + uint32_t f, i; + char name[RTE_RING_NAMESIZE]; + struct tle_tcp_stream *s; + // stream + sz_s = RTE_ALIGN_CEIL(sizeof(*s), RTE_CACHE_LINE_SIZE); + + // rx.q + n_rxq = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); + n_rxq = rte_align32pow2(n_rxq); + sz_rxq = rte_ring_get_memsize(n_rxq); + sz_rxq = RTE_ALIGN_CEIL(sz_rxq, RTE_CACHE_LINE_SIZE); + + // rx.ofo + calc_ofo_elems(n_rxq, &nobj, &ndb); + osz = sizeof(*ofo) + sizeof(ofo->db[0]) * ndb; + dsz = sizeof(ofo->db[0].obj[0]) * nobj * ndb; + sz_ofo = osz + dsz; + sz_ofo = RTE_ALIGN_CEIL(sz_ofo, RTE_CACHE_LINE_SIZE); + + // tx.q + n_txq = RTE_MAX(ctx->prm.max_stream_sbufs, 1U); + n_txq = rte_align32pow2(n_txq); + sz_txq = rte_ring_get_memsize(n_txq); + sz_txq = RTE_ALIGN_CEIL(sz_txq, RTE_CACHE_LINE_SIZE); + + // tx.drb.r nb = drb_nb_elem(ctx); k = calc_stream_drb_num(ctx, nb); - n = rte_align32pow2(k); - - /* size of the drbs ring */ - rsz = rte_ring_get_memsize(n); + n_drb = rte_align32pow2(k); + rsz = rte_ring_get_memsize(n_drb); /* size of the drbs ring */ rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE); + bsz = tle_drb_calc_size(nb); /* size of the drb. */ + sz_drb_r = rsz + bsz * k; /* total stream drbs size. */ + sz_drb_r = RTE_ALIGN_CEIL(sz_drb_r, RTE_CACHE_LINE_SIZE); - /* size of the drb. */ - bsz = tle_drb_calc_size(nb); - - /* total stream drbs size. */ - sz = rsz + bsz * k; - - s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, - ctx->prm.socket_id); - if (s->tx.drb.r == NULL) { - TCP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + sz = sz_s + sz_rxq + sz_ofo + sz_txq + sz_drb_r; + s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s == NULL) { + TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d " "failed with error code: %d\n", - __func__, s, sz, ctx->prm.socket_id, rte_errno); + __func__, sz, ctx->prm.socket_id, rte_errno); return -ENOMEM; } - snprintf(name, sizeof(name), "%p@%zu", s, sz); - rte_ring_init(s->tx.drb.r, name, n, f); + s->rx.q = (struct rte_ring *)((uintptr_t)s + sz_s); + s->rx.ofo = (struct ofo *)((uintptr_t)s->rx.q + sz_rxq); + ofo = s->rx.ofo; + s->tx.q = (struct rte_ring *)((uintptr_t)s->rx.ofo + sz_ofo); + s->tx.drb.r = (struct rte_ring *)((uintptr_t)s->tx.q + sz_txq); + // ring flags + f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 : + (RING_F_SP_ENQ | RING_F_SC_DEQ); + + /* init RX part. */ + snprintf(name, sizeof(name), "%p@%zu", s->rx.q, sz_rxq); + rte_ring_init(s->rx.q, name, n_rxq, f); + + obj = (struct rte_mbuf **)&ofo->db[ndb]; + for (i = 0; i != ndb; i++) { + ofo->db[i].nb_max = nobj; + ofo->db[i].obj = obj + i * nobj; + } + ofo->nb_max = ndb; + + /* init TX part. */ + snprintf(name, sizeof(name), "%p@%zu", s->tx.q, sz_txq); + rte_ring_init(s->tx.q, name, n_txq, f); + + snprintf(name, sizeof(name), "%p@%zu", s->tx.drb.r, sz_drb_r); + rte_ring_init(s->tx.drb.r, name, n_drb, f); for (i = 0; i != k; i++) { drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r + rsz + bsz * i); @@ -200,7 +244,7 @@ tcp_init_streams(struct tle_ctx *ctx) f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 : (RING_F_SP_ENQ | RING_F_SC_DEQ); - sz = sizeof(*ts) + sizeof(ts->s[0]) * ctx->prm.max_streams; + sz = sizeof(*ts); ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, ctx->prm.socket_id); if (ts == NULL) { @@ -210,6 +254,7 @@ tcp_init_streams(struct tle_ctx *ctx) return -ENOMEM; } + rte_spinlock_init(&ts->dr.lock); STAILQ_INIT(&ts->dr.fe); STAILQ_INIT(&ts->dr.be); @@ -228,12 +273,11 @@ tcp_init_streams(struct tle_ctx *ctx) if (ts->tsq == NULL) rc = -ENOMEM; else - rc = stbl_init(&ts->st, ctx->prm.max_streams, - ctx->prm.socket_id); + rc = stbl_init(&ts->st, (ctx->prm.flags & TLE_CTX_FLAG_ST) == 0); } - for (i = 0; rc == 0 && i != ctx->prm.max_streams; i++) - rc = init_stream(ctx, &ts->s[i]); + for (i = 0; rc == 0 && i != ctx->prm.min_streams; i++) + rc = add_stream(ctx); if (rc != 0) { TCP_LOG(ERR, "initalisation of %u-th stream failed", i); @@ -243,11 +287,30 @@ tcp_init_streams(struct tle_ctx *ctx) return rc; } -static void __attribute__((constructor)) +/* + * Note this function is not thread-safe, and we did not lock here as we + * have the assumption that this ctx is dedicated to one thread. + */ +static uint32_t +tcp_more_streams(struct tle_ctx *ctx) +{ + uint32_t i, nb; + uint32_t nb_max = ctx->prm.max_streams - 1; + uint32_t nb_cur = ctx->streams.nb_cur; + + nb = RTE_MIN(ctx->prm.delta_streams, nb_max - nb_cur); + for (i = 0; i < nb; i++) + if (add_stream(ctx) != 0) + break; + return i; +} + +static void __attribute__((constructor(101))) tcp_stream_setup(void) { static const struct stream_ops tcp_ops = { .init_streams = tcp_init_streams, + .more_streams = tcp_more_streams, .fini_streams = tcp_fini_streams, .free_drbs = tcp_free_drbs, }; @@ -305,16 +368,12 @@ tle_tcp_stream_open(struct tle_ctx *ctx, s = (struct tle_tcp_stream *)get_stream(ctx); if (s == NULL) { - rte_errno = ENFILE; - return NULL; - - /* some TX still pending for that stream. */ - } else if (TCP_STREAM_TX_PENDING(s)) { - put_stream(ctx, &s->s, 0); rte_errno = EAGAIN; return NULL; } + s->s.option.raw = prm->option; + /* setup L4 ports and L3 addresses fields. */ rc = stream_fill_ctx(ctx, &s->s, (const struct sockaddr *)&prm->addr.local, @@ -336,12 +395,14 @@ tle_tcp_stream_open(struct tle_ctx *ctx, /* store other params */ s->flags = ctx->prm.flags; + s->tcb.err = 0; s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries : TLE_TCP_DEFAULT_RETRIES; s->tcb.snd.cwnd = (ctx->prm.icw == 0) ? TCP_INITIAL_CWND_MAX : ctx->prm.icw; s->tcb.snd.rto_tw = (ctx->prm.timewait == TLE_TCP_TIMEWAIT_DEFAULT) ? TCP_RTO_2MSL : ctx->prm.timewait; + s->tcb.snd.rto_fw = TLE_TCP_FINWAIT_TIMEOUT; tcp_stream_up(s); return &s->s; @@ -354,9 +415,16 @@ static inline int stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s) { uint16_t uop; - uint32_t state; static const struct tle_stream_cb zcb; + /* Put uop operation into this wlock; or it may cause this stream + * to be put into death ring twice, for example: + * 1) FE sets OP_CLOSE; + * 2) BE stream_term sets state as TCP_ST_CLOSED, and put in queue; + * 3) FE down the stream, and calls stream_term again. + */ + tcp_stream_down(s); + /* check was close() already invoked */ uop = s->tcb.uop; if ((uop & TCP_OP_CLOSE) != 0) @@ -366,47 +434,66 @@ stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s) if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0) return -EDEADLK; - /* mark stream as unavaialbe for RX/TX. */ - tcp_stream_down(s); - /* reset events/callbacks */ - s->rx.ev = NULL; s->tx.ev = NULL; + s->rx.ev = NULL; s->err.ev = NULL; s->rx.cb = zcb; s->tx.cb = zcb; s->err.cb = zcb; - state = s->tcb.state; - - /* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */ - if (state <= TCP_ST_SYN_SENT) { + switch (s->tcb.state) { + case TCP_ST_LISTEN: + /* close the stream straightway */ tcp_stream_reset(ctx, s); return 0; - } - - /* generate FIN and proceed with normal connection termination */ - if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) { - - /* change state */ - s->tcb.state = (state == TCP_ST_ESTABLISHED) ? - TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK; - - /* mark stream as writable/readable again */ + case TCP_ST_CLOSED: + /* it could be put into this state if a RST packet is + * received, but this stream could be still in tsq trying + * to send something. + */ + /* fallthrough */ + case TCP_ST_SYN_SENT: + /* timer on and could be in tsq (SYN retrans) */ + stream_term(s); + /* fallthrough */ + case TCP_ST_FIN_WAIT_1: + /* fallthrough */ + case TCP_ST_CLOSING: + /* fallthrough */ + case TCP_ST_TIME_WAIT: + /* fallthrough */ + case TCP_ST_LAST_ACK: tcp_stream_up(s); - - /* queue stream into to-send queue */ - txs_enqueue(ctx, s); return 0; + case TCP_ST_ESTABLISHED: + /* fallthrough */ + case TCP_ST_CLOSE_WAIT: + if (s->tcb.state == TCP_ST_ESTABLISHED) { + s->tcb.state = TCP_ST_FIN_WAIT_1; + TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB); + } else + s->tcb.state = TCP_ST_LAST_ACK; + + if (!rte_ring_empty(s->rx.q)) { + TCP_INC_STATS(TCP_MIB_ESTABRESETS); + s->tcb.uop |= TCP_OP_RESET; + stream_term(s); + } + break; + case TCP_ST_FIN_WAIT_2: + /* Can reach this state if shutdown was called, but the timer + * shall be set after this close. + */ + break; + default: + rte_panic("Invalid state when close: %d\n", s->tcb.state); } - /* - * accroding to the state, close() was already invoked, - * should never that point. - */ - RTE_ASSERT(0); - return -EINVAL; + tcp_stream_up(s); + txs_enqueue(ctx, s); + return 0; } uint32_t @@ -453,6 +540,64 @@ tle_tcp_stream_close(struct tle_stream *ts) } int +tle_tcp_stream_shutdown(struct tle_stream *ts, int how) +{ + int ret; + bool wakeup; + uint32_t state; + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + if (ts == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + /* Refer to linux/net/ipv4/tcp.c:tcp_shutdown() */ + if (how == SHUT_RD) + return 0; + + tcp_stream_down(s); + + state = s->tcb.state; + + switch (state) { + case TCP_ST_LISTEN: + /* fallthrough */ + case TCP_ST_SYN_SENT: + s->tcb.state = TCP_ST_CLOSED; + wakeup = true; + ret = 0; + break; + case TCP_ST_ESTABLISHED: + /* fallthrough */ + case TCP_ST_CLOSE_WAIT: + if (state == TCP_ST_ESTABLISHED) { + TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB); + s->tcb.state = TCP_ST_FIN_WAIT_1; + } else + s->tcb.state = TCP_ST_LAST_ACK; + txs_enqueue(ts->ctx, s); + wakeup = true; + ret = 0; + break; + default: + wakeup = false; + rte_errno = ENOTCONN; + ret = -1; + } + + if (wakeup) { + /* Notify other threads which may wait on the event */ + if (s->tx.ev) + tle_event_raise(s->tx.ev); + if (how == SHUT_RDWR && s->err.ev) + tle_event_raise(s->err.ev); + } + + tcp_stream_up(s); + return ret; +} + +int tle_tcp_stream_get_addr(const struct tle_stream *ts, struct tle_tcp_stream_addr *addr) { @@ -617,3 +762,73 @@ tle_tcp_stream_get_mss(const struct tle_stream * ts) s = TCP_STREAM(ts); return s->tcb.snd.mss; } + +int +tle_tcp_stream_get_info(const struct tle_stream * ts, void *info, socklen_t *optlen) +{ + struct tle_tcp_stream *s; + struct tcp_info i; + + if (ts == NULL) + return -EINVAL; + + s = TCP_STREAM(ts); + + memset(&i, 0, sizeof(struct tcp_info)); + + /* transform from tldk state into linux kernel state */ + switch (s->tcb.state) { + case TCP_ST_CLOSED: + i.tcpi_state = TCP_CLOSE; + break; + case TCP_ST_LISTEN: + i.tcpi_state = TCP_LISTEN; + break; + case TCP_ST_SYN_SENT: + i.tcpi_state = TCP_SYN_SENT; + break; + case TCP_ST_SYN_RCVD: + i.tcpi_state = TCP_SYN_RECV; + break; + case TCP_ST_ESTABLISHED: + i.tcpi_state = TCP_ESTABLISHED; + break; + case TCP_ST_FIN_WAIT_1: + i.tcpi_state = TCP_FIN_WAIT1; + break; + case TCP_ST_FIN_WAIT_2: + i.tcpi_state = TCP_FIN_WAIT2; + break; + case TCP_ST_CLOSE_WAIT: + i.tcpi_state = TCP_CLOSE_WAIT; + break; + case TCP_ST_CLOSING: + i.tcpi_state = TCP_CLOSING; + break; + case TCP_ST_LAST_ACK: + i.tcpi_state = TCP_LAST_ACK; + break; + case TCP_ST_TIME_WAIT: + i.tcpi_state = TCP_TIME_WAIT; + break; + } + + /* fix me, total retrans? */ + i.tcpi_total_retrans = s->tcb.snd.nb_retx; + + if (*optlen > sizeof(struct tcp_info)) + *optlen = sizeof(struct tcp_info); + rte_memcpy(info, &i, *optlen); + return 0; +} + +void +tle_tcp_stream_set_keepalive(struct tle_stream *ts) +{ + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + + s->tcb.uop |= TCP_OP_KEEPALIVE; + txs_enqueue(ts->ctx, s); +} diff --git a/lib/libtle_l4p/tcp_stream.h b/lib/libtle_l4p/tcp_stream.h index 4629fe6..1202574 100644 --- a/lib/libtle_l4p/tcp_stream.h +++ b/lib/libtle_l4p/tcp_stream.h @@ -17,6 +17,8 @@ #define _TCP_STREAM_H_ #include <rte_vect.h> +#include <rte_mbuf.h> + #include <tle_dring.h> #include <tle_tcp.h> #include <tle_event.h> @@ -45,23 +47,28 @@ enum { }; enum { - TCP_OP_LISTEN = 0x1, - TCP_OP_ACCEPT = 0x2, - TCP_OP_CONNECT = 0x4, - TCP_OP_CLOSE = 0x8, + TCP_OP_LISTEN = 0x1, + TCP_OP_ACCEPT = 0x2, + TCP_OP_CONNECT = 0x4, + TCP_OP_CLOSE = 0x8, + TCP_OP_RESET = 0x10, + TCP_OP_KEEPALIVE = 0x20 }; struct tcb { + int err; volatile uint16_t state; volatile uint16_t uop; /* operations by user performed */ struct { uint32_t nxt; + uint32_t cpy; /* head of yet unread data */ uint32_t irs; /* initial received sequence */ uint32_t wnd; uint32_t ts; struct { uint32_t seq; - uint32_t on; + uint32_t on; /* on == 1: received an out-of-order fin + * on == 2: received an in order fin */ } frs; uint32_t srtt; /* smoothed round trip time (scaled by >> 3) */ uint32_t rttvar; /* rtt variance */ @@ -83,15 +90,32 @@ struct tcb { uint32_t ssthresh; /* slow start threshold */ uint32_t rto; /* retransmission timeout */ uint32_t rto_tw; /* TIME_WAIT retransmission timeout */ + uint32_t rto_fw; /* FIN_WAIT_2 waiting timeout */ uint32_t iss; /* initial send sequence */ + uint32_t waitlen; /* total length of unacknowledged pkt */ + uint32_t cork_ts; uint16_t mss; uint8_t wscale; uint8_t nb_retx; /* number of retransmission */ uint8_t nb_retm; /**< max number of retx attempts. */ + uint8_t nb_keepalive;/* number of sended keepalive */ + bool update_rcv; /* Flag for updating recv window */ + uint16_t nxt_offset; /* Partial tx, next data of a segment to tx */ + uint32_t una_offset; /* Partial ack, next data of a mbuf to ack */ + struct rte_mbuf *nxt_pkt; /* Partial tx, next segment to send */ } snd; struct syn_opts so; /* initial syn options. */ }; +enum { + TIMER_RTO, + TIMER_DACK, + TIMER_KEEPALIVE, + TIMER_NUM, + TIMER_MAX_NUM = 8, + TIMER_MASK = TIMER_MAX_NUM - 1 +}; + struct tle_tcp_stream { struct tle_stream s; @@ -103,7 +127,7 @@ struct tle_tcp_stream { struct tcb tcb; struct { - void *handle; + void *handle[TIMER_NUM]; } timer; struct { @@ -155,7 +179,6 @@ struct tcp_streams { struct tle_timer_wheel *tmr; /* timer wheel */ struct rte_ring *tsq; /* to-send streams queue */ struct sdr dr; /* death row for zombie streams */ - struct tle_tcp_stream s[]; /* array of allocated streams. */ }; #define CTX_TCP_STREAMS(ctx) ((struct tcp_streams *)(ctx)->streams.buf) diff --git a/lib/libtle_l4p/tcp_timer.h b/lib/libtle_l4p/tcp_timer.h index 8faefb3..d242556 100644 --- a/lib/libtle_l4p/tcp_timer.h +++ b/lib/libtle_l4p/tcp_timer.h @@ -27,43 +27,53 @@ extern "C" { * all RTO values are in ms. */ #define TCP_RTO_MAX 60000U /* RFC 6298 (2.5) */ -#define TCP_RTO_MIN 1000U /* RFC 6298 (2.4) */ +#define TCP_RTO_MIN 200U /* Linux/include/net/tcp.h: TCP_RTO_MIN */ #define TCP_RTO_2MSL (2 * TCP_RTO_MAX) -#define TCP_RTO_DEFAULT TCP_RTO_MIN /* RFC 6298 (2.1)*/ +#define TCP_RTO_DEFAULT 1000U /* RFC 6298 (2.1)*/ #define TCP_RTO_GRANULARITY 100U +static inline struct tle_tcp_stream * +timer_stream(struct tle_tcp_stream *s) +{ + return (struct tle_tcp_stream *)((unsigned long)s & (~(unsigned long)TIMER_MASK)); +} + +static inline uint8_t +timer_type(struct tle_tcp_stream *s) +{ + return (uint8_t)((unsigned long)s & (unsigned long)TIMER_MASK); +} static inline void -timer_stop(struct tle_tcp_stream *s) +timer_stop(struct tle_tcp_stream *s, uint8_t type) { struct tle_timer_wheel *tw; - if (s->timer.handle != NULL) { + if (s->timer.handle[type] != NULL) { tw = CTX_TCP_TMWHL(s->s.ctx); - tle_timer_stop(tw, s->timer.handle); - s->timer.handle = NULL; + tle_timer_stop(tw, s->timer.handle[type]); + s->timer.handle[type] = NULL; } } static inline void -timer_start(struct tle_tcp_stream *s) +timer_start(struct tle_tcp_stream *s, uint8_t type, uint32_t timeout) { struct tle_timer_wheel *tw; - if (s->timer.handle == NULL) { + if (s->timer.handle[type] == NULL) { tw = CTX_TCP_TMWHL(s->s.ctx); - s->timer.handle = tle_timer_start(tw, s, s->tcb.snd.rto); - s->tcb.snd.nb_retx = 0; + s->timer.handle[type] = tle_timer_start(tw, (void*)((unsigned long)s | type), timeout); } } static inline void -timer_restart(struct tle_tcp_stream *s) +timer_restart(struct tle_tcp_stream *s, uint8_t type, uint32_t timeout) { struct tle_timer_wheel *tw; tw = CTX_TCP_TMWHL(s->s.ctx); - s->timer.handle = tle_timer_start(tw, s, s->tcb.snd.rto); + s->timer.handle[type] = tle_timer_start(tw, (void*)((unsigned long)s | type), timeout); } @@ -71,10 +81,10 @@ timer_restart(struct tle_tcp_stream *s) * reset number of retransmissions and restart RTO timer. */ static inline void -timer_reset(struct tle_tcp_stream *s) +timer_reset(struct tle_tcp_stream *s, uint8_t type, uint32_t timeout) { - timer_stop(s); - timer_start(s); + timer_stop(s, type); + timer_start(s, type, timeout); } static inline uint32_t diff --git a/lib/libtle_l4p/tcp_tx_seg.h b/lib/libtle_l4p/tcp_tx_seg.h index ac2b13b..b64aa77 100644 --- a/lib/libtle_l4p/tcp_tx_seg.h +++ b/lib/libtle_l4p/tcp_tx_seg.h @@ -27,7 +27,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num, struct rte_mbuf *in_seg = NULL; uint32_t nbseg, in_seg_data_pos; uint32_t more_in_segs; - uint16_t bytes_left; + uint16_t out_bytes_remain; in_seg = mbin; in_seg_data_pos = 0; @@ -35,7 +35,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num, /* Check that pkts_out is big enough to hold all fragments */ if (mss * num < (uint16_t)mbin->pkt_len) - return -ENOSPC; + return -EAGAIN; more_in_segs = 1; while (more_in_segs) { @@ -49,7 +49,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num, return -ENOMEM; } - bytes_left = mss; + out_bytes_remain = mss; out_seg_prev = out_pkt; more_out_segs = 1; while (more_out_segs && more_in_segs) { @@ -68,7 +68,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num, /* Prepare indirect buffer */ rte_pktmbuf_attach(out_seg, in_seg); - len = bytes_left; + len = out_bytes_remain; if (len > (in_seg->data_len - in_seg_data_pos)) len = in_seg->data_len - in_seg_data_pos; @@ -77,10 +77,10 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num, out_pkt->pkt_len = (uint16_t)(len + out_pkt->pkt_len); out_pkt->nb_segs += 1; in_seg_data_pos += len; - bytes_left -= len; + out_bytes_remain -= len; /* Current output packet (i.e. fragment) done ? */ - if (bytes_left == 0) + if (out_bytes_remain == 0) more_out_segs = 0; /* Current input segment done ? */ diff --git a/lib/libtle_l4p/tcp_txq.h b/lib/libtle_l4p/tcp_txq.h index 78f1d56..303b8fd 100644 --- a/lib/libtle_l4p/tcp_txq.h +++ b/lib/libtle_l4p/tcp_txq.h @@ -68,9 +68,28 @@ tcp_txq_set_nxt_head(struct tle_tcp_stream *s, uint32_t num) static inline void tcp_txq_rst_nxt_head(struct tle_tcp_stream *s) { - struct rte_ring *r; + struct rte_ring *r = s->tx.q; + struct rte_mbuf *m; + uint32_t offset, data_len; + + if (s->tcb.snd.nxt_pkt != NULL) { + s->tcb.snd.nxt_offset = 0; + s->tcb.snd.nxt_pkt = NULL; + } + + offset = s->tcb.snd.una_offset; + if (offset) { + m = (struct rte_mbuf *)(_rte_ring_get_data(r)[r->cons.tail & r->mask]); + data_len = m->data_len - PKT_L234_HLEN(m); + while (offset >= data_len) { + offset -= data_len; + m = m->next; + data_len = m->data_len; + } + s->tcb.snd.nxt_pkt = m; + s->tcb.snd.nxt_offset = offset; + } - r = s->tx.q; r->cons.head = r->cons.tail; } @@ -110,9 +129,13 @@ static inline uint32_t txs_dequeue_bulk(struct tle_ctx *ctx, struct tle_tcp_stream *s[], uint32_t num) { struct rte_ring *r; + uint32_t n, i; r = CTX_TCP_TSQ(ctx); - return _rte_ring_dequeue_burst(r, (void **)s, num); + n = _rte_ring_dequeue_burst(r, (void **)s, num); + for (i = 0; i < n; i++) + rte_atomic32_clear(&s[i]->tx.arm); + return n; } #ifdef __cplusplus diff --git a/lib/libtle_l4p/tle_ctx.h b/lib/libtle_l4p/tle_ctx.h index de78a6b..f0efd51 100644 --- a/lib/libtle_l4p/tle_ctx.h +++ b/lib/libtle_l4p/tle_ctx.h @@ -54,6 +54,43 @@ extern "C" { struct tle_ctx; struct tle_dev; +typedef union tle_stream_options { + struct { + uint32_t reuseaddr: 1; + uint32_t reuseport: 1; + uint32_t keepalive: 1; + uint32_t ipv6only: 1; + uint32_t oobinline: 1; + uint32_t tcpcork: 1; + uint32_t tcpnodelay: 1; + uint32_t mulloop: 1; + uint32_t timestamp: 1; + uint32_t reserve: 3; + uint32_t tcpquickack: 4; + uint32_t multtl: 8; + uint32_t keepcnt: 8; + uint16_t keepidle; + uint16_t keepintvl; + }; + uint64_t raw; +} tle_stream_options_t; + +static inline void +tle_set_timestamp(struct msghdr *msg, struct rte_mbuf *m) +{ + struct timeval *tv; + struct cmsghdr *cmsg; + + cmsg = CMSG_FIRSTHDR(msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SO_TIMESTAMP; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct timeval)); + msg->msg_controllen = cmsg->cmsg_len; + tv = (struct timeval*)CMSG_DATA(cmsg); + tv->tv_sec = m->timestamp >> 20; + tv->tv_usec = m->timestamp & 0xFFFFFUL; +} + /** * Blocked L4 ports info. */ @@ -112,6 +149,8 @@ struct tle_ctx_param { int32_t socket_id; /**< socket ID to allocate memory for. */ uint32_t proto; /**< L4 proto to handle. */ uint32_t max_streams; /**< max number of streams in context. */ + uint32_t min_streams; /**< min number of streams at init. */ + uint32_t delta_streams; /**< delta of streams of each allocation. */ uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */ uint32_t max_stream_sbufs; /**< max send mbufs per stream. */ uint32_t send_bulk_size; /**< expected # of packets per send call. */ @@ -145,6 +184,8 @@ struct tle_ctx_param { */ #define TLE_TCP_TIMEWAIT_DEFAULT UINT32_MAX +#define TLE_TCP_FINWAIT_TIMEOUT 60000 + /** * create L4 processing context. * @param ctx_prm diff --git a/lib/libtle_l4p/tle_event.h b/lib/libtle_l4p/tle_event.h index d730345..dd7a997 100644 --- a/lib/libtle_l4p/tle_event.h +++ b/lib/libtle_l4p/tle_event.h @@ -43,7 +43,7 @@ struct tle_event { struct tle_evq *head; const void *data; enum tle_ev_state state; -} __rte_cache_aligned; +}; struct tle_evq { rte_spinlock_t lock; diff --git a/lib/libtle_l4p/tle_stats.h b/lib/libtle_l4p/tle_stats.h new file mode 100644 index 0000000..3588c6d --- /dev/null +++ b/lib/libtle_l4p/tle_stats.h @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TLE_STATS_H +#define TLE_STATS_H + +#include <rte_per_lcore.h> +#include <rte_memory.h> + +/* tcp mib definitions */ +/* + * RFC 1213: MIB-II TCP group + * RFC 2012 (updates 1213): SNMPv2-MIB-TCP + */ +enum +{ + TCP_MIB_RTOALGORITHM, /* RtoAlgorithm */ + TCP_MIB_RTOMIN, /* RtoMin */ + TCP_MIB_RTOMAX, /* RtoMax */ + TCP_MIB_MAXCONN, /* MaxConn */ + TCP_MIB_ACTIVEOPENS, /* ActiveOpens */ + TCP_MIB_PASSIVEOPENS, /* PassiveOpens */ + TCP_MIB_ATTEMPTFAILS, /* AttemptFails */ + TCP_MIB_ESTABRESETS, /* EstabResets */ + TCP_MIB_CURRESTAB, /* CurrEstab */ + TCP_MIB_INSEGS, /* InSegs */ + TCP_MIB_OUTSEGS, /* OutSegs */ + TCP_MIB_RETRANSSEGS, /* RetransSegs */ + TCP_MIB_INERRS, /* InErrs */ + TCP_MIB_OUTRSTS, /* OutRsts */ + TCP_MIB_CSUMERRORS, /* InCsumErrors */ + TCP_MIB_MAX +}; + +/* udp mib definitions */ +/* + * RFC 1213: MIB-II UDP group + * RFC 2013 (updates 1213): SNMPv2-MIB-UDP + */ +enum +{ + UDP_MIB_INDATAGRAMS, /* InDatagrams */ + UDP_MIB_NOPORTS, /* NoPorts */ + UDP_MIB_INERRORS, /* InErrors */ + UDP_MIB_OUTDATAGRAMS, /* OutDatagrams */ + UDP_MIB_RCVBUFERRORS, /* RcvbufErrors */ + UDP_MIB_SNDBUFERRORS, /* SndbufErrors */ + UDP_MIB_CSUMERRORS, /* InCsumErrors */ + UDP_MIB_IGNOREDMULTI, /* IgnoredMulti */ + UDP_MIB_MAX +}; + +struct tcp_mib { + unsigned long mibs[TCP_MIB_MAX]; +}; + +struct udp_mib { + unsigned long mibs[UDP_MIB_MAX]; +}; + +struct tle_mib { + struct tcp_mib tcp; + struct udp_mib udp; +} __rte_cache_aligned; + +extern struct tle_mib default_mib; + +RTE_DECLARE_PER_LCORE(struct tle_mib *, mib); + +#define PERCPU_MIB RTE_PER_LCORE(mib) + +#define SNMP_INC_STATS(mib, field) (mib).mibs[field]++ +#define SNMP_DEC_STATS(mib, field) (mib).mibs[field]-- +#define SNMP_ADD_STATS(mib, field, n) (mib).mibs[field] += n +#define SNMP_ADD_STATS_ATOMIC(mib, field, n) \ + rte_atomic64_add((rte_atomic64_t *)(&(mib).mibs[field]), n) + +#define TCP_INC_STATS(field) SNMP_INC_STATS(PERCPU_MIB->tcp, field) +#define TCP_DEC_STATS(field) SNMP_DEC_STATS(PERCPU_MIB->tcp, field) +#define TCP_ADD_STATS(field, n) SNMP_ADD_STATS(PERCPU_MIB->tcp, field, n) +#define TCP_INC_STATS_ATOMIC(field) SNMP_ADD_STATS_ATOMIC(PERCPU_MIB->tcp, field, 1) +#define TCP_DEC_STATS_ATOMIC(field) SNMP_ADD_STATS_ATOMIC(PERCPU_MIB->tcp, field, (-1)) + +#define UDP_INC_STATS(field) SNMP_INC_STATS(PERCPU_MIB->udp, field) +#define UDP_ADD_STATS(field, n) SNMP_ADD_STATS(PERCPU_MIB->udp, field, n) +#define UDP_ADD_STATS_ATOMIC(field, n) \ + SNMP_ADD_STATS_ATOMIC(PERCPU_MIB->udp, field, n) + +#endif /* TLE_STATS_H */ diff --git a/lib/libtle_l4p/tle_tcp.h b/lib/libtle_l4p/tle_tcp.h index b0cbda6..93e853e 100644 --- a/lib/libtle_l4p/tle_tcp.h +++ b/lib/libtle_l4p/tle_tcp.h @@ -49,6 +49,7 @@ struct tle_tcp_stream_cfg { struct tle_tcp_stream_param { struct tle_tcp_stream_addr addr; struct tle_tcp_stream_cfg cfg; + uint64_t option; }; /** @@ -86,6 +87,25 @@ tle_tcp_stream_open(struct tle_ctx *ctx, int tle_tcp_stream_close(struct tle_stream *s); /** + * shutdown an open stream in SHUT_WR way. + * similar to tle_tcp_stream_close(), except: + * - rx still works + * - er still works + * @param s + * Pointer to the stream to close. + * @return + * zero on successful completion. + * - -EINVAL - invalid parameter passed to function + * - -EDEADLK - close was already invoked on that stream + */ +int tle_tcp_stream_shutdown(struct tle_stream *s, int how); + +/** + * Send rst on this stream. + */ +void tle_tcp_stream_kill(struct tle_stream *s); + +/** * close a group of open streams. * if the stream is in connected state, then: * - connection termination would be performed. @@ -268,6 +288,15 @@ uint16_t tle_tcp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[], uint16_t num); /** + * Get how many bytes are received in recv window. + * @param ts + * TCP stream to receive data from. + * @return + * bytes of data inside recv window. + */ +uint16_t tle_tcp_stream_inq(struct tle_stream *s); + +/** * Reads iovcnt buffers from the for given TCP stream. * Note that the stream has to be in connected state. * Data ordering is preserved. @@ -290,6 +319,19 @@ ssize_t tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, int iovcnt); /** + * Like tle_tcp_stream_readv, but with more information returned in msghdr. + * Note that the stream has to be in connected state. + * @param ts + * TCP stream to receive data from. + * @param msg + * If not NULL, generate control message into msg_control field of msg. + * @return + * On success, number of bytes read in the stream receive buffer. + * In case of error, returns -1 and error code will be set in rte_errno. + */ +ssize_t tle_tcp_stream_recvmsg(struct tle_stream *ts, struct msghdr *msg); + +/** * Consume and queue up to *num* packets, that will be sent eventually * by tle_tcp_tx_bulk(). * Note that the stream has to be in connected state. @@ -420,6 +462,24 @@ uint16_t tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], */ int tle_tcp_process(struct tle_ctx *ctx, uint32_t num); +/** + * Get tcp info of a tcp stream. + * This function is not multi-thread safe. + * @param ts + * TCP stream to get info from. + * @param info + * Pointer to store info. + * @param optlen + * Pointer to length of info. + * @return + * zero on successful completion. + * - ENOTCONN - connection is not connected yet. + */ +int +tle_tcp_stream_get_info(const struct tle_stream * ts, void *info, socklen_t *optlen); + +void tle_tcp_stream_set_keepalive(struct tle_stream *ts); + #ifdef __cplusplus } #endif diff --git a/lib/libtle_l4p/tle_udp.h b/lib/libtle_l4p/tle_udp.h index d3a8fe9..640ed64 100644 --- a/lib/libtle_l4p/tle_udp.h +++ b/lib/libtle_l4p/tle_udp.h @@ -35,6 +35,7 @@ struct tle_udp_stream_param { struct tle_event *send_ev; /**< send event to use. */ struct tle_stream_cb send_cb; /**< send callback to use. */ + uint64_t option; }; /** @@ -55,6 +56,36 @@ tle_udp_stream_open(struct tle_ctx *ctx, const struct tle_udp_stream_param *prm); /** + * set an existed stream within given UDP context with new param. + * @param ts + * stream to set with new param + * @param ctx + * UDP context to set the stream within. + * @param prm + * Parameters used to set the stream. + * @return + * Pointer to UDP stream structure that can be used in future UDP API calls, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOFILE - max limit of open streams reached for that context + */ +struct tle_stream * +tle_udp_stream_set(struct tle_stream *ts, struct tle_ctx *ctx, + const struct tle_udp_stream_param *prm); + +/** + * shutdown an open stream. + * + * @param s + * Pointer to the stream to shutdown. + * @return + * zero on successful completion. + * - -EINVAL - invalid parameter passed to function + */ +int tle_udp_stream_shutdown(struct tle_stream *s, int how); + +/** * close an open stream. * All packets still remaining in stream receive buffer will be freed. * All packets still remaining in stream transmit buffer will be kept @@ -180,6 +211,24 @@ uint16_t tle_udp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[], uint16_t tle_udp_stream_send(struct tle_stream *s, struct rte_mbuf *pkt[], uint16_t num, const struct sockaddr *dst_addr); +/** + * updates configuration (associated events, callbacks, stream parameters) + * for the given streams. + * @param ts + * An array of pointers to the streams to update. + * @param prm + * An array of parameters to update for the given streams. + * @param num + * Number of elements in the *ts* and *prm* arrays. + * @return + * number of streams successfully updated. + * In case of error, error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + */ +uint32_t tle_udp_stream_update_cfg(struct tle_stream *ts[], + struct tle_udp_stream_param prm[], uint32_t num); + #ifdef __cplusplus } #endif diff --git a/lib/libtle_l4p/udp_rxtx.c b/lib/libtle_l4p/udp_rxtx.c index 84a13ea..e9539b9 100644 --- a/lib/libtle_l4p/udp_rxtx.c +++ b/lib/libtle_l4p/udp_rxtx.c @@ -13,7 +13,6 @@ * limitations under the License. */ -#include <rte_malloc.h> #include <rte_errno.h> #include <rte_ethdev.h> #include <rte_ip.h> @@ -24,14 +23,11 @@ #include "misc.h" static inline struct tle_udp_stream * -rx_stream_obtain(struct tle_dev *dev, uint32_t type, uint32_t port) +rx_stream_obtain_by_tuples(struct stbl *st, const union pkt_info *pi) { struct tle_udp_stream *s; - if (type >= TLE_VNUM || dev->dp[type] == NULL) - return NULL; - - s = (struct tle_udp_stream *)dev->dp[type]->streams[port]; + s = UDP_STREAM(stbl_find_stream(st, pi)); if (s == NULL) return NULL; @@ -41,6 +37,24 @@ rx_stream_obtain(struct tle_dev *dev, uint32_t type, uint32_t port) return s; } +static inline struct tle_udp_stream * +rx_stream_obtain(struct tle_dev *dev, uint32_t type, const union pkt_info *pi) +{ + struct tle_udp_stream *s; + + if (type == TLE_V4) + s = bhash_lookup4(dev->ctx->bhash[type], + pi->addr4.dst, pi->port.dst, 1); + else + s = bhash_lookup6(dev->ctx->bhash[type], + pi->addr6->dst, pi->port.dst, 1); + + if (s == NULL || rwl_acquire(&s->rx.use) < 0) + return NULL; + + return s; +} + static inline uint16_t get_pkt_type(const struct rte_mbuf *m) { @@ -57,8 +71,9 @@ get_pkt_type(const struct rte_mbuf *m) } static inline union l4_ports -pkt_info(struct rte_mbuf *m, union l4_ports *ports, union ipv4_addrs *addr4, - union ipv6_addrs **addr6) +pkt_info_udp(struct rte_mbuf *m, union l4_ports *ports, + union ipv4_addrs *addr4, union ipv6_addrs **addr6, + union pkt_info *pi) { uint32_t len; union l4_ports ret, *up; @@ -71,15 +86,20 @@ pkt_info(struct rte_mbuf *m, union l4_ports *ports, union ipv4_addrs *addr4, pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *, len + offsetof(struct ipv4_hdr, src_addr)); addr4->raw = pa4->raw; + pi->addr4.raw = pa4->raw; + pi->tf.type = TLE_V4; } else if (ret.src == TLE_V6) { *addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *, len + offsetof(struct ipv6_hdr, src_addr)); + pi->addr6 = *addr6; + pi->tf.type = TLE_V6; } len += m->l3_len; up = rte_pktmbuf_mtod_offset(m, union l4_ports *, len + offsetof(struct udp_hdr, src_port)); ports->raw = up->raw; + pi->port.raw = up->raw; ret.dst = ports->dst; return ret; } @@ -96,6 +116,11 @@ rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[], r = _rte_ring_enqueue_burst(s->rx.q, mb, num); + if (unlikely(r != num)) { + UDP_ADD_STATS(UDP_MIB_RCVBUFERRORS, num - r); + UDP_ADD_STATS(UDP_MIB_INERRORS, num - r); + } + /* if RX queue was empty invoke user RX notification callback. */ if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r) s->rx.cb.func(s->rx.cb.data, &s->s); @@ -164,28 +189,64 @@ rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[], return rx_stream(s, mb, rp + k, rc + k, n); } +/* + * Consider 2 UDP pkt_info *equal* if their: + * - types (IPv4/IPv6) + * - TCP src and dst ports + * - IP src and dst addresses + * are equal. + */ +static inline int +udp_pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num) +{ + uint32_t i; + + i = 1; + + if (pi[0].tf.type == TLE_V4) { + while (i != num && pi[i].tf.type == TLE_V4 && + pi[0].port.raw == pi[i].port.raw && + pi[0].addr4.raw == pi[i].addr4.raw) + i++; + } else if (pi[0].tf.type == TLE_V6) { + while (i != num && pi[i].tf.type == TLE_V6 && + pi[0].port.raw == pi[i].port.raw && + ymm_cmp(&pi[0].addr6->raw, + &pi[i].addr6->raw) == 0) + i++; + } + + return i; +} + uint16_t tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], struct rte_mbuf *rp[], int32_t rc[], uint16_t num) { + struct stbl *st; struct tle_udp_stream *s; - uint32_t i, j, k, n, p, t; + uint32_t i, j, k, n, t; union l4_ports tp[num], port[num]; union ipv4_addrs a4[num]; union ipv6_addrs *pa6[num]; + union pkt_info pi[num]; + + st = CTX_UDP_STLB(dev->ctx); for (i = 0; i != num; i++) - tp[i] = pkt_info(pkt[i], &port[i], &a4[i], &pa6[i]); + tp[i] = pkt_info_udp(pkt[i], &port[i], &a4[i], + &pa6[i], &pi[i]); k = 0; for (i = 0; i != num; i = j) { - for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++) - ; + j = i + udp_pkt_info_bulk_eq(pi + i, num - i); t = tp[i].src; - p = tp[i].dst; - s = rx_stream_obtain(dev, t, p); + + s = rx_stream_obtain_by_tuples(st, &pi[i]); + if (s == NULL) + s = rx_stream_obtain(dev, t, &pi[i]); if (s != NULL) { if (t == TLE_V4) @@ -202,6 +263,7 @@ tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], rwl_release(&s->rx.use); } else { + UDP_ADD_STATS(UDP_MIB_NOPORTS, j - i); for (; i != j; i++) { rc[k] = ENOENT; rp[k] = pkt[i]; @@ -262,6 +324,8 @@ tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num) stream_drb_release(s, drb + i, j - i); } + UDP_ADD_STATS(UDP_MIB_OUTDATAGRAMS, n); + return n; } @@ -272,24 +336,18 @@ tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num) static inline uint32_t recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type) { - uint32_t i, k; - uint64_t flg[num], ofl[num]; - - for (i = 0; i != num; i++) { - flg[i] = m[i]->ol_flags; - ofl[i] = m[i]->tx_offload; - } + uint32_t i, k, offset; - k = 0; - for (i = 0; i != num; i++) { - - /* drop packets with invalid cksum(s). */ - if (check_pkt_csum(m[i], flg[i], type, IPPROTO_UDP) != 0) { + for (i = 0, k = 0; i != num; i++) { + if (check_pkt_csum(m[i], type, IPPROTO_UDP) != 0) { + UDP_INC_STATS(UDP_MIB_CSUMERRORS); rte_pktmbuf_free(m[i]); m[i] = NULL; k++; - } else - rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i])); + } else { + offset = _tx_offload_l4_offset(m[i]->tx_offload); + rte_pktmbuf_adj(m[i], offset); + } } return k; @@ -302,9 +360,25 @@ tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num) struct tle_udp_stream *s; s = UDP_STREAM(us); + n = 0; + +again: n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num); - if (n == 0) + if (n == 0) { + if (rwl_try_acquire(&s->rx.use) > 0) + rte_errno = EAGAIN; + else + rte_errno = ESHUTDOWN; + rwl_release(&s->rx.use); return 0; + } + + k = recv_pkt_process(pkt, n, s->s.type); + if (unlikely(k)) + UDP_ADD_STATS_ATOMIC(UDP_MIB_CSUMERRORS, k); + n = compress_pkt_list(pkt, n, k); + if (n == 0) + goto again; /* * if we still have packets to read, @@ -316,8 +390,8 @@ tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num) rwl_release(&s->rx.use); } - k = recv_pkt_process(pkt, n, s->s.type); - return compress_pkt_list(pkt, n, k); + UDP_ADD_STATS_ATOMIC(UDP_MIB_INDATAGRAMS, n); + return n; } static inline int @@ -413,7 +487,7 @@ fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num, /* Remove the Ethernet header from the input packet */ rte_pktmbuf_adj(pkt, dst->l2_len); - mtu = dst->mtu - dst->l2_len; + mtu = dst->mtu; /* fragment packet */ if (type == TLE_V4) @@ -475,13 +549,22 @@ queue_pkt_out(struct tle_udp_stream *s, struct tle_dev *dev, nb += nbc; /* no free drbs, can't send anything */ - if (nb == 0) + if (unlikely(nb == 0)) { + if (all_or_nothing) + UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, 1); + else + UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, nb_pkt); return 0; + } /* not enough free drbs, reduce number of packets to send. */ else if (nb != nbm) { - if (all_or_nothing) + if (all_or_nothing) { + UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, 1); return 0; + } + + UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, nb_pkt - nb * bsz); nb_pkt = nb * bsz; } @@ -509,12 +592,18 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[], const struct sockaddr_in *d4; const struct sockaddr_in6 *d6; struct tle_udp_stream *s; - const void *da; + const void *sa, *da; union udph udph; struct tle_dest dst; struct tle_drb *drb[num]; + uint8_t ufo; s = UDP_STREAM(us); + if (rwl_acquire(&s->tx.use) < 0) { + rte_errno = EPIPE; /* tx is shutdown */ + return 0; + } + type = s->s.type; /* start filling UDP header. */ @@ -523,7 +612,10 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[], /* figure out what destination addr/port to use. */ if (dst_addr != NULL) { - if (dst_addr->sa_family != s->prm.remote_addr.ss_family) { + if (dst_addr->sa_family != s->prm.remote_addr.ss_family && + (s->prm.remote_addr.ss_family == AF_INET || + !IN6_IS_ADDR_UNSPECIFIED(&s->s.ipv6.addr.dst))) { + rwl_release(&s->tx.use); rte_errno = EINVAL; return 0; } @@ -531,21 +623,28 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[], d4 = (const struct sockaddr_in *)dst_addr; da = &d4->sin_addr; udph.ports.dst = d4->sin_port; + sa = &s->s.ipv4.addr.dst; } else { d6 = (const struct sockaddr_in6 *)dst_addr; da = &d6->sin6_addr; udph.ports.dst = d6->sin6_port; + sa = &s->s.ipv6.addr.dst; } } else { udph.ports.dst = s->s.port.src; - if (type == TLE_V4) + if (type == TLE_V4) { da = &s->s.ipv4.addr.src; - else + sa = &s->s.ipv4.addr.dst; + } + else { da = &s->s.ipv6.addr.src; + sa = &s->s.ipv6.addr.dst; + } } - di = stream_get_dest(&s->s, da, &dst); + di = stream_get_dest(type, &s->s, sa, da, &dst); if (di < 0) { + rwl_release(&s->tx.use); rte_errno = -di; return 0; } @@ -553,12 +652,7 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[], pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num; mtu = dst.mtu - dst.l2_len - dst.l3_len; - /* mark stream as not closable. */ - if (rwl_acquire(&s->tx.use) < 0) { - rte_errno = EAGAIN; - return 0; - } - + ufo = dst.dev->prm.tx_offload & DEV_TX_OFFLOAD_UDP_TSO; nb = 0; for (i = 0, k = 0; k != num; k = i) { @@ -568,7 +662,7 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[], ol_flags = dst.dev->tx.ol_flags[type]; while (i != num && frg == 0) { - frg = pkt[i]->pkt_len > mtu; + frg = (!ufo) && pkt[i]->pkt_len > mtu; if (frg != 0) ol_flags &= ~PKT_TX_UDP_CKSUM; rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i, diff --git a/lib/libtle_l4p/udp_stream.c b/lib/libtle_l4p/udp_stream.c index 29f5a40..0cd5c27 100644 --- a/lib/libtle_l4p/udp_stream.c +++ b/lib/libtle_l4p/udp_stream.c @@ -43,74 +43,87 @@ fini_stream(struct tle_udp_stream *s) static void udp_fini_streams(struct tle_ctx *ctx) { - uint32_t i; - struct tle_udp_stream *s; + struct udp_streams *us; + struct tle_stream *s; + + us = CTX_UDP_STREAMS(ctx); + if (us != NULL) { + stbl_fini(&us->st); + + while (ctx->streams.nb_free--) { + s = STAILQ_FIRST(&ctx->streams.free); + STAILQ_FIRST(&ctx->streams.free) = STAILQ_NEXT(s, link); + fini_stream(UDP_STREAM(s)); + } - s = ctx->streams.buf; - if (s != NULL) { - for (i = 0; i != ctx->prm.max_streams; i++) - fini_stream(s + i); } - rte_free(s); + rte_free(us); ctx->streams.buf = NULL; STAILQ_INIT(&ctx->streams.free); } +/* stream memory layout: + * [tle_udp_stream] [rx.q] [tx.drb.r] + */ static int -init_stream(struct tle_ctx *ctx, struct tle_udp_stream *s) +add_stream(struct tle_ctx *ctx) { - size_t bsz, rsz, sz; - uint32_t i, k, n, nb; + size_t sz_s, sz_rxq, sz_drb_r, sz; + /* for rx.q */ + uint32_t n_rxq; + /* for tx.drb.r */ + size_t bsz, rsz; struct tle_drb *drb; - char name[RTE_RING_NAMESIZE]; + uint32_t k, nb, n_drb; - /* init RX part. */ - - n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); - n = rte_align32pow2(n); - sz = rte_ring_get_memsize(n); - - s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, - ctx->prm.socket_id); - if (s->rx.q == NULL) { - UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " - "failed with error code: %d\n", - __func__, s, sz, ctx->prm.socket_id, rte_errno); - return -ENOMEM; - } + uint32_t i, f; + char name[RTE_RING_NAMESIZE]; + struct tle_udp_stream *s; - snprintf(name, sizeof(name), "%p@%zu", s, sz); - rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ); + // stream + sz_s = RTE_ALIGN_CEIL(sizeof(*s), RTE_CACHE_LINE_SIZE); - /* init TX part. */ + // rx.q + n_rxq = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); + n_rxq = rte_align32pow2(n_rxq); + sz_rxq = rte_ring_get_memsize(n_rxq); + sz_rxq = RTE_ALIGN_CEIL(sz_rxq, RTE_CACHE_LINE_SIZE); + // tx.drb.r nb = drb_nb_elem(ctx); k = calc_stream_drb_num(ctx, nb); - n = rte_align32pow2(k); - - /* size of the drbs ring */ - rsz = rte_ring_get_memsize(n); + n_drb = rte_align32pow2(k); + rsz = rte_ring_get_memsize(n_drb); /* size of the drbs ring */ rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE); + bsz = tle_drb_calc_size(nb); /* size of the drb. */ + sz_drb_r = rsz + bsz * k; /* total stream drbs size. */ + sz_drb_r = RTE_ALIGN_CEIL(sz_drb_r, RTE_CACHE_LINE_SIZE); - /* size of the drb. */ - bsz = tle_drb_calc_size(nb); - - /* total stream drbs size. */ - sz = rsz + bsz * k; - - s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, - ctx->prm.socket_id); - if (s->tx.drb.r == NULL) { - UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + sz = sz_s + sz_rxq + sz_drb_r; + s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s == NULL) { + UDP_LOG(ERR, "%s: allocation of %zu bytes on socket %d " "failed with error code: %d\n", - __func__, s, sz, ctx->prm.socket_id, rte_errno); + __func__, sz, ctx->prm.socket_id, rte_errno); return -ENOMEM; } - snprintf(name, sizeof(name), "%p@%zu", s, sz); - rte_ring_init(s->tx.drb.r, name, n, 0); + s->rx.q = (struct rte_ring *)((uintptr_t)s + sz_s); + s->tx.drb.r = (struct rte_ring *)((uintptr_t)s->rx.q + sz_rxq); + + // ring flags + f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 : + (RING_F_SP_ENQ | RING_F_SC_DEQ); + + /* init RX part. */ + snprintf(name, sizeof(name), "%p@%zu", s->rx.q, sz_rxq); + rte_ring_init(s->rx.q, name, n_rxq, f); + /* init TX part. */ + snprintf(name, sizeof(name), "%p@%zu", s->tx.drb.r, sz_drb_r); + rte_ring_init(s->tx.drb.r, name, n_drb, f); for (i = 0; i != k; i++) { drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r + rsz + bsz * i); @@ -146,38 +159,59 @@ udp_init_streams(struct tle_ctx *ctx) size_t sz; uint32_t i; int32_t rc; - struct tle_udp_stream *s; + struct udp_streams *us; - sz = sizeof(*s) * ctx->prm.max_streams; - s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + sz = sizeof(*us); + us = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, ctx->prm.socket_id); - if (s == NULL) { + if (us == NULL) { UDP_LOG(ERR, "allocation of %zu bytes on socket %d " "for %u udp_streams failed\n", sz, ctx->prm.socket_id, ctx->prm.max_streams); return -ENOMEM; } - ctx->streams.buf = s; + ctx->streams.buf = us; STAILQ_INIT(&ctx->streams.free); - for (i = 0; i != ctx->prm.max_streams; i++) { - rc = init_stream(ctx, s + i); - if (rc != 0) { - UDP_LOG(ERR, "initalisation of %u-th stream failed", i); - udp_fini_streams(ctx); - return rc; - } + rc = stbl_init(&us->st, (ctx->prm.flags & TLE_CTX_FLAG_ST) == 0); + if (rc < 0) { + UDP_LOG(ERR, "failed to init UDP stbl: rc = %dl\n", rc); + return rc; } - return 0; + for (i = 0; rc == 0 && i != ctx->prm.min_streams; i++) + rc = add_stream(ctx); + + if (rc != 0) { + UDP_LOG(ERR, "initalisation of %u-th stream failed", i); + udp_fini_streams(ctx); + } + + return rc; } -static void __attribute__((constructor)) +static uint32_t +udp_more_streams(struct tle_ctx *ctx) +{ + uint32_t i, nb; + uint32_t nb_max = ctx->prm.max_streams; + uint32_t nb_cur = ctx->streams.nb_cur; + + nb = RTE_MIN(ctx->prm.delta_streams, nb_max - nb_cur); + for (i = 0; i < nb; i++) + if (add_stream(ctx) != 0) + break; + + return i; +} + +static void __attribute__((constructor(101))) udp_stream_setup(void) { static const struct stream_ops udp_ops = { .init_streams = udp_init_streams, + .more_streams = udp_more_streams, .fini_streams = udp_fini_streams, .free_drbs = udp_free_drbs, }; @@ -188,8 +222,8 @@ udp_stream_setup(void) static inline void stream_down(struct tle_udp_stream *s) { - rwl_down(&s->rx.use); - rwl_down(&s->tx.use); + rwl_try_down(&s->rx.use); + rwl_try_down(&s->tx.use); } static inline void @@ -224,6 +258,59 @@ check_stream_prm(const struct tle_ctx *ctx, } struct tle_stream * +tle_udp_stream_set(struct tle_stream *ts, struct tle_ctx *ctx, + const struct tle_udp_stream_param *prm) +{ + struct tle_udp_stream *s; + int32_t rc; + + if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) { + tle_udp_stream_close(ts); + rte_errno = EINVAL; + return NULL; + } + + s = UDP_STREAM(ts); + + /* free stream's destination port */ + rc = stream_clear_ctx(ctx, &s->s); + + if (s->ste) { + stbl_del_stream(CTX_UDP_STLB(ctx), s->ste, ts); + s->ste = NULL; + } + + /* copy input parameters. */ + s->prm = *prm; + s->s.option.raw = prm->option; + + /* setup L4 ports and L3 addresses fields. */ + rc = stream_fill_ctx(ctx, &s->s, + (const struct sockaddr *)&prm->local_addr, + (const struct sockaddr *)&prm->remote_addr); + + if (rc != 0) + goto error; + + /* add stream to the table for non-listen type stream */ + if (!is_empty_addr((const struct sockaddr *)&prm->remote_addr)) { + s->ste = stbl_add_stream(CTX_UDP_STLB(ctx), &s->s); + if (s->ste == NULL) { + rc = EEXIST; + goto error; + } + } + + return &s->s; + +error: + tle_udp_stream_close(ts); + rte_errno = rc; + return NULL; + +} + +struct tle_stream * tle_udp_stream_open(struct tle_ctx *ctx, const struct tle_udp_stream_param *prm) { @@ -237,42 +324,80 @@ tle_udp_stream_open(struct tle_ctx *ctx, s = (struct tle_udp_stream *)get_stream(ctx); if (s == NULL) { - rte_errno = ENFILE; - return NULL; - - /* some TX still pending for that stream. */ - } else if (UDP_STREAM_TX_PENDING(s)) { - put_stream(ctx, &s->s, 0); rte_errno = EAGAIN; return NULL; } /* copy input parameters. */ s->prm = *prm; + s->s.option.raw = prm->option; /* setup L4 ports and L3 addresses fields. */ rc = stream_fill_ctx(ctx, &s->s, (const struct sockaddr *)&prm->local_addr, (const struct sockaddr *)&prm->remote_addr); - if (rc != 0) { - put_stream(ctx, &s->s, 1); - s = NULL; - rte_errno = rc; - } else { - /* setup stream notification menchanism */ - s->rx.ev = prm->recv_ev; - s->rx.cb = prm->recv_cb; - s->tx.ev = prm->send_ev; - s->tx.cb = prm->send_cb; - - /* mark stream as avaialbe for RX/TX */ - if (s->tx.ev != NULL) - tle_event_raise(s->tx.ev); - stream_up(s); + if (rc != 0) + goto error; + + /* add stream to the table for non-listen type stream */ + if (!is_empty_addr((const struct sockaddr *)&prm->remote_addr)) { + s->ste = stbl_add_stream(CTX_UDP_STLB(ctx), &s->s); + if (s->ste == NULL) { + rc = EEXIST; + goto error; + } } + /* setup stream notification menchanism */ + s->rx.ev = prm->recv_ev; + s->rx.cb = prm->recv_cb; + s->tx.ev = prm->send_ev; + s->tx.cb = prm->send_cb; + + /* mark stream as avaialbe for RX/TX */ + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + stream_up(s); + return &s->s; + +error: + put_stream(ctx, &s->s, 1); + rte_errno = rc; + return NULL; +} + +int +tle_udp_stream_shutdown(struct tle_stream *us, int how) +{ + bool shut_rd = false; + bool shut_wr = false; + struct tle_udp_stream *s = UDP_STREAM(us); + + switch (how) { + case SHUT_RD: + shut_rd = true; + rwl_down(&s->rx.use); + break; + case SHUT_WR: + shut_wr = true; + rwl_down(&s->tx.use); + break; + case SHUT_RDWR: + shut_rd = true; + shut_wr = true; + stream_down(s); + break; + default: + return -EINVAL; + } + + if (shut_rd && s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + if (shut_wr && s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + return 0; } int @@ -312,6 +437,11 @@ tle_udp_stream_close(struct tle_stream *us) /* empty stream's RX queue */ empty_mbuf_ring(s->rx.q); + if (s->ste) { + stbl_del_stream(CTX_UDP_STLB(ctx), s->ste, us); + s->ste = NULL; + } + /* * mark the stream as free again. * if there still are pkts queued for TX, @@ -344,3 +474,56 @@ tle_udp_stream_get_param(const struct tle_stream *us, return 0; } + +/* + * helper function, updates stream config + */ +static inline int +stream_update_cfg(struct tle_stream *us, struct tle_udp_stream_param *prm) +{ + struct tle_udp_stream *s; + + s = UDP_STREAM(us); + + /* setup stream notification menchanism */ + s->rx.ev = prm->recv_ev; + s->rx.cb = prm->recv_cb; + s->tx.ev = prm->send_ev; + s->tx.cb = prm->send_cb; + + rte_smp_wmb(); + + /* invoke async notifications, if any */ + if (rte_ring_count(s->rx.q) != 0) { + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + else if (s->rx.cb.func != NULL) + s->rx.cb.func(s->rx.cb.data, &s->s); + } + + /* always ok to write */ + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + else if (s->tx.cb.func != NULL) + s->tx.cb.func(s->tx.cb.data, &s->s); + + return 0; +} + +uint32_t +tle_udp_stream_update_cfg(struct tle_stream *us[], + struct tle_udp_stream_param prm[], uint32_t num) +{ + int32_t rc; + uint32_t i; + + for (i = 0; i != num; i++) { + rc = stream_update_cfg(us[i], &prm[i]); + if (rc != 0) { + rte_errno = -rc; + break; + } + } + + return i; +} diff --git a/lib/libtle_l4p/udp_stream.h b/lib/libtle_l4p/udp_stream.h index a950e56..55a66f8 100644 --- a/lib/libtle_l4p/udp_stream.h +++ b/lib/libtle_l4p/udp_stream.h @@ -24,6 +24,7 @@ #include "osdep.h" #include "ctx.h" #include "stream.h" +#include "stream_table.h" #ifdef __cplusplus extern "C" { @@ -41,6 +42,7 @@ union udph { struct tle_udp_stream { struct tle_stream s; + struct stbl_entry *ste; /* entry in streams table. */ struct { struct rte_ring *q; @@ -63,6 +65,13 @@ struct tle_udp_stream { struct tle_udp_stream_param prm; } __rte_cache_aligned; +struct udp_streams { + struct stbl st; +}; + +#define CTX_UDP_STREAMS(ctx) ((struct udp_streams *)(ctx)->streams.buf) +#define CTX_UDP_STLB(ctx) (&CTX_UDP_STREAMS(ctx)->st) + #define UDP_STREAM(p) \ ((struct tle_udp_stream *)((uintptr_t)(p) - offsetof(struct tle_udp_stream, s))) |