aboutsummaryrefslogtreecommitdiffstats
path: root/lib/libtle_l4p
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libtle_l4p')
-rw-r--r--lib/libtle_l4p/Makefile1
-rw-r--r--lib/libtle_l4p/ctx.c349
-rw-r--r--lib/libtle_l4p/ctx.h38
-rw-r--r--lib/libtle_l4p/misc.h66
-rw-r--r--lib/libtle_l4p/net_misc.h21
-rw-r--r--lib/libtle_l4p/port_statmap.h127
-rw-r--r--lib/libtle_l4p/stream.h55
-rw-r--r--lib/libtle_l4p/stream_table.c65
-rw-r--r--lib/libtle_l4p/stream_table.h490
-rw-r--r--lib/libtle_l4p/syncookie.h4
-rw-r--r--lib/libtle_l4p/tcp_ctl.h68
-rw-r--r--lib/libtle_l4p/tcp_misc.h34
-rw-r--r--lib/libtle_l4p/tcp_ofo.c39
-rw-r--r--lib/libtle_l4p/tcp_ofo.h14
-rw-r--r--lib/libtle_l4p/tcp_rxq.h4
-rw-r--r--lib/libtle_l4p/tcp_rxtx.c1445
-rw-r--r--lib/libtle_l4p/tcp_rxtx.h252
-rw-r--r--lib/libtle_l4p/tcp_stream.c395
-rw-r--r--lib/libtle_l4p/tcp_stream.h37
-rw-r--r--lib/libtle_l4p/tcp_timer.h40
-rw-r--r--lib/libtle_l4p/tcp_tx_seg.h12
-rw-r--r--lib/libtle_l4p/tcp_txq.h29
-rw-r--r--lib/libtle_l4p/tle_ctx.h41
-rw-r--r--lib/libtle_l4p/tle_event.h2
-rw-r--r--lib/libtle_l4p/tle_stats.h101
-rw-r--r--lib/libtle_l4p/tle_tcp.h60
-rw-r--r--lib/libtle_l4p/tle_udp.h49
-rw-r--r--lib/libtle_l4p/udp_rxtx.c186
-rw-r--r--lib/libtle_l4p/udp_stream.c347
-rw-r--r--lib/libtle_l4p/udp_stream.h9
30 files changed, 3138 insertions, 1242 deletions
diff --git a/lib/libtle_l4p/Makefile b/lib/libtle_l4p/Makefile
index e1357d1..ee81d4a 100644
--- a/lib/libtle_l4p/Makefile
+++ b/lib/libtle_l4p/Makefile
@@ -45,6 +45,7 @@ SYMLINK-y-include += tle_ctx.h
SYMLINK-y-include += tle_event.h
SYMLINK-y-include += tle_tcp.h
SYMLINK-y-include += tle_udp.h
+SYMLINK-y-include += tle_stats.h
# this lib dependencies
DEPDIRS-y += lib/libtle_misc
diff --git a/lib/libtle_l4p/ctx.c b/lib/libtle_l4p/ctx.c
index b8067f0..d6bde48 100644
--- a/lib/libtle_l4p/ctx.c
+++ b/lib/libtle_l4p/ctx.c
@@ -21,9 +21,14 @@
#include <rte_ip.h>
#include "stream.h"
+#include "stream_table.h"
#include "misc.h"
#include <halfsiphash.h>
+struct tle_mib default_mib;
+
+RTE_DEFINE_PER_LCORE(struct tle_mib *, mib) = &default_mib;
+
#define LPORT_START 0x8000
#define LPORT_END MAX_PORT_NUM
@@ -103,6 +108,16 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm)
ctx->prm = *ctx_prm;
+ rc = bhash_init(ctx);
+ if (rc != 0) {
+ UDP_LOG(ERR, "create bhash table (ctx=%p, proto=%u) failed "
+ "with error code: %d;\n",
+ ctx, ctx_prm->proto, rc);
+ tle_ctx_destroy(ctx);
+ rte_errno = -rc;
+ return NULL;
+ }
+
rc = tle_stream_ops[ctx_prm->proto].init_streams(ctx);
if (rc != 0) {
UDP_LOG(ERR, "init_streams(ctx=%p, proto=%u) failed "
@@ -114,9 +129,10 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm)
}
for (i = 0; i != RTE_DIM(ctx->use); i++)
- tle_pbm_init(ctx->use + i, LPORT_START_BLK);
+ tle_psm_init(ctx->use + i);
- ctx->streams.nb_free = ctx->prm.max_streams;
+ ctx->streams.nb_free = ctx->prm.min_streams;
+ ctx->streams.nb_cur = ctx->prm.min_streams;
/* Initialization of siphash state is done here to speed up the
* fastpath processing.
@@ -124,6 +140,11 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm)
if (ctx->prm.hash_alg == TLE_SIPHASH)
siphash_initialization(&ctx->prm.secret_key,
&ctx->prm.secret_key);
+
+ rte_spinlock_init(&ctx->dev_lock);
+ rte_spinlock_init(&ctx->bhash_lock[TLE_V4]);
+ rte_spinlock_init(&ctx->bhash_lock[TLE_V6]);
+
return ctx;
}
@@ -137,6 +158,8 @@ tle_ctx_destroy(struct tle_ctx *ctx)
return;
}
+ bhash_fini(ctx);
+
for (i = 0; i != RTE_DIM(ctx->dev); i++)
tle_del_dev(ctx->dev + i);
@@ -150,37 +173,6 @@ tle_ctx_invalidate(struct tle_ctx *ctx)
RTE_SET_USED(ctx);
}
-static void
-fill_pbm(struct tle_pbm *pbm, const struct tle_bl_port *blp)
-{
- uint32_t i;
-
- for (i = 0; i != blp->nb_port; i++)
- tle_pbm_set(pbm, blp->port[i]);
-}
-
-static int
-init_dev_proto(struct tle_dev *dev, uint32_t idx, int32_t socket_id,
- const struct tle_bl_port *blp)
-{
- size_t sz;
-
- sz = sizeof(*dev->dp[idx]);
- dev->dp[idx] = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
- socket_id);
-
- if (dev->dp[idx] == NULL) {
- UDP_LOG(ERR, "allocation of %zu bytes on "
- "socket %d for %u-th device failed\n",
- sz, socket_id, idx);
- return ENOMEM;
- }
-
- tle_pbm_init(&dev->dp[idx]->use, LPORT_START_BLK);
- fill_pbm(&dev->dp[idx]->use, blp);
- return 0;
-}
-
static struct tle_dev *
find_free_dev(struct tle_ctx *ctx)
{
@@ -214,27 +206,8 @@ tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm)
return NULL;
rc = 0;
- /* device can handle IPv4 traffic */
- if (dev_prm->local_addr4.s_addr != INADDR_ANY) {
- rc = init_dev_proto(dev, TLE_V4, ctx->prm.socket_id,
- &dev_prm->bl4);
- if (rc == 0)
- fill_pbm(&ctx->use[TLE_V4], &dev_prm->bl4);
- }
-
- /* device can handle IPv6 traffic */
- if (rc == 0 && memcmp(&dev_prm->local_addr6, &tle_ipv6_any,
- sizeof(tle_ipv6_any)) != 0) {
- rc = init_dev_proto(dev, TLE_V6, ctx->prm.socket_id,
- &dev_prm->bl6);
- if (rc == 0)
- fill_pbm(&ctx->use[TLE_V6], &dev_prm->bl6);
- }
-
if (rc != 0) {
/* cleanup and return an error. */
- rte_free(dev->dp[TLE_V4]);
- rte_free(dev->dp[TLE_V6]);
rte_errno = rc;
return NULL;
}
@@ -246,16 +219,19 @@ tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm)
if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0 &&
ctx->prm.proto == TLE_PROTO_UDP) {
- dev->tx.ol_flags[TLE_V4] |= PKT_TX_IPV4 | PKT_TX_UDP_CKSUM;
- dev->tx.ol_flags[TLE_V6] |= PKT_TX_IPV6 | PKT_TX_UDP_CKSUM;
+ dev->tx.ol_flags[TLE_V4] |= PKT_TX_UDP_CKSUM;
+ dev->tx.ol_flags[TLE_V6] |= PKT_TX_UDP_CKSUM;
} else if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_TCP_CKSUM) != 0 &&
ctx->prm.proto == TLE_PROTO_TCP) {
- dev->tx.ol_flags[TLE_V4] |= PKT_TX_IPV4 | PKT_TX_TCP_CKSUM;
- dev->tx.ol_flags[TLE_V6] |= PKT_TX_IPV6 | PKT_TX_TCP_CKSUM;
+ dev->tx.ol_flags[TLE_V4] |= PKT_TX_TCP_CKSUM;
+ dev->tx.ol_flags[TLE_V6] |= PKT_TX_TCP_CKSUM;
}
if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_IPV4_CKSUM) != 0)
- dev->tx.ol_flags[TLE_V4] |= PKT_TX_IPV4 | PKT_TX_IP_CKSUM;
+ dev->tx.ol_flags[TLE_V4] |= PKT_TX_IP_CKSUM;
+
+ dev->tx.ol_flags[TLE_V4] |= PKT_TX_IPV4;
+ dev->tx.ol_flags[TLE_V6] |= PKT_TX_IPV6;
dev->prm = *dev_prm;
dev->ctx = ctx;
@@ -300,220 +276,97 @@ tle_del_dev(struct tle_dev *dev)
ctx = dev->ctx;
p = dev - ctx->dev;
- if (p >= RTE_DIM(ctx->dev) ||
- (dev->dp[TLE_V4] == NULL &&
- dev->dp[TLE_V6] == NULL))
+ if (p >= RTE_DIM(ctx->dev))
return -EINVAL;
/* emtpy TX queues. */
empty_dring(&dev->tx.dr, ctx->prm.proto);
- rte_free(dev->dp[TLE_V4]);
- rte_free(dev->dp[TLE_V6]);
memset(dev, 0, sizeof(*dev));
ctx->nb_dev--;
return 0;
}
-static struct tle_dev *
-find_ipv4_dev(struct tle_ctx *ctx, const struct in_addr *addr)
-{
- uint32_t i;
-
- for (i = 0; i != RTE_DIM(ctx->dev); i++) {
- if (ctx->dev[i].prm.local_addr4.s_addr == addr->s_addr &&
- ctx->dev[i].dp[TLE_V4] != NULL)
- return ctx->dev + i;
- }
-
- return NULL;
-}
-
-static struct tle_dev *
-find_ipv6_dev(struct tle_ctx *ctx, const struct in6_addr *addr)
+int
+stream_fill_ctx(struct tle_ctx *ctx, struct tle_stream *s,
+ const struct sockaddr *laddr, const struct sockaddr *raddr)
{
- uint32_t i;
+ struct sockaddr_storage addr;
+ int32_t rc = 0;
- for (i = 0; i != RTE_DIM(ctx->dev); i++) {
- if (memcmp(&ctx->dev[i].prm.local_addr6, addr,
- sizeof(*addr)) == 0 &&
- ctx->dev[i].dp[TLE_V6] != NULL)
- return ctx->dev + i;
+ if (laddr->sa_family == AF_INET) {
+ s->type = TLE_V4;
+ } else if (laddr->sa_family == AF_INET6) {
+ s->type = TLE_V6;
}
- return NULL;
-}
-
-static int
-stream_fill_dev(struct tle_ctx *ctx, struct tle_stream *s,
- const struct sockaddr *addr)
-{
- struct tle_dev *dev;
- struct tle_pbm *pbm;
- const struct sockaddr_in *lin4;
- const struct sockaddr_in6 *lin6;
- uint32_t i, p, sp, t;
-
- if (addr->sa_family == AF_INET) {
- lin4 = (const struct sockaddr_in *)addr;
- t = TLE_V4;
- p = lin4->sin_port;
- } else if (addr->sa_family == AF_INET6) {
- lin6 = (const struct sockaddr_in6 *)addr;
- t = TLE_V6;
- p = lin6->sin6_port;
- } else
- return EINVAL;
-
+ uint16_t p = ((const struct sockaddr_in *)laddr)->sin_port;
p = ntohs(p);
-
- /* if local address is not wildcard, find device it belongs to. */
- if (t == TLE_V4 && lin4->sin_addr.s_addr != INADDR_ANY) {
- dev = find_ipv4_dev(ctx, &lin4->sin_addr);
- if (dev == NULL)
- return ENODEV;
- } else if (t == TLE_V6 && memcmp(&tle_ipv6_any, &lin6->sin6_addr,
- sizeof(tle_ipv6_any)) != 0) {
- dev = find_ipv6_dev(ctx, &lin6->sin6_addr);
- if (dev == NULL)
- return ENODEV;
- } else
- dev = NULL;
-
- if (dev != NULL)
- pbm = &dev->dp[t]->use;
- else
- pbm = &ctx->use[t];
-
+ struct tle_psm *psm = &ctx->use[s->type];
/* try to acquire local port number. */
+ rte_spinlock_lock(&ctx->dev_lock);
if (p == 0) {
- p = tle_pbm_find_range(pbm, pbm->blk, LPORT_END_BLK);
- if (p == 0 && pbm->blk > LPORT_START_BLK)
- p = tle_pbm_find_range(pbm, LPORT_START_BLK, pbm->blk);
- } else if (tle_pbm_check(pbm, p) != 0)
- return EEXIST;
-
- if (p == 0)
- return ENFILE;
-
- /* fill socket's dst port and type */
-
- sp = htons(p);
- s->type = t;
- s->port.dst = sp;
-
- /* mark port as in-use */
-
- tle_pbm_set(&ctx->use[t], p);
- if (dev != NULL) {
- tle_pbm_set(pbm, p);
- dev->dp[t]->streams[sp] = s;
- } else {
- for (i = 0; i != RTE_DIM(ctx->dev); i++) {
- if (ctx->dev[i].dp[t] != NULL) {
- tle_pbm_set(&ctx->dev[i].dp[t]->use, p);
- ctx->dev[i].dp[t]->streams[sp] = s;
- }
+ if (s->type == TLE_V6 && is_empty_addr(laddr) && !s->option.ipv6only)
+ p = tle_psm_alloc_dual_port(&ctx->use[TLE_V4], psm);
+ else
+ p = tle_psm_alloc_port(psm);
+ if (p == 0) {
+ rte_spinlock_unlock(&ctx->dev_lock);
+ return ENFILE;
}
+ rte_memcpy(&addr, laddr, sizeof(struct sockaddr_storage));
+ ((struct sockaddr_in *)&addr)->sin_port = htons(p);
+ laddr = (const struct sockaddr*)&addr;
}
- return 0;
-}
+ if (tle_psm_set(psm, p, s->option.reuseport) != 0) {
+ rte_spinlock_unlock(&ctx->dev_lock);
+ return EADDRINUSE;
+ }
-static int
-stream_clear_dev(struct tle_ctx *ctx, const struct tle_stream *s)
-{
- struct tle_dev *dev;
- uint32_t i, p, sp, t;
-
- t = s->type;
- sp = s->port.dst;
- p = ntohs(sp);
-
- /* if local address is not wildcard, find device it belongs to. */
- if (t == TLE_V4 && s->ipv4.addr.dst != INADDR_ANY) {
- dev = find_ipv4_dev(ctx,
- (const struct in_addr *)&s->ipv4.addr.dst);
- if (dev == NULL)
- return ENODEV;
- } else if (t == TLE_V6 && memcmp(&tle_ipv6_any, &s->ipv6.addr.dst,
- sizeof(tle_ipv6_any)) != 0) {
- dev = find_ipv6_dev(ctx,
- (const struct in6_addr *)&s->ipv6.addr.dst);
- if (dev == NULL)
- return ENODEV;
- } else
- dev = NULL;
-
- tle_pbm_clear(&ctx->use[t], p);
- if (dev != NULL) {
- if (dev->dp[t]->streams[sp] == s) {
- tle_pbm_clear(&dev->dp[t]->use, p);
- dev->dp[t]->streams[sp] = NULL;
- }
- } else {
- for (i = 0; i != RTE_DIM(ctx->dev); i++) {
- if (ctx->dev[i].dp[t] != NULL &&
- ctx->dev[i].dp[t]->streams[sp] == s) {
- tle_pbm_clear(&ctx->dev[i].dp[t]->use, p);
- ctx->dev[i].dp[t]->streams[sp] = NULL;
+ if (is_empty_addr(laddr)) {
+ if (s->type == TLE_V6 && !s->option.ipv6only) {
+ rc = tle_psm_set(&ctx->use[TLE_V4], p, s->option.reuseport);
+ if (rc != 0) {
+ tle_psm_clear(psm, p);
+ rte_spinlock_unlock(&ctx->dev_lock);
+ return EADDRINUSE;
}
}
}
- return 0;
-}
-
-static void
-fill_ipv4_am(const struct sockaddr_in *in, uint32_t *addr, uint32_t *mask)
-{
- *addr = in->sin_addr.s_addr;
- *mask = (*addr == INADDR_ANY) ? INADDR_ANY : INADDR_NONE;
-}
+ if (is_empty_addr(raddr))
+ rc = bhash_add_entry(ctx, laddr, s);
-static void
-fill_ipv6_am(const struct sockaddr_in6 *in, rte_xmm_t *addr, rte_xmm_t *mask)
-{
- const struct in6_addr *pm;
-
- memcpy(addr, &in->sin6_addr, sizeof(*addr));
- if (memcmp(&tle_ipv6_any, addr, sizeof(*addr)) == 0)
- pm = &tle_ipv6_any;
- else
- pm = &tle_ipv6_none;
-
- memcpy(mask, pm, sizeof(*mask));
-}
+ if (rc) {
+ tle_psm_clear(psm, p);
+ }
-int
-stream_fill_ctx(struct tle_ctx *ctx, struct tle_stream *s,
- const struct sockaddr *laddr, const struct sockaddr *raddr)
-{
- const struct sockaddr_in *rin;
- int32_t rc;
+ rte_spinlock_unlock(&ctx->dev_lock);
+ /* fill socket's dst (src actually) port */
+ s->port.dst = htons(p);
- /* setup ports and port mask fields (except dst port). */
- rin = (const struct sockaddr_in *)raddr;
- s->port.src = rin->sin_port;
- s->pmsk.src = (s->port.src == 0) ? 0 : UINT16_MAX;
- s->pmsk.dst = UINT16_MAX;
+ if (rc)
+ return rc;
- /* setup src and dst addresses. */
+ /* setup src, dst addresses, and src port. */
if (laddr->sa_family == AF_INET) {
fill_ipv4_am((const struct sockaddr_in *)laddr,
&s->ipv4.addr.dst, &s->ipv4.mask.dst);
fill_ipv4_am((const struct sockaddr_in *)raddr,
&s->ipv4.addr.src, &s->ipv4.mask.src);
+ s->port.src = ((const struct sockaddr_in *)raddr)->sin_port;
} else if (laddr->sa_family == AF_INET6) {
fill_ipv6_am((const struct sockaddr_in6 *)laddr,
&s->ipv6.addr.dst, &s->ipv6.mask.dst);
fill_ipv6_am((const struct sockaddr_in6 *)raddr,
&s->ipv6.addr.src, &s->ipv6.mask.src);
+ s->port.src = ((const struct sockaddr_in6 *)raddr)->sin6_port;
}
- rte_spinlock_lock(&ctx->dev_lock);
- rc = stream_fill_dev(ctx, s, laddr);
- rte_spinlock_unlock(&ctx->dev_lock);
+ /* setup port mask fields. */
+ s->pmsk.src = (s->port.src == 0) ? 0 : UINT16_MAX;
+ s->pmsk.dst = UINT16_MAX;
return rc;
}
@@ -522,11 +375,41 @@ stream_fill_ctx(struct tle_ctx *ctx, struct tle_stream *s,
int
stream_clear_ctx(struct tle_ctx *ctx, struct tle_stream *s)
{
- int32_t rc;
+ bool is_any = false;
+ struct sockaddr_storage addr;
+ struct sockaddr_in *addr4;
+ struct sockaddr_in6 *addr6;
+
+ if (s->type == TLE_V4) {
+ if (s->ipv4.addr.src == INADDR_ANY) {
+ is_any = true;
+ addr4 = (struct sockaddr_in *)&addr;
+ addr4->sin_addr.s_addr = s->ipv4.addr.dst;
+ addr4->sin_port = s->port.dst;
+ addr.ss_family = AF_INET;
+ bhash_del_entry(ctx, s, (struct sockaddr*)&addr);
+ }
+ } else {
+ if (IN6_IS_ADDR_UNSPECIFIED(&s->ipv6.addr.src)) {
+ is_any = true;
+ addr6 = (struct sockaddr_in6 *)&addr;
+ memcpy(&addr6->sin6_addr, &s->ipv6.addr.dst,
+ sizeof(tle_ipv6_any));
+ addr6->sin6_port = s->port.dst;
+ addr.ss_family = AF_INET6;
+ bhash_del_entry(ctx, s, (struct sockaddr*)&addr);
+ }
+ }
rte_spinlock_lock(&ctx->dev_lock);
- rc = stream_clear_dev(ctx, s);
+ /* strange behaviour to match linux stack */
+ if (is_any) {
+ if (s->type == TLE_V6 && !s->option.ipv6only)
+ tle_psm_clear(&ctx->use[TLE_V4], ntohs(s->port.dst));
+ }
+
+ tle_psm_clear(&ctx->use[s->type], ntohs(s->port.dst));
rte_spinlock_unlock(&ctx->dev_lock);
- return rc;
+ return 0;
}
diff --git a/lib/libtle_l4p/ctx.h b/lib/libtle_l4p/ctx.h
index f18060b..9483976 100644
--- a/lib/libtle_l4p/ctx.h
+++ b/lib/libtle_l4p/ctx.h
@@ -21,7 +21,7 @@
#include <tle_dring.h>
#include <tle_ctx.h>
-#include "port_bitmap.h"
+#include "port_statmap.h"
#include "osdep.h"
#include "net_misc.h"
@@ -29,11 +29,6 @@
extern "C" {
#endif
-struct tle_dport {
- struct tle_pbm use; /* ports in use. */
- struct tle_stream *streams[MAX_PORT_NUM]; /* port to stream. */
-};
-
struct tle_dev {
struct tle_ctx *ctx;
struct {
@@ -45,7 +40,6 @@ struct tle_dev {
struct tle_dring dr;
} tx;
struct tle_dev_param prm; /* copy of device parameters. */
- struct tle_dport *dp[TLE_VNUM]; /* device L4 ports */
};
struct tle_ctx {
@@ -54,18 +48,23 @@ struct tle_ctx {
struct {
rte_spinlock_t lock;
uint32_t nb_free; /* number of free streams. */
+ uint32_t nb_cur; /* number of allocated streams. */
STAILQ_HEAD(, tle_stream) free;
void *buf; /* space allocated for streams */
} streams;
- rte_spinlock_t dev_lock;
+ rte_spinlock_t bhash_lock[TLE_VNUM];
+ struct rte_hash *bhash[TLE_VNUM]; /* bind and listen hash table */
+
uint32_t nb_dev;
- struct tle_pbm use[TLE_VNUM]; /* all ports in use. */
+ rte_spinlock_t dev_lock;
+ struct tle_psm use[TLE_VNUM]; /* all ports in use. */
struct tle_dev dev[RTE_MAX_ETHPORTS];
};
struct stream_ops {
int (*init_streams)(struct tle_ctx *);
+ uint32_t (*more_streams)(struct tle_ctx *);
void (*fini_streams)(struct tle_ctx *);
void (*free_drbs)(struct tle_stream *, struct tle_drb *[], uint32_t);
};
@@ -77,6 +76,27 @@ int stream_fill_ctx(struct tle_ctx *ctx, struct tle_stream *s,
int stream_clear_ctx(struct tle_ctx *ctx, struct tle_stream *s);
+static inline void
+fill_ipv4_am(const struct sockaddr_in *in, uint32_t *addr, uint32_t *mask)
+{
+ *addr = in->sin_addr.s_addr;
+ *mask = (*addr == INADDR_ANY) ? INADDR_ANY : INADDR_NONE;
+}
+
+static inline void
+fill_ipv6_am(const struct sockaddr_in6 *in, rte_xmm_t *addr, rte_xmm_t *mask)
+{
+ const struct in6_addr *pm;
+
+ memcpy(addr, &in->sin6_addr, sizeof(*addr));
+ if (IN6_IS_ADDR_UNSPECIFIED(addr))
+ pm = &tle_ipv6_any;
+ else
+ pm = &tle_ipv6_none;
+
+ memcpy(mask, pm, sizeof(*mask));
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/libtle_l4p/misc.h b/lib/libtle_l4p/misc.h
index 327296f..d39e5a1 100644
--- a/lib/libtle_l4p/misc.h
+++ b/lib/libtle_l4p/misc.h
@@ -16,12 +16,34 @@
#ifndef _MISC_H_
#define _MISC_H_
+#include <tle_stats.h>
#include <tle_dpdk_wrapper.h>
#ifdef __cplusplus
extern "C" {
#endif
+union typflg {
+ uint16_t raw;
+ struct {
+ uint8_t type; /* TLE_V4/TLE_V6 */
+ uint8_t flags; /* TCP header flags */
+ };
+};
+
+union pkt_info {
+ rte_xmm_t raw;
+ struct {
+ union typflg tf;
+ uint16_t csf; /* checksum flags */
+ union l4_ports port;
+ union {
+ union ipv4_addrs addr4;
+ const union ipv6_addrs *addr6;
+ };
+ };
+};
+
static inline int
xmm_cmp(const rte_xmm_t *da, const rte_xmm_t *sa)
{
@@ -286,43 +308,41 @@ _ipv4x_cksum(const void *iph, size_t len)
return (cksum == 0xffff) ? cksum : ~cksum;
}
-/*
- * helper function to check csum.
- */
static inline int
-check_pkt_csum(const struct rte_mbuf *m, uint64_t ol_flags, uint32_t type,
- uint32_t proto)
+check_pkt_csum(const struct rte_mbuf *m, uint32_t type, uint32_t proto)
{
const struct ipv4_hdr *l3h4;
const struct ipv6_hdr *l3h6;
const struct udp_hdr *l4h;
- uint64_t fl3, fl4;
- uint16_t csum;
int32_t ret;
-
- fl4 = ol_flags & PKT_RX_L4_CKSUM_MASK;
- fl3 = (type == TLE_V4) ?
- (ol_flags & PKT_RX_IP_CKSUM_MASK) : PKT_RX_IP_CKSUM_GOOD;
+ uint16_t csum;
+ uint64_t ol_flags = m->ol_flags;
/* case 0: both ip and l4 cksum is verified or data is valid */
- if ((fl3 | fl4) == (PKT_RX_IP_CKSUM_GOOD | PKT_RX_L4_CKSUM_GOOD))
+ if ((ol_flags & PKT_RX_IP_CKSUM_GOOD) &&
+ (ol_flags & PKT_RX_L4_CKSUM_GOOD))
return 0;
/* case 1: either ip or l4 cksum bad */
- if (fl3 == PKT_RX_IP_CKSUM_BAD || fl4 == PKT_RX_L4_CKSUM_BAD)
+ if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD)
+ return 1;
+
+ if ((ol_flags & PKT_RX_L4_CKSUM_MASK) == PKT_RX_L4_CKSUM_BAD)
return 1;
/* case 2: either ip or l4 or both cksum is unknown */
+ ret = 0;
l3h4 = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, m->l2_len);
l3h6 = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, m->l2_len);
- ret = 0;
- if (fl3 == PKT_RX_IP_CKSUM_UNKNOWN && l3h4->hdr_checksum != 0) {
+ if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_UNKNOWN &&
+ l3h4->hdr_checksum != 0) {
csum = _ipv4x_cksum(l3h4, m->l3_len);
ret = (csum != UINT16_MAX);
}
- if (ret == 0 && fl4 == PKT_RX_L4_CKSUM_UNKNOWN) {
+ if (ret == 0 && (ol_flags & PKT_RX_L4_CKSUM_MASK) ==
+ PKT_RX_L4_CKSUM_UNKNOWN) {
/*
* for IPv4 it is allowed to have zero UDP cksum,
@@ -376,8 +396,20 @@ rwl_acquire(rte_atomic32_t *p)
static inline void
rwl_down(rte_atomic32_t *p)
{
- while (rte_atomic32_cmpset((volatile uint32_t *)p, 0, INT32_MIN) == 0)
+ while (rte_atomic32_cmpset((volatile uint32_t *)p, 0, INT32_MIN) == 0)
+ rte_pause();
+}
+
+static inline int
+rwl_try_down(rte_atomic32_t *p)
+{
+ while (rte_atomic32_cmpset((volatile uint32_t *)p, 0, INT32_MIN) == 0) {
+ /* Already down */
+ if (rte_atomic32_read(p) == INT32_MIN)
+ return -1;
rte_pause();
+ }
+ return 0;
}
static inline void
diff --git a/lib/libtle_l4p/net_misc.h b/lib/libtle_l4p/net_misc.h
index 2d8dac2..c1d946b 100644
--- a/lib/libtle_l4p/net_misc.h
+++ b/lib/libtle_l4p/net_misc.h
@@ -16,6 +16,7 @@
#ifndef _NET_MISC_H_
#define _NET_MISC_H_
+#include <stdbool.h>
#include <rte_ip.h>
#include <rte_udp.h>
#include "osdep.h"
@@ -71,6 +72,26 @@ union ip_addrs {
union ipv6_addrs v6;
};
+static inline bool
+is_empty_addr(const struct sockaddr *addr)
+{
+ bool any = false;
+ const struct sockaddr_in *in4;
+ const struct sockaddr_in6 *in6;
+
+ if (addr->sa_family == AF_INET) {
+ in4 = (const struct sockaddr_in *)addr;
+ if (in4->sin_addr.s_addr == INADDR_ANY)
+ any = true;
+ } else if (addr->sa_family == AF_INET6) {
+ in6 = (const struct sockaddr_in6 *)addr;
+ if (IN6_IS_ADDR_UNSPECIFIED(&in6->sin6_addr))
+ any = true;
+ }
+
+ return any;
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/libtle_l4p/port_statmap.h b/lib/libtle_l4p/port_statmap.h
new file mode 100644
index 0000000..8bbb0ba
--- /dev/null
+++ b/lib/libtle_l4p/port_statmap.h
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2019 Ant Financial Services Group.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _PORT_STATMAP_H_
+#define _PORT_STATMAP_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define MAX_PORT_NUM (UINT16_MAX + 1)
+#define ALLOC_PORT_START 0x8000
+
+struct tle_psm {
+ uint32_t nb_used; /* Number of ports already in use. */
+ uint32_t next_alloc; /* Next port to try allocate. */
+ uint8_t stat[MAX_PORT_NUM]; /* Status of the port:
+ * 1) the most significant bit indicates
+ * if SO_REUSEPORT is allowed;
+ * 2) lowest 7 bits indicate # of streams
+ * using the port.
+ */
+};
+
+static inline void
+tle_psm_init(struct tle_psm *psm)
+{
+ memset(psm, 0, sizeof(struct tle_psm));
+ psm->next_alloc = ALLOC_PORT_START;
+}
+
+static inline int
+tle_psm_set(struct tle_psm *psm, uint16_t port, uint8_t reuseport)
+{
+ if (psm->stat[port] == 0) {
+ /* port has not been used */
+ psm->stat[port]++;
+ if (reuseport)
+ psm->stat[port] |= 0x80;
+ } else {
+ /* port is used by some socket */
+ if (reuseport && (psm->stat[port] & 0x80)) {
+ /* all sockets set reuseport */
+ psm->stat[port]++;
+ } else
+ return -1;
+ }
+
+ return 0;
+}
+
+static inline void
+tle_psm_clear(struct tle_psm *psm, uint16_t port)
+{
+ psm->stat[port]--;
+ if ((psm->stat[port] & 0x7f) == 0)
+ psm->stat[port] = 0;
+}
+
+
+static inline uint8_t
+tle_psm_check(const struct tle_psm *psm, uint16_t port)
+{
+ return psm->stat[port];
+}
+
+static inline uint16_t
+tle_psm_alloc_port(struct tle_psm *psm)
+{
+ uint32_t i = psm->next_alloc;
+
+ for (; i < MAX_PORT_NUM; i++) {
+ if (psm->stat[i] == 0) {
+ psm->next_alloc = i + 1;
+ return (uint16_t)i;
+ }
+ }
+
+ for (i = ALLOC_PORT_START; i < psm->next_alloc; i++) {
+ if (psm->stat[i] == 0) {
+ psm->next_alloc = i + 1;
+ return (uint16_t)i;
+ }
+ }
+
+ return 0;
+}
+
+static inline uint16_t
+tle_psm_alloc_dual_port(struct tle_psm *psm4, struct tle_psm *psm6)
+{
+ uint32_t i = psm6->next_alloc;
+
+ for (; i < MAX_PORT_NUM; i++) {
+ if (psm6->stat[i] == 0 && psm4->stat[i] == 0) {
+ psm6->next_alloc = i + 1;
+ return (uint16_t)i;
+ }
+ }
+
+ for (i = ALLOC_PORT_START; i < psm6->next_alloc; i++) {
+ if (psm6->stat[i] == 0 && psm4->stat[i] == 0) {
+ psm6->next_alloc = i + 1;
+ return (uint16_t)i;
+ }
+ }
+
+ return 0;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _PORT_STATMAP_H_ */
diff --git a/lib/libtle_l4p/stream.h b/lib/libtle_l4p/stream.h
index 49a2809..9f2bbc1 100644
--- a/lib/libtle_l4p/stream.h
+++ b/lib/libtle_l4p/stream.h
@@ -31,7 +31,11 @@ struct tle_stream {
STAILQ_ENTRY(tle_stream) link;
struct tle_ctx *ctx;
- uint8_t type; /* TLE_V4 or TLE_V6 */
+ tle_stream_options_t option;
+ unsigned long timestamp;
+ uint16_t reuseport_seed;
+ uint8_t type; /* TLE_V4 or TLE_V6 */
+ uint8_t padding;
/* Stream address information. */
union l4_ports port;
@@ -53,15 +57,25 @@ static inline uint32_t
get_streams(struct tle_ctx *ctx, struct tle_stream *s[], uint32_t num)
{
struct tle_stream *p;
- uint32_t i, n;
+ uint32_t i, n, inc;
rte_spinlock_lock(&ctx->streams.lock);
- n = RTE_MIN(ctx->streams.nb_free, num);
- for (i = 0, p = STAILQ_FIRST(&ctx->streams.free);
- i != n;
- i++, p = STAILQ_NEXT(p, link))
+ n = ctx->streams.nb_free;
+ if (n < num) {
+ inc = tle_stream_ops[ctx->prm.proto].more_streams(ctx);
+ ctx->streams.nb_free += inc;
+ ctx->streams.nb_cur += inc;
+ n = ctx->streams.nb_free;
+ }
+ n = RTE_MIN(n, num);
+
+ for (i = 0, p = STAILQ_FIRST(&ctx->streams.free); i != n; ) {
s[i] = p;
+ p = STAILQ_NEXT(p, link);
+ s[i]->link.stqe_next = NULL;
+ i++;
+ }
if (p == NULL)
/* we retrieved all free entries */
@@ -80,9 +94,6 @@ get_stream(struct tle_ctx *ctx)
struct tle_stream *s;
s = NULL;
- if (ctx->streams.nb_free == 0)
- return s;
-
get_streams(ctx, &s, 1);
return s;
}
@@ -120,8 +131,8 @@ drb_nb_elem(const struct tle_ctx *ctx)
}
static inline int32_t
-stream_get_dest(struct tle_stream *s, const void *dst_addr,
- struct tle_dest *dst)
+stream_get_dest(uint8_t type, struct tle_stream *s, const void *src_addr,
+ const void *dst_addr, struct tle_dest *dst)
{
int32_t rc;
const struct in_addr *d4;
@@ -133,12 +144,13 @@ stream_get_dest(struct tle_stream *s, const void *dst_addr,
/* it is here just to keep gcc happy. */
d4 = NULL;
+ /* it is here just to keep gcc happy. */
d6 = NULL;
- if (s->type == TLE_V4) {
+ if (type == TLE_V4) {
d4 = dst_addr;
rc = ctx->prm.lookup4(ctx->prm.lookup4_data, d4, dst);
- } else if (s->type == TLE_V6) {
+ } else if (type == TLE_V6) {
d6 = dst_addr;
rc = ctx->prm.lookup6(ctx->prm.lookup6_data, d6, dst);
} else
@@ -148,18 +160,25 @@ stream_get_dest(struct tle_stream *s, const void *dst_addr,
return -ENOENT;
dev = dst->dev;
- dst->ol_flags = dev->tx.ol_flags[s->type];
+ dst->ol_flags = dev->tx.ol_flags[type];
- if (s->type == TLE_V4) {
+ if (type == TLE_V4) {
struct ipv4_hdr *l3h;
l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len);
- l3h->src_addr = dev->prm.local_addr4.s_addr;
+ if (((const struct in_addr*)src_addr)->s_addr != INADDR_ANY)
+ l3h->src_addr = ((const struct in_addr*)src_addr)->s_addr;
+ else
+ l3h->src_addr = dev->prm.local_addr4.s_addr;
l3h->dst_addr = d4->s_addr;
} else {
struct ipv6_hdr *l3h;
l3h = (struct ipv6_hdr *)(dst->hdr + dst->l2_len);
- rte_memcpy(l3h->src_addr, &dev->prm.local_addr6,
- sizeof(l3h->src_addr));
+ if (!IN6_IS_ADDR_UNSPECIFIED(src_addr))
+ rte_memcpy(l3h->src_addr, src_addr,
+ sizeof(l3h->src_addr));
+ else
+ rte_memcpy(l3h->src_addr, &dev->prm.local_addr6,
+ sizeof(l3h->src_addr));
rte_memcpy(l3h->dst_addr, d6, sizeof(l3h->dst_addr));
}
diff --git a/lib/libtle_l4p/stream_table.c b/lib/libtle_l4p/stream_table.c
index 5a89553..e029306 100644
--- a/lib/libtle_l4p/stream_table.c
+++ b/lib/libtle_l4p/stream_table.c
@@ -13,68 +13,47 @@
* limitations under the License.
*/
#include <string.h>
-#include <rte_malloc.h>
#include <rte_errno.h>
#include "stream_table.h"
void
-stbl_fini(struct stbl *st)
+bhash_fini(struct tle_ctx *ctx)
{
uint32_t i;
- for (i = 0; i != RTE_DIM(st->ht); i++) {
- rte_hash_free(st->ht[i].t);
- rte_free(st->ht[i].ent);
- }
-
- memset(st, 0, sizeof(*st));
+ for (i = 0; i != RTE_DIM(ctx->bhash); i++)
+ rte_hash_free(ctx->bhash[i]);
}
int
-stbl_init(struct stbl *st, uint32_t num, int32_t socket)
+bhash_init(struct tle_ctx *ctx)
{
- int32_t rc;
- size_t i, sz;
- struct rte_hash_parameters hprm;
+ int rc = 0;
+ struct rte_hash_parameters hprm = {0};
+ bool ipv6 = ctx->prm.lookup6 != NULL;
char buf[RTE_HASH_NAMESIZE];
- num = RTE_MAX(5 * num / 4, 0x10U);
-
- memset(&hprm, 0, sizeof(hprm));
hprm.name = buf;
- hprm.entries = num;
- hprm.socket_id = socket;
-
- rc = 0;
-
- snprintf(buf, sizeof(buf), "stbl4@%p", st);
- hprm.key_len = sizeof(struct stbl4_key);
- st->ht[TLE_V4].t = rte_hash_create(&hprm);
- if (st->ht[TLE_V4].t == NULL)
+ hprm.entries = 4096;
+ hprm.extra_flag = RTE_HASH_EXTRA_FLAGS_EXT_TABLE;
+ hprm.socket_id = ctx->prm.socket_id;
+
+ snprintf(buf, sizeof(buf), "bhash4@%p", ctx);
+ hprm.key_len = sizeof(struct bhash4_key);
+ ctx->bhash[TLE_V4] = rte_hash_create(&hprm);
+ if (ctx->bhash[TLE_V4] == NULL)
rc = (rte_errno != 0) ? -rte_errno : -ENOMEM;
- if (rc == 0) {
- snprintf(buf, sizeof(buf), "stbl6@%p", st);
- hprm.key_len = sizeof(struct stbl6_key);
- st->ht[TLE_V6].t = rte_hash_create(&hprm);
- if (st->ht[TLE_V6].t == NULL)
+ if (rc == 0 && ipv6) {
+ snprintf(buf, sizeof(buf), "bhash6@%p", ctx);
+ hprm.key_len = sizeof(struct bhash6_key);
+ ctx->bhash[TLE_V6] = rte_hash_create(&hprm);
+ if (ctx->bhash[TLE_V6] == NULL) {
+ rte_hash_free(ctx->bhash[TLE_V4]);
rc = (rte_errno != 0) ? -rte_errno : -ENOMEM;
+ }
}
- for (i = 0; i != RTE_DIM(st->ht) && rc == 0; i++) {
-
- sz = sizeof(*st->ht[i].ent) * num;
- st->ht[i].ent = rte_zmalloc_socket(NULL, sz,
- RTE_CACHE_LINE_SIZE, socket);
- if (st->ht[i].ent == NULL)
- rc = -ENOMEM;
- else
- st->ht[i].nb_ent = num;
- }
-
- if (rc != 0)
- stbl_fini(st);
-
return rc;
}
diff --git a/lib/libtle_l4p/stream_table.h b/lib/libtle_l4p/stream_table.h
index 033c306..ba8d165 100644
--- a/lib/libtle_l4p/stream_table.h
+++ b/lib/libtle_l4p/stream_table.h
@@ -16,199 +16,415 @@
#ifndef _STREAM_TABLE_H_
#define _STREAM_TABLE_H_
+#include <string.h>
#include <rte_hash.h>
-#include "tcp_misc.h"
+#include "stream.h"
+#include "misc.h"
#ifdef __cplusplus
extern "C" {
#endif
+#define HASH_SIZE_32K 32771
+#define HASH_SIZE_64K 65537
+#define HASH_SIZE_128K 131071
+
+#define HASH_SIZE HASH_SIZE_64K
+
struct stbl_entry {
void *data;
};
-struct shtbl {
- uint32_t nb_ent; /* max number of entries in the table. */
- rte_spinlock_t l; /* lock to protect the hash table */
- struct rte_hash *t;
- struct stbl_entry *ent;
+struct stbl {
+ rte_spinlock_t l;
+ uint32_t need_lock;
+ struct stbl_entry head[HASH_SIZE];
} __rte_cache_aligned;
-struct stbl {
- struct shtbl ht[TLE_VNUM];
-};
+static inline int
+stbl_init(struct stbl *st, uint32_t lock)
+{
+ st->need_lock = lock;
+ return 0;
+}
-struct stbl4_key {
- union l4_ports port;
- union ipv4_addrs addr;
-} __attribute__((__packed__));
+static inline int
+stbl_fini(struct stbl *st)
+{
+ st->need_lock = 0;
+ return 0;
+}
-struct stbl6_key {
- union l4_ports port;
- union ipv6_addrs addr;
-} __attribute__((__packed__));
+static inline uint8_t
+compare_pkt(const struct tle_stream *s, const union pkt_info *pi)
+{
+ if (s->type != pi->tf.type)
+ return -1;
-struct stbl_key {
- union l4_ports port;
- union {
- union ipv4_addrs addr4;
- union ipv6_addrs addr6;
- };
-} __attribute__((__packed__));
+ if (s->port.raw != pi->port.raw)
+ return -1;
-extern void stbl_fini(struct stbl *st);
+ if (s->type == TLE_V4) {
+ if (s->ipv4.addr.raw != pi->addr4.raw)
+ return -1;
+ } else {
+ if (memcmp(&s->ipv6.addr, pi->addr6, sizeof(union ipv6_addrs)))
+ return -1;
+ }
-extern int stbl_init(struct stbl *st, uint32_t num, int32_t socket);
+ return 0;
+}
-static inline void
-stbl_pkt_fill_key(struct stbl_key *k, const union pkt_info *pi, uint32_t type)
+static inline uint32_t
+stbl_hash_stream(const struct tle_stream *s)
{
- static const struct stbl_key zero = {
- .port.raw = 0,
- };
-
- k->port = pi->port;
- if (type == TLE_V4)
- k->addr4 = pi->addr4;
- else if (type == TLE_V6)
- k->addr6 = *pi->addr6;
- else
- *k = zero;
+ int i;
+ unsigned int hash;
+
+ if (s->type == TLE_V4) {
+ hash = s->ipv4.addr.src ^ s->ipv4.addr.dst
+ ^ s->port.src ^ s->port.dst;
+ } else {
+ hash = s->port.src ^ s->port.dst;
+ for (i = 0; i < 4; i++) {
+ hash ^= s->ipv6.addr.src.u32[i];
+ hash ^= s->ipv6.addr.dst.u32[i];
+ }
+ }
+
+ return hash % HASH_SIZE;
}
-static inline void
-stbl_lock(struct stbl *st, uint32_t type)
+static inline uint32_t
+stbl_hash_pkt(const union pkt_info* pi)
{
- rte_spinlock_lock(&st->ht[type].l);
+ int i;
+ unsigned int hash;
+
+ if (pi->tf.type == TLE_V4) {
+ hash = pi->addr4.src ^ pi->addr4.dst ^ pi->port.src ^ pi->port.dst;
+ } else {
+ hash = pi->port.src ^ pi->port.dst;
+ for (i = 0; i < 4; i++) {
+ hash ^= pi->addr6->src.u32[i];
+ hash ^= pi->addr6->dst.u32[i];
+ }
+ }
+
+ return hash % HASH_SIZE;
}
-static inline void
-stbl_unlock(struct stbl *st, uint32_t type)
+static inline struct stbl_entry*
+stbl_add_stream(struct stbl *st, struct tle_stream *s)
{
- rte_spinlock_unlock(&st->ht[type].l);
+ struct stbl_entry* entry;
+
+ if (st->need_lock)
+ rte_spinlock_lock(&st->l);
+ entry = &st->head[stbl_hash_stream(s)];
+ s->link.stqe_next = (struct tle_stream*)entry->data;
+ entry->data = s;
+ if (st->need_lock)
+ rte_spinlock_unlock(&st->l);
+
+ return entry;
}
-static inline struct stbl_entry *
-stbl_add_entry(struct stbl *st, const union pkt_info *pi)
+static inline struct tle_stream *
+stbl_find_stream(struct stbl *st, const union pkt_info *pi)
{
- int32_t rc;
- uint32_t type;
- struct shtbl *ht;
- struct stbl_key k;
-
- type = pi->tf.type;
- stbl_pkt_fill_key(&k, pi, type);
- ht = st->ht + type;
-
- rc = rte_hash_add_key(ht->t, &k);
- if ((uint32_t)rc >= ht->nb_ent)
- return NULL;
- return ht->ent + rc;
+ struct tle_stream* head;
+
+ if (st->need_lock)
+ rte_spinlock_lock(&st->l);
+ head = (struct tle_stream*)st->head[stbl_hash_pkt(pi)].data;
+ while (head != NULL) {
+ if (compare_pkt(head, pi) == 0)
+ break;
+
+ head = head->link.stqe_next;
+ }
+ if (st->need_lock)
+ rte_spinlock_unlock(&st->l);
+ return head;
}
-static inline struct stbl_entry *
-stbl_add_stream(struct stbl *st, const union pkt_info *pi, const void *s)
+static inline void
+stbl_del_stream(struct stbl *st, struct stbl_entry *se,
+ struct tle_stream *s)
{
- struct stbl_entry *se;
+ struct tle_stream *prev, *current;
- se = stbl_add_entry(st, pi);
- if (se != NULL)
- se->data = (void *)(uintptr_t)s;
- return se;
+ if (st->need_lock)
+ rte_spinlock_lock(&st->l);
+ if (se == NULL)
+ se = &st->head[stbl_hash_stream(s)];
+ prev = NULL;
+ current = (struct tle_stream*)se->data;
+ while (current != NULL) {
+ if (current != s) {
+ prev = current;
+ current = current->link.stqe_next;
+ continue;
+ }
+
+ if (prev)
+ prev->link.stqe_next = current->link.stqe_next;
+ else
+ se->data = current->link.stqe_next;
+ break;
+ }
+ if (st->need_lock)
+ rte_spinlock_unlock(&st->l);
+
+ s->link.stqe_next = NULL;
}
-static inline struct stbl_entry *
-stbl_find_entry(struct stbl *st, const union pkt_info *pi)
+struct bhash4_key {
+ uint16_t port;
+ uint32_t addr;
+} __attribute__((__packed__));
+
+struct bhash6_key {
+ uint16_t port;
+ rte_xmm_t addr;
+} __attribute__((__packed__));
+
+struct bhash_key {
+ uint16_t port;
+ union {
+ uint32_t addr4;
+ rte_xmm_t addr6;
+ };
+} __attribute__((__packed__));
+
+void bhash_fini(struct tle_ctx *ctx);
+
+int bhash_init(struct tle_ctx *ctx);
+
+static inline int
+bhash_sockaddr2key(const struct sockaddr *addr, struct bhash_key *key)
{
- int32_t rc;
- uint32_t type;
- struct shtbl *ht;
- struct stbl_key k;
-
- type = pi->tf.type;
- stbl_pkt_fill_key(&k, pi, type);
- ht = st->ht + type;
-
- rc = rte_hash_lookup(ht->t, &k);
- if ((uint32_t)rc >= ht->nb_ent)
- return NULL;
- return ht->ent + rc;
+ int t;
+ const struct sockaddr_in *lin4;
+ const struct sockaddr_in6 *lin6;
+
+ if (addr->sa_family == AF_INET) {
+ lin4 = (const struct sockaddr_in *)addr;
+ key->port = lin4->sin_port;
+ key->addr4 = lin4->sin_addr.s_addr;
+ t = TLE_V4;
+ } else {
+ lin6 = (const struct sockaddr_in6 *)addr;
+ memcpy(&key->addr6, &lin6->sin6_addr, sizeof(key->addr6));
+ key->port = lin6->sin6_port;
+ t = TLE_V6;
+ }
+
+ return t;
}
-static inline void *
-stbl_find_data(struct stbl *st, const union pkt_info *pi)
+/* Return 0 on success;
+ * Return errno on failure.
+ */
+static inline int
+bhash_add_entry(struct tle_ctx *ctx, const struct sockaddr *addr,
+ struct tle_stream *s)
{
- struct stbl_entry *ent;
-
- ent = stbl_find_entry(st, pi);
- return (ent == NULL) ? NULL : ent->data;
+ int t;
+ int rc;
+ int is_first;
+ struct bhash_key key;
+ struct rte_hash *bhash;
+ struct tle_stream *old, *tmp;
+
+ is_first = 0;
+ t = bhash_sockaddr2key(addr, &key);
+
+ rte_spinlock_lock(&ctx->bhash_lock[t]);
+ bhash = ctx->bhash[t];
+ rc = rte_hash_lookup_data(bhash, &key, (void **)&old);
+ if (rc == -ENOENT) {
+ is_first = 1;
+ s->link.stqe_next = NULL; /* just to avoid follow */
+ rc = rte_hash_add_key_data(bhash, &key, s);
+ } else if (rc >= 0) {
+ if (t == TLE_V4 && old->type == TLE_V6) {
+ /* V6 stream may listen V4 address, assure V4 stream
+ * is ahead of V6 stream in the list
+ */
+ s->link.stqe_next = old;
+ rte_hash_add_key_data(bhash, &key, s);
+ } else {
+ tmp = old->link.stqe_next;
+ old->link.stqe_next = s;
+ s->link.stqe_next = tmp;
+ }
+ }
+ rte_spinlock_unlock(&ctx->bhash_lock[t]);
+
+ /* IPv6 socket with unspecified address could receive IPv4 packets.
+ * So the stream should also be recorded in IPv4 table.
+ * Only the first stream need be inserted into V4 list, otherwise
+ * the V6 list is already following V4 list.
+ */
+ if (t == TLE_V6 && !s->option.ipv6only && is_first &&
+ IN6_IS_ADDR_UNSPECIFIED(&key.addr6)) {
+ t = TLE_V4;
+ rte_spinlock_lock(&ctx->bhash_lock[t]);
+ bhash = ctx->bhash[t];
+ rc = rte_hash_lookup_data(bhash, &key, (void **)&old);
+ if (rc == -ENOENT)
+ rc = rte_hash_add_key_data(bhash, &key, s);
+ else if (rc >= 0) {
+ while(old->link.stqe_next != NULL)
+ old = old->link.stqe_next;
+ old->link.stqe_next = s;
+ s->link.stqe_next = NULL;
+ }
+ rte_spinlock_unlock(&ctx->bhash_lock[t]);
+ }
+
+ return (rc >= 0) ? 0 : (-rc);
}
-#include "tcp_stream.h"
-
static inline void
-stbl_stream_fill_key(struct stbl_key *k, const struct tle_stream *s,
- uint32_t type)
+bhash_del_entry(struct tle_ctx *ctx, struct tle_stream *s,
+ const struct sockaddr *addr)
{
- static const struct stbl_key zero = {
- .port.raw = 0,
- };
+ int t;
+ int rc;
+ struct bhash_key key;
+ struct tle_stream *f, *cur, *pre = NULL;
+
+ t = bhash_sockaddr2key(addr, &key);
+
+ rte_spinlock_lock(&ctx->bhash_lock[t]);
+ rc = rte_hash_lookup_data(ctx->bhash[t], &key, (void **)&f);
+ if (rc >= 0) {
+ cur = f;
+ pre = NULL;
+ while (cur != s) {
+ pre = cur;
+ cur = cur->link.stqe_next;
+ }
+
+ if (pre == NULL) {
+ cur = cur->link.stqe_next;
+ if (cur == NULL)
+ rte_hash_del_key(ctx->bhash[t], &key);
+ else /* change data */
+ rte_hash_add_key_data(ctx->bhash[t], &key, cur);
+ } else
+ pre->link.stqe_next = cur->link.stqe_next;
+ }
+
+ rte_spinlock_unlock(&ctx->bhash_lock[t]);
+
+ if (rc < 0)
+ return;
+
+ s->link.stqe_next = NULL;
+
+ /* IPv6 socket with unspecified address could receive IPv4 packets.
+ * So the stream should also be recorded in IPv4 table*/
+ if (t == TLE_V6 && !s->option.ipv6only && pre == NULL &&
+ IN6_IS_ADDR_UNSPECIFIED(&key.addr6)) {
+ t = TLE_V4;
+ rte_spinlock_lock(&ctx->bhash_lock[t]);
+ rc = rte_hash_lookup_data(ctx->bhash[t], &key, (void **)&f);
+ if (rc >= 0) {
+ cur = f;
+ pre = NULL;
+ while (cur != s) {
+ pre = cur;
+ cur = cur->link.stqe_next;
+ }
+
+ if (pre == NULL) {
+ cur = cur->link.stqe_next;
+ if (cur == NULL)
+ rte_hash_del_key(ctx->bhash[t], &key);
+ else /* change data */
+ rte_hash_add_key_data(ctx->bhash[t], &key, cur);
+ } else
+ pre->link.stqe_next = cur->link.stqe_next;
+ }
+
+ rte_spinlock_unlock(&ctx->bhash_lock[t]);
+ }
- k->port = s->port;
- if (type == TLE_V4)
- k->addr4 = s->ipv4.addr;
- else if (type == TLE_V6)
- k->addr6 = s->ipv6.addr;
- else
- *k = zero;
}
-static inline struct stbl_entry *
-stbl_add_stream_lock(struct stbl *st, const struct tle_tcp_stream *s)
+static inline void *
+bhash_reuseport_get_stream(struct tle_stream *s)
{
- uint32_t type;
- struct stbl_key k;
- struct stbl_entry *se;
- struct shtbl *ht;
- int32_t rc;
-
- type = s->s.type;
- stbl_stream_fill_key(&k, &s->s, type);
- ht = st->ht + type;
+ int n = 0;
+ struct tle_stream *e, *all[32];
+
+ e = s;
+ while(e && n < 32) {
+ all[n++] = e;
+ e = e->link.stqe_next;
+ }
+
+ /* for each connection, this function will be called twice
+ * 1st time for the first handshake: SYN
+ * 2nd time for the third handshake: ACK
+ */
+ return all[(s->reuseport_seed++) % n];
+}
- stbl_lock(st, type);
- rc = rte_hash_add_key(ht->t, &k);
- stbl_unlock(st, type);
+static inline void *
+bhash_lookup4(struct rte_hash *t, uint32_t addr, uint16_t port, uint8_t reuse)
+{
+ int rc;
+ void *s = NULL;
+ struct bhash_key key = {
+ .port = port,
+ .addr4 = addr,
+ };
- if ((uint32_t)rc >= ht->nb_ent)
- return NULL;
+ rc = rte_hash_lookup_data(t, &key, &s);
+ if (rc == -ENOENT) {
+ key.addr4 = INADDR_ANY;
+ rc = rte_hash_lookup_data(t, &key, &s);
+ }
- se = ht->ent + rc;
- if (se != NULL)
- se->data = (void *)(uintptr_t)s;
+ if (rc >= 0) {
+ if (reuse)
+ return bhash_reuseport_get_stream(s);
+ else
+ return s;
+ }
- return se;
+ return NULL;
}
-static inline void
-stbl_del_stream(struct stbl *st, struct stbl_entry *se,
- const struct tle_tcp_stream *s, uint32_t lock)
+static inline void *
+bhash_lookup6(struct rte_hash *t, rte_xmm_t addr, uint16_t port, uint8_t reuse)
{
- uint32_t type;
- struct stbl_key k;
+ int rc;
+ void *s = NULL;
+ struct bhash_key key = {
+ .port = port,
+ .addr6 = addr,
+ };
- if (se == NULL)
- return;
+ rc = rte_hash_lookup_data(t, &key, &s);
+ if (rc == -ENOENT) {
+ memcpy(&key.addr6, &tle_ipv6_any, sizeof(key.addr6));
+ rc = rte_hash_lookup_data(t, &key, &s);
+ }
- se->data = NULL;
+ if (rc >= 0) {
+ if (reuse)
+ return bhash_reuseport_get_stream(s);
+ else
+ return s;
+ }
- type = s->s.type;
- stbl_stream_fill_key(&k, &s->s, type);
- if (lock != 0)
- stbl_lock(st, type);
- rte_hash_del_key(st->ht[type].t, &k);
- if (lock != 0)
- stbl_unlock(st, type);
+ return NULL;
}
#ifdef __cplusplus
diff --git a/lib/libtle_l4p/syncookie.h b/lib/libtle_l4p/syncookie.h
index 61bfce4..bf01e78 100644
--- a/lib/libtle_l4p/syncookie.h
+++ b/lib/libtle_l4p/syncookie.h
@@ -182,9 +182,12 @@ sync_fill_tcb(struct tcb *tcb, const union seg_info *si, const union tsopt *to)
{
uint32_t ack, mss, seq, wscale;
+ tcb->err = 0;
+
seq = si->seq;
tcb->rcv.nxt = seq;
+ tcb->rcv.cpy = seq;
tcb->rcv.irs = seq - 1;
tcb->snd.wu.wl1 = seq;
@@ -202,6 +205,7 @@ sync_fill_tcb(struct tcb *tcb, const union seg_info *si, const union tsopt *to)
tcb->so.mss = mss;
tcb->snd.ts = to->ecr;
+ tcb->snd.cork_ts = 0;
tcb->rcv.ts = to->val;
tcb->so.ts.raw = to->raw;
diff --git a/lib/libtle_l4p/tcp_ctl.h b/lib/libtle_l4p/tcp_ctl.h
index bec1e76..3196470 100644
--- a/lib/libtle_l4p/tcp_ctl.h
+++ b/lib/libtle_l4p/tcp_ctl.h
@@ -22,6 +22,7 @@
#include "tcp_stream.h"
#include "tcp_ofo.h"
+#include "tcp_timer.h"
#ifdef __cplusplus
extern "C" {
@@ -97,10 +98,10 @@ calc_rx_wnd(const struct tle_tcp_stream *s, uint32_t scale)
/* peer doesn't support WSCALE option, wnd size is limited to 64K */
if (scale == TCP_WSCALE_NONE) {
- wnd = _rte_ring_get_mask(s->rx.q) << TCP_WSCALE_DEFAULT;
+ wnd = rte_ring_free_count(s->rx.q) << TCP_WSCALE_DEFAULT;
return RTE_MIN(wnd, (uint32_t)UINT16_MAX);
} else
- return _rte_ring_get_mask(s->rx.q) << scale;
+ return rte_ring_free_count(s->rx.q) << scale;
}
/* empty stream's send queue */
@@ -144,31 +145,34 @@ static inline void
tcp_stream_reset(struct tle_ctx *ctx, struct tle_tcp_stream *s)
{
struct stbl *st;
- uint16_t uop;
+ uint16_t state;
+ uint8_t i;
st = CTX_TCP_STLB(ctx);
- /* reset TX armed */
- rte_atomic32_set(&s->tx.arm, 0);
+ for (i = 0; i < TIMER_NUM; i++)
+ timer_stop(s, i);
/* reset TCB */
- uop = s->tcb.uop & ~TCP_OP_CLOSE;
+ state = s->tcb.state;
memset(&s->tcb, 0, sizeof(s->tcb));
/* reset cached destination */
memset(&s->tx.dst, 0, sizeof(s->tx.dst));
- if (uop != TCP_OP_ACCEPT) {
+ /* state could be ESTABLISHED, CLOSED or LISTEN
+ * stream in CLOSED state has already been cleared by stream_term
+ * stream in ESTABLISHED state is accepted stream, and doesn't need clear
+ */
+ if (state == TCP_ST_LISTEN) {
/* free stream's destination port */
stream_clear_ctx(ctx, &s->s);
- if (uop == TCP_OP_LISTEN)
- empty_lq(s);
+ empty_lq(s);
}
if (s->ste != NULL) {
/* remove entry from RX streams table */
- stbl_del_stream(st, s->ste, s,
- (s->flags & TLE_CTX_FLAG_ST) == 0);
+ stbl_del_stream(st, s->ste, &s->s);
s->ste = NULL;
empty_rq(s);
}
@@ -184,6 +188,48 @@ tcp_stream_reset(struct tle_ctx *ctx, struct tle_tcp_stream *s)
put_stream(ctx, &s->s, TCP_STREAM_TX_FINISHED(s));
}
+static inline void
+stream_term(struct tle_tcp_stream *s)
+{
+ struct sdr *dr;
+
+ /* 1) recv a RST packet; 2) keepalive timeout */
+ if (s->tcb.state == TCP_ST_ESTABLISHED) {
+ TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB);
+ TCP_INC_STATS(TCP_MIB_ESTABRESETS);
+ }
+
+ s->tcb.state = TCP_ST_CLOSED;
+ rte_smp_wmb();
+
+ /* close() was already invoked, schedule final cleanup */
+ if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
+ if ((s->tcb.uop & TCP_OP_ACCEPT) == 0) {
+ /* free stream's destination port */
+ stream_clear_ctx(s->s.ctx, &s->s);
+ if ((s->tcb.uop & TCP_OP_LISTEN) != 0)
+ empty_lq(s);
+ }
+
+ if (s->ste != NULL) {
+ /* remove entry from RX streams table */
+ stbl_del_stream(CTX_TCP_STLB(s->s.ctx), s->ste, &s->s);
+ s->ste = NULL;
+ empty_rq(s);
+ }
+
+ dr = CTX_TCP_SDR(s->s.ctx);
+ rte_spinlock_lock(&dr->lock);
+ STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
+ rte_spinlock_unlock(&dr->lock);
+
+ /* notify user that stream need to be closed */
+ } else if (s->err.ev != NULL)
+ tle_event_raise(s->err.ev);
+ else if (s->err.cb.func != NULL)
+ s->err.cb.func(s->err.cb.data, &s->s);
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/libtle_l4p/tcp_misc.h b/lib/libtle_l4p/tcp_misc.h
index 0cef8b2..1f7974e 100644
--- a/lib/libtle_l4p/tcp_misc.h
+++ b/lib/libtle_l4p/tcp_misc.h
@@ -30,7 +30,7 @@ extern "C" {
* of protocol related data.
*/
-#define TCP_WSCALE_DEFAULT 7
+#define TCP_WSCALE_DEFAULT 10
#define TCP_WSCALE_NONE 0
#define TCP_TX_HDR_MAX (sizeof(struct tcp_hdr) + TCP_TX_OPT_LEN_MAX)
@@ -71,27 +71,6 @@ extern "C" {
/* TCP flags mask. */
#define TCP_FLAG_MASK UINT8_MAX
-union typflg {
- uint16_t raw;
- struct {
- uint8_t type; /* TLE_V4/TLE_V6 */
- uint8_t flags; /* TCP header flags */
- };
-};
-
-union pkt_info {
- rte_xmm_t raw;
- struct {
- union typflg tf;
- uint16_t csf; /* checksum flags */
- union l4_ports port;
- union {
- union ipv4_addrs addr4;
- const union ipv6_addrs *addr6;
- };
- };
-};
-
union seg_info {
rte_xmm_t raw;
struct {
@@ -226,7 +205,7 @@ struct dack_info {
};
/* get current timestamp in ms */
-static inline uint32_t
+static inline uint64_t
tcp_get_tms(uint32_t mshift)
{
uint64_t ts;
@@ -344,7 +323,9 @@ fill_syn_opts(void *p, const struct syn_opts *so)
opt = (struct tcpopt *)to;
}
- to[0] = TCP_OPT_KIND_EOL;
+ to[0] = TCP_OPT_KIND_NOP;
+ to[1] = TCP_OPT_KIND_NOP;
+ to[2] = TCP_OPT_KIND_NOP;
}
/*
@@ -390,6 +371,8 @@ get_tms_opts(uintptr_t p, uint32_t len)
else if (kind == TCP_OPT_KIND_NOP)
i += sizeof(to->kl.kind);
else {
+ if (to->kl.len == 0)
+ break;
i += to->kl.len;
if (i <= len && to->kl.raw == TCP_OPT_KL_TMS) {
ts.val = rte_be_to_cpu_32(to->ts.val);
@@ -449,7 +432,6 @@ get_pkt_info(const struct rte_mbuf *m, union pkt_info *pi, union seg_info *si)
((uintptr_t)tcph + offsetof(struct tcp_hdr, src_port));
pi->tf.flags = tcph->tcp_flags;
pi->tf.type = type;
- pi->csf = m->ol_flags & (PKT_RX_IP_CKSUM_MASK | PKT_RX_L4_CKSUM_MASK);
pi->port.raw = prt->raw;
get_seg_info(tcph, si);
@@ -462,7 +444,7 @@ tcp_mbuf_seq_free(struct rte_mbuf *mb[], uint32_t num)
len = 0;
for (i = 0; i != num; i++) {
- len += mb[i]->pkt_len;
+ len += PKT_L4_PLEN(mb[i]);
rte_pktmbuf_free(mb[i]);
}
diff --git a/lib/libtle_l4p/tcp_ofo.c b/lib/libtle_l4p/tcp_ofo.c
index 1565445..b31f2b5 100644
--- a/lib/libtle_l4p/tcp_ofo.c
+++ b/lib/libtle_l4p/tcp_ofo.c
@@ -12,7 +12,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <rte_malloc.h>
#include <rte_errno.h>
#include "tcp_stream.h"
@@ -28,12 +27,6 @@
#define OFO_OBJ_MAX (OFODB_OBJ_MAX * OFO_DB_MAX)
void
-tcp_ofo_free(struct ofo *ofo)
-{
- rte_free(ofo);
-}
-
-static void
calc_ofo_elems(uint32_t nbufs, uint32_t *nobj, uint32_t *ndb)
{
uint32_t n, nd, no;
@@ -51,35 +44,3 @@ calc_ofo_elems(uint32_t nbufs, uint32_t *nobj, uint32_t *ndb)
*nobj = no;
*ndb = nd;
}
-
-struct ofo *
-tcp_ofo_alloc(uint32_t nbufs, int32_t socket)
-{
- uint32_t i, ndb, nobj;
- size_t dsz, osz, sz;
- struct ofo *ofo;
- struct rte_mbuf **obj;
-
- calc_ofo_elems(nbufs, &nobj, &ndb);
- osz = sizeof(*ofo) + sizeof(ofo->db[0]) * ndb;
- dsz = sizeof(ofo->db[0].obj[0]) * nobj * ndb;
- sz = osz + dsz;
-
- ofo = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, socket);
- if (ofo == NULL) {
- TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
- "failed with error code: %d\n",
- __func__, sz, socket, rte_errno);
- return NULL;
- }
-
- obj = (struct rte_mbuf **)&ofo->db[ndb];
- for (i = 0; i != ndb; i++) {
- ofo->db[i].nb_max = nobj;
- ofo->db[i].obj = obj + i * nobj;
- }
-
- ofo->nb_max = ndb;
- return ofo;
-}
-
diff --git a/lib/libtle_l4p/tcp_ofo.h b/lib/libtle_l4p/tcp_ofo.h
index 9d88266..0857f17 100644
--- a/lib/libtle_l4p/tcp_ofo.h
+++ b/lib/libtle_l4p/tcp_ofo.h
@@ -20,8 +20,6 @@
extern "C" {
#endif
-#include <stdbool.h>
-
struct ofodb {
uint32_t nb_elem;
uint32_t nb_max;
@@ -103,7 +101,7 @@ _ofo_insert_mbuf(struct ofo* ofo, uint32_t pos, union seqlen* sl,
db->obj[k + i] = mb[i];
}
if (tcp_seq_lt(end, seq))
- rte_pktmbuf_trim(mb[i - 1], seq - end);
+ _rte_pktmbuf_trim(mb[i - 1], seq - end);
db->nb_elem += i;
db->sl.len += tcp_seq_min(seq, end) - sl->seq;
@@ -157,7 +155,7 @@ _ofo_insert_right(struct ofo *ofo, uint32_t pos, union seqlen *sl,
plen = mb[i]->pkt_len;
if (n < plen) {
/* adjust partially overlapped packet. */
- rte_pktmbuf_adj(mb[i], n);
+ mb[i] = _rte_pktmbuf_adj(mb[i], n);
break;
}
}
@@ -258,7 +256,7 @@ static inline uint32_t
_ofodb_enqueue(struct rte_ring *r, const struct ofodb *db, uint32_t *seq)
{
uint32_t i, n, num, begin, end;
- struct rte_mbuf *pkt;
+ struct rte_mbuf* pkt;
n = 0;
num = db->nb_elem;
@@ -289,11 +287,7 @@ _ofodb_enqueue(struct rte_ring *r, const struct ofodb *db, uint32_t *seq)
return num - n;
}
-struct ofo *
-tcp_ofo_alloc(uint32_t nbufs, int32_t socket);
-
-void
-tcp_ofo_free(struct ofo *ofo);
+void calc_ofo_elems(uint32_t nbufs, uint32_t *nobj, uint32_t *ndb);
#ifdef __cplusplus
}
diff --git a/lib/libtle_l4p/tcp_rxq.h b/lib/libtle_l4p/tcp_rxq.h
index 2351ee6..be092f9 100644
--- a/lib/libtle_l4p/tcp_rxq.h
+++ b/lib/libtle_l4p/tcp_rxq.h
@@ -17,6 +17,7 @@
#define _TCP_RXQ_H_
#include "tcp_ofo.h"
+#include "tcp_ctl.h"
#ifdef __cplusplus
extern "C" {
@@ -74,6 +75,7 @@ rx_ofo_reduce(struct tle_tcp_stream *s)
s->tcb.rcv.nxt = seq;
_ofo_remove(ofo, 0, i);
+
return n;
}
@@ -133,6 +135,8 @@ rx_data_enqueue(struct tle_tcp_stream *s, uint32_t seq, uint32_t len,
}
n = rte_ring_count(s->rx.q);
+ /* update receive window with left recv buffer*/
+ s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
if (r != n) {
/* raise RX event */
if (s->rx.ev != NULL)
diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c
index a519645..5d7e0d1 100644
--- a/lib/libtle_l4p/tcp_rxtx.c
+++ b/lib/libtle_l4p/tcp_rxtx.c
@@ -28,8 +28,30 @@
#include "tcp_rxq.h"
#include "tcp_txq.h"
#include "tcp_tx_seg.h"
+#include "tcp_rxtx.h"
-#define TCP_MAX_PKT_SEG 0x20
+/* Uncomment below line to debug cwnd */
+// #define DEBUG_CWND
+
+#ifdef DEBUG_CWND
+#define CWND_INFO(msg, value) printf("CWND: %s: %d\n", msg, value)
+#else
+#define CWND_INFO(msg, value) do {} while (0)
+#endif
+
+#define TCP_MAX_PKT_SEG 0x20
+#define DELAY_ACK_CHECK_INTERVAL 100
+
+/* must larger than l2_len(14)+l3_len(20)+l4_len(20)+tms_option(12) */
+#define RESERVE_HEADER_LEN 128
+
+/* If we encounter exhaustion of recv win, we set this thresh to
+ * update recv win to the remote. It's not set to 1 or some smaller
+ * value to avoid too-frequent update.
+ */
+#define RECV_WIN_NOTIFY_THRESH 64
+
+static inline int stream_fill_dest(struct tle_tcp_stream *s);
/*
* checks if input TCP ports and IP addresses match given stream.
@@ -54,11 +76,17 @@ rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
static inline struct tle_tcp_stream *
rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
- uint32_t type)
+ uint32_t type, uint8_t reuse)
{
struct tle_tcp_stream *s;
- s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
+ if (type == TLE_V4)
+ s = bhash_lookup4(dev->ctx->bhash[type],
+ pi->addr4.dst, pi->port.dst, reuse);
+ else
+ s = bhash_lookup6(dev->ctx->bhash[type],
+ pi->addr6->dst, pi->port.dst, reuse);
+
if (s == NULL || tcp_stream_acquire(s) < 0)
return NULL;
@@ -77,10 +105,10 @@ rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
{
struct tle_tcp_stream *s;
- s = stbl_find_data(st, pi);
+ s = TCP_STREAM(stbl_find_stream(st, pi));
if (s == NULL) {
- if (pi->tf.flags == TCP_FLAG_ACK)
- return rx_obtain_listen_stream(dev, pi, type);
+ if (pi->tf.flags & TCP_FLAG_ACK)
+ return rx_obtain_listen_stream(dev, pi, type, 1);
return NULL;
}
@@ -150,131 +178,6 @@ pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
return i;
}
-static inline void
-stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
- uint32_t nb_drb)
-{
- _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
-}
-
-static inline uint32_t
-stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
- uint32_t nb_drb)
-{
- return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
-}
-
-static inline uint32_t
-get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
-{
- uint32_t pid;
- rte_atomic32_t *pa;
-
- pa = &dev->tx.packet_id[type];
-
- if (st == 0) {
- pid = rte_atomic32_add_return(pa, num);
- return pid - num;
- } else {
- pid = rte_atomic32_read(pa);
- rte_atomic32_set(pa, pid + num);
- return pid;
- }
-}
-
-static inline void
-fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
- uint32_t seq, uint8_t hlen, uint8_t flags)
-{
- uint16_t wnd;
-
- l4h->src_port = port.dst;
- l4h->dst_port = port.src;
-
- wnd = (flags & TCP_FLAG_SYN) ?
- RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
- tcb->rcv.wnd >> tcb->rcv.wscale;
-
- /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
- l4h->sent_seq = rte_cpu_to_be_32(seq);
- l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
- l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
- l4h->tcp_flags = flags;
- l4h->rx_win = rte_cpu_to_be_16(wnd);
- l4h->cksum = 0;
- l4h->tcp_urp = 0;
-
- if (flags & TCP_FLAG_SYN)
- fill_syn_opts(l4h + 1, &tcb->so);
- else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
- fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
-}
-
-static inline int
-tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
- const struct tle_dest *dst, uint64_t ol_flags,
- union l4_ports port, uint32_t seq, uint32_t flags,
- uint32_t pid, uint32_t swcsm)
-{
- uint32_t l4, len, plen;
- struct tcp_hdr *l4h;
- char *l2h;
-
- len = dst->l2_len + dst->l3_len;
- plen = m->pkt_len;
-
- if (flags & TCP_FLAG_SYN)
- l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
- else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
- l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
- else
- l4 = sizeof(*l4h);
-
- /* adjust mbuf to put L2/L3/L4 headers into it. */
- l2h = rte_pktmbuf_prepend(m, len + l4);
- if (l2h == NULL)
- return -EINVAL;
-
- /* copy L2/L3 header */
- rte_memcpy(l2h, dst->hdr, len);
-
- /* setup TCP header & options */
- l4h = (struct tcp_hdr *)(l2h + len);
- fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
-
- /* setup mbuf TX offload related fields. */
- m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
- m->ol_flags |= ol_flags;
-
- /* update proto specific fields. */
-
- if (s->s.type == TLE_V4) {
- struct ipv4_hdr *l3h;
- l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
- l3h->packet_id = rte_cpu_to_be_16(pid);
- l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
-
- if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
- l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
- ol_flags);
- else if (swcsm != 0)
- l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
-
- if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
- l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
- } else {
- struct ipv6_hdr *l3h;
- l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
- l3h->payload_len = rte_cpu_to_be_16(plen + l4);
- if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
- l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
- else if (swcsm != 0)
- l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
- }
-
- return 0;
-}
-
/*
* That function supposed to be used only for data packets.
* Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
@@ -355,6 +258,9 @@ tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
num, drb, &nb);
+ if (i > 0)
+ timer_stop(s, TIMER_DACK);
+
/* free unused drbs. */
if (nb != 0)
stream_drb_free(s, drb + nbm - nb, nb);
@@ -362,6 +268,113 @@ tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
return i;
}
+/*
+ * case 0: pkt is not split yet, (indicate plen > sl->len)
+ * case 1: pkt is split, but left packet > sl->len
+ * case 2: pkt is split, but left packet <= sl->len
+ */
+static inline struct rte_mbuf *
+get_indirect_mbuf(struct tle_tcp_stream *s,
+ struct rte_mbuf *m, uint32_t *p_plen,
+ union seqlen *sl, uint32_t type,
+ uint32_t mss)
+{
+ uint32_t hdr_len = PKT_L234_HLEN(m), plen, left;
+ struct rte_mbuf *f, *t;
+ uint16_t i, nb_segs, adj;
+ void *hdr;
+
+ if (s->tcb.snd.nxt_pkt) {
+ f = s->tcb.snd.nxt_pkt;
+ plen = f->data_len - s->tcb.snd.nxt_offset;
+ if (f == m) /* 1st segment contains net headers */
+ plen -= hdr_len;
+ } else {
+ f = m;
+ plen = f->data_len - hdr_len;
+ }
+
+ TCP_LOG(DEBUG, "m(%p): pkt_len=%u, nb_segs=%u, sl->len = %u\n",
+ m, m->pkt_len, m->nb_segs, sl->len);
+
+ nb_segs = 1;
+ if (sl->len < plen) {
+ /* Segment split needed: sometimes, cwnd will be reset to
+ * 1 or 2 mss. In this case, we send part of this seg, and
+ * record which segment we've sent, and the offset of sent
+ * data in tcb.
+ */
+ left = plen - sl->len;
+ plen = sl->len;
+ s->tcb.snd.nxt_pkt = f;
+ } else {
+ left = 0;
+ t = f->next;
+ while (t && plen + t->data_len <= sl->len) {
+ plen += t->data_len;
+ t = t->next;
+ nb_segs++;
+ }
+ s->tcb.snd.nxt_pkt = t;
+ }
+
+ struct rte_mbuf *pkts[1 + nb_segs];
+ if (rte_pktmbuf_alloc_bulk(s->tx.dst.head_mp, pkts, 1 + nb_segs) < 0)
+ return NULL;
+
+ rte_pktmbuf_attach(pkts[1], f);
+
+ /* remove bytes in the beginning */
+ adj = s->tcb.snd.nxt_offset;
+ if (f == m)
+ adj += hdr_len;
+ if (adj)
+ rte_pktmbuf_adj(pkts[1], adj);
+
+ /* remove bytes in the end */
+ if (left > 0) {
+ rte_pktmbuf_trim(pkts[1], left);
+ s->tcb.snd.nxt_offset += plen;
+ } else
+ s->tcb.snd.nxt_offset = 0;
+
+ /* attach chaining segment if we have */
+ for (i = 1, t = f->next; i < nb_segs; ++i) {
+ rte_pktmbuf_attach(pkts[i+1], t);
+ pkts[i]->next = pkts[i+1];
+ t = t->next;
+ }
+
+ /* prepare l2/l3/l4 header */
+ hdr = rte_pktmbuf_append(pkts[0], hdr_len);
+ rte_memcpy(hdr, rte_pktmbuf_mtod(m, void *), hdr_len);
+ pkts[0]->nb_segs = nb_segs + 1;
+ pkts[0]->pkt_len = plen + hdr_len;
+ pkts[0]->ol_flags = m->ol_flags;
+ pkts[0]->tx_offload = m->tx_offload;
+ if (type == TLE_V4) {
+ struct ipv4_hdr *l3h;
+
+ l3h = rte_pktmbuf_mtod_offset(pkts[0],
+ struct ipv4_hdr *, m->l2_len);
+ l3h->total_length =
+ rte_cpu_to_be_16(plen + m->l3_len + m->l4_len);
+ } else {
+ struct ipv6_hdr *l3h;
+
+ l3h = rte_pktmbuf_mtod_offset(pkts[0],
+ struct ipv6_hdr *, m->l2_len);
+ l3h->payload_len =
+ rte_cpu_to_be_16(plen + m->l4_len);
+ }
+ if (plen <= mss)
+ pkts[0]->ol_flags &= ~PKT_TX_TCP_SEG;
+ pkts[0]->next = pkts[1];
+
+ *p_plen = plen;
+ return pkts[0];
+}
+
static inline uint32_t
tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
uint32_t num)
@@ -371,11 +384,13 @@ tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
struct rte_mbuf *mb;
struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
+ /* check stream has drb to send pkts */
+ if (stream_drb_empty(s))
+ return 0;
+
mss = s->tcb.snd.mss;
type = s->s.type;
-
dev = s->tx.dst.dev;
- pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
k = 0;
tn = 0;
@@ -383,26 +398,64 @@ tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
mb = mi[i];
- sz = RTE_MIN(sl->len, mss);
plen = PKT_L4_PLEN(mb);
/*fast path, no need to use indirect mbufs. */
- if (plen <= sz) {
-
+ if (s->tcb.snd.nxt_pkt == NULL && plen <= sl->len) {
+ pid = get_ip_pid(dev, calc_seg_cnt(plen, s->tcb.snd.mss),
+ type, (s->flags & TLE_CTX_FLAG_ST) != 0);
/* update pkt TCP header */
- tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
+ tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid);
/* keep mbuf till ACK is received. */
rte_pktmbuf_refcnt_update(mb, 1);
sl->len -= plen;
sl->seq += plen;
mo[k++] = mb;
- /* remaining snd.wnd is less them MSS, send nothing */
- } else if (sz < mss)
+ if (sl->seq <= s->tcb.snd.rcvr)
+ TCP_INC_STATS(TCP_MIB_RETRANSSEGS);
+ /* remaining snd.wnd is less than MSS, send nothing */
+ } else if (sl->len < mss) {
+ break;
+ /* some data to send already */
+ } else if (k != 0 || tn != 0) {
break;
/* packet indirection needed */
- else
- RTE_VERIFY(0);
+ } else {
+ struct rte_mbuf *out;
+
+ out = get_indirect_mbuf(s, mb, &plen, sl, type, mss);
+ if (out == NULL)
+ return 0;
+
+ pid = get_ip_pid(dev, calc_seg_cnt(plen, s->tcb.snd.mss),
+ type, (s->flags & TLE_CTX_FLAG_ST) != 0);
+ /* update pkt TCP header */
+ tcp_update_mbuf(out, type, &s->tcb, sl->seq, pid);
+
+ /* no need to bump refcnt !!! */
+
+ sl->len -= plen;
+ sl->seq += plen;
+
+ if (tx_data_pkts(s, &out, 1) == 0) {
+ /* should not happen, we have checked at least one
+ * drb is available to send this mbuf
+ */
+ rte_pktmbuf_free(out);
+ return 0;
+ }
+
+ if (sl->seq <= s->tcb.snd.rcvr)
+ TCP_INC_STATS(TCP_MIB_RETRANSSEGS);
+
+ if (s->tcb.snd.nxt_pkt)
+ return 0;
+ else {
+ tn = 1;
+ continue;
+ }
+ }
if (k >= MAX_PKT_BURST) {
n = tx_data_pkts(s, mo, k);
@@ -466,14 +519,17 @@ tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
tcp_txq_set_nxt_head(s, n);
} while (n == num);
- s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
+ if (sl.seq != (uint32_t)s->tcb.snd.nxt) {
+ s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
+ s->tcb.snd.ack = s->tcb.rcv.nxt;
+ }
return tn;
}
static inline void
free_una_data(struct tle_tcp_stream *s, uint32_t len)
{
- uint32_t i, num, plen;
+ uint32_t i, num, plen, una_data;
struct rte_mbuf **mi;
plen = 0;
@@ -487,14 +543,18 @@ free_una_data(struct tle_tcp_stream *s, uint32_t len)
/* free acked data */
for (i = 0; i != num && plen != len; i++) {
- uint32_t next_pkt_len = PKT_L4_PLEN(mi[i]);
- if (plen + next_pkt_len > len) {
- /* keep SND.UNA at the start of the packet */
- len = plen;
+ una_data = PKT_L4_PLEN(mi[i]) - s->tcb.snd.una_offset;
+
+ /* partial ack */
+ if (plen + una_data > len) {
+ s->tcb.snd.una_offset += len - plen;
+ plen = len;
break;
- } else {
- plen += next_pkt_len;
}
+
+ /* monolithic ack */
+ s->tcb.snd.una_offset = 0;
+ plen += una_data;
rte_pktmbuf_free(mi[i]);
}
@@ -503,6 +563,7 @@ free_una_data(struct tle_tcp_stream *s, uint32_t len)
} while (plen < len);
s->tcb.snd.una += len;
+ s->tcb.snd.waitlen -= len;
/*
* that could happen in case of retransmit,
@@ -519,7 +580,7 @@ calc_smss(uint16_t mss, const struct tle_dest *dst)
{
uint16_t n;
- n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
+ n = dst->mtu - dst->l3_len - sizeof(struct tcp_hdr);
mss = RTE_MIN(n, mss);
return mss;
}
@@ -537,71 +598,53 @@ initial_cwnd(uint32_t smss, uint32_t icw)
return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
}
-/*
- * queue standalone packet to he particular output device
- * It assumes that:
- * - L2/L3/L4 headers should be already set.
- * - packet fits into one segment.
- */
-static inline int
-send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
+void
+tle_tcp_stream_kill(struct tle_stream *ts)
{
- uint32_t n, nb;
- struct tle_drb *drb;
-
- if (stream_drb_alloc(s, &drb, 1) == 0)
- return -ENOBUFS;
-
- /* enqueue pkt for TX. */
- nb = 1;
- n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
- &drb, &nb);
-
- /* free unused drbs. */
- if (nb != 0)
- stream_drb_free(s, &drb, 1);
-
- return (n == 1) ? 0 : -ENOBUFS;
-}
+ struct tle_tcp_stream *s;
-static inline int
-send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
- uint32_t flags)
-{
- const struct tle_dest *dst;
- uint32_t pid, type;
- int32_t rc;
+ s = TCP_STREAM(ts);
+ if (ts == NULL || s->s.type >= TLE_VNUM)
+ return;
- dst = &s->tx.dst;
- type = s->s.type;
- pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
+ if (s->tcb.state > TCP_ST_LISTEN)
+ send_rst(s, s->tcb.snd.nxt);
- rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
- if (rc == 0)
- rc = send_pkt(s, dst->dev, m);
+ if (s->tcb.state == TCP_ST_ESTABLISHED)
+ TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB);
- return rc;
+ s->tcb.state = TCP_ST_CLOSED;
+ rte_smp_wmb();
+ timer_stop(s, TIMER_RTO);
}
static inline int
-send_rst(struct tle_tcp_stream *s, uint32_t seq)
+send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
{
struct rte_mbuf *m;
+ uint32_t seq;
int32_t rc;
m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
if (m == NULL)
return -ENOMEM;
- rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
- if (rc != 0)
+ seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
+ s->tcb.snd.ts = tms;
+
+ rc = send_ctrl_pkt(s, m, seq, flags);
+ if (rc != 0) {
rte_pktmbuf_free(m);
+ return rc;
+ }
- return rc;
+ timer_stop(s, TIMER_DACK);
+ s->tcb.snd.ack = s->tcb.rcv.nxt;
+ return 0;
}
static inline int
-send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
+send_keepalive(struct tle_tcp_stream *s)
{
struct rte_mbuf *m;
uint32_t seq;
@@ -611,20 +654,16 @@ send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
if (m == NULL)
return -ENOMEM;
- seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
- s->tcb.snd.ts = tms;
+ seq = s->tcb.snd.una - 1;
- rc = send_ctrl_pkt(s, m, seq, flags);
+ rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_ACK);
if (rc != 0) {
rte_pktmbuf_free(m);
return rc;
}
-
- s->tcb.snd.ack = s->tcb.rcv.nxt;
return 0;
}
-
static int
sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
@@ -633,19 +672,23 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
int32_t rc;
uint32_t pid, seq, type;
struct tle_dev *dev;
- const void *da;
+ const void *sa, *da;
struct tle_dest dst;
const struct tcp_hdr *th;
- type = s->s.type;
+ type = pi->tf.type;
/* get destination information. */
- if (type == TLE_V4)
+ if (type == TLE_V4) {
da = &pi->addr4.src;
- else
+ sa = &pi->addr4.dst;
+ }
+ else {
da = &pi->addr6->src;
+ sa = &pi->addr6->dst;
+ }
- rc = stream_get_dest(&s->s, da, &dst);
+ rc = stream_get_dest(type, &s->s, sa, da, &dst);
if (rc < 0)
return rc;
@@ -654,11 +697,16 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
s->tcb.rcv.nxt = si->seq + 1;
+ s->tcb.rcv.cpy = si->seq + 1;
seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
s->s.ctx->prm.hash_alg,
&s->s.ctx->prm.secret_key);
- s->tcb.so.ts.ecr = s->tcb.so.ts.val;
- s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
+
+ if (s->tcb.so.ts.raw) {
+ s->tcb.so.ts.ecr = s->tcb.so.ts.val;
+ s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
+ }
+
s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
s->tcb.so.mss = calc_smss(dst.mtu, &dst);
@@ -672,11 +720,13 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
dev = dst.dev;
pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
- rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
- TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
+ rc = tcp_fill_mbuf(m, s, &dst, TCP_OLFLAGS_CKSUM(dst.ol_flags),
+ pi->port, seq, TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
if (rc == 0)
rc = send_pkt(s, dev, m);
+ TCP_INC_STATS(TCP_MIB_PASSIVEOPENS);
+
return rc;
}
@@ -800,43 +850,24 @@ restore_syn_opt(union seg_info *si, union tsopt *to,
return 0;
}
-static inline void
-stream_term(struct tle_tcp_stream *s)
-{
- struct sdr *dr;
-
- s->tcb.state = TCP_ST_CLOSED;
- rte_smp_wmb();
-
- timer_stop(s);
-
- /* close() was already invoked, schedule final cleanup */
- if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
-
- dr = CTX_TCP_SDR(s->s.ctx);
- STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
-
- /* notify user that stream need to be closed */
- } else if (s->err.ev != NULL)
- tle_event_raise(s->err.ev);
- else if (s->err.cb.func != NULL)
- s->err.cb.func(s->err.cb.data, &s->s);
-}
-
static inline int
stream_fill_dest(struct tle_tcp_stream *s)
{
int32_t rc;
uint32_t type;
- const void *da;
+ const void *sa, *da;
- type = s->s.type;
- if (type == TLE_V4)
+ type = s->s.type;
+ if (type == TLE_V4) {
+ sa = &s->s.ipv4.addr.dst;
da = &s->s.ipv4.addr.src;
- else
+ }
+ else {
+ sa = &s->s.ipv6.addr.dst;
da = &s->s.ipv6.addr.src;
+ }
- rc = stream_get_dest(&s->s, da, &s->tx.dst);
+ rc = stream_get_dest(type, &s->s, sa, da, &s->tx.dst);
return (rc < 0) ? rc : 0;
}
@@ -851,19 +882,17 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
int32_t rc;
uint32_t rtt;
- /* some TX still pending for that stream. */
- if (TCP_STREAM_TX_PENDING(cs))
- return -EAGAIN;
-
/* setup L4 ports and L3 addresses fields. */
cs->s.port.raw = pi->port.raw;
cs->s.pmsk.raw = UINT32_MAX;
if (pi->tf.type == TLE_V4) {
+ cs->s.type = TLE_V4;
cs->s.ipv4.addr = pi->addr4;
cs->s.ipv4.mask.src = INADDR_NONE;
cs->s.ipv4.mask.dst = INADDR_NONE;
} else if (pi->tf.type == TLE_V6) {
+ cs->s.type = TLE_V6;
cs->s.ipv6.addr = *pi->addr6;
rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
sizeof(cs->s.ipv6.mask.src));
@@ -887,7 +916,7 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
cs->tcb.snd.rto = TCP_RTO_DEFAULT;
/* copy streams type & flags. */
- cs->s.type = ps->s.type;
+ cs->s.type = pi->tf.type;
cs->flags = ps->flags;
/* retrive and cache destination information. */
@@ -897,16 +926,23 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
/* update snd.mss with SMSS value */
cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
+ if (cs->tcb.so.ts.raw != 0) {
+ cs->tcb.snd.mss -= TCP_TX_OPT_LEN_TMS;
+ }
/* setup congestion variables */
cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
+ CWND_INFO("accept", cs->tcb.snd.cwnd);
+
cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
+ cs->tcb.snd.rto_fw = ps->tcb.snd.rto_fw;
cs->tcb.state = TCP_ST_ESTABLISHED;
+ TCP_INC_STATS_ATOMIC(TCP_MIB_CURRESTAB);
/* add stream to the table */
- cs->ste = stbl_add_stream(st, pi, cs);
+ cs->ste = stbl_add_stream(st, &cs->s);
if (cs->ste == NULL)
return -ENOBUFS;
@@ -937,7 +973,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
*csp = NULL;
- if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
+ if ((pi->tf.flags & TCP_FLAG_ACK) == 0|| rx_check_stream(s, pi) != 0)
return -EINVAL;
ctx = s->s.ctx;
@@ -964,7 +1000,8 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
/* cleanup on failure */
tcp_stream_down(cs);
- stbl_del_stream(st, cs->ste, cs, 0);
+ TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB);
+ stbl_del_stream(st, cs->ste, &cs->s);
cs->ste = NULL;
}
@@ -982,6 +1019,10 @@ data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf **mb, uint32_t hlen,
len = *plen;
rte_pktmbuf_adj(*mb, hlen);
+ /* header is removed, so we clear tx_offload here to make sure
+ * we can get correct payload length with PKT_L4_PLEN.
+ */
+ (*mb)->tx_offload = 0;
if (len == 0)
return -ENODATA;
/* cut off the start of the packet */
@@ -1018,7 +1059,8 @@ rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
tle_event_raise(s->tx.ev);
else if (k == 0 && s->tx.cb.func != NULL)
s->tx.cb.func(s->tx.cb.data, &s->s);
- }
+ } else
+ txs_enqueue(s->s.ctx, s);
}
return n;
@@ -1029,8 +1071,7 @@ stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
{
if (rto != 0) {
s->tcb.state = TCP_ST_TIME_WAIT;
- s->tcb.snd.rto = rto;
- timer_reset(s);
+ timer_reset(s, TIMER_RTO, rto);
} else
stream_term(s);
}
@@ -1041,20 +1082,30 @@ rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
uint32_t state;
int32_t ackfin;
+ s->tcb.rcv.frs.on = 2;
s->tcb.rcv.nxt += 1;
ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
state = s->tcb.state;
if (state == TCP_ST_ESTABLISHED) {
+ TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB);
s->tcb.state = TCP_ST_CLOSE_WAIT;
/* raise err.ev & err.cb */
- if (s->err.ev != NULL)
+ /* raise error event only when recvbuf is empty, to inform
+ * that the stream will not receive data any more.
+ */
+ if (rte_ring_count(s->rx.q) == 0 && s->err.ev != NULL)
tle_event_raise(s->err.ev);
else if (s->err.cb.func != NULL)
s->err.cb.func(s->err.cb.data, &s->s);
} else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
rsp->flags |= TCP_FLAG_ACK;
+
+ /* shutdown instead of close happens */
+ if (rte_ring_count(s->rx.q) == 0 && s->err.ev != NULL)
+ tle_event_raise(s->err.ev);
+
if (ackfin != 0)
stream_timewait(s, s->tcb.snd.rto_tw);
else
@@ -1089,8 +1140,10 @@ rx_fin(struct tle_tcp_stream *s, uint32_t state,
ts = rx_tms_opt(&s->tcb, mb);
ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
- if (ret != 0)
+ if (ret != 0) {
+ rsp->flags |= TCP_FLAG_ACK;
return ret;
+ }
if (state < TCP_ST_ESTABLISHED)
return -EINVAL;
@@ -1108,9 +1161,10 @@ rx_fin(struct tle_tcp_stream *s, uint32_t state,
* fast-path: all data & FIN was already sent out
* and now is acknowledged.
*/
- if (s->tcb.snd.fss == s->tcb.snd.nxt &&
- si->ack == (uint32_t)s->tcb.snd.nxt) {
+ if (s->tcb.snd.fss >= s->tcb.snd.nxt &&
+ si->ack == (uint32_t)s->tcb.snd.fss) {
s->tcb.snd.una = s->tcb.snd.fss;
+ s->tcb.snd.nxt = s->tcb.snd.una;
empty_tq(s);
/* conventional ACK processiing */
} else
@@ -1148,8 +1202,25 @@ rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
else
rc = check_seqn(&s->tcb, si->seq, 0);
- if (rc == 0)
+ if (rc == 0) {
+ /* receive rst, connection is closed abnormal
+ * and should return errno in later operations.
+ */
+ switch (state) {
+ case TCP_ST_SYN_SENT:
+ TCP_INC_STATS(TCP_MIB_ATTEMPTFAILS);
+ s->tcb.err = ECONNREFUSED;
+ break;
+ case TCP_ST_CLOSE_WAIT:
+ s->tcb.err = EPIPE;
+ break;
+ case TCP_ST_CLOSED:
+ return rc;
+ default:
+ s->tcb.err = ECONNRESET;
+ }
stream_term(s);
+ }
return rc;
}
@@ -1222,6 +1293,7 @@ rto_cwnd_update(struct tcb *tcb)
* no more than 1 full-sized segment.
*/
tcb->snd.cwnd = tcb->snd.mss;
+ CWND_INFO("update", tcb->snd.cwnd);
}
static inline void
@@ -1330,13 +1402,17 @@ rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
plen, ts);
- if (ret != 0)
- break;
-
/* account for segment received */
ack_info_update(tack, &si[j], ret != 0, plen, ts);
+ if (ret != 0)
+ break;
+
rte_pktmbuf_adj(mb[j], hlen);
+ /* header is removed, so we clear tx_offload here to make sure
+ * we can get correct payload length with PKT_L4_PLEN.
+ */
+ mb[j]->tx_offload = 0;
}
n = j - i;
@@ -1377,6 +1453,7 @@ start_fast_retransmit(struct tle_tcp_stream *s)
tcp_txq_rst_nxt_head(s);
tcb->snd.nxt = tcb->snd.una;
tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
+ CWND_INFO("start fast retrans", tcb->snd.cwnd);
}
static inline void
@@ -1389,6 +1466,7 @@ stop_fast_retransmit(struct tle_tcp_stream *s)
n = tcb->snd.nxt - tcb->snd.una;
tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
+ CWND_INFO("stop fast retrans", tcb->snd.cwnd);
tcb->snd.fastack = 0;
}
@@ -1415,8 +1493,10 @@ in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
* during fast recovery, also reset the
* retransmit timer.
*/
- if (tcb->snd.fastack == 1)
- timer_reset(s);
+ if (tcb->snd.fastack == 1) {
+ timer_reset(s, TIMER_RTO, s->tcb.snd.rto);
+ s->tcb.snd.nb_retx = 0;
+ }
tcb->snd.fastack += ack_num;
return 1;
@@ -1456,7 +1536,8 @@ process_ack(struct tle_tcp_stream *s, uint32_t acked,
/* remain in normal mode */
} else if (acked != 0) {
ack_cwnd_update(&s->tcb, acked, tack);
- timer_stop(s);
+ timer_stop(s, TIMER_RTO);
+ s->tcb.snd.nb_retx = 0;
}
/* fast retransmit mode */
@@ -1470,7 +1551,7 @@ process_ack(struct tle_tcp_stream *s, uint32_t acked,
} else {
/* RFC 5682 3.2.3 full ACK */
stop_fast_retransmit(s);
- timer_stop(s);
+ timer_stop(s, TIMER_RTO);
/* if we have another series of dup ACKs */
if (tack->dup3.seg != 0 &&
@@ -1501,17 +1582,22 @@ rx_ackfin(struct tle_tcp_stream *s)
uint32_t state;
s->tcb.snd.una = s->tcb.snd.fss;
+ s->tcb.snd.nxt = s->tcb.snd.una;
empty_tq(s);
state = s->tcb.state;
if (state == TCP_ST_LAST_ACK)
stream_term(s);
else if (state == TCP_ST_FIN_WAIT_1) {
- timer_stop(s);
+ timer_stop(s, TIMER_RTO);
s->tcb.state = TCP_ST_FIN_WAIT_2;
- } else if (state == TCP_ST_CLOSING) {
+ /* if stream is closed, should be released
+ * before timeout even without fin from peer
+ */
+ if (s->tcb.uop & TCP_OP_CLOSE)
+ timer_start(s, TIMER_RTO, s->tcb.snd.rto_fw);
+ } else if (state == TCP_ST_CLOSING)
stream_timewait(s, s->tcb.snd.rto_tw);
- }
}
static inline void
@@ -1532,7 +1618,7 @@ rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
/* restart RTO timer. */
if (s->tcb.snd.nxt != s->tcb.snd.una)
- timer_start(s);
+ timer_start(s, TIMER_RTO, s->tcb.snd.rto);
/* update rto, if fresh packet is here then calculate rtt */
if (tack->ts.ecr != 0)
@@ -1554,15 +1640,9 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
if (state != TCP_ST_SYN_SENT)
return -EINVAL;
- /*
- * RFC 793 3.9: in the SYN-SENT state
- * If SEG.ACK =< ISS, or SEG.ACK > SND.NXT, send a reset
- * <SEQ=SEG.ACK><CTL=RST>
- * and discard the segment.
- * The connection remains in the same state.
- */
+ /* invalid SEG.SEQ */
if (si->ack != (uint32_t)s->tcb.snd.nxt) {
- send_rst(s, si->ack);
+ rsp->flags = TCP_FLAG_RST;
return 0;
}
@@ -1574,18 +1654,25 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
s->tcb.snd.una = s->tcb.snd.nxt;
s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
+ if (s->tcb.so.ts.raw != 0) {
+ s->tcb.snd.mss -= TCP_TX_OPT_LEN_TMS;
+ }
s->tcb.snd.wnd = si->wnd << so.wscale;
s->tcb.snd.wu.wl1 = si->seq;
s->tcb.snd.wu.wl2 = si->ack;
s->tcb.snd.wscale = so.wscale;
+ s->tcb.snd.cork_ts = 0;
/* setup congestion variables */
s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
+ CWND_INFO("synack", s->tcb.snd.cwnd);
+
s->tcb.snd.ssthresh = s->tcb.snd.wnd;
s->tcb.rcv.ts = so.ts.val;
s->tcb.rcv.irs = si->seq;
s->tcb.rcv.nxt = si->seq + 1;
+ s->tcb.rcv.cpy = si->seq + 1;
/* if peer doesn't support WSCALE opt, recalculate RCV.WND */
s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
@@ -1597,9 +1684,14 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
rsp->flags |= TCP_FLAG_ACK;
- timer_stop(s);
+ timer_stop(s, TIMER_RTO);
+ s->tcb.snd.nb_retx = 0;
s->tcb.state = TCP_ST_ESTABLISHED;
rte_smp_wmb();
+ TCP_INC_STATS_ATOMIC(TCP_MIB_CURRESTAB);
+
+ if (s->s.option.keepalive)
+ timer_start(s, TIMER_KEEPALIVE, s->s.option.keepidle * MS_PER_S);
if (s->tx.ev != NULL)
tle_event_raise(s->tx.ev);
@@ -1689,8 +1781,8 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts,
* fast-path: all data & FIN was already sent out
* and now is acknowledged.
*/
- if (s->tcb.snd.fss == s->tcb.snd.nxt &&
- tack.ack == (uint32_t)s->tcb.snd.nxt)
+ if (s->tcb.snd.fss >= s->tcb.snd.nxt &&
+ tack.ack == (uint32_t)s->tcb.snd.fss)
rx_ackfin(s);
else
rx_process_ack(s, ts, &tack);
@@ -1702,27 +1794,44 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts,
* - received segment with INO data and no TX is scheduled
* for that stream.
*/
- if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
- (tack.segs.data != 0 &&
- rte_atomic32_read(&s->tx.arm) == 0))
+ if (tack.segs.badseq != 0 || tack.segs.ofo != 0)
+ rsp.flags |= TCP_FLAG_ACK;
+ else if (tack.segs.data != 0 &&
+ rte_atomic32_read(&s->tx.arm) == 0 &&
+ (s->s.option.tcpquickack ||
+ s->tcb.rcv.nxt - s->tcb.snd.ack > 8 * s->tcb.so.mss)) {
rsp.flags |= TCP_FLAG_ACK;
+ if (s->s.option.tcpquickack > 0)
+ s->s.option.tcpquickack--;
+ }
+ else if (tack.segs.data && rsp.flags == 0)
+ timer_start(s, TIMER_DACK, DELAY_ACK_CHECK_INTERVAL);
rx_ofo_fin(s, &rsp);
k += num - n;
i = num;
+ if (s->s.option.keepalive) {
+ s->tcb.snd.nb_keepalive = 0;
+ timer_reset(s, TIMER_KEEPALIVE, s->s.option.keepidle * MS_PER_S);
+ }
/* unhandled state, drop all packets. */
} else
i = 0;
/* we have a response packet to send. */
- if (rsp.flags != 0) {
+ if (rsp.flags == TCP_FLAG_RST) {
+ send_rst(s, si[i].ack);
+ stream_term(s);
+ } else if (rsp.flags != 0) {
send_ack(s, ts, rsp.flags);
/* start the timer for FIN packet */
- if ((rsp.flags & TCP_FLAG_FIN) != 0)
- timer_reset(s);
+ if ((rsp.flags & TCP_FLAG_FIN) != 0) {
+ timer_reset(s, TIMER_RTO, s->tcb.snd.rto);
+ s->tcb.snd.nb_retx = 0;
+ }
}
/* unprocessed packets */
@@ -1778,7 +1887,6 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
state = s->tcb.state;
if (state == TCP_ST_LISTEN) {
-
/* one connection per flow */
cs = NULL;
ret = -EINVAL;
@@ -1835,6 +1943,74 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
return num - k;
}
+static inline void
+sync_refuse(struct tle_tcp_stream *s, struct tle_dev *dev,
+ const union pkt_info *pi, struct rte_mbuf *m)
+{
+ struct ether_hdr *eth_h;
+ struct ether_addr eth_addr;
+ struct ipv4_hdr *ip_h;
+ uint32_t ip_addr;
+ struct ipv6_hdr *ipv6_h;
+ struct in6_addr ipv6_addr;
+ struct tcp_hdr *th;
+ uint16_t port;
+
+ /* rst pkt should not contain options for syn */
+ rte_pktmbuf_trim(m, m->l4_len - sizeof(*th));
+
+ eth_h = rte_pktmbuf_mtod(m, struct ether_hdr*);
+ ether_addr_copy(&eth_h->s_addr, &eth_addr);
+ ether_addr_copy(&eth_h->d_addr, &eth_h->s_addr);
+ ether_addr_copy(&eth_addr, &eth_h->d_addr);
+
+ th = rte_pktmbuf_mtod_offset(m, struct tcp_hdr*,
+ m->l2_len + m->l3_len);
+ port = th->src_port;
+ th->src_port = th->dst_port;
+ th->dst_port = port;
+ th->tcp_flags = TCP_FLAG_RST | TCP_FLAG_ACK;
+ th->recv_ack = rte_cpu_to_be_32(rte_be_to_cpu_32(th->sent_seq) + 1);
+ th->sent_seq = 0;
+ th->data_off &= 0x0f;
+ th->data_off |= (sizeof(*th) / 4) << 4;
+ th->cksum = 0;
+
+ if (pi->tf.type == TLE_V4) {
+ ip_h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr*,
+ m->l2_len);
+ ip_addr = ip_h->src_addr;
+ ip_h->src_addr = ip_h->dst_addr;
+ ip_h->dst_addr = ip_addr;
+ ip_h->total_length = rte_cpu_to_be_16(
+ rte_be_to_cpu_16(ip_h->total_length) -
+ (m->l4_len - sizeof(*th)));
+ ip_h->hdr_checksum = 0;
+ th->cksum = rte_ipv4_udptcp_cksum(ip_h, th);
+ ip_h->hdr_checksum = rte_ipv4_cksum(ip_h);
+ } else {
+ ipv6_h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr*,
+ m->l2_len);
+ rte_memcpy(&ipv6_addr, ipv6_h->src_addr,
+ sizeof(struct in6_addr));
+ rte_memcpy(ipv6_h->src_addr, ipv6_h->dst_addr,
+ sizeof(struct in6_addr));
+ rte_memcpy(ipv6_h->dst_addr, &ipv6_addr,
+ sizeof(struct in6_addr));
+ ipv6_h->payload_len = rte_cpu_to_be_16(
+ rte_be_to_cpu_16(ipv6_h->payload_len) -
+ (m->l4_len - sizeof(*th)));
+ th->cksum = rte_ipv6_udptcp_cksum(ipv6_h, th);
+ }
+
+ if (m->pkt_len < ETHER_MIN_LEN)
+ rte_pktmbuf_append(m, ETHER_MIN_LEN - m->pkt_len);
+
+ if (send_pkt(s, dev, m) != 0)
+ rte_pktmbuf_free(m);
+ else
+ TCP_INC_STATS(TCP_MIB_OUTRSTS);
+}
static inline uint32_t
rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
@@ -1846,20 +2022,35 @@ rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
uint32_t i, k;
int32_t ret;
- s = rx_obtain_listen_stream(dev, &pi[0], type);
+ s = rx_obtain_listen_stream(dev, &pi[0], type, 0);
if (s == NULL) {
- for (i = 0; i != num; i++) {
- rc[i] = ENOENT;
- rp[i] = mb[i];
+ /* no socket listening this syn, send rst to refuse connect */
+ s = TCP_STREAM(get_stream(dev->ctx));
+ if (s != NULL) {
+ sync_refuse(s, dev, &pi[0], mb[0]);
+ put_stream(dev->ctx, &s->s, 0);
+ i = 1;
+ } else {
+ i = 0;
}
- return 0;
+ k = 0;
+ for (; i != num; i++) {
+ rc[k] = ENOENT;
+ rp[k] = mb[i];
+ k++;
+ }
+ return num - k;
}
k = 0;
for (i = 0; i != num; i++) {
-
+ /* check if stream has space to maintain new connection */
+ if (rte_ring_free_count(s->rx.q) == 0 ||
+ (s->s.ctx->streams.nb_free == 0 &&
+ s->s.ctx->streams.nb_cur >= s->s.ctx->prm.max_streams - 1))
+ ret = -ENOSPC;
/* check that this remote is allowed to connect */
- if (rx_check_stream(s, &pi[i]) != 0)
+ else if (rx_check_stream(s, &pi[i]) != 0)
ret = -ENOENT;
else
/* syncokie: reply with <SYN,ACK> */
@@ -1882,43 +2073,34 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
{
struct stbl *st;
struct tle_ctx *ctx;
- uint32_t i, j, k, mt, n, t, ts;
+ uint32_t i, j, k, n, t;
+ uint64_t ts;
union pkt_info pi[num];
union seg_info si[num];
- union {
- uint8_t t[TLE_VNUM];
- uint32_t raw;
- } stu;
+
+ TCP_ADD_STATS(TCP_MIB_INSEGS, num);
ctx = dev->ctx;
ts = tcp_get_tms(ctx->cycles_ms_shift);
st = CTX_TCP_STLB(ctx);
- mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
-
- stu.raw = 0;
/* extract packet info and check the L3/L4 csums */
for (i = 0; i != num; i++) {
get_pkt_info(pkt[i], &pi[i], &si[i]);
-
t = pi[i].tf.type;
- pi[i].csf = check_pkt_csum(pkt[i], pi[i].csf, t, IPPROTO_TCP);
- stu.t[t] = mt;
+ pi[i].csf = check_pkt_csum(pkt[i], t, IPPROTO_TCP);
}
- if (stu.t[TLE_V4] != 0)
- stbl_lock(st, TLE_V4);
- if (stu.t[TLE_V6] != 0)
- stbl_lock(st, TLE_V6);
-
k = 0;
for (i = 0; i != num; i += j) {
-
t = pi[i].tf.type;
/*basic checks for incoming packet */
- if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
+ if (t >= TLE_VNUM || pi[i].csf != 0) {
+ TCP_INC_STATS(TCP_MIB_INERRS);
+ if (t < TLE_VNUM)
+ TCP_INC_STATS(TCP_MIB_CSUMERRORS);
rc[k] = EINVAL;
rp[k] = pkt[i];
j = 1;
@@ -1937,11 +2119,6 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
}
}
- if (stu.t[TLE_V4] != 0)
- stbl_unlock(st, TLE_V4);
- if (stu.t[TLE_V6] != 0)
- stbl_unlock(st, TLE_V6);
-
return num - k;
}
@@ -1953,21 +2130,37 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
struct tle_tcp_stream *s;
s = TCP_STREAM(ts);
- n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
- if (n == 0)
- return 0;
- /*
- * if we still have packets to read,
- * then rearm stream RX event.
- */
- if (n == num && rte_ring_count(s->rx.q) != 0) {
- if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
- tle_event_raise(s->rx.ev);
+ if (tcp_stream_try_acquire(s) > 0) {
+ if (s->tcb.state != TCP_ST_LISTEN) {
+ tcp_stream_release(s);
+ rte_errno = EINVAL;
+ return 0;
+ }
+
+ n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
+ if (n == 0)
+ {
+ tcp_stream_release(s);
+ rte_errno = EAGAIN;
+ return 0;
+ }
+
+ /*
+ * if we still have packets to read,
+ * then rearm stream RX event.
+ */
+ if (n == num && rte_ring_count(s->rx.q) != 0) {
+ if (s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ }
+ tcp_stream_release(s);
+ return n;
+ } else {
tcp_stream_release(s);
+ rte_errno = EINVAL;
+ return 0;
}
-
- return n;
}
uint16_t
@@ -1995,6 +2188,7 @@ tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
stream_drb_free(s, drb + i, j - i);
}
+ TCP_ADD_STATS(TCP_MIB_OUTSEGS, n);
return n;
}
@@ -2010,73 +2204,17 @@ stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
pi->tf.type = s->s.type;
}
-static int
-stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
-{
- const struct sockaddr_in *in4;
- const struct sockaddr_in6 *in6;
- const struct tle_dev_param *prm;
- int32_t rc;
-
- rc = 0;
- s->s.pmsk.raw = UINT32_MAX;
-
- /* setup L4 src ports and src address fields. */
- if (s->s.type == TLE_V4) {
- in4 = (const struct sockaddr_in *)addr;
- if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
- return -EINVAL;
-
- s->s.port.src = in4->sin_port;
- s->s.ipv4.addr.src = in4->sin_addr.s_addr;
- s->s.ipv4.mask.src = INADDR_NONE;
- s->s.ipv4.mask.dst = INADDR_NONE;
-
- } else if (s->s.type == TLE_V6) {
- in6 = (const struct sockaddr_in6 *)addr;
- if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
- sizeof(tle_ipv6_any)) == 0 ||
- in6->sin6_port == 0)
- return -EINVAL;
-
- s->s.port.src = in6->sin6_port;
- rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
- sizeof(s->s.ipv6.addr.src));
- rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
- sizeof(s->s.ipv6.mask.src));
- rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
- sizeof(s->s.ipv6.mask.dst));
- }
-
- /* setup the destination device. */
- rc = stream_fill_dest(s);
- if (rc != 0)
- return rc;
-
- /* setup L4 dst address from device param */
- prm = &s->tx.dst.dev->prm;
- if (s->s.type == TLE_V4) {
- if (s->s.ipv4.addr.dst == INADDR_ANY)
- s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
- } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
- sizeof(tle_ipv6_any)) == 0)
- memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
- sizeof(s->s.ipv6.addr.dst));
-
- return rc;
-}
-
static inline int
-tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
+tx_syn(struct tle_tcp_stream *s)
{
int32_t rc;
- uint32_t tms, seq;
+ uint32_t seq;
+ uint64_t tms;
union pkt_info pi;
struct stbl *st;
struct stbl_entry *se;
- /* fill stream address */
- rc = stream_fill_addr(s, addr);
+ rc = stream_fill_dest(s);
if (rc != 0)
return rc;
@@ -2107,7 +2245,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
/* add the stream in stream table */
st = CTX_TCP_STLB(s->s.ctx);
- se = stbl_add_stream_lock(st, s);
+ se = stbl_add_stream(st, &s->s);
if (se == NULL)
return -ENOBUFS;
s->ste = se;
@@ -2115,6 +2253,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
/* put stream into the to-send queue */
txs_enqueue(s->s.ctx, s);
+ TCP_INC_STATS(TCP_MIB_ACTIVEOPENS);
return 0;
}
@@ -2147,7 +2286,7 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
/* fill stream, prepare and transmit syn pkt */
s->tcb.uop |= TCP_OP_CONNECT;
- rc = tx_syn(s, addr);
+ rc = tx_syn(s);
tcp_stream_release(s);
/* error happened, do a cleanup */
@@ -2160,13 +2299,29 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
uint16_t
tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
{
- uint32_t n;
+ uint32_t n, i;
+ uint32_t free_slots;
struct tle_tcp_stream *s;
s = TCP_STREAM(ts);
+
+ free_slots = rte_ring_free_count(s->rx.q);
+
n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
- if (n == 0)
+ if (n == 0) {
+ if (s->tcb.err != 0) {
+ rte_errno = s->tcb.err;
+ } else {
+ rte_errno = EAGAIN;
+ }
return 0;
+ }
+
+ for (i = 0; i < n; ++i)
+ s->tcb.rcv.cpy += rte_pktmbuf_pkt_len(pkt[i]);
+
+ /* update receive window with left recv buffer*/
+ s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
/*
* if we still have packets to read,
@@ -2176,28 +2331,99 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
tle_event_raise(s->rx.ev);
tcp_stream_release(s);
+ /* if we have received fin, no more data will come, raise err event. */
+ } else if (s->tcb.rcv.frs.on == 2) {
+ if (tcp_stream_try_acquire(s) > 0 && s->err.ev != NULL)
+ tle_event_raise(s->err.ev);
+ tcp_stream_release(s);
+ }
+
+ /* update recv win to the remote */
+ if (free_slots < RECV_WIN_NOTIFY_THRESH &&
+ rte_ring_free_count(s->rx.q) >= RECV_WIN_NOTIFY_THRESH) {
+ s->tcb.snd.update_rcv = true;
+ txs_enqueue(s->s.ctx, s);
}
return n;
}
+uint16_t
+tle_tcp_stream_inq(struct tle_stream *ts)
+{
+ struct tle_tcp_stream *s;
+
+ s = TCP_STREAM(ts);
+ return s->tcb.rcv.nxt - s->tcb.rcv.cpy;
+}
+
+#define DECONST(type, var) ((type)(uintptr_t)(const void *)(var))
+
+ssize_t
+tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, int iovcnt)
+{
+ struct msghdr msg = {0};
+
+ msg.msg_iov = DECONST(struct iovec *, iov); /* Recover const later */
+ msg.msg_iovlen = iovcnt;
+ return tle_tcp_stream_recvmsg(ts, &msg);
+}
+
ssize_t
-tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
- int iovcnt)
+tle_tcp_stream_recvmsg(struct tle_stream *ts, struct msghdr *msg)
{
+ size_t sz;
int32_t i;
uint32_t mn, n, tn;
- size_t sz;
+ uint32_t free_slots;
struct tle_tcp_stream *s;
struct iovec iv;
struct rxq_objs mo[2];
+ struct sockaddr_in *addr;
+ struct sockaddr_in6 *addr6;
+ const struct iovec *iov = msg->msg_iov;
+ int iovcnt = msg->msg_iovlen;
s = TCP_STREAM(ts);
+ free_slots = rte_ring_free_count(s->rx.q);
+
/* get group of packets */
mn = tcp_rxq_get_objs(s, mo);
- if (mn == 0)
- return 0;
+ if (mn == 0) {
+ if (s->tcb.err != 0)
+ rte_errno = s->tcb.err;
+ else
+ rte_errno = EAGAIN;
+ return -1;
+ }
+
+ if (!ts->option.timestamp)
+ ts->timestamp = mo[0].mb[0]->timestamp;
+
+ if (msg->msg_control != NULL) {
+ if (ts->option.timestamp)
+ tle_set_timestamp(msg, mo[0].mb[0]);
+ else
+ msg->msg_controllen = 0;
+ }
+
+ if (msg->msg_name != NULL) {
+ if (s->s.type == TLE_V4) {
+ addr = (struct sockaddr_in*)msg->msg_name;
+ addr->sin_family = AF_INET;
+ addr->sin_addr.s_addr = s->s.ipv4.addr.src;
+ addr->sin_port = s->s.port.src;
+ msg->msg_namelen = sizeof(struct sockaddr_in);
+ } else {
+ addr6 = (struct sockaddr_in6*)msg->msg_name;
+ addr6->sin6_family = AF_INET6;
+ rte_memcpy(&addr6->sin6_addr, &s->s.ipv6.addr.src,
+ sizeof(struct sockaddr_in6));
+ addr6->sin6_port = s->s.port.src;
+ msg->msg_namelen = sizeof(struct sockaddr_in6);
+ }
+ }
sz = 0;
n = 0;
@@ -2229,6 +2455,8 @@ tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
}
tcp_rxq_consume(s, tn);
+ /* update receive window with left recv buffer*/
+ s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
/*
* if we still have packets to read,
@@ -2238,6 +2466,20 @@ tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
tle_event_raise(s->rx.ev);
tcp_stream_release(s);
+ /* if we have received fin, no more data will come, raise err event. */
+ } else if (s->tcb.rcv.frs.on == 2) {
+ if (tcp_stream_try_acquire(s) > 0 && s->err.ev != NULL)
+ tle_event_raise(s->err.ev);
+ tcp_stream_release(s);
+ }
+
+ s->tcb.rcv.cpy += sz;
+
+ /* update recv win to the remote */
+ if (free_slots < RECV_WIN_NOTIFY_THRESH &&
+ rte_ring_free_count(s->rx.q) >= RECV_WIN_NOTIFY_THRESH) {
+ s->tcb.snd.update_rcv = true;
+ txs_enqueue(s->s.ctx, s);
}
return sz;
@@ -2263,48 +2505,35 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
if (i == num) {
/* queue packets for further transmission. */
rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
- if (rc != 0)
+ if (rc != 0) {
+ rc = -EAGAIN;
free_mbufs(segs, num);
+ }
}
return rc;
}
-uint16_t
-tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
+static inline uint16_t
+stream_send(struct tle_tcp_stream *s, struct rte_mbuf *pkt[],
+ uint16_t num, uint16_t mss, uint64_t ol_flags)
{
- uint32_t i, j, k, mss, n, state;
+ uint16_t i, j, k;
int32_t rc;
- uint64_t ol_flags;
- struct tle_tcp_stream *s;
+ uint32_t n, free_slots;
struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
-
- s = TCP_STREAM(ts);
-
- /* mark stream as not closable. */
- if (tcp_stream_acquire(s) < 0) {
- rte_errno = EAGAIN;
- return 0;
- }
-
- state = s->tcb.state;
- if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
- rte_errno = ENOTCONN;
- tcp_stream_release(s);
- return 0;
- }
-
- mss = s->tcb.snd.mss;
- ol_flags = s->tx.dst.ol_flags;
+ int32_t pkt_len;
k = 0;
rc = 0;
+ pkt_len = 0;
while (k != num) {
/* prepare and check for TX */
for (i = k; i != num; i++) {
if (pkt[i]->pkt_len > mss ||
pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
break;
+ pkt_len += pkt[i]->pkt_len;
rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
s->s.port, 0, TCP_FLAG_ACK, 0, 0);
if (rc != 0)
@@ -2328,6 +2557,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
pkt[j]->l3_len +
pkt[j]->l4_len);
pkt[j]->ol_flags &= ol_flags;
+ pkt_len -= pkt[j]->pkt_len;
}
break;
}
@@ -2339,8 +2569,10 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
/* segment large packet and enqueue for sending */
} else if (i != num) {
+ free_slots = rte_ring_free_count(s->tx.q);
+ free_slots = RTE_MIN(free_slots, RTE_DIM(segs));
/* segment the packet. */
- rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
+ rc = tcp_segmentation(pkt[i], segs, free_slots,
&s->tx.dst, mss);
if (rc < 0) {
rte_errno = -rc;
@@ -2351,19 +2583,161 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
if (rc == 0) {
/* free the large mbuf */
rte_pktmbuf_free(pkt[i]);
+ pkt_len += pkt[i]->pkt_len;
/* set the mbuf as consumed */
k++;
- } else
+ } else {
/* no space left in tx queue */
+ RTE_VERIFY(0);
break;
+ }
}
}
+ s->tcb.snd.waitlen += pkt_len;
+ return k;
+}
+
+static inline uint16_t
+stream_send_tso(struct tle_tcp_stream *s, struct rte_mbuf *pkt[],
+ uint16_t num, uint16_t mss, uint64_t ol_flags)
+{
+ uint16_t i, k, nb_segs;
+ int32_t rc, pkt_len;
+ uint64_t ol_flags1;
+ struct rte_mbuf *pre_tail;
+
+ k = 0;
+ rc = 0;
+ while (k != num) {
+ /* Make sure there is at least one slot available */
+ if (rte_ring_free_count(s->tx.q) == 0)
+ break;
+
+ /* prepare and check for TX */
+ nb_segs = 0;
+ pkt_len = 0;
+ pre_tail = NULL;
+ for (i = k; i != num; i++) {
+ if (pkt[i]->nb_segs != 1)
+ rte_panic("chained mbuf: %p\n", pkt[i]);
+ /* We shall consider cwnd and snd wnd when limit len */
+ if (nb_segs + pkt[i]->nb_segs <= TCP_MAX_PKT_SEG &&
+ pkt_len + pkt[i]->pkt_len <= 65535 - RESERVE_HEADER_LEN) {
+ nb_segs += pkt[i]->nb_segs;
+ pkt_len += pkt[i]->pkt_len;
+ if (pre_tail)
+ pre_tail->next = pkt[i];
+ pre_tail = rte_pktmbuf_lastseg(pkt[i]);
+ } else {
+ /* enqueue this one now */
+ break;
+ }
+ }
+
+ if (unlikely(i == k)) {
+ /* pkt[k] is a too big packet, now we fall back to
+ * non-tso send; we can optimize it later by
+ * splitting the mbuf.
+ */
+ if (stream_send(s, &pkt[k], 1, mss, ol_flags) == 1) {
+ k++;
+ continue;
+ } else
+ break;
+ }
+
+ pkt[k]->nb_segs = nb_segs;
+ pkt[k]->pkt_len = pkt_len;
+
+ ol_flags1 = ol_flags;
+ if (pkt_len > mss)
+ ol_flags1 |= PKT_TX_TCP_SEG;
+
+ rc = tcp_fill_mbuf(pkt[k], s, &s->tx.dst, ol_flags1,
+ s->s.port, 0, TCP_FLAG_ACK, 0, 0);
+ if (rc != 0) /* hard to recover */
+ rte_panic("failed to fill mbuf: %p\n", pkt[k]);
+
+ /* correct mss */
+ pkt[k]->tso_segsz = mss;
+
+ s->tcb.snd.waitlen += pkt_len;
+ /* We already make sure there is at least one slot */
+ if (_rte_ring_enqueue_burst(s->tx.q, (void **)pkt + k, 1) < 1)
+ RTE_VERIFY(0);
+
+ k = i;
+ }
+
+ return k;
+}
+
+uint16_t
+tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
+{
+ uint16_t k, mss, state;
+ uint64_t ol_flags;
+ struct tle_tcp_stream *s;
+
+ s = TCP_STREAM(ts);
+
+ if (s->tcb.err != 0) {
+ rte_errno = s->tcb.err;
+ return 0;
+ }
+
+ /* mark stream as not closable. */
+ if (tcp_stream_acquire(s) < 0) {
+ rte_errno = EAGAIN;
+ return 0;
+ }
+
+ state = s->tcb.state;
+ switch (state) {
+ case TCP_ST_ESTABLISHED:
+ case TCP_ST_CLOSE_WAIT:
+ break;
+ case TCP_ST_FIN_WAIT_1:
+ case TCP_ST_FIN_WAIT_2:
+ case TCP_ST_CLOSING:
+ case TCP_ST_LAST_ACK:
+ rte_errno = EPIPE;
+ tcp_stream_release(s);
+ return 0;
+ default:
+ rte_errno = ENOTCONN;
+ tcp_stream_release(s);
+ return 0;
+ }
+
+ mss = s->tcb.snd.mss;
+
+ ol_flags = s->tx.dst.ol_flags;
+
+ /* Some reference number on the case:
+ * "<netperf with uss> - tap - <kernel stack> - <netserver>"
+ * ~2Gbps with tso disabled;
+ * ~16Gbps with tso enabled.
+ */
+ if (rte_ring_free_count(s->tx.q) == 0) {
+ /* Block send may try without waiting for tx event (raised by acked
+ * data), so here we will still put this stream for further process
+ */
+ txs_enqueue(s->s.ctx, s);
+ rte_errno = EAGAIN;
+ k = 0;
+ } else if (s->tx.dst.dev->prm.tx_offload & DEV_TX_OFFLOAD_TCP_TSO)
+ k = stream_send_tso(s, pkt, num, mss, ol_flags);
+ else
+ k = stream_send(s, pkt, num, mss, ol_flags);
+
/* notify BE about more data to send */
if (k != 0)
txs_enqueue(s->s.ctx, s);
+
/* if possible, re-arm stream write event. */
- if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
+ if (rte_ring_free_count(s->tx.q) && s->tx.ev != NULL && k == num)
tle_event_raise(s->tx.ev);
tcp_stream_release(s);
@@ -2382,9 +2756,15 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
struct tle_tcp_stream *s;
struct iovec iv;
struct rte_mbuf *mb[2 * MAX_PKT_BURST];
+ uint16_t mss;
s = TCP_STREAM(ts);
+ if (s->tcb.err != 0) {
+ rte_errno = s->tcb.err;
+ return -1;
+ }
+
/* mark stream as not closable. */
if (tcp_stream_acquire(s) < 0) {
rte_errno = EAGAIN;
@@ -2392,7 +2772,18 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
}
state = s->tcb.state;
- if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
+ switch (state) {
+ case TCP_ST_ESTABLISHED:
+ case TCP_ST_CLOSE_WAIT:
+ break;
+ case TCP_ST_FIN_WAIT_1:
+ case TCP_ST_FIN_WAIT_2:
+ case TCP_ST_CLOSING:
+ case TCP_ST_LAST_ACK:
+ rte_errno = EPIPE;
+ tcp_stream_release(s);
+ return -1;
+ default:
rte_errno = ENOTCONN;
tcp_stream_release(s);
return -1;
@@ -2403,11 +2794,24 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
for (i = 0; i != iovcnt; i++)
tsz += iov[i].iov_len;
+ if (tsz == 0) {
+ tcp_stream_release(s);
+ return 0;
+ }
+
slen = rte_pktmbuf_data_room_size(mp);
- slen = RTE_MIN(slen, s->tcb.snd.mss);
+ mss = s->tcb.snd.mss;
+
+ slen = RTE_MIN(slen, mss);
num = (tsz + slen - 1) / slen;
n = rte_ring_free_count(s->tx.q);
+
+ if (n == 0) {
+ tcp_stream_release(s);
+ return 0;
+ }
+
num = RTE_MIN(num, n);
n = RTE_MIN(num, RTE_DIM(mb));
@@ -2451,7 +2855,6 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
k = 0;
if (k != j) {
-
/* free pkts that were not enqueued */
free_mbufs(mb + k, j - k);
@@ -2466,14 +2869,16 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
}
}
- if (k != 0) {
-
+ if (k != 0) {
/* notify BE about more data to send */
txs_enqueue(s->s.ctx, s);
/* if possible, re-arm stream write event. */
if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
tle_event_raise(s->tx.ev);
+ } else {
+ rte_errno = EAGAIN;
+ sz = -1;
}
tcp_stream_release(s);
@@ -2485,7 +2890,7 @@ static inline void
tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
{
/* try to send some data */
- tx_nxt_data(s, tms);
+ uint32_t tn = tx_nxt_data(s, tms);
/* we also have to send a FIN */
if (state != TCP_ST_ESTABLISHED &&
@@ -2495,6 +2900,13 @@ tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
s->tcb.snd.fss = ++s->tcb.snd.nxt;
send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
}
+
+ if (s->tcb.snd.update_rcv) {
+ if (tn == 0)
+ send_ack(s, tms, TCP_FLAG_ACK); /* update recv window */
+
+ s->tcb.snd.update_rcv = false;
+ }
}
static inline void
@@ -2507,7 +2919,7 @@ tx_stream(struct tle_tcp_stream *s, uint32_t tms)
if (state == TCP_ST_SYN_SENT) {
/* send the SYN, start the rto timer */
send_ack(s, tms, TCP_FLAG_SYN);
- timer_start(s);
+ timer_start(s, TIMER_RTO, s->tcb.snd.rto);
} else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
@@ -2515,7 +2927,7 @@ tx_stream(struct tle_tcp_stream *s, uint32_t tms)
/* start RTO timer. */
if (s->tcb.snd.nxt != s->tcb.snd.una)
- timer_start(s);
+ timer_start(s, TIMER_RTO, s->tcb.snd.rto);
}
}
@@ -2544,7 +2956,6 @@ rto_stream(struct tle_tcp_stream *s, uint32_t tms)
if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
-
/* update SND.CWD and SND.SSTHRESH */
rto_cwnd_update(&s->tcb);
@@ -2570,50 +2981,131 @@ rto_stream(struct tle_tcp_stream *s, uint32_t tms)
* than one SYN or SYN/ACK retransmissions or true loss
* detection has been made.
*/
- if (s->tcb.snd.nb_retx != 0)
+ if (s->tcb.snd.nb_retx != 0) {
s->tcb.snd.cwnd = s->tcb.snd.mss;
+ CWND_INFO("synsent", s->tcb.snd.cwnd);
+ }
send_ack(s, tms, TCP_FLAG_SYN);
-
- } else if (state == TCP_ST_TIME_WAIT) {
- stream_term(s);
+ TCP_INC_STATS(TCP_MIB_RETRANSSEGS);
}
/* RFC6298:5.5 back off the timer */
s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
s->tcb.snd.nb_retx++;
- timer_restart(s);
+ timer_restart(s, TIMER_RTO, s->tcb.snd.rto);
} else {
- send_rst(s, s->tcb.snd.nxt);
+ if (state == TCP_ST_SYN_SENT) {
+ if (stream_fill_dest(s) != 0 ||
+ is_broadcast_ether_addr((struct ether_addr *)s->tx.dst.hdr))
+ s->tcb.err = EHOSTUNREACH;
+ else
+ /* TODO: do we send rst on this */
+ s->tcb.err = ENOTCONN;
+ } else
+ send_rst(s, s->tcb.snd.una);
stream_term(s);
}
}
+static inline void
+set_keepalive_timer(struct tle_tcp_stream *s)
+{
+ if (s->s.option.keepalive) {
+ if (s->tcb.state == TCP_ST_ESTABLISHED) {
+ if (s->tcb.snd.nb_keepalive == 0)
+ timer_reset(s, TIMER_KEEPALIVE,
+ s->s.option.keepidle * MS_PER_S);
+ else
+ timer_reset(s, TIMER_KEEPALIVE,
+ s->s.option.keepintvl * MS_PER_S);
+ }
+ } else {
+ timer_stop(s, TIMER_KEEPALIVE);
+ s->tcb.snd.nb_keepalive = 0;
+ }
+}
+
int
tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
{
- uint32_t i, k, tms;
+ uint8_t type;
+ uint32_t i, k;
+ uint64_t tms;
struct sdr *dr;
struct tle_timer_wheel *tw;
struct tle_stream *p;
struct tle_tcp_stream *s, *rs[num];
- /* process streams with RTO exipred */
+ tms = tcp_get_tms(ctx->cycles_ms_shift);
+ /* process streams with RTO exipred */
tw = CTX_TCP_TMWHL(ctx);
- tms = tcp_get_tms(ctx->cycles_ms_shift);
tle_timer_expire(tw, tms);
k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
for (i = 0; i != k; i++) {
-
- s = rs[i];
- s->timer.handle = NULL;
- if (tcp_stream_try_acquire(s) > 0)
- rto_stream(s, tms);
- tcp_stream_release(s);
+ s = timer_stream(rs[i]);
+ type = timer_type(rs[i]);
+ s->timer.handle[type] = NULL;
+
+ switch (type) {
+ case TIMER_RTO:
+ /* FE cannot change stream into below states,
+ * that's why we don't put it into lock
+ */
+ if (s->tcb.state == TCP_ST_TIME_WAIT ||
+ s->tcb.state == TCP_ST_FIN_WAIT_2) {
+ tcp_stream_down(s);
+ stream_term(s);
+ tcp_stream_up(s);
+ } else if (tcp_stream_acquire(s) > 0) {
+ /*
+ * stream may be closed in frontend concurrently.
+ * if stream has already been closed, it need not
+ * to retransmit anymore.
+ */
+ if (s->tcb.state != TCP_ST_CLOSED)
+ rto_stream(s, tms);
+ tcp_stream_release(s);
+ }
+ /* Fail to aquire lock? FE is shutdown or close this
+ * stream, either FIN or RST needs to be sent, which
+ * means it's in tsq, will be processed later.
+ */
+ break;
+ case TIMER_DACK:
+ if (rte_atomic32_read(&s->tx.arm) == 0 &&
+ s->tcb.rcv.nxt != s->tcb.snd.ack &&
+ tcp_stream_acquire(s) > 0) {
+ s->s.option.tcpquickack = 8;
+ send_ack(s, tms, TCP_FLAG_ACK);
+ tcp_stream_release(s);
+ }
+ break;
+ case TIMER_KEEPALIVE:
+ if (s->tcb.snd.nb_keepalive < s->s.option.keepcnt) {
+ if (tcp_stream_try_acquire(s) > 0 &&
+ s->tcb.state == TCP_ST_ESTABLISHED) {
+ send_keepalive(s);
+ s->tcb.snd.nb_keepalive++;
+ timer_start(s, TIMER_KEEPALIVE,
+ s->s.option.keepintvl * MS_PER_S);
+ }
+ tcp_stream_release(s);
+ } else {
+ tcp_stream_down(s);
+ send_rst(s, s->tcb.snd.nxt);
+ s->tcb.err = ETIMEDOUT;
+ stream_term(s);
+ tcp_stream_up(s);
+ }
+ break;
+ default:
+ rte_panic("Invalid timer type: %d\n", type);
+ }
}
/* process streams from to-send queue */
@@ -2621,20 +3113,63 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
for (i = 0; i != k; i++) {
-
s = rs[i];
- rte_atomic32_set(&s->tx.arm, 0);
- if (tcp_stream_try_acquire(s) > 0)
+ if (s->tcb.uop & TCP_OP_RESET) {
+ /* already put into death row in close() */
+ send_rst(s, s->tcb.snd.nxt);
+ continue;
+ }
+
+ if (tcp_stream_acquire(s) > 0) {
+ if (s->tcb.uop & TCP_OP_KEEPALIVE) {
+ s->tcb.uop &= ~TCP_OP_KEEPALIVE;
+ set_keepalive_timer(s);
+ }
+
+ if (s->tcb.state == TCP_ST_FIN_WAIT_2 &&
+ s->tcb.uop & TCP_OP_CLOSE) {
+ /* This could happen after:
+ * 1) shutdown;
+ * 2) FIN sent;
+ * 3) ack received;
+ * 4) close;
+ */
+ timer_start(s, TIMER_RTO, s->tcb.snd.rto_fw);
+ tcp_stream_release(s);
+ continue;
+ }
+
+ if (s->tcb.state == TCP_ST_ESTABLISHED &&
+ s->s.option.tcpcork) {
+ if (s->tcb.snd.cork_ts == 0)
+ s->tcb.snd.cork_ts = (uint32_t)tms;
+
+ if (s->tcb.snd.waitlen < s->tcb.snd.mss &&
+ (uint32_t)tms - s->tcb.snd.cork_ts < 200) {
+ txs_enqueue(s->s.ctx, s);
+ tcp_stream_release(s);
+ continue;
+ }
+
+ s->tcb.snd.cork_ts = 0;
+ }
+
tx_stream(s, tms);
- else
+ tcp_stream_release(s);
+ continue;
+ }
+
+ if (s->tcb.state != TCP_ST_CLOSED)
txs_enqueue(s->s.ctx, s);
- tcp_stream_release(s);
+
+ /* TCP_ST_CLOSED? See close with TCP_ST_CLOSED state */
}
/* collect streams to close from the death row */
dr = CTX_TCP_SDR(ctx);
+ rte_spinlock_lock(&dr->lock);
for (k = 0, p = STAILQ_FIRST(&dr->be);
k != num && p != NULL;
k++, p = STAILQ_NEXT(p, link))
@@ -2645,9 +3180,21 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
else
STAILQ_FIRST(&dr->be) = p;
+ /* if stream still in tsq, wait one more round */
+ for (i = 0; i != k; i++) {
+ if (rte_atomic32_read(&rs[i]->tx.arm) > 0) {
+ STAILQ_INSERT_TAIL(&dr->be, &rs[i]->s, link);
+ rs[i] = NULL;
+ }
+ }
+
+ rte_spinlock_unlock(&dr->lock);
+
/* cleanup closed streams */
for (i = 0; i != k; i++) {
s = rs[i];
+ if (s == NULL)
+ continue;
tcp_stream_down(s);
tcp_stream_reset(ctx, s);
}
diff --git a/lib/libtle_l4p/tcp_rxtx.h b/lib/libtle_l4p/tcp_rxtx.h
new file mode 100644
index 0000000..e7f8e3e
--- /dev/null
+++ b/lib/libtle_l4p/tcp_rxtx.h
@@ -0,0 +1,252 @@
+/*
+ * Copyright (c) 2016-2017 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _TCP_RXTX_H_
+#define _TCP_RXTX_H_
+
+#include "tcp_stream.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+static inline uint32_t
+calc_seg_cnt(uint32_t plen, uint32_t mss)
+{
+ if (plen > mss)
+ return (plen + mss - 1) / mss;
+ else
+ return 1;
+}
+
+static inline uint32_t
+get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
+{
+ uint32_t pid;
+ rte_atomic32_t *pa;
+
+ pa = &dev->tx.packet_id[type];
+
+ if (st == 0) {
+ pid = rte_atomic32_add_return(pa, num);
+ return pid - num;
+ } else {
+ pid = rte_atomic32_read(pa);
+ rte_atomic32_set(pa, pid + num);
+ return pid;
+ }
+}
+
+static inline void
+fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
+ uint32_t seq, uint8_t hlen, uint8_t flags)
+{
+ uint16_t wnd;
+
+ l4h->src_port = port.dst;
+ l4h->dst_port = port.src;
+
+ wnd = (flags & TCP_FLAG_SYN) ?
+ RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
+ tcb->rcv.wnd >> tcb->rcv.wscale;
+
+ /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
+ l4h->sent_seq = rte_cpu_to_be_32(seq);
+ l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
+ l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
+ l4h->tcp_flags = flags;
+ l4h->rx_win = rte_cpu_to_be_16(wnd);
+ l4h->cksum = 0;
+ l4h->tcp_urp = 0;
+
+ if (flags & TCP_FLAG_SYN)
+ fill_syn_opts(l4h + 1, &tcb->so);
+ else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
+ fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
+}
+
+static inline int
+tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
+ const struct tle_dest *dst, uint64_t ol_flags,
+ union l4_ports port, uint32_t seq, uint32_t flags,
+ uint32_t pid, uint32_t swcsm)
+{
+ uint32_t l4, len, plen;
+ struct tcp_hdr *l4h;
+ char *l2h, *l3;
+
+ len = dst->l2_len + dst->l3_len;
+ plen = m->pkt_len;
+
+ if (flags & TCP_FLAG_SYN) {
+ /* basic length */
+ l4 = sizeof(*l4h) + TCP_OPT_LEN_MSS;
+
+ /* add wscale space and nop */
+ if (s->tcb.so.wscale) {
+ l4 += TCP_OPT_LEN_WSC + TCP_OPT_LEN_NOP;
+ }
+
+ /* add timestamp space and nop */
+ if (s->tcb.so.ts.raw) {
+ l4 += TCP_TX_OPT_LEN_TMS;
+ }
+ } else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0) {
+ l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
+ } else {
+ l4 = sizeof(*l4h);
+ }
+
+ /* adjust mbuf to put L2/L3/L4 headers into it. */
+ l2h = rte_pktmbuf_prepend(m, len + l4);
+ if (l2h == NULL)
+ return -EINVAL;
+
+ /* copy L2/L3 header */
+ rte_memcpy(l2h, dst->hdr, len);
+
+ /* setup TCP header & options */
+ l4h = (struct tcp_hdr *)(l2h + len);
+ fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
+
+ /* setup mbuf TX offload related fields. */
+ m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
+ m->ol_flags |= ol_flags;
+
+ /* update proto specific fields. */
+
+ l3 = l2h + dst->l2_len;
+ if (((struct ipv4_hdr*)l3)->version_ihl>>4 == 4) {
+ struct ipv4_hdr *l3h;
+ l3h = (struct ipv4_hdr *)l3;
+ l3h->packet_id = rte_cpu_to_be_16(pid);
+ l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
+
+ if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
+ l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
+ ol_flags);
+ else if (swcsm != 0)
+ l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
+
+ if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
+ l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
+ } else {
+ struct ipv6_hdr *l3h;
+ l3h = (struct ipv6_hdr *)l3;
+ l3h->payload_len = rte_cpu_to_be_16(plen + l4);
+ if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
+ l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
+ else if (swcsm != 0)
+ l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
+ }
+
+ return 0;
+}
+
+static inline int
+stream_drb_empty(struct tle_tcp_stream *s)
+{
+ return rte_ring_empty(s->tx.drb.r);
+}
+
+static inline void
+stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
+ uint32_t nb_drb)
+{
+ _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+}
+
+static inline uint32_t
+stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
+ uint32_t nb_drb)
+{
+ return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+}
+
+/*
+ * queue standalone packet to he particular output device
+ * It assumes that:
+ * - L2/L3/L4 headers should be already set.
+ * - packet fits into one segment.
+ */
+static inline int
+send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
+{
+ uint32_t n, nb;
+ struct tle_drb *drb;
+
+ if (stream_drb_alloc(s, &drb, 1) == 0)
+ return -ENOBUFS;
+
+ /* enqueue pkt for TX. */
+ nb = 1;
+ n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
+ &drb, &nb);
+
+ /* free unused drbs. */
+ if (nb != 0)
+ stream_drb_free(s, &drb, 1);
+
+ return (n == 1) ? 0 : -ENOBUFS;
+}
+
+#define TCP_OLFLAGS_CKSUM(flags) (flags & (PKT_TX_IP_CKSUM | PKT_TX_TCP_CKSUM))
+
+static inline int
+send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
+ uint32_t flags)
+{
+ const struct tle_dest *dst;
+ uint32_t pid, type;
+ int32_t rc;
+
+ dst = &s->tx.dst;
+ type = s->s.type;
+ pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
+
+ rc = tcp_fill_mbuf(m, s, dst, TCP_OLFLAGS_CKSUM(dst->ol_flags),
+ s->s.port, seq, flags, pid, 1);
+ if (rc == 0)
+ rc = send_pkt(s, dst->dev, m);
+
+ return rc;
+}
+
+static inline int
+send_rst(struct tle_tcp_stream *s, uint32_t seq)
+{
+ struct rte_mbuf *m;
+ int32_t rc;
+
+ m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
+ if (m == NULL)
+ return -ENOMEM;
+
+ rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST | TCP_FLAG_ACK);
+ if (rc != 0)
+ rte_pktmbuf_free(m);
+ else
+ TCP_INC_STATS(TCP_MIB_OUTRSTS);
+
+ return rc;
+}
+
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _TCP_RXTX_H_ */
diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c
index 676521b..4a65053 100644
--- a/lib/libtle_l4p/tcp_stream.c
+++ b/lib/libtle_l4p/tcp_stream.c
@@ -20,6 +20,8 @@
#include <rte_ip.h>
#include <rte_tcp.h>
+#include <netinet/tcp.h>
+
#include "tcp_stream.h"
#include "tcp_timer.h"
#include "stream_table.h"
@@ -27,6 +29,7 @@
#include "tcp_ctl.h"
#include "tcp_ofo.h"
#include "tcp_txq.h"
+#include "tcp_rxtx.h"
static void
unuse_stream(struct tle_tcp_stream *s)
@@ -38,25 +41,27 @@ unuse_stream(struct tle_tcp_stream *s)
static void
fini_stream(struct tle_tcp_stream *s)
{
- if (s != NULL) {
- rte_free(s->rx.q);
- tcp_ofo_free(s->rx.ofo);
- rte_free(s->tx.q);
- rte_free(s->tx.drb.r);
- }
+ rte_free(s);
}
static void
tcp_fini_streams(struct tle_ctx *ctx)
{
- uint32_t i;
struct tcp_streams *ts;
+ struct tle_stream *s;
ts = CTX_TCP_STREAMS(ctx);
if (ts != NULL) {
stbl_fini(&ts->st);
- for (i = 0; i != ctx->prm.max_streams; i++)
- fini_stream(&ts->s[i]);
+
+ /* TODO: free those in use? may be not necessary, as we assume
+ * all streams have been closed and are free.
+ */
+ while (ctx->streams.nb_free--) {
+ s = STAILQ_FIRST(&ctx->streams.free);
+ STAILQ_FIRST(&ctx->streams.free) = STAILQ_NEXT(s, link);
+ fini_stream(TCP_STREAM(s));
+ }
/* free the timer wheel */
tle_timer_free(ts->tmr);
@@ -94,61 +99,100 @@ alloc_ring(uint32_t n, uint32_t flags, int32_t socket)
return r;
}
+/* stream memory layout:
+ * [tle_tcp_stream] [rx.q] [rx.ofo] [tx.q] [tx.drb.r]
+ */
static int
-init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s)
+add_stream(struct tle_ctx *ctx)
{
- size_t bsz, rsz, sz;
- uint32_t f, i, k, n, nb;
+ size_t sz_s, sz_rxq, sz_ofo, sz_txq, sz_drb_r, sz;
+ /* for rx.q */
+ uint32_t n_rxq;
+ /* for rx.ofo */
+ struct ofo *ofo;
+ struct rte_mbuf **obj;
+ uint32_t ndb, nobj;
+ size_t dsz, osz;
+ /* for tx.q */
+ uint32_t n_txq;
+ /* for tx.drb.r */
+ size_t bsz, rsz;
struct tle_drb *drb;
- char name[RTE_RING_NAMESIZE];
-
- f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
- (RING_F_SP_ENQ | RING_F_SC_DEQ);
-
- /* init RX part. */
-
- n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
- s->rx.q = alloc_ring(n, f | RING_F_SP_ENQ, ctx->prm.socket_id);
- if (s->rx.q == NULL)
- return -ENOMEM;
-
- s->rx.ofo = tcp_ofo_alloc(n, ctx->prm.socket_id);
- if (s->rx.ofo == NULL)
- return -ENOMEM;
-
- /* init TX part. */
+ uint32_t k, nb, n_drb;
- n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
- s->tx.q = alloc_ring(n, f | RING_F_SC_DEQ, ctx->prm.socket_id);
- if (s->tx.q == NULL)
- return -ENOMEM;
+ uint32_t f, i;
+ char name[RTE_RING_NAMESIZE];
+ struct tle_tcp_stream *s;
+ // stream
+ sz_s = RTE_ALIGN_CEIL(sizeof(*s), RTE_CACHE_LINE_SIZE);
+
+ // rx.q
+ n_rxq = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
+ n_rxq = rte_align32pow2(n_rxq);
+ sz_rxq = rte_ring_get_memsize(n_rxq);
+ sz_rxq = RTE_ALIGN_CEIL(sz_rxq, RTE_CACHE_LINE_SIZE);
+
+ // rx.ofo
+ calc_ofo_elems(n_rxq, &nobj, &ndb);
+ osz = sizeof(*ofo) + sizeof(ofo->db[0]) * ndb;
+ dsz = sizeof(ofo->db[0].obj[0]) * nobj * ndb;
+ sz_ofo = osz + dsz;
+ sz_ofo = RTE_ALIGN_CEIL(sz_ofo, RTE_CACHE_LINE_SIZE);
+
+ // tx.q
+ n_txq = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
+ n_txq = rte_align32pow2(n_txq);
+ sz_txq = rte_ring_get_memsize(n_txq);
+ sz_txq = RTE_ALIGN_CEIL(sz_txq, RTE_CACHE_LINE_SIZE);
+
+ // tx.drb.r
nb = drb_nb_elem(ctx);
k = calc_stream_drb_num(ctx, nb);
- n = rte_align32pow2(k);
-
- /* size of the drbs ring */
- rsz = rte_ring_get_memsize(n);
+ n_drb = rte_align32pow2(k);
+ rsz = rte_ring_get_memsize(n_drb); /* size of the drbs ring */
rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
+ bsz = tle_drb_calc_size(nb); /* size of the drb. */
+ sz_drb_r = rsz + bsz * k; /* total stream drbs size. */
+ sz_drb_r = RTE_ALIGN_CEIL(sz_drb_r, RTE_CACHE_LINE_SIZE);
- /* size of the drb. */
- bsz = tle_drb_calc_size(nb);
-
- /* total stream drbs size. */
- sz = rsz + bsz * k;
-
- s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
- ctx->prm.socket_id);
- if (s->tx.drb.r == NULL) {
- TCP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
+ sz = sz_s + sz_rxq + sz_ofo + sz_txq + sz_drb_r;
+ s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+ ctx->prm.socket_id);
+ if (s == NULL) {
+ TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
"failed with error code: %d\n",
- __func__, s, sz, ctx->prm.socket_id, rte_errno);
+ __func__, sz, ctx->prm.socket_id, rte_errno);
return -ENOMEM;
}
- snprintf(name, sizeof(name), "%p@%zu", s, sz);
- rte_ring_init(s->tx.drb.r, name, n, f);
+ s->rx.q = (struct rte_ring *)((uintptr_t)s + sz_s);
+ s->rx.ofo = (struct ofo *)((uintptr_t)s->rx.q + sz_rxq);
+ ofo = s->rx.ofo;
+ s->tx.q = (struct rte_ring *)((uintptr_t)s->rx.ofo + sz_ofo);
+ s->tx.drb.r = (struct rte_ring *)((uintptr_t)s->tx.q + sz_txq);
+ // ring flags
+ f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
+ (RING_F_SP_ENQ | RING_F_SC_DEQ);
+
+ /* init RX part. */
+ snprintf(name, sizeof(name), "%p@%zu", s->rx.q, sz_rxq);
+ rte_ring_init(s->rx.q, name, n_rxq, f);
+
+ obj = (struct rte_mbuf **)&ofo->db[ndb];
+ for (i = 0; i != ndb; i++) {
+ ofo->db[i].nb_max = nobj;
+ ofo->db[i].obj = obj + i * nobj;
+ }
+ ofo->nb_max = ndb;
+
+ /* init TX part. */
+ snprintf(name, sizeof(name), "%p@%zu", s->tx.q, sz_txq);
+ rte_ring_init(s->tx.q, name, n_txq, f);
+
+ snprintf(name, sizeof(name), "%p@%zu", s->tx.drb.r, sz_drb_r);
+ rte_ring_init(s->tx.drb.r, name, n_drb, f);
for (i = 0; i != k; i++) {
drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
rsz + bsz * i);
@@ -200,7 +244,7 @@ tcp_init_streams(struct tle_ctx *ctx)
f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
(RING_F_SP_ENQ | RING_F_SC_DEQ);
- sz = sizeof(*ts) + sizeof(ts->s[0]) * ctx->prm.max_streams;
+ sz = sizeof(*ts);
ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
ctx->prm.socket_id);
if (ts == NULL) {
@@ -210,6 +254,7 @@ tcp_init_streams(struct tle_ctx *ctx)
return -ENOMEM;
}
+ rte_spinlock_init(&ts->dr.lock);
STAILQ_INIT(&ts->dr.fe);
STAILQ_INIT(&ts->dr.be);
@@ -228,12 +273,11 @@ tcp_init_streams(struct tle_ctx *ctx)
if (ts->tsq == NULL)
rc = -ENOMEM;
else
- rc = stbl_init(&ts->st, ctx->prm.max_streams,
- ctx->prm.socket_id);
+ rc = stbl_init(&ts->st, (ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
}
- for (i = 0; rc == 0 && i != ctx->prm.max_streams; i++)
- rc = init_stream(ctx, &ts->s[i]);
+ for (i = 0; rc == 0 && i != ctx->prm.min_streams; i++)
+ rc = add_stream(ctx);
if (rc != 0) {
TCP_LOG(ERR, "initalisation of %u-th stream failed", i);
@@ -243,11 +287,30 @@ tcp_init_streams(struct tle_ctx *ctx)
return rc;
}
-static void __attribute__((constructor))
+/*
+ * Note this function is not thread-safe, and we did not lock here as we
+ * have the assumption that this ctx is dedicated to one thread.
+ */
+static uint32_t
+tcp_more_streams(struct tle_ctx *ctx)
+{
+ uint32_t i, nb;
+ uint32_t nb_max = ctx->prm.max_streams - 1;
+ uint32_t nb_cur = ctx->streams.nb_cur;
+
+ nb = RTE_MIN(ctx->prm.delta_streams, nb_max - nb_cur);
+ for (i = 0; i < nb; i++)
+ if (add_stream(ctx) != 0)
+ break;
+ return i;
+}
+
+static void __attribute__((constructor(101)))
tcp_stream_setup(void)
{
static const struct stream_ops tcp_ops = {
.init_streams = tcp_init_streams,
+ .more_streams = tcp_more_streams,
.fini_streams = tcp_fini_streams,
.free_drbs = tcp_free_drbs,
};
@@ -305,16 +368,12 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
s = (struct tle_tcp_stream *)get_stream(ctx);
if (s == NULL) {
- rte_errno = ENFILE;
- return NULL;
-
- /* some TX still pending for that stream. */
- } else if (TCP_STREAM_TX_PENDING(s)) {
- put_stream(ctx, &s->s, 0);
rte_errno = EAGAIN;
return NULL;
}
+ s->s.option.raw = prm->option;
+
/* setup L4 ports and L3 addresses fields. */
rc = stream_fill_ctx(ctx, &s->s,
(const struct sockaddr *)&prm->addr.local,
@@ -336,12 +395,14 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
/* store other params */
s->flags = ctx->prm.flags;
+ s->tcb.err = 0;
s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
TLE_TCP_DEFAULT_RETRIES;
s->tcb.snd.cwnd = (ctx->prm.icw == 0) ? TCP_INITIAL_CWND_MAX :
ctx->prm.icw;
s->tcb.snd.rto_tw = (ctx->prm.timewait == TLE_TCP_TIMEWAIT_DEFAULT) ?
TCP_RTO_2MSL : ctx->prm.timewait;
+ s->tcb.snd.rto_fw = TLE_TCP_FINWAIT_TIMEOUT;
tcp_stream_up(s);
return &s->s;
@@ -354,9 +415,16 @@ static inline int
stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
{
uint16_t uop;
- uint32_t state;
static const struct tle_stream_cb zcb;
+ /* Put uop operation into this wlock; or it may cause this stream
+ * to be put into death ring twice, for example:
+ * 1) FE sets OP_CLOSE;
+ * 2) BE stream_term sets state as TCP_ST_CLOSED, and put in queue;
+ * 3) FE down the stream, and calls stream_term again.
+ */
+ tcp_stream_down(s);
+
/* check was close() already invoked */
uop = s->tcb.uop;
if ((uop & TCP_OP_CLOSE) != 0)
@@ -366,47 +434,66 @@ stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0)
return -EDEADLK;
- /* mark stream as unavaialbe for RX/TX. */
- tcp_stream_down(s);
-
/* reset events/callbacks */
- s->rx.ev = NULL;
s->tx.ev = NULL;
+ s->rx.ev = NULL;
s->err.ev = NULL;
s->rx.cb = zcb;
s->tx.cb = zcb;
s->err.cb = zcb;
- state = s->tcb.state;
-
- /* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */
- if (state <= TCP_ST_SYN_SENT) {
+ switch (s->tcb.state) {
+ case TCP_ST_LISTEN:
+ /* close the stream straightway */
tcp_stream_reset(ctx, s);
return 0;
- }
-
- /* generate FIN and proceed with normal connection termination */
- if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) {
-
- /* change state */
- s->tcb.state = (state == TCP_ST_ESTABLISHED) ?
- TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK;
-
- /* mark stream as writable/readable again */
+ case TCP_ST_CLOSED:
+ /* it could be put into this state if a RST packet is
+ * received, but this stream could be still in tsq trying
+ * to send something.
+ */
+ /* fallthrough */
+ case TCP_ST_SYN_SENT:
+ /* timer on and could be in tsq (SYN retrans) */
+ stream_term(s);
+ /* fallthrough */
+ case TCP_ST_FIN_WAIT_1:
+ /* fallthrough */
+ case TCP_ST_CLOSING:
+ /* fallthrough */
+ case TCP_ST_TIME_WAIT:
+ /* fallthrough */
+ case TCP_ST_LAST_ACK:
tcp_stream_up(s);
-
- /* queue stream into to-send queue */
- txs_enqueue(ctx, s);
return 0;
+ case TCP_ST_ESTABLISHED:
+ /* fallthrough */
+ case TCP_ST_CLOSE_WAIT:
+ if (s->tcb.state == TCP_ST_ESTABLISHED) {
+ s->tcb.state = TCP_ST_FIN_WAIT_1;
+ TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB);
+ } else
+ s->tcb.state = TCP_ST_LAST_ACK;
+
+ if (!rte_ring_empty(s->rx.q)) {
+ TCP_INC_STATS(TCP_MIB_ESTABRESETS);
+ s->tcb.uop |= TCP_OP_RESET;
+ stream_term(s);
+ }
+ break;
+ case TCP_ST_FIN_WAIT_2:
+ /* Can reach this state if shutdown was called, but the timer
+ * shall be set after this close.
+ */
+ break;
+ default:
+ rte_panic("Invalid state when close: %d\n", s->tcb.state);
}
- /*
- * accroding to the state, close() was already invoked,
- * should never that point.
- */
- RTE_ASSERT(0);
- return -EINVAL;
+ tcp_stream_up(s);
+ txs_enqueue(ctx, s);
+ return 0;
}
uint32_t
@@ -453,6 +540,64 @@ tle_tcp_stream_close(struct tle_stream *ts)
}
int
+tle_tcp_stream_shutdown(struct tle_stream *ts, int how)
+{
+ int ret;
+ bool wakeup;
+ uint32_t state;
+ struct tle_tcp_stream *s;
+
+ s = TCP_STREAM(ts);
+ if (ts == NULL || s->s.type >= TLE_VNUM)
+ return -EINVAL;
+
+ /* Refer to linux/net/ipv4/tcp.c:tcp_shutdown() */
+ if (how == SHUT_RD)
+ return 0;
+
+ tcp_stream_down(s);
+
+ state = s->tcb.state;
+
+ switch (state) {
+ case TCP_ST_LISTEN:
+ /* fallthrough */
+ case TCP_ST_SYN_SENT:
+ s->tcb.state = TCP_ST_CLOSED;
+ wakeup = true;
+ ret = 0;
+ break;
+ case TCP_ST_ESTABLISHED:
+ /* fallthrough */
+ case TCP_ST_CLOSE_WAIT:
+ if (state == TCP_ST_ESTABLISHED) {
+ TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB);
+ s->tcb.state = TCP_ST_FIN_WAIT_1;
+ } else
+ s->tcb.state = TCP_ST_LAST_ACK;
+ txs_enqueue(ts->ctx, s);
+ wakeup = true;
+ ret = 0;
+ break;
+ default:
+ wakeup = false;
+ rte_errno = ENOTCONN;
+ ret = -1;
+ }
+
+ if (wakeup) {
+ /* Notify other threads which may wait on the event */
+ if (s->tx.ev)
+ tle_event_raise(s->tx.ev);
+ if (how == SHUT_RDWR && s->err.ev)
+ tle_event_raise(s->err.ev);
+ }
+
+ tcp_stream_up(s);
+ return ret;
+}
+
+int
tle_tcp_stream_get_addr(const struct tle_stream *ts,
struct tle_tcp_stream_addr *addr)
{
@@ -617,3 +762,73 @@ tle_tcp_stream_get_mss(const struct tle_stream * ts)
s = TCP_STREAM(ts);
return s->tcb.snd.mss;
}
+
+int
+tle_tcp_stream_get_info(const struct tle_stream * ts, void *info, socklen_t *optlen)
+{
+ struct tle_tcp_stream *s;
+ struct tcp_info i;
+
+ if (ts == NULL)
+ return -EINVAL;
+
+ s = TCP_STREAM(ts);
+
+ memset(&i, 0, sizeof(struct tcp_info));
+
+ /* transform from tldk state into linux kernel state */
+ switch (s->tcb.state) {
+ case TCP_ST_CLOSED:
+ i.tcpi_state = TCP_CLOSE;
+ break;
+ case TCP_ST_LISTEN:
+ i.tcpi_state = TCP_LISTEN;
+ break;
+ case TCP_ST_SYN_SENT:
+ i.tcpi_state = TCP_SYN_SENT;
+ break;
+ case TCP_ST_SYN_RCVD:
+ i.tcpi_state = TCP_SYN_RECV;
+ break;
+ case TCP_ST_ESTABLISHED:
+ i.tcpi_state = TCP_ESTABLISHED;
+ break;
+ case TCP_ST_FIN_WAIT_1:
+ i.tcpi_state = TCP_FIN_WAIT1;
+ break;
+ case TCP_ST_FIN_WAIT_2:
+ i.tcpi_state = TCP_FIN_WAIT2;
+ break;
+ case TCP_ST_CLOSE_WAIT:
+ i.tcpi_state = TCP_CLOSE_WAIT;
+ break;
+ case TCP_ST_CLOSING:
+ i.tcpi_state = TCP_CLOSING;
+ break;
+ case TCP_ST_LAST_ACK:
+ i.tcpi_state = TCP_LAST_ACK;
+ break;
+ case TCP_ST_TIME_WAIT:
+ i.tcpi_state = TCP_TIME_WAIT;
+ break;
+ }
+
+ /* fix me, total retrans? */
+ i.tcpi_total_retrans = s->tcb.snd.nb_retx;
+
+ if (*optlen > sizeof(struct tcp_info))
+ *optlen = sizeof(struct tcp_info);
+ rte_memcpy(info, &i, *optlen);
+ return 0;
+}
+
+void
+tle_tcp_stream_set_keepalive(struct tle_stream *ts)
+{
+ struct tle_tcp_stream *s;
+
+ s = TCP_STREAM(ts);
+
+ s->tcb.uop |= TCP_OP_KEEPALIVE;
+ txs_enqueue(ts->ctx, s);
+}
diff --git a/lib/libtle_l4p/tcp_stream.h b/lib/libtle_l4p/tcp_stream.h
index 4629fe6..1202574 100644
--- a/lib/libtle_l4p/tcp_stream.h
+++ b/lib/libtle_l4p/tcp_stream.h
@@ -17,6 +17,8 @@
#define _TCP_STREAM_H_
#include <rte_vect.h>
+#include <rte_mbuf.h>
+
#include <tle_dring.h>
#include <tle_tcp.h>
#include <tle_event.h>
@@ -45,23 +47,28 @@ enum {
};
enum {
- TCP_OP_LISTEN = 0x1,
- TCP_OP_ACCEPT = 0x2,
- TCP_OP_CONNECT = 0x4,
- TCP_OP_CLOSE = 0x8,
+ TCP_OP_LISTEN = 0x1,
+ TCP_OP_ACCEPT = 0x2,
+ TCP_OP_CONNECT = 0x4,
+ TCP_OP_CLOSE = 0x8,
+ TCP_OP_RESET = 0x10,
+ TCP_OP_KEEPALIVE = 0x20
};
struct tcb {
+ int err;
volatile uint16_t state;
volatile uint16_t uop; /* operations by user performed */
struct {
uint32_t nxt;
+ uint32_t cpy; /* head of yet unread data */
uint32_t irs; /* initial received sequence */
uint32_t wnd;
uint32_t ts;
struct {
uint32_t seq;
- uint32_t on;
+ uint32_t on; /* on == 1: received an out-of-order fin
+ * on == 2: received an in order fin */
} frs;
uint32_t srtt; /* smoothed round trip time (scaled by >> 3) */
uint32_t rttvar; /* rtt variance */
@@ -83,15 +90,32 @@ struct tcb {
uint32_t ssthresh; /* slow start threshold */
uint32_t rto; /* retransmission timeout */
uint32_t rto_tw; /* TIME_WAIT retransmission timeout */
+ uint32_t rto_fw; /* FIN_WAIT_2 waiting timeout */
uint32_t iss; /* initial send sequence */
+ uint32_t waitlen; /* total length of unacknowledged pkt */
+ uint32_t cork_ts;
uint16_t mss;
uint8_t wscale;
uint8_t nb_retx; /* number of retransmission */
uint8_t nb_retm; /**< max number of retx attempts. */
+ uint8_t nb_keepalive;/* number of sended keepalive */
+ bool update_rcv; /* Flag for updating recv window */
+ uint16_t nxt_offset; /* Partial tx, next data of a segment to tx */
+ uint32_t una_offset; /* Partial ack, next data of a mbuf to ack */
+ struct rte_mbuf *nxt_pkt; /* Partial tx, next segment to send */
} snd;
struct syn_opts so; /* initial syn options. */
};
+enum {
+ TIMER_RTO,
+ TIMER_DACK,
+ TIMER_KEEPALIVE,
+ TIMER_NUM,
+ TIMER_MAX_NUM = 8,
+ TIMER_MASK = TIMER_MAX_NUM - 1
+};
+
struct tle_tcp_stream {
struct tle_stream s;
@@ -103,7 +127,7 @@ struct tle_tcp_stream {
struct tcb tcb;
struct {
- void *handle;
+ void *handle[TIMER_NUM];
} timer;
struct {
@@ -155,7 +179,6 @@ struct tcp_streams {
struct tle_timer_wheel *tmr; /* timer wheel */
struct rte_ring *tsq; /* to-send streams queue */
struct sdr dr; /* death row for zombie streams */
- struct tle_tcp_stream s[]; /* array of allocated streams. */
};
#define CTX_TCP_STREAMS(ctx) ((struct tcp_streams *)(ctx)->streams.buf)
diff --git a/lib/libtle_l4p/tcp_timer.h b/lib/libtle_l4p/tcp_timer.h
index 8faefb3..d242556 100644
--- a/lib/libtle_l4p/tcp_timer.h
+++ b/lib/libtle_l4p/tcp_timer.h
@@ -27,43 +27,53 @@ extern "C" {
* all RTO values are in ms.
*/
#define TCP_RTO_MAX 60000U /* RFC 6298 (2.5) */
-#define TCP_RTO_MIN 1000U /* RFC 6298 (2.4) */
+#define TCP_RTO_MIN 200U /* Linux/include/net/tcp.h: TCP_RTO_MIN */
#define TCP_RTO_2MSL (2 * TCP_RTO_MAX)
-#define TCP_RTO_DEFAULT TCP_RTO_MIN /* RFC 6298 (2.1)*/
+#define TCP_RTO_DEFAULT 1000U /* RFC 6298 (2.1)*/
#define TCP_RTO_GRANULARITY 100U
+static inline struct tle_tcp_stream *
+timer_stream(struct tle_tcp_stream *s)
+{
+ return (struct tle_tcp_stream *)((unsigned long)s & (~(unsigned long)TIMER_MASK));
+}
+
+static inline uint8_t
+timer_type(struct tle_tcp_stream *s)
+{
+ return (uint8_t)((unsigned long)s & (unsigned long)TIMER_MASK);
+}
static inline void
-timer_stop(struct tle_tcp_stream *s)
+timer_stop(struct tle_tcp_stream *s, uint8_t type)
{
struct tle_timer_wheel *tw;
- if (s->timer.handle != NULL) {
+ if (s->timer.handle[type] != NULL) {
tw = CTX_TCP_TMWHL(s->s.ctx);
- tle_timer_stop(tw, s->timer.handle);
- s->timer.handle = NULL;
+ tle_timer_stop(tw, s->timer.handle[type]);
+ s->timer.handle[type] = NULL;
}
}
static inline void
-timer_start(struct tle_tcp_stream *s)
+timer_start(struct tle_tcp_stream *s, uint8_t type, uint32_t timeout)
{
struct tle_timer_wheel *tw;
- if (s->timer.handle == NULL) {
+ if (s->timer.handle[type] == NULL) {
tw = CTX_TCP_TMWHL(s->s.ctx);
- s->timer.handle = tle_timer_start(tw, s, s->tcb.snd.rto);
- s->tcb.snd.nb_retx = 0;
+ s->timer.handle[type] = tle_timer_start(tw, (void*)((unsigned long)s | type), timeout);
}
}
static inline void
-timer_restart(struct tle_tcp_stream *s)
+timer_restart(struct tle_tcp_stream *s, uint8_t type, uint32_t timeout)
{
struct tle_timer_wheel *tw;
tw = CTX_TCP_TMWHL(s->s.ctx);
- s->timer.handle = tle_timer_start(tw, s, s->tcb.snd.rto);
+ s->timer.handle[type] = tle_timer_start(tw, (void*)((unsigned long)s | type), timeout);
}
@@ -71,10 +81,10 @@ timer_restart(struct tle_tcp_stream *s)
* reset number of retransmissions and restart RTO timer.
*/
static inline void
-timer_reset(struct tle_tcp_stream *s)
+timer_reset(struct tle_tcp_stream *s, uint8_t type, uint32_t timeout)
{
- timer_stop(s);
- timer_start(s);
+ timer_stop(s, type);
+ timer_start(s, type, timeout);
}
static inline uint32_t
diff --git a/lib/libtle_l4p/tcp_tx_seg.h b/lib/libtle_l4p/tcp_tx_seg.h
index ac2b13b..b64aa77 100644
--- a/lib/libtle_l4p/tcp_tx_seg.h
+++ b/lib/libtle_l4p/tcp_tx_seg.h
@@ -27,7 +27,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num,
struct rte_mbuf *in_seg = NULL;
uint32_t nbseg, in_seg_data_pos;
uint32_t more_in_segs;
- uint16_t bytes_left;
+ uint16_t out_bytes_remain;
in_seg = mbin;
in_seg_data_pos = 0;
@@ -35,7 +35,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num,
/* Check that pkts_out is big enough to hold all fragments */
if (mss * num < (uint16_t)mbin->pkt_len)
- return -ENOSPC;
+ return -EAGAIN;
more_in_segs = 1;
while (more_in_segs) {
@@ -49,7 +49,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num,
return -ENOMEM;
}
- bytes_left = mss;
+ out_bytes_remain = mss;
out_seg_prev = out_pkt;
more_out_segs = 1;
while (more_out_segs && more_in_segs) {
@@ -68,7 +68,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num,
/* Prepare indirect buffer */
rte_pktmbuf_attach(out_seg, in_seg);
- len = bytes_left;
+ len = out_bytes_remain;
if (len > (in_seg->data_len - in_seg_data_pos))
len = in_seg->data_len - in_seg_data_pos;
@@ -77,10 +77,10 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num,
out_pkt->pkt_len = (uint16_t)(len + out_pkt->pkt_len);
out_pkt->nb_segs += 1;
in_seg_data_pos += len;
- bytes_left -= len;
+ out_bytes_remain -= len;
/* Current output packet (i.e. fragment) done ? */
- if (bytes_left == 0)
+ if (out_bytes_remain == 0)
more_out_segs = 0;
/* Current input segment done ? */
diff --git a/lib/libtle_l4p/tcp_txq.h b/lib/libtle_l4p/tcp_txq.h
index 78f1d56..303b8fd 100644
--- a/lib/libtle_l4p/tcp_txq.h
+++ b/lib/libtle_l4p/tcp_txq.h
@@ -68,9 +68,28 @@ tcp_txq_set_nxt_head(struct tle_tcp_stream *s, uint32_t num)
static inline void
tcp_txq_rst_nxt_head(struct tle_tcp_stream *s)
{
- struct rte_ring *r;
+ struct rte_ring *r = s->tx.q;
+ struct rte_mbuf *m;
+ uint32_t offset, data_len;
+
+ if (s->tcb.snd.nxt_pkt != NULL) {
+ s->tcb.snd.nxt_offset = 0;
+ s->tcb.snd.nxt_pkt = NULL;
+ }
+
+ offset = s->tcb.snd.una_offset;
+ if (offset) {
+ m = (struct rte_mbuf *)(_rte_ring_get_data(r)[r->cons.tail & r->mask]);
+ data_len = m->data_len - PKT_L234_HLEN(m);
+ while (offset >= data_len) {
+ offset -= data_len;
+ m = m->next;
+ data_len = m->data_len;
+ }
+ s->tcb.snd.nxt_pkt = m;
+ s->tcb.snd.nxt_offset = offset;
+ }
- r = s->tx.q;
r->cons.head = r->cons.tail;
}
@@ -110,9 +129,13 @@ static inline uint32_t
txs_dequeue_bulk(struct tle_ctx *ctx, struct tle_tcp_stream *s[], uint32_t num)
{
struct rte_ring *r;
+ uint32_t n, i;
r = CTX_TCP_TSQ(ctx);
- return _rte_ring_dequeue_burst(r, (void **)s, num);
+ n = _rte_ring_dequeue_burst(r, (void **)s, num);
+ for (i = 0; i < n; i++)
+ rte_atomic32_clear(&s[i]->tx.arm);
+ return n;
}
#ifdef __cplusplus
diff --git a/lib/libtle_l4p/tle_ctx.h b/lib/libtle_l4p/tle_ctx.h
index de78a6b..f0efd51 100644
--- a/lib/libtle_l4p/tle_ctx.h
+++ b/lib/libtle_l4p/tle_ctx.h
@@ -54,6 +54,43 @@ extern "C" {
struct tle_ctx;
struct tle_dev;
+typedef union tle_stream_options {
+ struct {
+ uint32_t reuseaddr: 1;
+ uint32_t reuseport: 1;
+ uint32_t keepalive: 1;
+ uint32_t ipv6only: 1;
+ uint32_t oobinline: 1;
+ uint32_t tcpcork: 1;
+ uint32_t tcpnodelay: 1;
+ uint32_t mulloop: 1;
+ uint32_t timestamp: 1;
+ uint32_t reserve: 3;
+ uint32_t tcpquickack: 4;
+ uint32_t multtl: 8;
+ uint32_t keepcnt: 8;
+ uint16_t keepidle;
+ uint16_t keepintvl;
+ };
+ uint64_t raw;
+} tle_stream_options_t;
+
+static inline void
+tle_set_timestamp(struct msghdr *msg, struct rte_mbuf *m)
+{
+ struct timeval *tv;
+ struct cmsghdr *cmsg;
+
+ cmsg = CMSG_FIRSTHDR(msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SO_TIMESTAMP;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct timeval));
+ msg->msg_controllen = cmsg->cmsg_len;
+ tv = (struct timeval*)CMSG_DATA(cmsg);
+ tv->tv_sec = m->timestamp >> 20;
+ tv->tv_usec = m->timestamp & 0xFFFFFUL;
+}
+
/**
* Blocked L4 ports info.
*/
@@ -112,6 +149,8 @@ struct tle_ctx_param {
int32_t socket_id; /**< socket ID to allocate memory for. */
uint32_t proto; /**< L4 proto to handle. */
uint32_t max_streams; /**< max number of streams in context. */
+ uint32_t min_streams; /**< min number of streams at init. */
+ uint32_t delta_streams; /**< delta of streams of each allocation. */
uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */
uint32_t max_stream_sbufs; /**< max send mbufs per stream. */
uint32_t send_bulk_size; /**< expected # of packets per send call. */
@@ -145,6 +184,8 @@ struct tle_ctx_param {
*/
#define TLE_TCP_TIMEWAIT_DEFAULT UINT32_MAX
+#define TLE_TCP_FINWAIT_TIMEOUT 60000
+
/**
* create L4 processing context.
* @param ctx_prm
diff --git a/lib/libtle_l4p/tle_event.h b/lib/libtle_l4p/tle_event.h
index d730345..dd7a997 100644
--- a/lib/libtle_l4p/tle_event.h
+++ b/lib/libtle_l4p/tle_event.h
@@ -43,7 +43,7 @@ struct tle_event {
struct tle_evq *head;
const void *data;
enum tle_ev_state state;
-} __rte_cache_aligned;
+};
struct tle_evq {
rte_spinlock_t lock;
diff --git a/lib/libtle_l4p/tle_stats.h b/lib/libtle_l4p/tle_stats.h
new file mode 100644
index 0000000..3588c6d
--- /dev/null
+++ b/lib/libtle_l4p/tle_stats.h
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2018 Ant Financial Services Group.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TLE_STATS_H
+#define TLE_STATS_H
+
+#include <rte_per_lcore.h>
+#include <rte_memory.h>
+
+/* tcp mib definitions */
+/*
+ * RFC 1213: MIB-II TCP group
+ * RFC 2012 (updates 1213): SNMPv2-MIB-TCP
+ */
+enum
+{
+ TCP_MIB_RTOALGORITHM, /* RtoAlgorithm */
+ TCP_MIB_RTOMIN, /* RtoMin */
+ TCP_MIB_RTOMAX, /* RtoMax */
+ TCP_MIB_MAXCONN, /* MaxConn */
+ TCP_MIB_ACTIVEOPENS, /* ActiveOpens */
+ TCP_MIB_PASSIVEOPENS, /* PassiveOpens */
+ TCP_MIB_ATTEMPTFAILS, /* AttemptFails */
+ TCP_MIB_ESTABRESETS, /* EstabResets */
+ TCP_MIB_CURRESTAB, /* CurrEstab */
+ TCP_MIB_INSEGS, /* InSegs */
+ TCP_MIB_OUTSEGS, /* OutSegs */
+ TCP_MIB_RETRANSSEGS, /* RetransSegs */
+ TCP_MIB_INERRS, /* InErrs */
+ TCP_MIB_OUTRSTS, /* OutRsts */
+ TCP_MIB_CSUMERRORS, /* InCsumErrors */
+ TCP_MIB_MAX
+};
+
+/* udp mib definitions */
+/*
+ * RFC 1213: MIB-II UDP group
+ * RFC 2013 (updates 1213): SNMPv2-MIB-UDP
+ */
+enum
+{
+ UDP_MIB_INDATAGRAMS, /* InDatagrams */
+ UDP_MIB_NOPORTS, /* NoPorts */
+ UDP_MIB_INERRORS, /* InErrors */
+ UDP_MIB_OUTDATAGRAMS, /* OutDatagrams */
+ UDP_MIB_RCVBUFERRORS, /* RcvbufErrors */
+ UDP_MIB_SNDBUFERRORS, /* SndbufErrors */
+ UDP_MIB_CSUMERRORS, /* InCsumErrors */
+ UDP_MIB_IGNOREDMULTI, /* IgnoredMulti */
+ UDP_MIB_MAX
+};
+
+struct tcp_mib {
+ unsigned long mibs[TCP_MIB_MAX];
+};
+
+struct udp_mib {
+ unsigned long mibs[UDP_MIB_MAX];
+};
+
+struct tle_mib {
+ struct tcp_mib tcp;
+ struct udp_mib udp;
+} __rte_cache_aligned;
+
+extern struct tle_mib default_mib;
+
+RTE_DECLARE_PER_LCORE(struct tle_mib *, mib);
+
+#define PERCPU_MIB RTE_PER_LCORE(mib)
+
+#define SNMP_INC_STATS(mib, field) (mib).mibs[field]++
+#define SNMP_DEC_STATS(mib, field) (mib).mibs[field]--
+#define SNMP_ADD_STATS(mib, field, n) (mib).mibs[field] += n
+#define SNMP_ADD_STATS_ATOMIC(mib, field, n) \
+ rte_atomic64_add((rte_atomic64_t *)(&(mib).mibs[field]), n)
+
+#define TCP_INC_STATS(field) SNMP_INC_STATS(PERCPU_MIB->tcp, field)
+#define TCP_DEC_STATS(field) SNMP_DEC_STATS(PERCPU_MIB->tcp, field)
+#define TCP_ADD_STATS(field, n) SNMP_ADD_STATS(PERCPU_MIB->tcp, field, n)
+#define TCP_INC_STATS_ATOMIC(field) SNMP_ADD_STATS_ATOMIC(PERCPU_MIB->tcp, field, 1)
+#define TCP_DEC_STATS_ATOMIC(field) SNMP_ADD_STATS_ATOMIC(PERCPU_MIB->tcp, field, (-1))
+
+#define UDP_INC_STATS(field) SNMP_INC_STATS(PERCPU_MIB->udp, field)
+#define UDP_ADD_STATS(field, n) SNMP_ADD_STATS(PERCPU_MIB->udp, field, n)
+#define UDP_ADD_STATS_ATOMIC(field, n) \
+ SNMP_ADD_STATS_ATOMIC(PERCPU_MIB->udp, field, n)
+
+#endif /* TLE_STATS_H */
diff --git a/lib/libtle_l4p/tle_tcp.h b/lib/libtle_l4p/tle_tcp.h
index b0cbda6..93e853e 100644
--- a/lib/libtle_l4p/tle_tcp.h
+++ b/lib/libtle_l4p/tle_tcp.h
@@ -49,6 +49,7 @@ struct tle_tcp_stream_cfg {
struct tle_tcp_stream_param {
struct tle_tcp_stream_addr addr;
struct tle_tcp_stream_cfg cfg;
+ uint64_t option;
};
/**
@@ -86,6 +87,25 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
int tle_tcp_stream_close(struct tle_stream *s);
/**
+ * shutdown an open stream in SHUT_WR way.
+ * similar to tle_tcp_stream_close(), except:
+ * - rx still works
+ * - er still works
+ * @param s
+ * Pointer to the stream to close.
+ * @return
+ * zero on successful completion.
+ * - -EINVAL - invalid parameter passed to function
+ * - -EDEADLK - close was already invoked on that stream
+ */
+int tle_tcp_stream_shutdown(struct tle_stream *s, int how);
+
+/**
+ * Send rst on this stream.
+ */
+void tle_tcp_stream_kill(struct tle_stream *s);
+
+/**
* close a group of open streams.
* if the stream is in connected state, then:
* - connection termination would be performed.
@@ -268,6 +288,15 @@ uint16_t tle_tcp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[],
uint16_t num);
/**
+ * Get how many bytes are received in recv window.
+ * @param ts
+ * TCP stream to receive data from.
+ * @return
+ * bytes of data inside recv window.
+ */
+uint16_t tle_tcp_stream_inq(struct tle_stream *s);
+
+/**
* Reads iovcnt buffers from the for given TCP stream.
* Note that the stream has to be in connected state.
* Data ordering is preserved.
@@ -290,6 +319,19 @@ ssize_t tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
int iovcnt);
/**
+ * Like tle_tcp_stream_readv, but with more information returned in msghdr.
+ * Note that the stream has to be in connected state.
+ * @param ts
+ * TCP stream to receive data from.
+ * @param msg
+ * If not NULL, generate control message into msg_control field of msg.
+ * @return
+ * On success, number of bytes read in the stream receive buffer.
+ * In case of error, returns -1 and error code will be set in rte_errno.
+ */
+ssize_t tle_tcp_stream_recvmsg(struct tle_stream *ts, struct msghdr *msg);
+
+/**
* Consume and queue up to *num* packets, that will be sent eventually
* by tle_tcp_tx_bulk().
* Note that the stream has to be in connected state.
@@ -420,6 +462,24 @@ uint16_t tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
*/
int tle_tcp_process(struct tle_ctx *ctx, uint32_t num);
+/**
+ * Get tcp info of a tcp stream.
+ * This function is not multi-thread safe.
+ * @param ts
+ * TCP stream to get info from.
+ * @param info
+ * Pointer to store info.
+ * @param optlen
+ * Pointer to length of info.
+ * @return
+ * zero on successful completion.
+ * - ENOTCONN - connection is not connected yet.
+ */
+int
+tle_tcp_stream_get_info(const struct tle_stream * ts, void *info, socklen_t *optlen);
+
+void tle_tcp_stream_set_keepalive(struct tle_stream *ts);
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/libtle_l4p/tle_udp.h b/lib/libtle_l4p/tle_udp.h
index d3a8fe9..640ed64 100644
--- a/lib/libtle_l4p/tle_udp.h
+++ b/lib/libtle_l4p/tle_udp.h
@@ -35,6 +35,7 @@ struct tle_udp_stream_param {
struct tle_event *send_ev; /**< send event to use. */
struct tle_stream_cb send_cb; /**< send callback to use. */
+ uint64_t option;
};
/**
@@ -55,6 +56,36 @@ tle_udp_stream_open(struct tle_ctx *ctx,
const struct tle_udp_stream_param *prm);
/**
+ * set an existed stream within given UDP context with new param.
+ * @param ts
+ * stream to set with new param
+ * @param ctx
+ * UDP context to set the stream within.
+ * @param prm
+ * Parameters used to set the stream.
+ * @return
+ * Pointer to UDP stream structure that can be used in future UDP API calls,
+ * or NULL on error, with error code set in rte_errno.
+ * Possible rte_errno errors include:
+ * - EINVAL - invalid parameter passed to function
+ * - ENOFILE - max limit of open streams reached for that context
+ */
+struct tle_stream *
+tle_udp_stream_set(struct tle_stream *ts, struct tle_ctx *ctx,
+ const struct tle_udp_stream_param *prm);
+
+/**
+ * shutdown an open stream.
+ *
+ * @param s
+ * Pointer to the stream to shutdown.
+ * @return
+ * zero on successful completion.
+ * - -EINVAL - invalid parameter passed to function
+ */
+int tle_udp_stream_shutdown(struct tle_stream *s, int how);
+
+/**
* close an open stream.
* All packets still remaining in stream receive buffer will be freed.
* All packets still remaining in stream transmit buffer will be kept
@@ -180,6 +211,24 @@ uint16_t tle_udp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[],
uint16_t tle_udp_stream_send(struct tle_stream *s, struct rte_mbuf *pkt[],
uint16_t num, const struct sockaddr *dst_addr);
+/**
+ * updates configuration (associated events, callbacks, stream parameters)
+ * for the given streams.
+ * @param ts
+ * An array of pointers to the streams to update.
+ * @param prm
+ * An array of parameters to update for the given streams.
+ * @param num
+ * Number of elements in the *ts* and *prm* arrays.
+ * @return
+ * number of streams successfully updated.
+ * In case of error, error code set in rte_errno.
+ * Possible rte_errno errors include:
+ * - EINVAL - invalid parameter passed to function
+ */
+uint32_t tle_udp_stream_update_cfg(struct tle_stream *ts[],
+ struct tle_udp_stream_param prm[], uint32_t num);
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/libtle_l4p/udp_rxtx.c b/lib/libtle_l4p/udp_rxtx.c
index 84a13ea..e9539b9 100644
--- a/lib/libtle_l4p/udp_rxtx.c
+++ b/lib/libtle_l4p/udp_rxtx.c
@@ -13,7 +13,6 @@
* limitations under the License.
*/
-#include <rte_malloc.h>
#include <rte_errno.h>
#include <rte_ethdev.h>
#include <rte_ip.h>
@@ -24,14 +23,11 @@
#include "misc.h"
static inline struct tle_udp_stream *
-rx_stream_obtain(struct tle_dev *dev, uint32_t type, uint32_t port)
+rx_stream_obtain_by_tuples(struct stbl *st, const union pkt_info *pi)
{
struct tle_udp_stream *s;
- if (type >= TLE_VNUM || dev->dp[type] == NULL)
- return NULL;
-
- s = (struct tle_udp_stream *)dev->dp[type]->streams[port];
+ s = UDP_STREAM(stbl_find_stream(st, pi));
if (s == NULL)
return NULL;
@@ -41,6 +37,24 @@ rx_stream_obtain(struct tle_dev *dev, uint32_t type, uint32_t port)
return s;
}
+static inline struct tle_udp_stream *
+rx_stream_obtain(struct tle_dev *dev, uint32_t type, const union pkt_info *pi)
+{
+ struct tle_udp_stream *s;
+
+ if (type == TLE_V4)
+ s = bhash_lookup4(dev->ctx->bhash[type],
+ pi->addr4.dst, pi->port.dst, 1);
+ else
+ s = bhash_lookup6(dev->ctx->bhash[type],
+ pi->addr6->dst, pi->port.dst, 1);
+
+ if (s == NULL || rwl_acquire(&s->rx.use) < 0)
+ return NULL;
+
+ return s;
+}
+
static inline uint16_t
get_pkt_type(const struct rte_mbuf *m)
{
@@ -57,8 +71,9 @@ get_pkt_type(const struct rte_mbuf *m)
}
static inline union l4_ports
-pkt_info(struct rte_mbuf *m, union l4_ports *ports, union ipv4_addrs *addr4,
- union ipv6_addrs **addr6)
+pkt_info_udp(struct rte_mbuf *m, union l4_ports *ports,
+ union ipv4_addrs *addr4, union ipv6_addrs **addr6,
+ union pkt_info *pi)
{
uint32_t len;
union l4_ports ret, *up;
@@ -71,15 +86,20 @@ pkt_info(struct rte_mbuf *m, union l4_ports *ports, union ipv4_addrs *addr4,
pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *,
len + offsetof(struct ipv4_hdr, src_addr));
addr4->raw = pa4->raw;
+ pi->addr4.raw = pa4->raw;
+ pi->tf.type = TLE_V4;
} else if (ret.src == TLE_V6) {
*addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *,
len + offsetof(struct ipv6_hdr, src_addr));
+ pi->addr6 = *addr6;
+ pi->tf.type = TLE_V6;
}
len += m->l3_len;
up = rte_pktmbuf_mtod_offset(m, union l4_ports *,
len + offsetof(struct udp_hdr, src_port));
ports->raw = up->raw;
+ pi->port.raw = up->raw;
ret.dst = ports->dst;
return ret;
}
@@ -96,6 +116,11 @@ rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[],
r = _rte_ring_enqueue_burst(s->rx.q, mb, num);
+ if (unlikely(r != num)) {
+ UDP_ADD_STATS(UDP_MIB_RCVBUFERRORS, num - r);
+ UDP_ADD_STATS(UDP_MIB_INERRORS, num - r);
+ }
+
/* if RX queue was empty invoke user RX notification callback. */
if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r)
s->rx.cb.func(s->rx.cb.data, &s->s);
@@ -164,28 +189,64 @@ rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
return rx_stream(s, mb, rp + k, rc + k, n);
}
+/*
+ * Consider 2 UDP pkt_info *equal* if their:
+ * - types (IPv4/IPv6)
+ * - TCP src and dst ports
+ * - IP src and dst addresses
+ * are equal.
+ */
+static inline int
+udp_pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
+{
+ uint32_t i;
+
+ i = 1;
+
+ if (pi[0].tf.type == TLE_V4) {
+ while (i != num && pi[i].tf.type == TLE_V4 &&
+ pi[0].port.raw == pi[i].port.raw &&
+ pi[0].addr4.raw == pi[i].addr4.raw)
+ i++;
+ } else if (pi[0].tf.type == TLE_V6) {
+ while (i != num && pi[i].tf.type == TLE_V6 &&
+ pi[0].port.raw == pi[i].port.raw &&
+ ymm_cmp(&pi[0].addr6->raw,
+ &pi[i].addr6->raw) == 0)
+ i++;
+ }
+
+ return i;
+}
+
uint16_t
tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
{
+ struct stbl *st;
struct tle_udp_stream *s;
- uint32_t i, j, k, n, p, t;
+ uint32_t i, j, k, n, t;
union l4_ports tp[num], port[num];
union ipv4_addrs a4[num];
union ipv6_addrs *pa6[num];
+ union pkt_info pi[num];
+
+ st = CTX_UDP_STLB(dev->ctx);
for (i = 0; i != num; i++)
- tp[i] = pkt_info(pkt[i], &port[i], &a4[i], &pa6[i]);
+ tp[i] = pkt_info_udp(pkt[i], &port[i], &a4[i],
+ &pa6[i], &pi[i]);
k = 0;
for (i = 0; i != num; i = j) {
- for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++)
- ;
+ j = i + udp_pkt_info_bulk_eq(pi + i, num - i);
t = tp[i].src;
- p = tp[i].dst;
- s = rx_stream_obtain(dev, t, p);
+
+ s = rx_stream_obtain_by_tuples(st, &pi[i]);
+ if (s == NULL)
+ s = rx_stream_obtain(dev, t, &pi[i]);
if (s != NULL) {
if (t == TLE_V4)
@@ -202,6 +263,7 @@ tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
rwl_release(&s->rx.use);
} else {
+ UDP_ADD_STATS(UDP_MIB_NOPORTS, j - i);
for (; i != j; i++) {
rc[k] = ENOENT;
rp[k] = pkt[i];
@@ -262,6 +324,8 @@ tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
stream_drb_release(s, drb + i, j - i);
}
+ UDP_ADD_STATS(UDP_MIB_OUTDATAGRAMS, n);
+
return n;
}
@@ -272,24 +336,18 @@ tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
static inline uint32_t
recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
{
- uint32_t i, k;
- uint64_t flg[num], ofl[num];
-
- for (i = 0; i != num; i++) {
- flg[i] = m[i]->ol_flags;
- ofl[i] = m[i]->tx_offload;
- }
+ uint32_t i, k, offset;
- k = 0;
- for (i = 0; i != num; i++) {
-
- /* drop packets with invalid cksum(s). */
- if (check_pkt_csum(m[i], flg[i], type, IPPROTO_UDP) != 0) {
+ for (i = 0, k = 0; i != num; i++) {
+ if (check_pkt_csum(m[i], type, IPPROTO_UDP) != 0) {
+ UDP_INC_STATS(UDP_MIB_CSUMERRORS);
rte_pktmbuf_free(m[i]);
m[i] = NULL;
k++;
- } else
- rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
+ } else {
+ offset = _tx_offload_l4_offset(m[i]->tx_offload);
+ rte_pktmbuf_adj(m[i], offset);
+ }
}
return k;
@@ -302,9 +360,25 @@ tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num)
struct tle_udp_stream *s;
s = UDP_STREAM(us);
+ n = 0;
+
+again:
n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
- if (n == 0)
+ if (n == 0) {
+ if (rwl_try_acquire(&s->rx.use) > 0)
+ rte_errno = EAGAIN;
+ else
+ rte_errno = ESHUTDOWN;
+ rwl_release(&s->rx.use);
return 0;
+ }
+
+ k = recv_pkt_process(pkt, n, s->s.type);
+ if (unlikely(k))
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_CSUMERRORS, k);
+ n = compress_pkt_list(pkt, n, k);
+ if (n == 0)
+ goto again;
/*
* if we still have packets to read,
@@ -316,8 +390,8 @@ tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num)
rwl_release(&s->rx.use);
}
- k = recv_pkt_process(pkt, n, s->s.type);
- return compress_pkt_list(pkt, n, k);
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_INDATAGRAMS, n);
+ return n;
}
static inline int
@@ -413,7 +487,7 @@ fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
/* Remove the Ethernet header from the input packet */
rte_pktmbuf_adj(pkt, dst->l2_len);
- mtu = dst->mtu - dst->l2_len;
+ mtu = dst->mtu;
/* fragment packet */
if (type == TLE_V4)
@@ -475,13 +549,22 @@ queue_pkt_out(struct tle_udp_stream *s, struct tle_dev *dev,
nb += nbc;
/* no free drbs, can't send anything */
- if (nb == 0)
+ if (unlikely(nb == 0)) {
+ if (all_or_nothing)
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, 1);
+ else
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, nb_pkt);
return 0;
+ }
/* not enough free drbs, reduce number of packets to send. */
else if (nb != nbm) {
- if (all_or_nothing)
+ if (all_or_nothing) {
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, 1);
return 0;
+ }
+
+ UDP_ADD_STATS_ATOMIC(UDP_MIB_SNDBUFERRORS, nb_pkt - nb * bsz);
nb_pkt = nb * bsz;
}
@@ -509,12 +592,18 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
const struct sockaddr_in *d4;
const struct sockaddr_in6 *d6;
struct tle_udp_stream *s;
- const void *da;
+ const void *sa, *da;
union udph udph;
struct tle_dest dst;
struct tle_drb *drb[num];
+ uint8_t ufo;
s = UDP_STREAM(us);
+ if (rwl_acquire(&s->tx.use) < 0) {
+ rte_errno = EPIPE; /* tx is shutdown */
+ return 0;
+ }
+
type = s->s.type;
/* start filling UDP header. */
@@ -523,7 +612,10 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
/* figure out what destination addr/port to use. */
if (dst_addr != NULL) {
- if (dst_addr->sa_family != s->prm.remote_addr.ss_family) {
+ if (dst_addr->sa_family != s->prm.remote_addr.ss_family &&
+ (s->prm.remote_addr.ss_family == AF_INET ||
+ !IN6_IS_ADDR_UNSPECIFIED(&s->s.ipv6.addr.dst))) {
+ rwl_release(&s->tx.use);
rte_errno = EINVAL;
return 0;
}
@@ -531,21 +623,28 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
d4 = (const struct sockaddr_in *)dst_addr;
da = &d4->sin_addr;
udph.ports.dst = d4->sin_port;
+ sa = &s->s.ipv4.addr.dst;
} else {
d6 = (const struct sockaddr_in6 *)dst_addr;
da = &d6->sin6_addr;
udph.ports.dst = d6->sin6_port;
+ sa = &s->s.ipv6.addr.dst;
}
} else {
udph.ports.dst = s->s.port.src;
- if (type == TLE_V4)
+ if (type == TLE_V4) {
da = &s->s.ipv4.addr.src;
- else
+ sa = &s->s.ipv4.addr.dst;
+ }
+ else {
da = &s->s.ipv6.addr.src;
+ sa = &s->s.ipv6.addr.dst;
+ }
}
- di = stream_get_dest(&s->s, da, &dst);
+ di = stream_get_dest(type, &s->s, sa, da, &dst);
if (di < 0) {
+ rwl_release(&s->tx.use);
rte_errno = -di;
return 0;
}
@@ -553,12 +652,7 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num;
mtu = dst.mtu - dst.l2_len - dst.l3_len;
- /* mark stream as not closable. */
- if (rwl_acquire(&s->tx.use) < 0) {
- rte_errno = EAGAIN;
- return 0;
- }
-
+ ufo = dst.dev->prm.tx_offload & DEV_TX_OFFLOAD_UDP_TSO;
nb = 0;
for (i = 0, k = 0; k != num; k = i) {
@@ -568,7 +662,7 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
ol_flags = dst.dev->tx.ol_flags[type];
while (i != num && frg == 0) {
- frg = pkt[i]->pkt_len > mtu;
+ frg = (!ufo) && pkt[i]->pkt_len > mtu;
if (frg != 0)
ol_flags &= ~PKT_TX_UDP_CKSUM;
rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i,
diff --git a/lib/libtle_l4p/udp_stream.c b/lib/libtle_l4p/udp_stream.c
index 29f5a40..0cd5c27 100644
--- a/lib/libtle_l4p/udp_stream.c
+++ b/lib/libtle_l4p/udp_stream.c
@@ -43,74 +43,87 @@ fini_stream(struct tle_udp_stream *s)
static void
udp_fini_streams(struct tle_ctx *ctx)
{
- uint32_t i;
- struct tle_udp_stream *s;
+ struct udp_streams *us;
+ struct tle_stream *s;
+
+ us = CTX_UDP_STREAMS(ctx);
+ if (us != NULL) {
+ stbl_fini(&us->st);
+
+ while (ctx->streams.nb_free--) {
+ s = STAILQ_FIRST(&ctx->streams.free);
+ STAILQ_FIRST(&ctx->streams.free) = STAILQ_NEXT(s, link);
+ fini_stream(UDP_STREAM(s));
+ }
- s = ctx->streams.buf;
- if (s != NULL) {
- for (i = 0; i != ctx->prm.max_streams; i++)
- fini_stream(s + i);
}
- rte_free(s);
+ rte_free(us);
ctx->streams.buf = NULL;
STAILQ_INIT(&ctx->streams.free);
}
+/* stream memory layout:
+ * [tle_udp_stream] [rx.q] [tx.drb.r]
+ */
static int
-init_stream(struct tle_ctx *ctx, struct tle_udp_stream *s)
+add_stream(struct tle_ctx *ctx)
{
- size_t bsz, rsz, sz;
- uint32_t i, k, n, nb;
+ size_t sz_s, sz_rxq, sz_drb_r, sz;
+ /* for rx.q */
+ uint32_t n_rxq;
+ /* for tx.drb.r */
+ size_t bsz, rsz;
struct tle_drb *drb;
- char name[RTE_RING_NAMESIZE];
+ uint32_t k, nb, n_drb;
- /* init RX part. */
-
- n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
- n = rte_align32pow2(n);
- sz = rte_ring_get_memsize(n);
-
- s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
- ctx->prm.socket_id);
- if (s->rx.q == NULL) {
- UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
- "failed with error code: %d\n",
- __func__, s, sz, ctx->prm.socket_id, rte_errno);
- return -ENOMEM;
- }
+ uint32_t i, f;
+ char name[RTE_RING_NAMESIZE];
+ struct tle_udp_stream *s;
- snprintf(name, sizeof(name), "%p@%zu", s, sz);
- rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ);
+ // stream
+ sz_s = RTE_ALIGN_CEIL(sizeof(*s), RTE_CACHE_LINE_SIZE);
- /* init TX part. */
+ // rx.q
+ n_rxq = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
+ n_rxq = rte_align32pow2(n_rxq);
+ sz_rxq = rte_ring_get_memsize(n_rxq);
+ sz_rxq = RTE_ALIGN_CEIL(sz_rxq, RTE_CACHE_LINE_SIZE);
+ // tx.drb.r
nb = drb_nb_elem(ctx);
k = calc_stream_drb_num(ctx, nb);
- n = rte_align32pow2(k);
-
- /* size of the drbs ring */
- rsz = rte_ring_get_memsize(n);
+ n_drb = rte_align32pow2(k);
+ rsz = rte_ring_get_memsize(n_drb); /* size of the drbs ring */
rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
+ bsz = tle_drb_calc_size(nb); /* size of the drb. */
+ sz_drb_r = rsz + bsz * k; /* total stream drbs size. */
+ sz_drb_r = RTE_ALIGN_CEIL(sz_drb_r, RTE_CACHE_LINE_SIZE);
- /* size of the drb. */
- bsz = tle_drb_calc_size(nb);
-
- /* total stream drbs size. */
- sz = rsz + bsz * k;
-
- s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
- ctx->prm.socket_id);
- if (s->tx.drb.r == NULL) {
- UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
+ sz = sz_s + sz_rxq + sz_drb_r;
+ s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+ ctx->prm.socket_id);
+ if (s == NULL) {
+ UDP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
"failed with error code: %d\n",
- __func__, s, sz, ctx->prm.socket_id, rte_errno);
+ __func__, sz, ctx->prm.socket_id, rte_errno);
return -ENOMEM;
}
- snprintf(name, sizeof(name), "%p@%zu", s, sz);
- rte_ring_init(s->tx.drb.r, name, n, 0);
+ s->rx.q = (struct rte_ring *)((uintptr_t)s + sz_s);
+ s->tx.drb.r = (struct rte_ring *)((uintptr_t)s->rx.q + sz_rxq);
+
+ // ring flags
+ f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
+ (RING_F_SP_ENQ | RING_F_SC_DEQ);
+
+ /* init RX part. */
+ snprintf(name, sizeof(name), "%p@%zu", s->rx.q, sz_rxq);
+ rte_ring_init(s->rx.q, name, n_rxq, f);
+ /* init TX part. */
+ snprintf(name, sizeof(name), "%p@%zu", s->tx.drb.r, sz_drb_r);
+ rte_ring_init(s->tx.drb.r, name, n_drb, f);
for (i = 0; i != k; i++) {
drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
rsz + bsz * i);
@@ -146,38 +159,59 @@ udp_init_streams(struct tle_ctx *ctx)
size_t sz;
uint32_t i;
int32_t rc;
- struct tle_udp_stream *s;
+ struct udp_streams *us;
- sz = sizeof(*s) * ctx->prm.max_streams;
- s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+ sz = sizeof(*us);
+ us = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
ctx->prm.socket_id);
- if (s == NULL) {
+ if (us == NULL) {
UDP_LOG(ERR, "allocation of %zu bytes on socket %d "
"for %u udp_streams failed\n",
sz, ctx->prm.socket_id, ctx->prm.max_streams);
return -ENOMEM;
}
- ctx->streams.buf = s;
+ ctx->streams.buf = us;
STAILQ_INIT(&ctx->streams.free);
- for (i = 0; i != ctx->prm.max_streams; i++) {
- rc = init_stream(ctx, s + i);
- if (rc != 0) {
- UDP_LOG(ERR, "initalisation of %u-th stream failed", i);
- udp_fini_streams(ctx);
- return rc;
- }
+ rc = stbl_init(&us->st, (ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
+ if (rc < 0) {
+ UDP_LOG(ERR, "failed to init UDP stbl: rc = %dl\n", rc);
+ return rc;
}
- return 0;
+ for (i = 0; rc == 0 && i != ctx->prm.min_streams; i++)
+ rc = add_stream(ctx);
+
+ if (rc != 0) {
+ UDP_LOG(ERR, "initalisation of %u-th stream failed", i);
+ udp_fini_streams(ctx);
+ }
+
+ return rc;
}
-static void __attribute__((constructor))
+static uint32_t
+udp_more_streams(struct tle_ctx *ctx)
+{
+ uint32_t i, nb;
+ uint32_t nb_max = ctx->prm.max_streams;
+ uint32_t nb_cur = ctx->streams.nb_cur;
+
+ nb = RTE_MIN(ctx->prm.delta_streams, nb_max - nb_cur);
+ for (i = 0; i < nb; i++)
+ if (add_stream(ctx) != 0)
+ break;
+
+ return i;
+}
+
+static void __attribute__((constructor(101)))
udp_stream_setup(void)
{
static const struct stream_ops udp_ops = {
.init_streams = udp_init_streams,
+ .more_streams = udp_more_streams,
.fini_streams = udp_fini_streams,
.free_drbs = udp_free_drbs,
};
@@ -188,8 +222,8 @@ udp_stream_setup(void)
static inline void
stream_down(struct tle_udp_stream *s)
{
- rwl_down(&s->rx.use);
- rwl_down(&s->tx.use);
+ rwl_try_down(&s->rx.use);
+ rwl_try_down(&s->tx.use);
}
static inline void
@@ -224,6 +258,59 @@ check_stream_prm(const struct tle_ctx *ctx,
}
struct tle_stream *
+tle_udp_stream_set(struct tle_stream *ts, struct tle_ctx *ctx,
+ const struct tle_udp_stream_param *prm)
+{
+ struct tle_udp_stream *s;
+ int32_t rc;
+
+ if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
+ tle_udp_stream_close(ts);
+ rte_errno = EINVAL;
+ return NULL;
+ }
+
+ s = UDP_STREAM(ts);
+
+ /* free stream's destination port */
+ rc = stream_clear_ctx(ctx, &s->s);
+
+ if (s->ste) {
+ stbl_del_stream(CTX_UDP_STLB(ctx), s->ste, ts);
+ s->ste = NULL;
+ }
+
+ /* copy input parameters. */
+ s->prm = *prm;
+ s->s.option.raw = prm->option;
+
+ /* setup L4 ports and L3 addresses fields. */
+ rc = stream_fill_ctx(ctx, &s->s,
+ (const struct sockaddr *)&prm->local_addr,
+ (const struct sockaddr *)&prm->remote_addr);
+
+ if (rc != 0)
+ goto error;
+
+ /* add stream to the table for non-listen type stream */
+ if (!is_empty_addr((const struct sockaddr *)&prm->remote_addr)) {
+ s->ste = stbl_add_stream(CTX_UDP_STLB(ctx), &s->s);
+ if (s->ste == NULL) {
+ rc = EEXIST;
+ goto error;
+ }
+ }
+
+ return &s->s;
+
+error:
+ tle_udp_stream_close(ts);
+ rte_errno = rc;
+ return NULL;
+
+}
+
+struct tle_stream *
tle_udp_stream_open(struct tle_ctx *ctx,
const struct tle_udp_stream_param *prm)
{
@@ -237,42 +324,80 @@ tle_udp_stream_open(struct tle_ctx *ctx,
s = (struct tle_udp_stream *)get_stream(ctx);
if (s == NULL) {
- rte_errno = ENFILE;
- return NULL;
-
- /* some TX still pending for that stream. */
- } else if (UDP_STREAM_TX_PENDING(s)) {
- put_stream(ctx, &s->s, 0);
rte_errno = EAGAIN;
return NULL;
}
/* copy input parameters. */
s->prm = *prm;
+ s->s.option.raw = prm->option;
/* setup L4 ports and L3 addresses fields. */
rc = stream_fill_ctx(ctx, &s->s,
(const struct sockaddr *)&prm->local_addr,
(const struct sockaddr *)&prm->remote_addr);
- if (rc != 0) {
- put_stream(ctx, &s->s, 1);
- s = NULL;
- rte_errno = rc;
- } else {
- /* setup stream notification menchanism */
- s->rx.ev = prm->recv_ev;
- s->rx.cb = prm->recv_cb;
- s->tx.ev = prm->send_ev;
- s->tx.cb = prm->send_cb;
-
- /* mark stream as avaialbe for RX/TX */
- if (s->tx.ev != NULL)
- tle_event_raise(s->tx.ev);
- stream_up(s);
+ if (rc != 0)
+ goto error;
+
+ /* add stream to the table for non-listen type stream */
+ if (!is_empty_addr((const struct sockaddr *)&prm->remote_addr)) {
+ s->ste = stbl_add_stream(CTX_UDP_STLB(ctx), &s->s);
+ if (s->ste == NULL) {
+ rc = EEXIST;
+ goto error;
+ }
}
+ /* setup stream notification menchanism */
+ s->rx.ev = prm->recv_ev;
+ s->rx.cb = prm->recv_cb;
+ s->tx.ev = prm->send_ev;
+ s->tx.cb = prm->send_cb;
+
+ /* mark stream as avaialbe for RX/TX */
+ if (s->tx.ev != NULL)
+ tle_event_raise(s->tx.ev);
+ stream_up(s);
+
return &s->s;
+
+error:
+ put_stream(ctx, &s->s, 1);
+ rte_errno = rc;
+ return NULL;
+}
+
+int
+tle_udp_stream_shutdown(struct tle_stream *us, int how)
+{
+ bool shut_rd = false;
+ bool shut_wr = false;
+ struct tle_udp_stream *s = UDP_STREAM(us);
+
+ switch (how) {
+ case SHUT_RD:
+ shut_rd = true;
+ rwl_down(&s->rx.use);
+ break;
+ case SHUT_WR:
+ shut_wr = true;
+ rwl_down(&s->tx.use);
+ break;
+ case SHUT_RDWR:
+ shut_rd = true;
+ shut_wr = true;
+ stream_down(s);
+ break;
+ default:
+ return -EINVAL;
+ }
+
+ if (shut_rd && s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ if (shut_wr && s->tx.ev != NULL)
+ tle_event_raise(s->tx.ev);
+ return 0;
}
int
@@ -312,6 +437,11 @@ tle_udp_stream_close(struct tle_stream *us)
/* empty stream's RX queue */
empty_mbuf_ring(s->rx.q);
+ if (s->ste) {
+ stbl_del_stream(CTX_UDP_STLB(ctx), s->ste, us);
+ s->ste = NULL;
+ }
+
/*
* mark the stream as free again.
* if there still are pkts queued for TX,
@@ -344,3 +474,56 @@ tle_udp_stream_get_param(const struct tle_stream *us,
return 0;
}
+
+/*
+ * helper function, updates stream config
+ */
+static inline int
+stream_update_cfg(struct tle_stream *us, struct tle_udp_stream_param *prm)
+{
+ struct tle_udp_stream *s;
+
+ s = UDP_STREAM(us);
+
+ /* setup stream notification menchanism */
+ s->rx.ev = prm->recv_ev;
+ s->rx.cb = prm->recv_cb;
+ s->tx.ev = prm->send_ev;
+ s->tx.cb = prm->send_cb;
+
+ rte_smp_wmb();
+
+ /* invoke async notifications, if any */
+ if (rte_ring_count(s->rx.q) != 0) {
+ if (s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ else if (s->rx.cb.func != NULL)
+ s->rx.cb.func(s->rx.cb.data, &s->s);
+ }
+
+ /* always ok to write */
+ if (s->tx.ev != NULL)
+ tle_event_raise(s->tx.ev);
+ else if (s->tx.cb.func != NULL)
+ s->tx.cb.func(s->tx.cb.data, &s->s);
+
+ return 0;
+}
+
+uint32_t
+tle_udp_stream_update_cfg(struct tle_stream *us[],
+ struct tle_udp_stream_param prm[], uint32_t num)
+{
+ int32_t rc;
+ uint32_t i;
+
+ for (i = 0; i != num; i++) {
+ rc = stream_update_cfg(us[i], &prm[i]);
+ if (rc != 0) {
+ rte_errno = -rc;
+ break;
+ }
+ }
+
+ return i;
+}
diff --git a/lib/libtle_l4p/udp_stream.h b/lib/libtle_l4p/udp_stream.h
index a950e56..55a66f8 100644
--- a/lib/libtle_l4p/udp_stream.h
+++ b/lib/libtle_l4p/udp_stream.h
@@ -24,6 +24,7 @@
#include "osdep.h"
#include "ctx.h"
#include "stream.h"
+#include "stream_table.h"
#ifdef __cplusplus
extern "C" {
@@ -41,6 +42,7 @@ union udph {
struct tle_udp_stream {
struct tle_stream s;
+ struct stbl_entry *ste; /* entry in streams table. */
struct {
struct rte_ring *q;
@@ -63,6 +65,13 @@ struct tle_udp_stream {
struct tle_udp_stream_param prm;
} __rte_cache_aligned;
+struct udp_streams {
+ struct stbl st;
+};
+
+#define CTX_UDP_STREAMS(ctx) ((struct udp_streams *)(ctx)->streams.buf)
+#define CTX_UDP_STLB(ctx) (&CTX_UDP_STREAMS(ctx)->st)
+
#define UDP_STREAM(p) \
((struct tle_udp_stream *)((uintptr_t)(p) - offsetof(struct tle_udp_stream, s)))