aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKonstantin Ananyev <konstantin.ananyev@intel.com>2016-07-07 20:24:24 +0100
committerKonstantin Ananyev <konstantin.ananyev@intel.com>2016-07-07 23:41:30 +0100
commit8efc4c11464fd76db6c2a145664aa047615b749e (patch)
tree3061254b813f3c5592d7f92788f7abe2f155f791
parenta633eec74619a96925285ac4dcf0154fbfafb855 (diff)
Change libtle_udp to use dring.
Right now didn't see any noticeable performance boost with these changes. Though it allowed to get rid of using locks at UDP TX code-path and simplify the code quite a lot. Change-Id: If865abd3db9127f510df670d9a8edb168b915770 Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
-rw-r--r--examples/udpfwd/main.c17
-rw-r--r--lib/libtle_udp/Makefile9
-rw-r--r--lib/libtle_udp/buf_cage.c81
-rw-r--r--lib/libtle_udp/buf_cage.h231
-rw-r--r--lib/libtle_udp/misc.h15
-rw-r--r--lib/libtle_udp/osdep.h11
-rw-r--r--lib/libtle_udp/tle_udp_impl.h1
-rw-r--r--lib/libtle_udp/udp_ctl.c127
-rw-r--r--lib/libtle_udp/udp_impl.h23
-rw-r--r--lib/libtle_udp/udp_rxtx.c235
10 files changed, 243 insertions, 507 deletions
diff --git a/examples/udpfwd/main.c b/examples/udpfwd/main.c
index a907355..20f123e 100644
--- a/examples/udpfwd/main.c
+++ b/examples/udpfwd/main.c
@@ -34,6 +34,9 @@
#define RX_CSUM_OFFLOAD (DEV_RX_OFFLOAD_IPV4_CKSUM | DEV_RX_OFFLOAD_UDP_CKSUM)
#define TX_CSUM_OFFLOAD (DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM)
+#define OPT_SHORT_SBULK 'B'
+#define OPT_LONG_SBULK "sburst"
+
#define OPT_SHORT_PROMISC 'P'
#define OPT_LONG_PROMISC "promisc"
@@ -62,6 +65,7 @@ static const struct option long_opt[] = {
{OPT_LONG_PROMISC, 0, 0, OPT_SHORT_PROMISC},
{OPT_LONG_RBUFS, 1, 0, OPT_SHORT_RBUFS},
{OPT_LONG_SBUFS, 1, 0, OPT_SHORT_SBUFS},
+ {OPT_LONG_SBULK, 1, 0, OPT_SHORT_SBULK},
{OPT_LONG_STREAMS, 1, 0, OPT_SHORT_STREAMS},
{NULL, 0, 0, 0}
};
@@ -1704,14 +1708,23 @@ main(int argc, char *argv[])
"%s: rte_eal_init failed with error code: %d\n",
__func__, rc);
+ memset(&ctx_prm, 0, sizeof(ctx_prm));
+
argc -= rc;
argv += rc;
optind = 0;
optarg = NULL;
- while ((opt = getopt_long(argc, argv, "PR:S:b:f:s:", long_opt,
+ while ((opt = getopt_long(argc, argv, "B:PR:S:b:f:s:", long_opt,
&opt_idx)) != EOF) {
- if (opt == OPT_SHORT_PROMISC) {
+ if (opt == OPT_SHORT_SBULK) {
+ rc = parse_uint_val(NULL, optarg, &v);
+ if (rc < 0)
+ rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
+ "for option: \'%c\'\n",
+ __func__, optarg, opt);
+ ctx_prm.send_bulk_size = v;
+ } else if (opt == OPT_SHORT_PROMISC) {
becfg.promisc = 1;
} else if (opt == OPT_SHORT_RBUFS) {
rc = parse_uint_val(NULL, optarg, &v);
diff --git a/lib/libtle_udp/Makefile b/lib/libtle_udp/Makefile
index 100755c..a834873 100644
--- a/lib/libtle_udp/Makefile
+++ b/lib/libtle_udp/Makefile
@@ -31,7 +31,6 @@ EXPORT_MAP := tle_udp_version.map
LIBABIVER := 1
#source files
-SRCS-y += buf_cage.c
SRCS-y += event.c
SRCS-y += udp_ctl.c
SRCS-y += udp_rxtx.c
@@ -40,11 +39,7 @@ SRCS-y += udp_rxtx.c
SYMLINK-y-include += tle_udp_impl.h
SYMLINK-y-include += tle_event.h
-# this library depends on
-DEPDIRS-y += $(RTE_SDK)/lib/librte_eal
-DEPDIRS-y += $(RTE_SDK)/lib/librte_ether
-DEPDIRS-y += $(RTE_SDK)/lib/librte_mbuf
-DEPDIRS-y += $(RTE_SDK)lib/librte_net
-DEPDIRS-y += $(RTE_SDK)lib/librte_ip_frag
+# this lib dependencies
+DEPDIRS-y += lib/libtle_dring
include $(RTE_SDK)/mk/rte.extlib.mk
diff --git a/lib/libtle_udp/buf_cage.c b/lib/libtle_udp/buf_cage.c
deleted file mode 100644
index 0ae21b0..0000000
--- a/lib/libtle_udp/buf_cage.c
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2016 Intel Corporation.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <rte_errno.h>
-#include <rte_malloc.h>
-#include <rte_log.h>
-
-#include "buf_cage.h"
-#include "osdep.h"
-
-struct bcg_store *
-bcg_create(const struct bcg_store_prm *prm)
-{
- struct buf_cage *bc;
- struct bcg_store *st;
- uintptr_t end, p;
- size_t sz, tsz;
- uint32_t n;
-
- if (prm == NULL || (prm->cage_align != 0 &&
- rte_is_power_of_2(prm->cage_align) == 0)) {
- rte_errno = EINVAL;
- return NULL;
- }
-
- /* number of cages required. */
- n = (prm->max_bufs + prm->cage_bufs - 1) / prm->cage_bufs;
- n = RTE_MAX(n, prm->min_cages);
-
- /* size of each cage. */
- sz = prm->cage_bufs * sizeof(bc->bufs[0]) + sizeof(*bc);
- sz = RTE_ALIGN_CEIL(sz, prm->cage_align);
-
- /* total number of bytes required. */
- tsz = n * sz + RTE_ALIGN_CEIL(sizeof(*st), prm->cage_align);
-
- st = rte_zmalloc_socket(NULL, tsz, RTE_CACHE_LINE_SIZE, prm->socket_id);
- if (st == NULL) {
- UDP_LOG(ERR, "%s: allocation of %zu bytes on "
- "socket %d failed\n",
- __func__, tsz, prm->socket_id);
- return NULL;
- }
-
- st->prm = prm[0];
- bcg_queue_reset(&st->free);
-
- p = (uintptr_t)RTE_PTR_ALIGN_CEIL((st + 1), prm->cage_align);
- end = p + n * sz;
-
- for (; p != end; p += sz) {
- bc = (struct buf_cage *)p;
- bc->st = st;
- bc->num = prm->cage_bufs;
- STAILQ_INSERT_TAIL(&st->free.queue, bc, ql);
- }
-
- st->free.num = n;
- st->nb_cages = n;
- st->cage_sz = sz;
- st->total_sz = tsz;
- return st;
-}
-
-void
-bcg_destroy(struct bcg_store *st)
-{
- rte_free(st);
-}
diff --git a/lib/libtle_udp/buf_cage.h b/lib/libtle_udp/buf_cage.h
deleted file mode 100644
index 3b3c429..0000000
--- a/lib/libtle_udp/buf_cage.h
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright (c) 2016 Intel Corporation.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef _BUF_CAGE_H_
-#define _BUF_CAGE_H_
-
-#include <rte_common.h>
-#include <rte_atomic.h>
-#include <rte_spinlock.h>
-#include <sys/queue.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-struct bcg_store;
-
-struct buf_cage {
- struct bcg_store *st;
- STAILQ_ENTRY(buf_cage) ql;
- uint32_t num;
- uint32_t rp;
- uint32_t wp;
- const void *bufs[0];
-};
-
-struct bcg_queue {
- rte_spinlock_t lock;
- uint32_t num;
- STAILQ_HEAD(, buf_cage) queue;
-};
-
-struct bcg_store_prm {
- void *user_data;
- int32_t socket_id; /* NUMA socket to allocate memory from. */
- uint32_t max_bufs; /* total number of bufs to cage. */
- uint32_t min_cages; /* min number of cages per store. */
- uint32_t cage_bufs; /* min number of bufs per cage. */
- uint32_t cage_align; /* each cage to be aligned (power of 2). */
-};
-
-struct bcg_store {
- struct bcg_queue free;
- uint32_t nb_cages;
- size_t cage_sz;
- size_t total_sz;
- struct bcg_store_prm prm;
-} __rte_cache_aligned;
-
-struct bcg_store *bcg_create(const struct bcg_store_prm *prm);
-void bcg_destroy(struct bcg_store *st);
-
-static inline int
-bcg_store_full(const struct bcg_store *st)
-{
- return st->nb_cages == st->free.num;
-}
-
-static inline void
-bcg_queue_reset(struct bcg_queue *bq)
-{
- STAILQ_INIT(&bq->queue);
- bq->num = 0;
- rte_spinlock_init(&bq->lock);
-}
-
-static inline void
-bcg_reset(struct buf_cage *bc)
-{
- bc->rp = 0;
- bc->wp = 0;
-}
-
-static inline void *
-bcg_get_udata(struct buf_cage *bc)
-{
- return bc->st->prm.user_data;
-}
-
-static inline struct buf_cage *
-__bcg_dequeue_head(struct bcg_queue *bq)
-{
- struct buf_cage *bc;
-
- bc = STAILQ_FIRST(&bq->queue);
- if (bc != NULL) {
- STAILQ_REMOVE_HEAD(&bq->queue, ql);
- bq->num--;
- }
- return bc;
-}
-
-static inline struct buf_cage *
-bcg_dequeue_head(struct bcg_queue *bq)
-{
- struct buf_cage *bc;
-
- if (bq->num == 0)
- return NULL;
-
- rte_compiler_barrier();
-
- rte_spinlock_lock(&bq->lock);
- bc = __bcg_dequeue_head(bq);
- rte_spinlock_unlock(&bq->lock);
- return bc;
-}
-
-static inline uint32_t
-__bcg_enqueue_head(struct bcg_queue *bq, struct buf_cage *bc)
-{
- STAILQ_INSERT_HEAD(&bq->queue, bc, ql);
- return ++bq->num;
-}
-
-static inline uint32_t
-bcg_enqueue_head(struct bcg_queue *bq, struct buf_cage *bc)
-{
- uint32_t n;
-
- rte_spinlock_lock(&bq->lock);
- n = __bcg_enqueue_head(bq, bc);
- rte_spinlock_unlock(&bq->lock);
- return n;
-}
-
-static inline uint32_t
-__bcg_enqueue_tail(struct bcg_queue *bq, struct buf_cage *bc)
-{
- STAILQ_INSERT_TAIL(&bq->queue, bc, ql);
- return ++bq->num;
-}
-
-static inline uint32_t
-bcg_enqueue_tail(struct bcg_queue *bq, struct buf_cage *bc)
-{
- uint32_t n;
-
- rte_spinlock_lock(&bq->lock);
- n = __bcg_enqueue_tail(bq, bc);
- rte_spinlock_unlock(&bq->lock);
- return n;
-}
-
-static inline uint32_t
-bcg_queue_append(struct bcg_queue *dst, struct bcg_queue *src)
-{
- rte_spinlock_lock(&src->lock);
- STAILQ_CONCAT(&dst->queue, &src->queue);
- dst->num += src->num;
- src->num = 0;
- rte_spinlock_unlock(&src->lock);
- return dst->num;
-}
-
-static inline uint32_t
-bcg_free_count(const struct buf_cage *bc)
-{
- return bc->num - bc->wp;
-}
-
-
-static inline uint32_t
-bcg_fill_count(const struct buf_cage *bc)
-{
- return bc->wp - bc->rp;
-}
-
-/* !!! if going to keep it - try to unroll copying stuff. !!! */
-static inline uint32_t
-bcg_get(struct buf_cage *bc, const void *bufs[], uint32_t num)
-{
- uint32_t i, n, r;
-
- r = bc->rp;
- n = RTE_MIN(num, bc->wp - r);
- for (i = 0; i != n; i++)
- bufs[i] = bc->bufs[r + i];
-
- bc->rp = r + n;
- return n;
-}
-
-static inline uint32_t
-bcg_put(struct buf_cage *bc, const void *bufs[], uint32_t num)
-{
- uint32_t i, n, w;
-
- w = bc->wp;
- n = RTE_MIN(num, bc->num - w);
- for (i = 0; i != n; i++)
- bc->bufs[w + i] = bufs[i];
-
- bc->wp = w + n;
- return n;
-}
-
-
-static inline struct buf_cage *
-bcg_alloc(struct bcg_store *st)
-{
- return bcg_dequeue_head(&st->free);
-}
-
-static inline uint32_t
-bcg_free(struct buf_cage *bc)
-{
- struct bcg_store *st;
-
- st = bc->st;
- bcg_reset(bc);
- return bcg_enqueue_head(&st->free, bc);
-}
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* _BUF_CAGE_H_ */
diff --git a/lib/libtle_udp/misc.h b/lib/libtle_udp/misc.h
index 3874647..359f400 100644
--- a/lib/libtle_udp/misc.h
+++ b/lib/libtle_udp/misc.h
@@ -45,6 +45,21 @@ _mbuf_tx_offload(uint64_t il2, uint64_t il3, uint64_t il4, uint64_t tso,
}
/*
+ * Given the value of mbuf's tx_offload, calculate L4 payload offset.
+ */
+static inline uint32_t
+_tx_offload_l4_offset(uint64_t ofl)
+{
+ uint32_t l2, l3, l4;
+
+ l2 = ofl & 0x7f;
+ l3 = ofl >> 7 & 0x1ff;
+ l4 = ofl >> 16 & UINT8_MAX;
+
+ return l2 + l3 + l4;
+}
+
+/*
* Routines to calculate L3/L4 checksums in SW.
* Pretty similar to ones from DPDK librte_net/rte_ip.h,
* but provide better performance (at least for tested configurations),
diff --git a/lib/libtle_udp/osdep.h b/lib/libtle_udp/osdep.h
index 6161242..8e91964 100644
--- a/lib/libtle_udp/osdep.h
+++ b/lib/libtle_udp/osdep.h
@@ -23,6 +23,17 @@
extern "C" {
#endif
+/*
+ * internal defines.
+ */
+#define MAX_PKT_BURST 0x20
+
+#define MAX_DRB_BURST 4
+
+/*
+ * logging related macros.
+ */
+
#define UDP_LOG(lvl, fmt, args...) RTE_LOG(lvl, USER1, fmt, ##args)
/*
diff --git a/lib/libtle_udp/tle_udp_impl.h b/lib/libtle_udp/tle_udp_impl.h
index a5d17e1..8e61ea6 100644
--- a/lib/libtle_udp/tle_udp_impl.h
+++ b/lib/libtle_udp/tle_udp_impl.h
@@ -84,6 +84,7 @@ struct tle_udp_ctx_param {
uint32_t max_streams; /**< max number of streams in context. */
uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */
uint32_t max_stream_sbufs; /**< max send mbufs per stream. */
+ uint32_t send_bulk_size; /**< expected # of packets per send call. */
int (*lookup4)(void *opaque, const struct in_addr *addr,
struct tle_udp_dest *res);
diff --git a/lib/libtle_udp/udp_ctl.c b/lib/libtle_udp/udp_ctl.c
index 36ec8a6..55c4afd 100644
--- a/lib/libtle_udp/udp_ctl.c
+++ b/lib/libtle_udp/udp_ctl.c
@@ -29,8 +29,6 @@
#define LPORT_START_BLK PORT_BLK(LPORT_START)
#define LPORT_END_BLK PORT_BLK(LPORT_END)
-#define MAX_BURST 0x20
-
static const struct in6_addr tle_udp6_any = IN6ADDR_ANY_INIT;
static const struct in6_addr tle_udp6_none = {
{
@@ -60,12 +58,31 @@ unuse_stream(struct tle_udp_stream *s)
rte_atomic32_set(&s->tx.use, INT32_MIN);
}
+/* calculate number of drbs per stream. */
+static uint32_t
+calc_stream_drb_num(const struct tle_udp_ctx *ctx, uint32_t obj_num)
+{
+ uint32_t num;
+
+ num = (ctx->prm.max_stream_sbufs + obj_num - 1) / obj_num;
+ num = num + num / 2;
+ num = RTE_MAX(num, RTE_DIM(ctx->dev) + 1);
+ return num;
+}
+
+static uint32_t
+drb_nb_elem(const struct tle_udp_ctx *ctx)
+{
+ return (ctx->prm.send_bulk_size != 0) ?
+ ctx->prm.send_bulk_size : MAX_PKT_BURST;
+}
+
static int
init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
{
- size_t sz;
- uint32_t n;
- struct bcg_store_prm sp;
+ size_t bsz, rsz, sz;
+ uint32_t i, k, n, nb;
+ struct tle_drb *drb;
char name[RTE_RING_NAMESIZE];
/* init RX part. */
@@ -88,21 +105,45 @@ init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
/* init TX part. */
- sp.socket_id = ctx->prm.socket_id;
- sp.max_bufs = ctx->prm.max_stream_sbufs;
- sp.min_cages = RTE_DIM(ctx->dev) + 1;
- sp.cage_bufs = MAX_BURST;
- sp.cage_align = RTE_CACHE_LINE_SIZE;
- sp.user_data = s;
-
- s->tx.st = bcg_create(&sp);
- if (s->tx.st == NULL) {
- UDP_LOG(ERR,
- "%s(%p): bcg_create() failed with error code: %d\n",
- __func__, s, rte_errno);
+ nb = drb_nb_elem(ctx);
+ k = calc_stream_drb_num(ctx, nb);
+ n = rte_align32pow2(k);
+
+ /* size of the drbs ring */
+ rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]);
+ rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
+
+ /* size of the drb. */
+ bsz = tle_drb_calc_size(nb);
+
+ /* total stream drbs size. */
+ sz = rsz + bsz * k;
+
+ s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+ ctx->prm.socket_id);
+ if (s->tx.drb.r == NULL) {
+ UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
+ "failed with error code: %d\n",
+ __func__, s, sz, ctx->prm.socket_id, rte_errno);
return ENOMEM;
}
+ snprintf(name, sizeof(name), "%p@%zu", s, sz);
+ rte_ring_init(s->tx.drb.r, name, n, 0);
+
+ for (i = 0; i != k; i++) {
+ drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
+ rsz + bsz * i);
+ drb->udata = s;
+ drb->size = nb;
+ rte_ring_enqueue(s->tx.drb.r, drb);
+ }
+
+ s->tx.drb.nb_elem = nb;
+ s->tx.drb.nb_max = k;
+
+ /* mark stream as avaialble to use. */
+
s->ctx = ctx;
unuse_stream(s);
STAILQ_INSERT_TAIL(&ctx->streams.free, s, link);
@@ -113,8 +154,8 @@ init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
static void
fini_stream(struct tle_udp_stream *s)
{
- bcg_destroy(s->tx.st);
rte_free(s->rx.q);
+ rte_free(s->tx.drb.r);
}
struct tle_udp_ctx *
@@ -181,15 +222,15 @@ tle_udp_destroy(struct tle_udp_ctx *ctx)
return;
}
+ for (i = 0; i != RTE_DIM(ctx->dev); i++)
+ tle_udp_del_dev(ctx->dev + i);
+
if (ctx->streams.buf != 0) {
for (i = 0; i != ctx->prm.max_streams; i++)
fini_stream(&ctx->streams.buf[i]);
rte_free(ctx->streams.buf);
}
- for (i = 0; i != RTE_DIM(ctx->dev); i++)
- tle_udp_del_dev(ctx->dev + i);
-
rte_free(ctx);
}
@@ -279,8 +320,7 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx,
}
/* setup TX data. */
- bcg_queue_reset(&dev->tx.beq);
- bcg_queue_reset(&dev->tx.feq);
+ tle_dring_reset(&dev->tx.dr);
if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0) {
dev->tx.ol_flags[TLE_UDP_V4] |= PKT_TX_IPV4 | PKT_TX_UDP_CKSUM;
@@ -297,25 +337,33 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx,
}
static void
-empty_cage(struct buf_cage *bc)
+empty_dring(struct tle_dring *dr)
{
- uint32_t i, n;
- struct rte_mbuf *pkt[MAX_BURST];
+ uint32_t i, k, n;
+ struct tle_udp_stream *s;
+ struct rte_mbuf *pkt[MAX_PKT_BURST];
+ struct tle_drb *drb[MAX_PKT_BURST];
do {
- n = bcg_get(bc, (const void **)(uintptr_t)pkt, RTE_DIM(pkt));
+ k = RTE_DIM(drb);
+ n = tle_dring_sc_dequeue(dr, (const void **)(uintptr_t)pkt,
+ RTE_DIM(pkt), drb, &k);
+
+ /* free mbufs */
for (i = 0; i != n; i++)
rte_pktmbuf_free(pkt[i]);
+ /* free drbs */
+ for (i = 0; i != k; i++) {
+ s = drb[i]->udata;
+ rte_ring_enqueue(s->tx.drb.r, drb[i]);
+ }
} while (n != 0);
-
- bcg_free(bc);
}
int
tle_udp_del_dev(struct tle_udp_dev *dev)
{
uint32_t p;
- struct buf_cage *bc;
struct tle_udp_ctx *ctx;
ctx = dev->ctx;
@@ -331,13 +379,7 @@ tle_udp_del_dev(struct tle_udp_dev *dev)
return -EINVAL;
/* emtpy TX queues. */
- if (dev->tx.bc != NULL)
- empty_cage(dev->tx.bc);
-
- bcg_queue_append(&dev->tx.beq, &dev->tx.feq);
-
- while ((bc = __bcg_dequeue_head(&dev->tx.beq)) != NULL)
- empty_cage(bc);
+ empty_dring(&dev->tx.dr);
rte_free(dev->dp[TLE_UDP_V4]);
rte_free(dev->dp[TLE_UDP_V6]);
@@ -591,7 +633,7 @@ tle_udp_stream_open(struct tle_udp_ctx *ctx,
return NULL;
/* some TX still pending for that stream. */
- } else if (bcg_store_full(s->tx.st) == 0) {
+ } else if (UDP_STREAM_TX_PENDING(s)) {
put_stream(ctx, s, 0);
rte_errno = EAGAIN;
return NULL;
@@ -649,7 +691,7 @@ tle_udp_stream_close(struct tle_udp_stream *s)
uint32_t i, n;
int32_t rc;
struct tle_udp_ctx *ctx;
- struct rte_mbuf *m[MAX_BURST];
+ struct rte_mbuf *m[MAX_PKT_BURST];
static const struct tle_udp_stream_cb zcb;
@@ -661,11 +703,6 @@ tle_udp_stream_close(struct tle_udp_stream *s)
/* mark stream as unavaialbe for RX/TX. */
stream_down(s);
- /* reset TX cages. */
- rte_spinlock_lock(&s->tx.lock);
- memset(s->tx.cg, 0, sizeof(s->tx.cg));
- rte_spinlock_unlock(&s->tx.lock);
-
/* reset stream events if any. */
if (s->rx.ev != NULL) {
tle_event_idle(s->rx.ev);
@@ -696,7 +733,7 @@ tle_udp_stream_close(struct tle_udp_stream *s)
* if there still are pkts queued for TX,
* then put this stream to the tail of free list.
*/
- put_stream(ctx, s, bcg_store_full(s->tx.st));
+ put_stream(ctx, s, UDP_STREAM_TX_FINISHED(s));
return rc;
}
diff --git a/lib/libtle_udp/udp_impl.h b/lib/libtle_udp/udp_impl.h
index fbdb743..af35197 100644
--- a/lib/libtle_udp/udp_impl.h
+++ b/lib/libtle_udp/udp_impl.h
@@ -18,10 +18,10 @@
#include <rte_spinlock.h>
#include <rte_vect.h>
+#include <tle_dring.h>
#include <tle_udp_impl.h>
#include <tle_event.h>
-#include "buf_cage.h"
#include "port_bitmap.h"
#include "osdep.h"
@@ -104,16 +104,24 @@ struct tle_udp_stream {
struct {
rte_atomic32_t use;
- rte_spinlock_t lock;
+ struct {
+ uint32_t nb_elem; /* number of obects per drb. */
+ uint32_t nb_max; /* number of drbs per stream. */
+ struct rte_ring *r;
+ } drb;
struct tle_event *ev;
struct tle_udp_stream_cb cb;
- struct bcg_store *st;
- struct buf_cage *cg[RTE_MAX_ETHPORTS];
} tx __rte_cache_aligned;
struct tle_udp_stream_param prm;
} __rte_cache_aligned;
+#define UDP_STREAM_TX_PENDING(s) \
+ ((s)->tx.drb.nb_max != rte_ring_count((s)->tx.drb.r))
+
+#define UDP_STREAM_TX_FINISHED(s) \
+ ((s)->tx.drb.nb_max == rte_ring_count((s)->tx.drb.r))
+
struct tle_udp_dport {
struct udp_pbm use; /* ports in use. */
struct tle_udp_stream *streams[MAX_PORT_NUM]; /* port to stream. */
@@ -128,11 +136,9 @@ struct tle_udp_dev {
/* used by FE. */
uint64_t ol_flags[TLE_UDP_VNUM];
rte_atomic32_t packet_id[TLE_UDP_VNUM];
- struct bcg_queue feq;
- /* used by BE only. */
- struct bcg_queue beq __rte_cache_min_aligned;
- struct buf_cage *bc;
+ /* used by FE & BE. */
+ struct tle_dring dr;
} tx;
struct tle_udp_dev_param prm; /* copy of device paramaters. */
struct tle_udp_dport *dp[TLE_UDP_VNUM]; /* device udp ports */
@@ -140,7 +146,6 @@ struct tle_udp_dev {
struct tle_udp_ctx {
struct tle_udp_ctx_param prm;
-
struct {
rte_spinlock_t lock;
uint32_t nb_free; /* number of free streams. */
diff --git a/lib/libtle_udp/udp_rxtx.c b/lib/libtle_udp/udp_rxtx.c
index a8ab3fb..7136714 100644
--- a/lib/libtle_udp/udp_rxtx.c
+++ b/lib/libtle_udp/udp_rxtx.c
@@ -217,13 +217,13 @@ tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[],
}
static inline void
-tx_cage_release(struct buf_cage *bc)
+stream_drb_release(struct tle_udp_stream *s, struct tle_drb * drb[],
+ uint32_t nb_drb)
{
- struct tle_udp_stream *s;
uint32_t n;
- s = bcg_get_udata(bc);
- n = bcg_free(bc);
+ n = rte_ring_count(s->tx.drb.r);
+ rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
/* If stream is still open, then mark it as avaialble for writing. */
if (rwl_try_acquire(&s->tx.use) > 0) {
@@ -232,7 +232,7 @@ tx_cage_release(struct buf_cage *bc)
tle_event_raise(s->tx.ev);
/* if stream send buffer was full invoke TX callback */
- else if (s->tx.cb.func != NULL && n == 1)
+ else if (s->tx.cb.func != NULL && n == 0)
s->tx.cb.func(s->tx.cb.data, s);
}
@@ -240,53 +240,32 @@ tx_cage_release(struct buf_cage *bc)
rwl_release(&s->tx.use);
}
-static inline void
-tx_cage_update(struct tle_udp_dev *dev, struct buf_cage *bc)
-{
- struct tle_udp_stream *s;
- struct tle_udp_ctx *ctx;
- uint32_t idx;
-
- ctx = dev->ctx;
- s = bcg_get_udata(bc);
- idx = dev - ctx->dev;
-
- /* mark cage as closed to the stream. */
- rte_spinlock_lock(&s->tx.lock);
- if (bc == s->tx.cg[idx])
- s->tx.cg[idx] = NULL;
- rte_spinlock_unlock(&s->tx.lock);
-}
-
uint16_t
tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
{
- struct buf_cage *bc;
- uint32_t i, n;
+ uint32_t i, j, k, n;
+ struct tle_drb *drb[num];
+ struct tle_udp_stream *s;
- for (i = 0; i != num; i += n) {
+ /* extract packets from device TX queue. */
- bc = dev->tx.bc;
- if (bc == NULL) {
- if (dev->tx.beq.num == 0)
- bcg_queue_append(&dev->tx.beq, &dev->tx.feq);
- bc = __bcg_dequeue_head(&dev->tx.beq);
- if (bc == NULL)
- break;
- tx_cage_update(dev, bc);
- dev->tx.bc = bc;
- }
+ k = num;
+ n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
+ num, drb, &k);
- n = bcg_get(bc, (const void **)(uintptr_t)&pkt[i], num - i);
+ if (n == 0)
+ return 0;
- /* cage is empty, need to free it and notify related stream. */
- if (bcg_fill_count(bc) == 0) {
- tx_cage_release(bc);
- dev->tx.bc = NULL;
- }
+ /* free empty drbs and notify related streams. */
+
+ for (i = 0; i != k; i = j) {
+ s = drb[i]->udata;
+ for (j = i + 1; j != k && s == drb[i]->udata; j++)
+ ;
+ stream_drb_release(s, drb + i, j - i);
}
- return i;
+ return n;
}
static int
@@ -359,30 +338,41 @@ compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero)
* helper function, do the necessary pre-processing for the received packets
* before handiing them to the strem_recv caller.
*/
-static inline struct rte_mbuf *
-recv_pkt_process(struct rte_mbuf *m, uint32_t type)
+static inline uint32_t
+recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
{
- uint64_t f;
-
- f = m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
- if (f != 0) {
- if (check_pkt_csum(m, type) == 0)
- m->ol_flags ^= f;
- else {
- rte_pktmbuf_free(m);
- return NULL;
+ uint32_t i, k;
+ uint64_t f, flg[num], ofl[num];
+
+ for (i = 0; i != num; i++) {
+ flg[i] = m[i]->ol_flags;
+ ofl[i] = m[i]->tx_offload;
+ }
+
+ k = 0;
+ for (i = 0; i != num; i++) {
+
+ f = flg[i] & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
+
+ /* drop packets with invalid cksum(s). */
+ if (f != 0 && check_pkt_csum(m[i], type) != 0) {
+ rte_pktmbuf_free(m[i]);
+ m[i] = NULL;
+ k++;
}
+
+ m[i]->ol_flags ^= f;
+ rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
}
- rte_pktmbuf_adj(m, m->l2_len + m->l3_len + m->l4_len);
- return m;
+ return k;
}
uint16_t
tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
uint16_t num)
{
- uint32_t i, k, n;
+ uint32_t k, n;
n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
if (n == 0)
@@ -398,28 +388,7 @@ tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
rwl_release(&s->rx.use);
}
- k = 0;
- for (i = 0; i != RTE_ALIGN_FLOOR(n, 4); i += 4) {
- pkt[i] = recv_pkt_process(pkt[i], s->type);
- pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type);
- pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type);
- pkt[i + 3] = recv_pkt_process(pkt[i + 3], s->type);
- k += (pkt[i] == NULL) + (pkt[i + 1] == NULL) +
- (pkt[i + 2] == NULL) + (pkt[i + 3] == NULL);
- }
-
- switch (n % 4) {
- case 3:
- pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type);
- k += (pkt[i + 2] == NULL);
- case 2:
- pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type);
- k += (pkt[i + 1] == NULL);
- case 1:
- pkt[i] = recv_pkt_process(pkt[i], s->type);
- k += (pkt[i] == NULL);
- }
-
+ k = recv_pkt_process(pkt, n, s->type);
return compress_pkt_list(pkt, n, k);
}
@@ -586,65 +555,59 @@ fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
return frag_num;
}
+static inline void
+stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[],
+ uint32_t nb_drb)
+{
+ rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+}
+
+static inline uint32_t
+stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[],
+ uint32_t nb_drb)
+{
+ return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+}
+
/* enqueue up to num packets to the destination device queue. */
static inline uint16_t
-queue_pkt_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di,
- const void *pkt[], uint16_t num)
+queue_pkt_out(struct tle_udp_stream *s, struct tle_udp_dev *dev,
+ const void *pkt[], uint16_t nb_pkt,
+ struct tle_drb *drbs[], uint32_t *nb_drb)
{
- struct buf_cage *bc;
- uint32_t i, n;
+ uint32_t bsz, i, n, nb, nbc, nbm;
- rte_spinlock_lock(&s->tx.lock);
- bc = s->tx.cg[di];
+ bsz = s->tx.drb.nb_elem;
- for (i = 0; i != num; i += n) {
- if (bc == NULL) {
- bc = bcg_alloc(s->tx.st);
- if (bc == NULL)
- break;
- n = bcg_put(bc, pkt + i, num - i);
- bcg_enqueue_tail(bq, bc);
- } else
- n = bcg_put(bc, pkt + i, num - i);
+ /* calulate how many drbs are needed.*/
+ nbc = *nb_drb;
+ nbm = (nb_pkt + bsz - 1) / bsz;
+ nb = RTE_MAX(nbm, nbc) - nbc;
- if (n != num - i)
- bc = NULL;
- }
+ /* allocate required drbs */
+ if (nb != 0)
+ nb = stream_drb_alloc(s, drbs + nbc, nb);
- s->tx.cg[di] = bc;
- rte_spinlock_unlock(&s->tx.lock);
- return i;
-}
+ nb += nbc;
-/*
- * etiher enqueue all num packets or none.
- * assumes that all number of input packets not exceed size of buf_cage.
- */
-static inline uint16_t
-queue_frg_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di,
- const void *pkt[], uint16_t num)
-{
- struct buf_cage *bc, *bcp;
- uint32_t n;
+ /* no free drbs, can't send anything */
+ if (nb == 0)
+ return 0;
- rte_spinlock_lock(&s->tx.lock);
- bc = s->tx.cg[di];
+ /* not enough free drbs, reduce number of packets to send. */
+ else if (nb != nbm)
+ nb_pkt = nb * bsz;
- n = 0;
- if (bc == NULL || bcg_free_count(bc) < num) {
- bcp = bc;
- bc = bcg_alloc(s->tx.st);
- if (bc != NULL) {
- if (bcp != NULL)
- n = bcg_put(bcp, pkt, num);
- n += bcg_put(bc, pkt, num - n);
- bcg_enqueue_tail(bq, bc);
- }
- } else
- n = bcg_put(bc, pkt, num);
+ /* enqueue packets to the destination device. */
+ nbc = nb;
+ n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb);
- s->tx.cg[di] = bc;
- rte_spinlock_unlock(&s->tx.lock);
+ /* if not all available drbs were consumed, move them to the start. */
+ nbc -= nb;
+ for (i = 0; i != nb; i++)
+ drbs[i] = drbs[nbc + i];
+
+ *nb_drb = nb;
return n;
}
@@ -654,13 +617,14 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
{
int32_t di, frg, rc;
uint64_t ol_flags;
- uint32_t i, k, n;
+ uint32_t i, k, n, nb;
uint32_t mtu, pid, type;
const struct sockaddr_in *d4;
const struct sockaddr_in6 *d6;
const void *da;
union udph udph;
struct tle_udp_dest dst;
+ struct tle_drb *drb[num];
type = s->type;
@@ -704,6 +668,7 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
if (rwl_acquire(&s->tx.use) < 0)
return 0;
+ nb = 0;
for (i = 0, k = 0; k != num; k = i) {
/* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */
@@ -726,8 +691,9 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
/* enqueue non-fragment packets to the destination device. */
if (k != i) {
- k += queue_pkt_out(s, &dst.dev->tx.feq, di,
- (const void **)(uintptr_t)&pkt[k], i - k);
+ k += queue_pkt_out(s, dst.dev,
+ (const void **)(uintptr_t)&pkt[k], i - k,
+ drb, &nb);
/* stream TX queue is full. */
if (k != i)
@@ -746,8 +712,8 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
break;
}
- n = queue_frg_out(s, &dst.dev->tx.feq, di,
- (const void **)(uintptr_t)frag, rc);
+ n = queue_pkt_out(s, dst.dev,
+ (const void **)(uintptr_t)frag, rc, drb, &nb);
if (n == 0) {
while (rc-- != 0)
rte_pktmbuf_free(frag[rc]);
@@ -765,6 +731,11 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
tle_event_raise(s->tx.ev);
out:
+ /* free unused drbs. */
+ if (nb != 0)
+ stream_drb_free(s, drb, nb);
+
+ /* stream can be closed. */
rwl_release(&s->tx.use);
/*