aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/l4fwd/main.c3
-rw-r--r--examples/l4fwd/parse.c42
-rw-r--r--lib/libtle_dring/tle_dring.h73
-rw-r--r--lib/libtle_l4p/ctx.c13
-rw-r--r--lib/libtle_l4p/ctx.h3
-rw-r--r--lib/libtle_l4p/misc.h91
-rw-r--r--lib/libtle_l4p/stream.h4
-rw-r--r--lib/libtle_l4p/stream_table.h56
-rw-r--r--lib/libtle_l4p/syncookie.h51
-rw-r--r--lib/libtle_l4p/tcp_ctl.h91
-rw-r--r--lib/libtle_l4p/tcp_misc.h22
-rw-r--r--lib/libtle_l4p/tcp_rxq.h42
-rw-r--r--lib/libtle_l4p/tcp_rxtx.c378
-rw-r--r--lib/libtle_l4p/tcp_stream.c71
-rw-r--r--lib/libtle_l4p/tcp_stream.h9
-rw-r--r--lib/libtle_l4p/tcp_tx_seg.h15
-rw-r--r--lib/libtle_l4p/tle_ctx.h22
-rw-r--r--lib/libtle_l4p/tle_tcp.h72
-rw-r--r--lib/libtle_misc/tle_dpdk_wrapper.h113
-rw-r--r--test/dring/test_dring.c4
-rw-r--r--test/gtest/test_tle_dring.cpp4
-rw-r--r--test/gtest/test_tle_dring.h4
22 files changed, 902 insertions, 281 deletions
diff --git a/examples/l4fwd/main.c b/examples/l4fwd/main.c
index 7613a95..c43b8d7 100644
--- a/examples/l4fwd/main.c
+++ b/examples/l4fwd/main.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -209,6 +209,7 @@ main(int argc, char *argv[])
__func__, rc);
memset(&ctx_prm, 0, sizeof(ctx_prm));
+ ctx_prm.timewait = TLE_TCP_TIMEWAIT_DEFAULT;
signal(SIGINT, sig_handle);
diff --git a/examples/l4fwd/parse.c b/examples/l4fwd/parse.c
index 158b2cb..97cf20d 100644
--- a/examples/l4fwd/parse.c
+++ b/examples/l4fwd/parse.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -38,6 +38,9 @@ static const struct {
#define OPT_SHORT_SBULK 'B'
#define OPT_LONG_SBULK "sburst"
+#define OPT_SHORT_CTXFLAGS 'C'
+#define OPT_LONG_CTXFLAGS "ctxflags"
+
#define OPT_SHORT_PROMISC 'P'
#define OPT_LONG_PROMISC "promisc"
@@ -74,9 +77,16 @@ static const struct {
#define OPT_SHORT_VERBOSE 'v'
#define OPT_LONG_VERBOSE "verbose"
+#define OPT_SHORT_WINDOW 'w'
+#define OPT_LONG_WINDOW "initial-window"
+
+#define OPT_SHORT_TIMEWAIT 'W'
+#define OPT_LONG_TIMEWAIT "timewait"
+
static const struct option long_opt[] = {
{OPT_LONG_ARP, 1, 0, OPT_SHORT_ARP},
{OPT_LONG_SBULK, 1, 0, OPT_SHORT_SBULK},
+ {OPT_LONG_CTXFLAGS, 1, 0, OPT_SHORT_CTXFLAGS},
{OPT_LONG_PROMISC, 0, 0, OPT_SHORT_PROMISC},
{OPT_LONG_RBUFS, 1, 0, OPT_SHORT_RBUFS},
{OPT_LONG_SBUFS, 1, 0, OPT_SHORT_SBUFS},
@@ -89,6 +99,8 @@ static const struct option long_opt[] = {
{OPT_LONG_SEC_KEY, 1, 0, OPT_SHORT_SEC_KEY},
{OPT_LONG_LISTEN, 0, 0, OPT_SHORT_LISTEN},
{OPT_LONG_VERBOSE, 1, 0, OPT_SHORT_VERBOSE},
+ {OPT_LONG_WINDOW, 1, 0, OPT_SHORT_WINDOW},
+ {OPT_LONG_TIMEWAIT, 1, 0, OPT_SHORT_TIMEWAIT},
{NULL, 0, 0, 0}
};
@@ -760,7 +772,7 @@ parse_app_options(int argc, char **argv, struct netbe_cfg *cfg,
optind = 0;
optarg = NULL;
- while ((opt = getopt_long(argc, argv, "aB:LPR:S:TUb:f:s:v:H:K:",
+ while ((opt = getopt_long(argc, argv, "aB:C:LPR:S:TUb:f:s:v:H:K:W:w:",
long_opt, &opt_idx)) != EOF) {
if (opt == OPT_SHORT_ARP) {
cfg->arp = 1;
@@ -771,6 +783,14 @@ parse_app_options(int argc, char **argv, struct netbe_cfg *cfg,
"for option: \'%c\'\n",
__func__, optarg, opt);
ctx_prm->send_bulk_size = v;
+ } else if (opt == OPT_SHORT_CTXFLAGS) {
+ rc = parse_uint_val(NULL, optarg, &v);
+ if (rc < 0)
+ rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
+ "for option: \'%c\'\n",
+ __func__, optarg, opt);
+ ctx_prm->flags = v;
+ } else if (opt == OPT_SHORT_PROMISC) {
} else if (opt == OPT_SHORT_PROMISC) {
cfg->promisc = 1;
} else if (opt == OPT_SHORT_RBUFS) {
@@ -835,9 +855,21 @@ parse_app_options(int argc, char **argv, struct netbe_cfg *cfg,
}
memcpy(&ctx_prm->secret_key, optarg,
sizeof(ctx_prm->secret_key));
- }
-
- else {
+ } else if (opt == OPT_SHORT_WINDOW) {
+ rc = parse_uint_val(NULL, optarg, &v);
+ if (rc < 0)
+ rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
+ "for option: \'%c\'\n",
+ __func__, optarg, opt);
+ ctx_prm->icw = v;
+ } else if (opt == OPT_SHORT_TIMEWAIT) {
+ rc = parse_uint_val(NULL, optarg, &v);
+ if (rc < 0)
+ rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
+ "for option: \'%c\'\n",
+ __func__, optarg, opt);
+ ctx_prm->timewait = v;
+ } else {
rte_exit(EXIT_FAILURE,
"%s: unknown option: \'%c\'\n",
__func__, opt);
diff --git a/lib/libtle_dring/tle_dring.h b/lib/libtle_dring/tle_dring.h
index f589ece..9d3788a 100644
--- a/lib/libtle_dring/tle_dring.h
+++ b/lib/libtle_dring/tle_dring.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -21,6 +21,7 @@
#include <rte_common.h>
#include <rte_atomic.h>
#include <rte_memory.h>
+#include <rte_ring.h>
#include <rte_debug.h>
#ifdef __cplusplus
@@ -68,11 +69,13 @@ struct tle_drb {
struct tle_dring {
uint32_t flags;
struct {
+ uint32_t single; /**< true if single producer */
volatile uint32_t head; /**< producer head */
volatile uint32_t tail; /**< producer tail */
struct tle_drb * volatile crb; /**< block to enqueue to */
} prod __rte_cache_aligned;
struct {
+ uint32_t single; /**< true if single consumer */
volatile uint32_t head; /**< consumer head */
volatile uint32_t tail; /**< consumer tail */
struct tle_drb * volatile crb; /**< block to dequeue from */
@@ -259,6 +262,36 @@ tle_dring_sp_enqueue(struct tle_dring *dr, const void * const objs[],
return nb_obj;
}
+/**
+ * Enqueue several objects on the dring.
+ * Note that it is a caller responsibility to provide enough drbs
+ * to enqueue all requested objects.
+ *
+ * @param dr
+ * A pointer to the ring structure.
+ * @param objs
+ * An array of pointers to objects to enqueue.
+ * @param nb_obj
+ * The number of objects to add to the dring from the objs[].
+ * @param drbs
+ * An array of pointers to the drbs that can be used by the dring
+ * to perform enqueue operation.
+ * @param nb_drb
+ * at input: number of elements in the drbs[] array.
+ * at output: number of unused by the dring elements in the drbs[] array.
+ * @return
+ * - number of enqueued objects.
+ */
+static inline uint32_t
+tle_dring_enqueue(struct tle_dring *dr, const void * const objs[],
+ uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+ if (dr->prod.single == 0)
+ return tle_dring_mp_enqueue(dr, objs, nb_obj, drbs, nb_drb);
+ else
+ return tle_dring_sp_enqueue(dr, objs, nb_obj, drbs, nb_drb);
+}
+
/*
* helper routine, to dequeue objects from the ring.
*/
@@ -429,6 +462,39 @@ tle_dring_sc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
}
/**
+ * Dequeue several objects from the dring.
+ * Note, that it is a caller responsibility to provide drbs[] large
+ * enough to store pointers to all drbs that might become unused
+ * after that dequeue operation. It is a caller responsibility to manage
+ * unused drbs after the dequeue operation is completed
+ * (i.e mark them as free/reusable again, etc.).
+ *
+ * @param dr
+ * A pointer to the ring structure.
+ * @param objs
+ * An array of pointers to objects that will be dequeued.
+ * @param nb_obj
+ * The number of objects to dequeue from the dring.
+ * @param drbs
+ * An array of pointers to the drbs that will become unused after that
+ * dequeue operation.
+ * @param nb_drb
+ * at input: number of elements in the drbs[] array.
+ * at output: number of filled entries in the drbs[] array.
+ * @return
+ * - number of dequeued objects.
+ */
+static inline uint32_t
+tle_dring_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
+ struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+ if (dr->cons.single == 0)
+ return tle_dring_mc_dequeue(dr, objs, nb_obj, drbs, nb_drb);
+ else
+ return tle_dring_sc_dequeue(dr, objs, nb_obj, drbs, nb_drb);
+}
+
+/**
* Reset given dring to the initial state.
* Note, that information about all queued objects will be lost.
*
@@ -436,11 +502,14 @@ tle_dring_sc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
* A pointer to the dring structure.
*/
static inline void
-tle_dring_reset(struct tle_dring *dr)
+tle_dring_reset(struct tle_dring *dr, uint32_t flags)
{
memset(dr, 0, sizeof(*dr));
dr->prod.crb = &dr->dummy;
dr->cons.crb = &dr->dummy;
+ dr->prod.single = ((flags & RING_F_SP_ENQ) != 0);
+ dr->cons.single = ((flags & RING_F_SC_DEQ) != 0);
+ dr->flags = flags;
}
/**
diff --git a/lib/libtle_l4p/ctx.c b/lib/libtle_l4p/ctx.c
index 6eb33eb..910fc88 100644
--- a/lib/libtle_l4p/ctx.c
+++ b/lib/libtle_l4p/ctx.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -16,6 +16,7 @@
#include <string.h>
#include <rte_malloc.h>
#include <rte_errno.h>
+#include <rte_cycles.h>
#include <rte_ethdev.h>
#include <rte_ip.h>
@@ -77,6 +78,7 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm)
{
struct tle_ctx *ctx;
size_t sz;
+ uint64_t ms;
uint32_t i;
int32_t rc;
@@ -95,6 +97,10 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm)
return NULL;
}
+ /* caclulate closest shift to convert from cycles to ms (approximate) */
+ ms = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S;
+ ctx->cycles_ms_shift = sizeof(ms) * CHAR_BIT - __builtin_clzll(ms) - 1;
+
ctx->prm = *ctx_prm;
rc = tle_stream_ops[ctx_prm->proto].init_streams(ctx);
@@ -195,6 +201,7 @@ struct tle_dev *
tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm)
{
int32_t rc;
+ uint32_t df;
struct tle_dev *dev;
if (ctx == NULL || dev_prm == NULL || check_dev_prm(dev_prm) != 0) {
@@ -247,7 +254,9 @@ tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm)
}
/* setup TX data. */
- tle_dring_reset(&dev->tx.dr);
+ df = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
+ RING_F_SP_ENQ | RING_F_SC_DEQ;
+ tle_dring_reset(&dev->tx.dr, df);
if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0 &&
ctx->prm.proto == TLE_PROTO_UDP) {
diff --git a/lib/libtle_l4p/ctx.h b/lib/libtle_l4p/ctx.h
index cc32081..389d646 100644
--- a/lib/libtle_l4p/ctx.h
+++ b/lib/libtle_l4p/ctx.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -53,6 +53,7 @@ struct tle_dev {
struct tle_ctx {
struct tle_ctx_param prm;
+ uint32_t cycles_ms_shift; /* to convert from cycles to ms */
struct {
rte_spinlock_t lock;
uint32_t nb_free; /* number of free streams. */
diff --git a/lib/libtle_l4p/misc.h b/lib/libtle_l4p/misc.h
index 6450b67..9bff459 100644
--- a/lib/libtle_l4p/misc.h
+++ b/lib/libtle_l4p/misc.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -396,20 +396,103 @@ compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero)
return nb_pkt;
}
+static inline void
+free_mbufs(struct rte_mbuf *mb[], uint32_t num)
+{
+ uint32_t i;
+
+ for (i = 0; i != num; i++)
+ rte_pktmbuf_free(mb[i]);
+}
+
/* empty ring and free queued mbufs */
static inline void
empty_mbuf_ring(struct rte_ring *r)
{
- uint32_t i, n;
+ uint32_t n;
struct rte_mbuf *mb[MAX_PKT_BURST];
do {
n = _rte_ring_dequeue_burst(r, (void **)mb, RTE_DIM(mb));
- for (i = 0; i != n; i++)
- rte_pktmbuf_free(mb[i]);
+ free_mbufs(mb, n);
} while (n != 0);
}
+static inline uint32_t
+_mbus_to_iovec(struct iovec *iv, struct rte_mbuf *mb[], uint32_t num)
+{
+ uint32_t i, ns;
+ uint32_t len, slen, tlen;
+ struct rte_mbuf *m, *next;
+ const void *src;
+
+ for (i = 0; i != num; i++) {
+
+ m = mb[i];
+ tlen = 0;
+ ns = 0;
+
+ do {
+ slen = m->data_len;
+ src = rte_pktmbuf_mtod(m, const void *);
+ len = RTE_MIN(iv->iov_len - tlen, slen);
+ rte_memcpy((uint8_t *)iv->iov_base + tlen, src, len);
+ slen -= len;
+ tlen += len;
+ if (slen != 0)
+ break;
+ ns++;
+ next = m->next;
+ rte_pktmbuf_free_seg(m);
+ m = next;
+ } while (m != NULL);
+
+ iv->iov_base = (uint8_t *)iv->iov_base + tlen;
+ iv->iov_len -= tlen;
+
+ /* partly consumed mbuf */
+ if (m != NULL) {
+ m->pkt_len = mb[i]->pkt_len - tlen;
+ m->data_len = slen;
+ m->data_off += len;
+ m->nb_segs = mb[i]->nb_segs - ns;
+ mb[i] = m;
+ break;
+ }
+ }
+
+ return i;
+}
+
+static inline uint32_t
+_iovec_to_mbsegs(struct iovec *iv, uint32_t seglen, struct rte_mbuf *mb[],
+ uint32_t num)
+{
+ uint32_t i;
+ uint32_t len, slen, tlen;
+ struct rte_mbuf *m;
+ void *dst;
+
+ tlen = 0;
+ for (i = 0; i != num; i++) {
+
+ m = mb[i];
+ slen = rte_pktmbuf_tailroom(m);
+ slen = RTE_MIN(slen, seglen - m->data_len);
+ len = RTE_MIN(iv->iov_len - tlen, slen);
+ dst = rte_pktmbuf_append(m, len);
+ rte_memcpy(dst, (uint8_t *)iv->iov_base + tlen, len);
+ tlen += len;
+ if (len != slen)
+ break;
+ }
+
+ iv->iov_base = (uint8_t *)iv->iov_base + tlen;
+ iv->iov_len -= tlen;
+
+ return i;
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/libtle_l4p/stream.h b/lib/libtle_l4p/stream.h
index f3b5828..e76f126 100644
--- a/lib/libtle_l4p/stream.h
+++ b/lib/libtle_l4p/stream.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -147,6 +147,8 @@ stream_get_dest(struct tle_stream *s, const void *dst_addr,
return -ENOENT;
dev = dst->dev;
+ dst->ol_flags = dev->tx.ol_flags[s->type];
+
if (s->type == TLE_V4) {
struct ipv4_hdr *l3h;
l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len);
diff --git a/lib/libtle_l4p/stream_table.h b/lib/libtle_l4p/stream_table.h
index 29f1f63..033c306 100644
--- a/lib/libtle_l4p/stream_table.h
+++ b/lib/libtle_l4p/stream_table.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -23,9 +23,6 @@
extern "C" {
#endif
-/* current stbl entry contains packet. */
-#define STE_PKT 1
-
struct stbl_entry {
void *data;
};
@@ -138,18 +135,6 @@ stbl_find_entry(struct stbl *st, const union pkt_info *pi)
return ht->ent + rc;
}
-static inline int
-stbl_data_pkt(const void *p)
-{
- return ((uintptr_t)p & STE_PKT);
-}
-
-static inline void *
-stbl_get_pkt(const struct stbl_entry *se)
-{
- return (void *)((uintptr_t)se->data ^ STE_PKT);
-}
-
static inline void *
stbl_find_data(struct stbl *st, const union pkt_info *pi)
{
@@ -159,35 +144,6 @@ stbl_find_data(struct stbl *st, const union pkt_info *pi)
return (ent == NULL) ? NULL : ent->data;
}
-static inline void
-stbl_del_pkt(struct stbl *st, struct stbl_entry *se, const union pkt_info *pi)
-{
- uint32_t type;
- struct stbl_key k;
-
- se->data = NULL;
-
- type = pi->tf.type;
- stbl_pkt_fill_key(&k, pi, type);
- rte_hash_del_key(st->ht[type].t, &k);
-}
-
-static inline void
-stbl_del_pkt_lock(struct stbl *st, struct stbl_entry *se,
- const union pkt_info *pi)
-{
- uint32_t type;
- struct stbl_key k;
-
- se->data = NULL;
-
- type = pi->tf.type;
- stbl_pkt_fill_key(&k, pi, type);
- stbl_lock(st, type);
- rte_hash_del_key(st->ht[type].t, &k);
- stbl_unlock(st, type);
-}
-
#include "tcp_stream.h"
static inline void
@@ -235,8 +191,8 @@ stbl_add_stream_lock(struct stbl *st, const struct tle_tcp_stream *s)
}
static inline void
-stbl_del_stream_lock(struct stbl *st, struct stbl_entry *se,
- const struct tle_tcp_stream *s)
+stbl_del_stream(struct stbl *st, struct stbl_entry *se,
+ const struct tle_tcp_stream *s, uint32_t lock)
{
uint32_t type;
struct stbl_key k;
@@ -248,9 +204,11 @@ stbl_del_stream_lock(struct stbl *st, struct stbl_entry *se,
type = s->s.type;
stbl_stream_fill_key(&k, &s->s, type);
- stbl_lock(st, type);
+ if (lock != 0)
+ stbl_lock(st, type);
rte_hash_del_key(st->ht[type].t, &k);
- stbl_unlock(st, type);
+ if (lock != 0)
+ stbl_unlock(st, type);
}
#ifdef __cplusplus
diff --git a/lib/libtle_l4p/syncookie.h b/lib/libtle_l4p/syncookie.h
index da2e166..61bfce4 100644
--- a/lib/libtle_l4p/syncookie.h
+++ b/lib/libtle_l4p/syncookie.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -178,37 +178,40 @@ sync_check_ack(const union pkt_info *pi, uint32_t seq, uint32_t ack,
}
static inline void
-sync_get_opts(struct syn_opts *so, uintptr_t p, uint32_t len)
+sync_fill_tcb(struct tcb *tcb, const union seg_info *si, const union tsopt *to)
{
- so->ts = get_tms_opts(p, len);
- so->wscale = so->ts.ecr & SYNC_TMS_WSCALE_MASK;
-}
+ uint32_t ack, mss, seq, wscale;
-static inline void
-sync_fill_tcb(struct tcb *tcb, const union seg_info *si,
- const struct syn_opts *so)
-{
- tcb->rcv.nxt = si->seq;
- tcb->rcv.irs = si->seq - 1;
+ seq = si->seq;
+
+ tcb->rcv.nxt = seq;
+ tcb->rcv.irs = seq - 1;
+ tcb->snd.wu.wl1 = seq;
+
+ ack = si->ack;
+
+ tcb->snd.nxt = ack;
+ tcb->snd.una = ack;
+ tcb->snd.iss = ack - 1;
+ tcb->snd.rcvr = ack - 1;
+ tcb->snd.wu.wl2 = ack;
- tcb->snd.nxt = si->ack;
- tcb->snd.una = si->ack;
- tcb->snd.iss = si->ack - 1;
- tcb->snd.rcvr = tcb->snd.iss;
+ mss = si->mss;
- tcb->snd.wu.wl1 = si->seq;
- tcb->snd.wu.wl2 = si->ack;
+ tcb->snd.mss = mss;
+ tcb->so.mss = mss;
- tcb->so = *so;
+ tcb->snd.ts = to->ecr;
+ tcb->rcv.ts = to->val;
+ tcb->so.ts.raw = to->raw;
- tcb->snd.wscale = tcb->so.wscale;
- tcb->snd.mss = tcb->so.mss;
- tcb->snd.wnd = si->wnd << tcb->snd.wscale;
+ wscale = to->ecr & SYNC_TMS_WSCALE_MASK;
- tcb->snd.ts = tcb->so.ts.ecr;
- tcb->rcv.ts = tcb->so.ts.val;
+ tcb->snd.wscale = wscale;
+ tcb->snd.wnd = si->wnd << wscale;
+ tcb->so.wscale = wscale;
- tcb->rcv.wscale = (tcb->so.wscale == TCP_WSCALE_NONE) ?
+ tcb->rcv.wscale = (wscale == TCP_WSCALE_NONE) ?
TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
}
diff --git a/lib/libtle_l4p/tcp_ctl.h b/lib/libtle_l4p/tcp_ctl.h
index 32faaa2..bec1e76 100644
--- a/lib/libtle_l4p/tcp_ctl.h
+++ b/lib/libtle_l4p/tcp_ctl.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -30,15 +30,63 @@ extern "C" {
static inline void
tcp_stream_down(struct tle_tcp_stream *s)
{
- rwl_down(&s->rx.use);
- rwl_down(&s->tx.use);
+ if ((s->flags & TLE_CTX_FLAG_ST) == 0)
+ rwl_down(&s->use);
+ else
+ rte_atomic32_set(&s->use, INT32_MIN);
}
static inline void
tcp_stream_up(struct tle_tcp_stream *s)
{
- rwl_up(&s->rx.use);
- rwl_up(&s->tx.use);
+ int32_t v;
+
+ if ((s->flags & TLE_CTX_FLAG_ST) == 0)
+ rwl_up(&s->use);
+ else {
+ v = rte_atomic32_read(&s->use) - INT32_MIN;
+ rte_atomic32_set(&s->use, v);
+ }
+}
+
+static inline int
+tcp_stream_try_acquire(struct tle_tcp_stream *s)
+{
+ int32_t v;
+
+ if ((s->flags & TLE_CTX_FLAG_ST) == 0)
+ return rwl_try_acquire(&s->use);
+
+ v = rte_atomic32_read(&s->use) + 1;
+ rte_atomic32_set(&s->use, v);
+ return v;
+}
+
+static inline void
+tcp_stream_release(struct tle_tcp_stream *s)
+{
+ int32_t v;
+
+ if ((s->flags & TLE_CTX_FLAG_ST) == 0)
+ rwl_release(&s->use);
+ else {
+ v = rte_atomic32_read(&s->use) - 1;
+ rte_atomic32_set(&s->use, v);
+ }
+}
+
+static inline int
+tcp_stream_acquire(struct tle_tcp_stream *s)
+{
+ int32_t v;
+
+ if ((s->flags & TLE_CTX_FLAG_ST) == 0)
+ return rwl_acquire(&s->use);
+
+ v = rte_atomic32_read(&s->use) + 1;
+ if (v > 0)
+ rte_atomic32_set(&s->use, v);
+ return v;
}
/* calculate RCV.WND value based on size of stream receive buffer */
@@ -67,28 +115,28 @@ empty_tq(struct tle_tcp_stream *s)
static inline void
empty_rq(struct tle_tcp_stream *s)
{
- empty_mbuf_ring(s->rx.q);
+ uint32_t n;
+ struct rte_mbuf *mb[MAX_PKT_BURST];
+
+ do {
+ n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)mb,
+ RTE_DIM(mb));
+ free_mbufs(mb, n);
+ } while (n != 0);
+
tcp_ofo_reset(s->rx.ofo);
}
/* empty stream's listen queue */
static inline void
-empty_lq(struct tle_tcp_stream *s, struct stbl *st)
+empty_lq(struct tle_tcp_stream *s)
{
- uint32_t i, n;
- struct rte_mbuf *mb;
- union pkt_info pi;
- union seg_info si;
- struct stbl_entry *se[MAX_PKT_BURST];
+ uint32_t n;
+ struct tle_stream *ts[MAX_PKT_BURST];
do {
- n = _rte_ring_dequeue_burst(s->rx.q, (void **)se, RTE_DIM(se));
- for (i = 0; i != n; i++) {
- mb = stbl_get_pkt(se[i]);
- get_pkt_info(mb, &pi, &si);
- stbl_del_pkt_lock(st, se[i], &pi);
- rte_pktmbuf_free(mb);
- }
+ n = _rte_ring_dequeue_burst(s->rx.q, (void **)ts, RTE_DIM(ts));
+ tle_tcp_stream_close_bulk(ts, n);
} while (n != 0);
}
@@ -114,12 +162,13 @@ tcp_stream_reset(struct tle_ctx *ctx, struct tle_tcp_stream *s)
/* free stream's destination port */
stream_clear_ctx(ctx, &s->s);
if (uop == TCP_OP_LISTEN)
- empty_lq(s, st);
+ empty_lq(s);
}
if (s->ste != NULL) {
/* remove entry from RX streams table */
- stbl_del_stream_lock(st, s->ste, s);
+ stbl_del_stream(st, s->ste, s,
+ (s->flags & TLE_CTX_FLAG_ST) == 0);
s->ste = NULL;
empty_rq(s);
}
diff --git a/lib/libtle_l4p/tcp_misc.h b/lib/libtle_l4p/tcp_misc.h
index 9f19f69..0ca5429 100644
--- a/lib/libtle_l4p/tcp_misc.h
+++ b/lib/libtle_l4p/tcp_misc.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -54,6 +54,10 @@ extern "C" {
#define TCP6_OP_MSS (TCP6_NOP_MSS - TCP_TX_OPT_LEN_MAX)
+/* Initial Window Configuration parameter, probably will be configured during
+ * the startup in future */
+#define TCP_INITIAL_CWND_MAX 14600
+
/*
* TCP flags
*/
@@ -93,8 +97,8 @@ union seg_info {
struct {
uint32_t seq;
uint32_t ack;
- uint16_t hole1;
uint16_t wnd;
+ uint16_t mss; /* valid only at SYN time */
};
};
@@ -223,11 +227,10 @@ struct dack_info {
/* get current timestamp in ms */
static inline uint32_t
-tcp_get_tms(void)
+tcp_get_tms(uint32_t mshift)
{
- uint64_t ts, ms;
- ms = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S;
- ts = rte_get_tsc_cycles() / ms;
+ uint64_t ts;
+ ts = rte_get_tsc_cycles() >> mshift;
return ts;
}
@@ -248,8 +251,11 @@ static inline void
get_seg_info(const struct tcp_hdr *th, union seg_info *si)
{
__m128i v;
- const __m128i bswap_mask = _mm_set_epi8(15, 14, 13, 12, 10, 11, 9, 8,
- 4, 5, 6, 7, 0, 1, 2, 3);
+ const __m128i bswap_mask =
+ _mm_set_epi8(UINT8_MAX, UINT8_MAX, UINT8_MAX, UINT8_MAX,
+ UINT8_MAX, UINT8_MAX, 10, 11,
+ 4, 5, 6, 7,
+ 0, 1, 2, 3);
v = _mm_loadu_si128((const __m128i *)&th->sent_seq);
si->raw.x = _mm_shuffle_epi8(v, bswap_mask);
diff --git a/lib/libtle_l4p/tcp_rxq.h b/lib/libtle_l4p/tcp_rxq.h
index bddc28e..01f34fa 100644
--- a/lib/libtle_l4p/tcp_rxq.h
+++ b/lib/libtle_l4p/tcp_rxq.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -22,6 +22,11 @@
extern "C" {
#endif
+struct rxq_objs {
+ struct rte_mbuf **mb;
+ uint32_t num;
+};
+
static inline uint32_t
rx_ofo_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
struct rte_mbuf *mb[], uint32_t num)
@@ -142,6 +147,41 @@ rx_data_enqueue(struct tle_tcp_stream *s, uint32_t seq, uint32_t len,
return t;
}
+static inline uint32_t
+tcp_rxq_get_objs(struct tle_tcp_stream *s, struct rxq_objs obj[2])
+{
+ struct rte_ring *r;
+ uint32_t n, head, sz;
+
+ r = s->rx.q;
+
+ n = _rte_ring_mcs_dequeue_start(r, UINT32_MAX);
+ if (n == 0)
+ return 0;
+
+ sz = _rte_ring_get_size(r);
+ head = (r->cons.head - n) & _rte_ring_get_mask(r);
+
+ obj[0].mb = (struct rte_mbuf **)(_rte_ring_get_data(r) + head);
+ obj[1].mb = (struct rte_mbuf **)_rte_ring_get_data(r);
+
+ if (head + n <= sz) {
+ obj[0].num = n;
+ obj[1].num = 0;
+ return 1;
+ } else {
+ obj[0].num = sz - head;
+ obj[1].num = n + head - sz;
+ return 2;
+ }
+}
+
+static inline void
+tcp_rxq_consume(struct tle_tcp_stream *s, uint32_t num)
+{
+ _rte_ring_mcs_dequeue_finish(s->rx.q, num);
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c
index a1c7d09..30ed104 100644
--- a/lib/libtle_l4p/tcp_rxtx.c
+++ b/lib/libtle_l4p/tcp_rxtx.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -59,12 +59,12 @@ rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
struct tle_tcp_stream *s;
s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
- if (s == NULL || rwl_acquire(&s->rx.use) < 0)
+ if (s == NULL || tcp_stream_acquire(s) < 0)
return NULL;
/* check that we have a proper stream. */
if (s->tcb.state != TCP_ST_LISTEN) {
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
s = NULL;
}
@@ -84,11 +84,11 @@ rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
return NULL;
}
- if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0)
+ if (tcp_stream_acquire(s) < 0)
return NULL;
/* check that we have a proper stream. */
else if (s->tcb.state == TCP_ST_CLOSED) {
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
s = NULL;
}
@@ -164,6 +164,24 @@ stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
}
+static inline uint32_t
+get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
+{
+ uint32_t pid;
+ rte_atomic32_t *pa;
+
+ pa = &dev->tx.packet_id[type];
+
+ if (st == 0) {
+ pid = rte_atomic32_add_return(pa, num);
+ return pid - num;
+ } else {
+ pid = rte_atomic32_read(pa);
+ rte_atomic32_set(pa, pid + num);
+ return pid;
+ }
+}
+
static inline void
fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
uint32_t seq, uint8_t hlen, uint8_t flags)
@@ -357,7 +375,7 @@ tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
type = s->s.type;
dev = s->tx.dst.dev;
- pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num;
+ pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
k = 0;
tn = 0;
@@ -506,22 +524,16 @@ calc_smss(uint16_t mss, const struct tle_dest *dst)
}
/*
- * RFC 5681 3.1
- * If SMSS > 2190 bytes:
- * IW = 2 * SMSS bytes and MUST NOT be more than 2 segments
- * If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes):
- * IW = 3 * SMSS bytes and MUST NOT be more than 3 segments
- * if SMSS <= 1095 bytes:
- * IW = 4 * SMSS bytes and MUST NOT be more than 4 segments
+ * RFC 6928 2
+ * min (10*MSS, max (2*MSS, 14600))
+ *
+ * or using user provided initial congestion window (icw)
+ * min (10*MSS, max (2*MSS, icw))
*/
static inline uint32_t
-initial_cwnd(uint16_t smss)
+initial_cwnd(uint32_t smss, uint32_t icw)
{
- if (smss > 2190)
- return 2 * smss;
- else if (smss > 1095)
- return 3 * smss;
- return 4 * smss;
+ return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
}
/*
@@ -561,7 +573,7 @@ send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
dst = &s->tx.dst;
type = s->s.type;
- pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1;
+ pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
if (rc == 0)
@@ -657,7 +669,7 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
return -EINVAL;
dev = dst.dev;
- pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1;
+ pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
@@ -763,8 +775,8 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
}
static inline int
-restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
- const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb,
+restore_syn_opt(union seg_info *si, union tsopt *to,
+ const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb,
uint32_t hash_alg, rte_xmm_t *secret_key)
{
int32_t rc;
@@ -778,12 +790,12 @@ restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
if (rc < 0)
return rc;
- so->mss = rc;
+ si->mss = rc;
th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
mb->l2_len + mb->l3_len);
len = mb->l4_len - sizeof(*th);
- sync_get_opts(so, (uintptr_t)(th + 1), len);
+ to[0] = get_tms_opts((uintptr_t)(th + 1), len);
return 0;
}
@@ -814,9 +826,11 @@ static inline int
stream_fill_dest(struct tle_tcp_stream *s)
{
int32_t rc;
+ uint32_t type;
const void *da;
- if (s->s.type == TLE_V4)
+ type = s->s.type;
+ if (type == TLE_V4)
da = &s->s.ipv4.addr.src;
else
da = &s->s.ipv6.addr.src;
@@ -830,7 +844,7 @@ stream_fill_dest(struct tle_tcp_stream *s)
*/
static inline int
accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
- struct tle_tcp_stream *cs, const struct syn_opts *so,
+ struct tle_tcp_stream *cs, const union tsopt *to,
uint32_t tms, const union pkt_info *pi, const union seg_info *si)
{
int32_t rc;
@@ -857,7 +871,7 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
}
/* setup TCB */
- sync_fill_tcb(&cs->tcb, si, so);
+ sync_fill_tcb(&cs->tcb, si, to);
cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
/*
@@ -871,8 +885,9 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
} else
cs->tcb.snd.rto = TCP_RTO_DEFAULT;
- /* copy streams type. */
+ /* copy streams type & flags. */
cs->s.type = ps->s.type;
+ cs->flags = ps->flags;
/* retrive and cache destination information. */
rc = stream_fill_dest(cs);
@@ -883,8 +898,9 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
/* setup congestion variables */
- cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
+ cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
+ cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
cs->tcb.state = TCP_ST_ESTABLISHED;
@@ -909,14 +925,14 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
*/
static inline int
rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
- const union pkt_info *pi, const union seg_info *si,
+ const union pkt_info *pi, union seg_info *si,
uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
{
int32_t rc;
struct tle_ctx *ctx;
struct tle_stream *ts;
struct tle_tcp_stream *cs;
- struct syn_opts so;
+ union tsopt to;
*csp = NULL;
@@ -924,7 +940,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
return -EINVAL;
ctx = s->s.ctx;
- rc = restore_syn_opt(&so, pi, si, tms, mb, ctx->prm.hash_alg,
+ rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg,
&ctx->prm.secret_key);
if (rc < 0)
return rc;
@@ -936,7 +952,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
return ENFILE;
/* prepare stream to handle new connection */
- if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
+ if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
/* put new stream in the accept queue */
if (_rte_ring_enqueue_burst(s->rx.q,
@@ -947,7 +963,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
/* cleanup on failure */
tcp_stream_down(cs);
- stbl_del_pkt(st, cs->ste, pi);
+ stbl_del_stream(st, cs->ste, cs, 0);
cs->ste = NULL;
}
@@ -1008,6 +1024,17 @@ rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
}
static void
+stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
+{
+ if (rto != 0) {
+ s->tcb.state = TCP_ST_TIME_WAIT;
+ s->tcb.snd.rto = rto;
+ timer_reset(s);
+ } else
+ stream_term(s);
+}
+
+static void
rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
{
uint32_t state;
@@ -1027,17 +1054,13 @@ rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
s->err.cb.func(s->err.cb.data, &s->s);
} else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
rsp->flags |= TCP_FLAG_ACK;
- if (ackfin != 0) {
- s->tcb.state = TCP_ST_TIME_WAIT;
- s->tcb.snd.rto = TCP_RTO_2MSL;
- timer_reset(s);
- } else
+ if (ackfin != 0)
+ stream_timewait(s, s->tcb.snd.rto_tw);
+ else
s->tcb.state = TCP_ST_CLOSING;
} else if (state == TCP_ST_FIN_WAIT_2) {
rsp->flags |= TCP_FLAG_ACK;
- s->tcb.state = TCP_ST_TIME_WAIT;
- s->tcb.snd.rto = TCP_RTO_2MSL;
- timer_reset(s);
+ stream_timewait(s, s->tcb.snd.rto_tw);
} else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
stream_term(s);
}
@@ -1144,7 +1167,9 @@ rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
static inline void
dack_info_init(struct dack_info *tack, const struct tcb *tcb)
{
- memset(tack, 0, sizeof(*tack));
+ static const struct dack_info zero_dack;
+
+ tack[0] = zero_dack;
tack->ack = tcb->snd.una;
tack->segs.dup = tcb->rcv.dupack;
tack->wu.raw = tcb->snd.wu.raw;
@@ -1488,9 +1513,7 @@ rx_ackfin(struct tle_tcp_stream *s)
timer_stop(s);
s->tcb.state = TCP_ST_FIN_WAIT_2;
} else if (state == TCP_ST_CLOSING) {
- s->tcb.state = TCP_ST_TIME_WAIT;
- s->tcb.snd.rto = TCP_RTO_2MSL;
- timer_reset(s);
+ stream_timewait(s, s->tcb.snd.rto_tw);
}
}
@@ -1554,7 +1577,7 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
s->tcb.snd.wscale = so.wscale;
/* setup congestion variables */
- s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss);
+ s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
s->tcb.snd.ssthresh = s->tcb.snd.wnd;
s->tcb.rcv.ts = so.ts.val;
@@ -1720,9 +1743,9 @@ rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
{
uint32_t i;
- if (rwl_acquire(&s->rx.use) > 0) {
+ if (tcp_stream_acquire(s) > 0) {
i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
return i;
}
@@ -1735,7 +1758,7 @@ rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
static inline uint32_t
rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
- const union pkt_info pi[], const union seg_info si[],
+ const union pkt_info pi[], union seg_info si[],
struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
uint32_t num)
{
@@ -1809,7 +1832,7 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
k = num - i;
}
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
return num - k;
}
@@ -1850,7 +1873,7 @@ rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
}
}
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
return num - k;
}
@@ -1859,7 +1882,8 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
{
struct stbl *st;
- uint32_t i, j, k, n, t, ts;
+ struct tle_ctx *ctx;
+ uint32_t i, j, k, mt, n, t, ts;
uint64_t csf;
union pkt_info pi[num];
union seg_info si[num];
@@ -1868,8 +1892,10 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
uint32_t raw;
} stu;
- ts = tcp_get_tms();
- st = CTX_TCP_STLB(dev->ctx);
+ ctx = dev->ctx;
+ ts = tcp_get_tms(ctx->cycles_ms_shift);
+ st = CTX_TCP_STLB(ctx);
+ mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
stu.raw = 0;
@@ -1887,7 +1913,7 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
pi[i].tf.type, IPPROTO_TCP) != 0)
pi[i].csf = csf;
- stu.t[t] = 1;
+ stu.t[t] = mt;
}
if (stu.t[TLE_V4] != 0)
@@ -1936,7 +1962,7 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
struct tle_tcp_stream *s;
s = TCP_STREAM(ts);
- n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
+ n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
if (n == 0)
return 0;
@@ -1945,9 +1971,9 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
* then rearm stream RX event.
*/
if (n == num && rte_ring_count(s->rx.q) != 0) {
- if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
+ if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
tle_event_raise(s->rx.ev);
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
}
return n;
@@ -2066,7 +2092,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
/* fill pkt info to generate seq.*/
stream_fill_pkt_info(s, &pi);
- tms = tcp_get_tms();
+ tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
s->tcb.so.ts.val = tms;
s->tcb.so.ts.ecr = 0;
s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
@@ -2116,7 +2142,7 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
if (type >= TLE_VNUM)
return -EINVAL;
- if (rwl_try_acquire(&s->tx.use) > 0) {
+ if (tcp_stream_try_acquire(s) > 0) {
rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
TCP_ST_SYN_SENT);
rc = (rc == 0) ? -EDEADLK : 0;
@@ -2124,14 +2150,14 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
rc = -EINVAL;
if (rc != 0) {
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
return rc;
}
/* fill stream, prepare and transmit syn pkt */
s->tcb.uop |= TCP_OP_CONNECT;
rc = tx_syn(s, addr);
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
/* error happened, do a cleanup */
if (rc != 0)
@@ -2147,7 +2173,7 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
struct tle_tcp_stream *s;
s = TCP_STREAM(ts);
- n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
+ n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
if (n == 0)
return 0;
@@ -2156,14 +2182,76 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
* then rearm stream RX event.
*/
if (n == num && rte_ring_count(s->rx.q) != 0) {
- if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
+ if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
tle_event_raise(s->rx.ev);
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
}
return n;
}
+ssize_t
+tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
+ int iovcnt)
+{
+ int32_t i;
+ uint32_t mn, n, tn;
+ size_t sz;
+ struct tle_tcp_stream *s;
+ struct iovec iv;
+ struct rxq_objs mo[2];
+
+ s = TCP_STREAM(ts);
+
+ /* get group of packets */
+ mn = tcp_rxq_get_objs(s, mo);
+ if (mn == 0)
+ return 0;
+
+ sz = 0;
+ n = 0;
+ for (i = 0; i != iovcnt; i++) {
+ iv = iov[i];
+ sz += iv.iov_len;
+ n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n);
+ if (iv.iov_len != 0) {
+ sz -= iv.iov_len;
+ break;
+ }
+ }
+
+ tn = n;
+
+ if (i != iovcnt && mn != 1) {
+ n = 0;
+ do {
+ sz += iv.iov_len;
+ n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n);
+ if (iv.iov_len != 0) {
+ sz -= iv.iov_len;
+ break;
+ }
+ if (i + 1 != iovcnt)
+ iv = iov[i + 1];
+ } while (++i != iovcnt);
+ tn += n;
+ }
+
+ tcp_rxq_consume(s, tn);
+
+ /*
+ * if we still have packets to read,
+ * then rearm stream RX event.
+ */
+ if (i == iovcnt && rte_ring_count(s->rx.q) != 0) {
+ if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ tcp_stream_release(s);
+ }
+
+ return sz;
+}
+
static inline int32_t
tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
struct rte_mbuf *segs[], uint32_t num)
@@ -2176,16 +2264,16 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
0, TCP_FLAG_ACK, 0, 0);
if (rc != 0) {
- free_segments(segs, num);
+ free_mbufs(segs, num);
break;
}
}
if (i == num) {
/* queue packets for further transmission. */
- rc = _rte_ring_mp_enqueue_bulk(s->tx.q, (void **)segs, num);
+ rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
if (rc != 0)
- free_segments(segs, num);
+ free_mbufs(segs, num);
}
return rc;
@@ -2194,17 +2282,16 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
uint16_t
tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
{
- uint32_t i, j, k, mss, n, state, type;
+ uint32_t i, j, k, mss, n, state;
int32_t rc;
uint64_t ol_flags;
struct tle_tcp_stream *s;
- struct tle_dev *dev;
struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
s = TCP_STREAM(ts);
/* mark stream as not closable. */
- if (rwl_acquire(&s->tx.use) < 0) {
+ if (tcp_stream_acquire(s) < 0) {
rte_errno = EAGAIN;
return 0;
}
@@ -2212,14 +2299,12 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
state = s->tcb.state;
if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
rte_errno = ENOTCONN;
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
return 0;
}
mss = s->tcb.snd.mss;
- dev = s->tx.dst.dev;
- type = s->s.type;
- ol_flags = dev->tx.ol_flags[type];
+ ol_flags = s->tx.dst.ol_flags;
k = 0;
rc = 0;
@@ -2237,7 +2322,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
if (i != k) {
/* queue packets for further transmission. */
- n = _rte_ring_mp_enqueue_burst(s->tx.q,
+ n = _rte_ring_enqueue_burst(s->tx.q,
(void **)pkt + k, (i - k));
k += n;
@@ -2246,7 +2331,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
* remove pkt l2/l3 headers, restore ol_flags
*/
if (i != k) {
- ol_flags = ~dev->tx.ol_flags[type];
+ ol_flags = ~s->tx.dst.ol_flags;
for (j = k; j != i; j++) {
rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
pkt[j]->l3_len +
@@ -2271,7 +2356,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
break;
}
- rc = tx_segments(s, dev->tx.ol_flags[type], segs, rc);
+ rc = tx_segments(s, ol_flags, segs, rc);
if (rc == 0) {
/* free the large mbuf */
rte_pktmbuf_free(pkt[i]);
@@ -2290,11 +2375,120 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
tle_event_raise(s->tx.ev);
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
return k;
}
+ssize_t
+tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
+ const struct iovec *iov, int iovcnt)
+{
+ int32_t i, rc;
+ uint32_t j, k, n, num, slen, state;
+ uint64_t ol_flags;
+ size_t sz, tsz;
+ struct tle_tcp_stream *s;
+ struct iovec iv;
+ struct rte_mbuf *mb[2 * MAX_PKT_BURST];
+
+ s = TCP_STREAM(ts);
+
+ /* mark stream as not closable. */
+ if (tcp_stream_acquire(s) < 0) {
+ rte_errno = EAGAIN;
+ return -1;
+ }
+
+ state = s->tcb.state;
+ if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
+ rte_errno = ENOTCONN;
+ tcp_stream_release(s);
+ return -1;
+ }
+
+ /* figure out how many mbufs do we need */
+ tsz = 0;
+ for (i = 0; i != iovcnt; i++)
+ tsz += iov[i].iov_len;
+
+ slen = rte_pktmbuf_data_room_size(mp);
+ slen = RTE_MIN(slen, s->tcb.snd.mss);
+
+ num = (tsz + slen - 1) / slen;
+ n = rte_ring_free_count(s->tx.q);
+ num = RTE_MIN(num, n);
+ n = RTE_MIN(num, RTE_DIM(mb));
+
+ /* allocate mbufs */
+ if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) {
+ rte_errno = ENOMEM;
+ tcp_stream_release(s);
+ return -1;
+ }
+
+ /* copy data into the mbufs */
+ k = 0;
+ sz = 0;
+ for (i = 0; i != iovcnt; i++) {
+ iv = iov[i];
+ sz += iv.iov_len;
+ k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k);
+ if (iv.iov_len != 0) {
+ sz -= iv.iov_len;
+ break;
+ }
+ }
+
+ /* partially filled segment */
+ k += (k != n && mb[k]->data_len != 0);
+
+ /* fill pkt headers */
+ ol_flags = s->tx.dst.ol_flags;
+
+ for (j = 0; j != k; j++) {
+ rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags,
+ s->s.port, 0, TCP_FLAG_ACK, 0, 0);
+ if (rc != 0)
+ break;
+ }
+
+ /* if no error encountered, then enqueue pkts for transmission */
+ if (k == j)
+ k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j);
+ else
+ k = 0;
+
+ if (k != j) {
+
+ /* free pkts that were not enqueued */
+ free_mbufs(mb + k, j - k);
+
+ /* our last segment can be partially filled */
+ sz += slen - sz % slen;
+ sz -= (j - k) * slen;
+
+ /* report an error */
+ if (rc != 0) {
+ rte_errno = -rc;
+ sz = -1;
+ }
+ }
+
+ if (k != 0) {
+
+ /* notify BE about more data to send */
+ txs_enqueue(s->s.ctx, s);
+
+ /* if possible, re-arm stream write event. */
+ if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
+ tle_event_raise(s->tx.ev);
+ }
+
+ tcp_stream_release(s);
+ return sz;
+}
+
/* send data and FIN (if needed) */
static inline void
tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
@@ -2376,6 +2570,18 @@ rto_stream(struct tle_tcp_stream *s, uint32_t tms)
} else if (state == TCP_ST_SYN_SENT) {
/* resending SYN */
s->tcb.so.ts.val = tms;
+
+ /* According to RFC 6928 2:
+ * To reduce the chance for spurious SYN or SYN/ACK
+ * retransmission, it is RECOMMENDED that
+ * implementations refrain from resetting the initial
+ * window to 1 segment, unless there have been more
+ * than one SYN or SYN/ACK retransmissions or true loss
+ * detection has been made.
+ */
+ if (s->tcb.snd.nb_retx != 0)
+ s->tcb.snd.cwnd = s->tcb.snd.mss;
+
send_ack(s, tms, TCP_FLAG_SYN);
} else if (state == TCP_ST_TIME_WAIT) {
@@ -2405,7 +2611,7 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
/* process streams with RTO exipred */
tw = CTX_TCP_TMWHL(ctx);
- tms = tcp_get_tms();
+ tms = tcp_get_tms(ctx->cycles_ms_shift);
tle_timer_expire(tw, tms);
k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
@@ -2414,9 +2620,9 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
s = rs[i];
s->timer.handle = NULL;
- if (rwl_try_acquire(&s->tx.use) > 0)
+ if (tcp_stream_try_acquire(s) > 0)
rto_stream(s, tms);
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
}
/* process streams from to-send queue */
@@ -2428,11 +2634,11 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
s = rs[i];
rte_atomic32_set(&s->tx.arm, 0);
- if (rwl_try_acquire(&s->tx.use) > 0)
+ if (tcp_stream_try_acquire(s) > 0)
tx_stream(s, tms);
else
txs_enqueue(s->s.ctx, s);
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
}
/* collect streams to close from the death row */
diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c
index 99791d0..4e9ddb7 100644
--- a/lib/libtle_l4p/tcp_stream.c
+++ b/lib/libtle_l4p/tcp_stream.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -32,8 +32,7 @@ static void
unuse_stream(struct tle_tcp_stream *s)
{
s->s.type = TLE_VNUM;
- rte_atomic32_set(&s->rx.use, INT32_MIN);
- rte_atomic32_set(&s->tx.use, INT32_MIN);
+ rte_atomic32_set(&s->use, INT32_MIN);
}
static void
@@ -99,14 +98,17 @@ static int
init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s)
{
size_t bsz, rsz, sz;
- uint32_t i, k, n, nb;
+ uint32_t f, i, k, n, nb;
struct tle_drb *drb;
char name[RTE_RING_NAMESIZE];
+ f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
+ (RING_F_SP_ENQ | RING_F_SC_DEQ);
+
/* init RX part. */
n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
- s->rx.q = alloc_ring(n, RING_F_SP_ENQ, ctx->prm.socket_id);
+ s->rx.q = alloc_ring(n, f | RING_F_SP_ENQ, ctx->prm.socket_id);
if (s->rx.q == NULL)
return -ENOMEM;
@@ -117,7 +119,7 @@ init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s)
/* init TX part. */
n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
- s->tx.q = alloc_ring(n, RING_F_SC_DEQ, ctx->prm.socket_id);
+ s->tx.q = alloc_ring(n, f | RING_F_SC_DEQ, ctx->prm.socket_id);
if (s->tx.q == NULL)
return -ENOMEM;
@@ -145,7 +147,7 @@ init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s)
}
snprintf(name, sizeof(name), "%p@%zu", s, sz);
- rte_ring_init(s->tx.drb.r, name, n, 0);
+ rte_ring_init(s->tx.drb.r, name, n, f);
for (i = 0; i != k; i++) {
drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
@@ -177,24 +179,27 @@ tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
}
static struct tle_timer_wheel *
-alloc_timers(uint32_t num, int32_t socket)
+alloc_timers(uint32_t num, uint32_t mshift, int32_t socket)
{
struct tle_timer_wheel_args twprm;
twprm.tick_size = TCP_RTO_GRANULARITY;
twprm.max_timer = num;
twprm.socket_id = socket;
- return tle_timer_create(&twprm, tcp_get_tms());
+ return tle_timer_create(&twprm, tcp_get_tms(mshift));
}
static int
tcp_init_streams(struct tle_ctx *ctx)
{
size_t sz;
- uint32_t i;
+ uint32_t f, i;
int32_t rc;
struct tcp_streams *ts;
+ f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
+ (RING_F_SP_ENQ | RING_F_SC_DEQ);
+
sz = sizeof(*ts) + sizeof(ts->s[0]) * ctx->prm.max_streams;
ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
ctx->prm.socket_id);
@@ -211,14 +216,15 @@ tcp_init_streams(struct tle_ctx *ctx)
ctx->streams.buf = ts;
STAILQ_INIT(&ctx->streams.free);
- ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->prm.socket_id);
+ ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->cycles_ms_shift,
+ ctx->prm.socket_id);
if (ts->tmr == NULL) {
TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
ctx, rte_errno);
rc = -ENOMEM;
} else {
ts->tsq = alloc_ring(ctx->prm.max_streams,
- RING_F_SC_DEQ, ctx->prm.socket_id);
+ f | RING_F_SC_DEQ, ctx->prm.socket_id);
if (ts->tsq == NULL)
rc = -ENOMEM;
else
@@ -329,8 +335,13 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
s->err.cb = prm->cfg.err_cb;
/* store other params */
+ s->flags = ctx->prm.flags;
s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
TLE_TCP_DEFAULT_RETRIES;
+ s->tcb.snd.cwnd = (ctx->prm.icw == 0) ? TCP_INITIAL_CWND_MAX :
+ ctx->prm.icw;
+ s->tcb.snd.rto_tw = (ctx->prm.timewait == TLE_TCP_TIMEWAIT_DEFAULT) ?
+ TCP_RTO_2MSL : ctx->prm.timewait;
tcp_stream_up(s);
return &s->s;
@@ -438,15 +449,6 @@ tle_tcp_stream_close(struct tle_stream *ts)
return -EINVAL;
ctx = s->s.ctx;
-
- /* reset stream events if any. */
- if (s->rx.ev != NULL)
- tle_event_idle(s->rx.ev);
- if (s->tx.ev != NULL)
- tle_event_idle(s->tx.ev);
- if (s->err.ev != NULL)
- tle_event_idle(s->err.ev);
-
return stream_close(ctx, s);
}
@@ -505,7 +507,7 @@ tle_tcp_stream_listen(struct tle_stream *ts)
return -EINVAL;
/* mark stream as not closable. */
- if (rwl_try_acquire(&s->rx.use) > 0) {
+ if (tcp_stream_try_acquire(s) > 0) {
rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
TCP_ST_LISTEN);
if (rc != 0) {
@@ -517,7 +519,7 @@ tle_tcp_stream_listen(struct tle_stream *ts)
} else
rc = -EINVAL;
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
return rc;
}
@@ -527,17 +529,12 @@ tle_tcp_stream_listen(struct tle_stream *ts)
static inline int
stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
{
- int32_t rc1, rc2;
struct tle_tcp_stream *s;
s = TCP_STREAM(ts);
- rc1 = rwl_try_acquire(&s->rx.use);
- rc2 = rwl_try_acquire(&s->tx.use);
-
- if (rc1 < 0 || rc2 < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
- rwl_release(&s->tx.use);
- rwl_release(&s->rx.use);
+ if (tcp_stream_try_acquire(s) < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
+ tcp_stream_release(s);
return -EINVAL;
}
@@ -581,9 +578,7 @@ stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
s->err.cb.func(s->err.cb.data, &s->s);
}
- rwl_release(&s->tx.use);
- rwl_release(&s->rx.use);
-
+ tcp_stream_release(s);
return 0;
}
@@ -606,13 +601,13 @@ tle_tcp_stream_update_cfg(struct tle_stream *ts[],
}
int
-tle_tcp_stream_get_mss(const struct tle_stream * stream)
+tle_tcp_stream_get_mss(const struct tle_stream * ts)
{
- struct tle_tcp_stream *tcp;
+ struct tle_tcp_stream *s;
- if (stream == NULL)
+ if (ts == NULL)
return -EINVAL;
- tcp = TCP_STREAM(stream);
- return tcp->tcb.snd.mss;
+ s = TCP_STREAM(ts);
+ return s->tcb.snd.mss;
}
diff --git a/lib/libtle_l4p/tcp_stream.h b/lib/libtle_l4p/tcp_stream.h
index 04c2f88..4629fe6 100644
--- a/lib/libtle_l4p/tcp_stream.h
+++ b/lib/libtle_l4p/tcp_stream.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -82,6 +82,7 @@ struct tcb {
uint32_t cwnd; /* congestion window */
uint32_t ssthresh; /* slow start threshold */
uint32_t rto; /* retransmission timeout */
+ uint32_t rto_tw; /* TIME_WAIT retransmission timeout */
uint32_t iss; /* initial send sequence */
uint16_t mss;
uint8_t wscale;
@@ -91,11 +92,13 @@ struct tcb {
struct syn_opts so; /* initial syn options. */
};
-
struct tle_tcp_stream {
struct tle_stream s;
+ uint32_t flags;
+ rte_atomic32_t use;
+
struct stbl_entry *ste; /* entry in streams table. */
struct tcb tcb;
@@ -109,7 +112,6 @@ struct tle_tcp_stream {
} err;
struct {
- rte_atomic32_t use;
struct rte_ring *q; /* listen (syn) queue */
struct ofo *ofo;
struct tle_event *ev; /* user provided recv event. */
@@ -117,7 +119,6 @@ struct tle_tcp_stream {
} rx __rte_cache_aligned;
struct {
- rte_atomic32_t use;
rte_atomic32_t arm; /* when > 0 stream is in to-send queue */
struct {
uint32_t nb_elem; /* number of objects per drb. */
diff --git a/lib/libtle_l4p/tcp_tx_seg.h b/lib/libtle_l4p/tcp_tx_seg.h
index 3a80fdd..a8d2425 100644
--- a/lib/libtle_l4p/tcp_tx_seg.h
+++ b/lib/libtle_l4p/tcp_tx_seg.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -20,15 +20,6 @@
extern "C" {
#endif
-static inline void
-free_segments(struct rte_mbuf *mb[], uint32_t num)
-{
- uint32_t i;
-
- for (i = 0; i != num; i++)
- rte_pktmbuf_free(mb[i]);
-}
-
static inline int32_t
tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num,
const struct tle_dest *dst, uint16_t mss)
@@ -53,7 +44,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num,
/* Allocate direct buffer */
out_pkt = rte_pktmbuf_alloc(dst->head_mp);
if (out_pkt == NULL) {
- free_segments(mbout, nbseg);
+ free_mbufs(mbout, nbseg);
return -ENOMEM;
}
@@ -67,7 +58,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num,
out_seg = rte_pktmbuf_alloc(dst->head_mp);
if (out_seg == NULL) {
rte_pktmbuf_free(out_pkt);
- free_segments(mbout, nbseg);
+ free_mbufs(mbout, nbseg);
return -ENOMEM;
}
out_seg_prev->next = out_seg;
diff --git a/lib/libtle_l4p/tle_ctx.h b/lib/libtle_l4p/tle_ctx.h
index 144dbe7..0ea4668 100644
--- a/lib/libtle_l4p/tle_ctx.h
+++ b/lib/libtle_l4p/tle_ctx.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -80,8 +80,9 @@ struct tle_dev_param {
struct tle_dest {
struct rte_mempool *head_mp;
/**< MP for fragment headers and control packets. */
- struct tle_dev *dev; /**< device to send packets through. */
- uint16_t mtu; /**< MTU for given destination. */
+ struct tle_dev *dev; /**< device to send packets through. */
+ uint64_t ol_flags; /**< tx ofload flags. */
+ uint16_t mtu; /**< MTU for given destination. */
uint8_t l2_len; /**< L2 header length. */
uint8_t l3_len; /**< L3 header length. */
uint8_t hdr[TLE_DST_MAX_HDR]; /**< L2/L3 headers. */
@@ -103,6 +104,10 @@ enum {
TLE_HASH_NUM
};
+enum {
+ TLE_CTX_FLAG_ST = 1, /**< ctx will be used by single thread */
+};
+
struct tle_ctx_param {
int32_t socket_id; /**< socket ID to allocate memory for. */
uint32_t proto; /**< L4 proto to handle. */
@@ -110,6 +115,7 @@ struct tle_ctx_param {
uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */
uint32_t max_stream_sbufs; /**< max send mbufs per stream. */
uint32_t send_bulk_size; /**< expected # of packets per send call. */
+ uint32_t flags; /**< specific flags */
int (*lookup4)(void *opaque, const struct in_addr *addr,
struct tle_dest *res);
@@ -127,9 +133,19 @@ struct tle_ctx_param {
/**< hash algorithm to be used to generate sequence number. */
rte_xmm_t secret_key;
/**< secret key to be used to calculate the hash. */
+
+ uint32_t icw; /**< initial congestion window, default is 2*MSS if 0. */
+ uint32_t timewait;
+ /**< TCP TIME_WAIT state timeout duration in milliseconds,
+ * default 2MSL, if UINT32_MAX */
};
/**
+ * use default TIMEWAIT timeout value.
+ */
+#define TLE_TCP_TIMEWAIT_DEFAULT UINT32_MAX
+
+/**
* create L4 processing context.
* @param ctx_prm
* Parameters used to create and initialise the L4 context.
diff --git a/lib/libtle_l4p/tle_tcp.h b/lib/libtle_l4p/tle_tcp.h
index 9086658..b0cbda6 100644
--- a/lib/libtle_l4p/tle_tcp.h
+++ b/lib/libtle_l4p/tle_tcp.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -120,6 +120,16 @@ tle_tcp_stream_get_addr(const struct tle_stream *s,
struct tle_tcp_stream_addr *addr);
/**
+ * Get current TCP maximum segment size
+ * @param ts
+ * Stream to retrieve MSS information from.
+ * @return
+ * Maximum segment size in bytes, if successful.
+ * Negative on failure.
+ */
+int tle_tcp_stream_get_mss(const struct tle_stream *ts);
+
+/**
* Client mode connect API.
*/
@@ -258,6 +268,28 @@ uint16_t tle_tcp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[],
uint16_t num);
/**
+ * Reads iovcnt buffers from the for given TCP stream.
+ * Note that the stream has to be in connected state.
+ * Data ordering is preserved.
+ * Buffers are processed in array order.
+ * This means that the function will comppletely fill iov[0]
+ * before proceeding to iov[1], and so on.
+ * If there is insufficient data, then not all buffers pointed to by iov
+ * may be filled.
+ * @param ts
+ * TCP stream to receive data from.
+ * @param iov
+ * Points to an array of iovec structures.
+ * @param iovcnt
+ * Number of elements in the *iov* array.
+ * @return
+ * On success, number of bytes read in the stream receive buffer.
+ * In case of error, returns -1 and error code will be set in rte_errno.
+ */
+ssize_t tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
+ int iovcnt);
+
+/**
* Consume and queue up to *num* packets, that will be sent eventually
* by tle_tcp_tx_bulk().
* Note that the stream has to be in connected state.
@@ -281,7 +313,7 @@ uint16_t tle_tcp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[],
* number of packets successfully queued in the stream send buffer.
* In case of error, error code can be set in rte_errno.
* Possible rte_errno errors include:
- * - EAGAIN - operation can be perfomed right now
+ * - EAGAIN - operation can't be perfomed right now
* (most likely close() was perfomed on that stream allready).
* - ENOTCONN - the stream is not connected.
*/
@@ -289,6 +321,32 @@ uint16_t tle_tcp_stream_send(struct tle_stream *s, struct rte_mbuf *pkt[],
uint16_t num);
/**
+ * Writes iovcnt buffers of data described by iov to the for given TCP stream.
+ * Note that the stream has to be in connected state.
+ * Data ordering is preserved.
+ * Buffers are processed in array order.
+ * This means that the function will write out the entire contents of iov[0]
+ * before proceeding to iov[1], and so on.
+ * If there is insufficient space in stream send buffer,
+ * then not all buffers pointed to by iov may be written out.
+ * @param ts
+ * TCP stream to send data to.
+ * @param iov
+ * Points to an array of iovec structures.
+ * @param iovcnt
+ * Number of elements in the *iov* array.
+ * @return
+ * On success, number of bytes written to the stream send buffer.
+ * In case of error, returns -1 and error code will be set in rte_errno.
+ * - EAGAIN - operation can't be perfomed right now
+ * (most likely close() was perfomed on that stream allready).
+ * - ENOTCONN - the stream is not connected.
+ * - ENOMEM - not enough internal buffer (mbuf) to store user provided data.
+ */
+ssize_t tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
+ const struct iovec *iov, int iovcnt);
+
+/**
* Back End (BE) API.
* BE API functions are not multi-thread safe.
* Supposed to be called by the L2/L3 processing layer.
@@ -362,16 +420,6 @@ uint16_t tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
*/
int tle_tcp_process(struct tle_ctx *ctx, uint32_t num);
-/**
- * Get current TCP maximum segment size
- * @param stream
- * Stream to get MSS from.
- * @return
- * Maximum segment size in bytes, if successful.
- * Negative on failure.
- */
-int tle_tcp_stream_get_mss(const struct tle_stream * const stream);
-
#ifdef __cplusplus
}
#endif
diff --git a/lib/libtle_misc/tle_dpdk_wrapper.h b/lib/libtle_misc/tle_dpdk_wrapper.h
index 6757663..3736964 100644
--- a/lib/libtle_misc/tle_dpdk_wrapper.h
+++ b/lib/libtle_misc/tle_dpdk_wrapper.h
@@ -57,6 +57,18 @@ _rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, uint32_t n)
}
static inline uint32_t
+_rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, uint32_t n)
+{
+ uint32_t rc;
+
+ rc = rte_ring_enqueue_bulk(r, (void * const *)obj_table, n, NULL);
+ if (rc == n)
+ return 0;
+ else
+ return -ENOSPC;
+}
+
+static inline uint32_t
_rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, uint32_t n)
{
return rte_ring_dequeue_burst(r, (void **)obj_table, n, NULL);
@@ -80,6 +92,17 @@ _rte_ring_get_data(struct rte_ring *r)
return (void **)(&r[1]);
}
+static inline void
+_rte_ring_dequeue_ptrs(struct rte_ring *r, void **obj_table, uint32_t num)
+{
+ uint32_t tail;
+ void **data;
+
+ tail = r->cons.tail;
+ data = _rte_ring_get_data(r);
+ DEQUEUE_PTRS(r, data, tail, obj_table, num, void *);
+}
+
#else
static inline uint32_t
@@ -109,6 +132,13 @@ _rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, uint32_t n)
}
static inline uint32_t
+_rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+ uint32_t n)
+{
+ return rte_ring_enqueue_bulk(r, (void * const *)obj_table, n);
+}
+
+static inline uint32_t
_rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, uint32_t n)
{
return rte_ring_dequeue_burst(r, (void **)obj_table, n);
@@ -132,10 +162,91 @@ _rte_ring_get_data(struct rte_ring *r)
return (void **)r->ring;
}
-#endif
+static inline void
+_rte_ring_dequeue_ptrs(struct rte_ring *r, void **obj_table, uint32_t num)
+{
+ uint32_t i, n;
+ uint32_t mask, cons_head;
+
+ n = num;
+ cons_head = r->cons.tail;
+ mask = _rte_ring_get_mask(r);
+
+ DEQUEUE_PTRS();
+}
+
+#endif /* RTE_VERSION >= RTE_VERSION_NUM(17, 5, 0, 0) */
+
+/*
+ * Serialized variation of DPDK rte_ring dequeue mechanism.
+ * At any given moment, only one consumer is allowed to dequeue
+ * objects from the ring.
+ */
+
+static inline __attribute__((always_inline)) uint32_t
+_rte_ring_mcs_dequeue_start(struct rte_ring *r, uint32_t num)
+{
+ uint32_t n, end, head, tail;
+ int32_t rc;
+
+ rc = 0;
+ do {
+ head = r->cons.head;
+ tail = r->cons.tail;
+ end = r->prod.tail;
+
+ if (head != tail) {
+ rte_pause();
+ continue;
+ }
+
+ n = end - head;
+ n = RTE_MIN(num, n);
+ if (n == 0)
+ return 0;
+
+ rc = rte_atomic32_cmpset(&r->cons.head, head, head + n);
+ } while (rc == 0);
+
+ return n;
+}
+
+static inline __attribute__((always_inline)) void
+_rte_ring_mcs_dequeue_finish(struct rte_ring *r, uint32_t num)
+{
+ uint32_t n, head, tail;
+
+ head = r->cons.head;
+ rte_smp_rmb();
+ tail = r->cons.tail;
+ n = head - tail;
+ RTE_ASSERT(n >= num);
+ RTE_SET_USED(n);
+ head = tail + num;
+ r->cons.head = head;
+ r->cons.tail = head;
+}
+
+static inline __attribute__((always_inline)) void
+_rte_ring_mcs_dequeue_abort(struct rte_ring *r)
+{
+ r->cons.head = r->cons.tail;
+}
+
+static inline uint32_t
+_rte_ring_mcs_dequeue_burst(struct rte_ring *r, void **obj_table, uint32_t num)
+{
+ uint32_t n;
+
+ n = _rte_ring_mcs_dequeue_start(r, num);
+ _rte_ring_dequeue_ptrs(r, obj_table, n);
+ _rte_ring_mcs_dequeue_finish(r, n);
+ return n;
+}
#ifdef __cplusplus
}
#endif
+
#endif /* TLE_DPDK_WRAPPER_H_ */
diff --git a/test/dring/test_dring.c b/test/dring/test_dring.c
index ce3e454..692fd44 100644
--- a/test/dring/test_dring.c
+++ b/test/dring/test_dring.c
@@ -349,7 +349,7 @@ test_dring_st(void)
printf("%s started;\n", __func__);
- tle_dring_reset(&dr);
+ tle_dring_reset(&dr, 0);
r = init_drb_ring(OBJ_NUM);
if (r == NULL)
return -ENOMEM;
@@ -397,7 +397,7 @@ test_dring_mt(int32_t master_enq_type, int32_t master_deq_type,
struct tle_dring dr;
struct dring_arg arg[RTE_MAX_LCORE];
- tle_dring_reset(&dr);
+ tle_dring_reset(&dr, 0);
r = init_drb_ring(OBJ_NUM);
if (r == NULL)
return -ENOMEM;
diff --git a/test/gtest/test_tle_dring.cpp b/test/gtest/test_tle_dring.cpp
index 6ab4905..5109ac2 100644
--- a/test/gtest/test_tle_dring.cpp
+++ b/test/gtest/test_tle_dring.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -19,7 +19,7 @@ TEST_F(dring, test_dring_st)
{
printf("%s started;\n", __func__);
- tle_dring_reset(&dr);
+ tle_dring_reset(&dr, 0);
r = init_drb_ring(OBJ_NUM);
ASSERT_NE(r, (void *) NULL) << "Out of memory";
diff --git a/test/gtest/test_tle_dring.h b/test/gtest/test_tle_dring.h
index 32a223e..fdb2c47 100644
--- a/test/gtest/test_tle_dring.h
+++ b/test/gtest/test_tle_dring.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -376,7 +376,7 @@ test_dring_mt(int32_t master_enq_type, int32_t master_deq_type,
struct tle_dring dr;
struct dring_arg arg[RTE_MAX_LCORE];
- tle_dring_reset(&dr);
+ tle_dring_reset(&dr, 0);
r = init_drb_ring(OBJ_NUM);
if (r == NULL)
return -ENOMEM;