From 8efc4c11464fd76db6c2a145664aa047615b749e Mon Sep 17 00:00:00 2001 From: Konstantin Ananyev Date: Thu, 7 Jul 2016 20:24:24 +0100 Subject: Change libtle_udp to use dring. Right now didn't see any noticeable performance boost with these changes. Though it allowed to get rid of using locks at UDP TX code-path and simplify the code quite a lot. Change-Id: If865abd3db9127f510df670d9a8edb168b915770 Signed-off-by: Konstantin Ananyev --- lib/libtle_udp/Makefile | 9 +- lib/libtle_udp/buf_cage.c | 81 --------------- lib/libtle_udp/buf_cage.h | 231 ----------------------------------------- lib/libtle_udp/misc.h | 15 +++ lib/libtle_udp/osdep.h | 11 ++ lib/libtle_udp/tle_udp_impl.h | 1 + lib/libtle_udp/udp_ctl.c | 127 +++++++++++++++-------- lib/libtle_udp/udp_impl.h | 23 +++-- lib/libtle_udp/udp_rxtx.c | 235 ++++++++++++++++++------------------------ 9 files changed, 228 insertions(+), 505 deletions(-) delete mode 100644 lib/libtle_udp/buf_cage.c delete mode 100644 lib/libtle_udp/buf_cage.h (limited to 'lib') diff --git a/lib/libtle_udp/Makefile b/lib/libtle_udp/Makefile index 100755c..a834873 100644 --- a/lib/libtle_udp/Makefile +++ b/lib/libtle_udp/Makefile @@ -31,7 +31,6 @@ EXPORT_MAP := tle_udp_version.map LIBABIVER := 1 #source files -SRCS-y += buf_cage.c SRCS-y += event.c SRCS-y += udp_ctl.c SRCS-y += udp_rxtx.c @@ -40,11 +39,7 @@ SRCS-y += udp_rxtx.c SYMLINK-y-include += tle_udp_impl.h SYMLINK-y-include += tle_event.h -# this library depends on -DEPDIRS-y += $(RTE_SDK)/lib/librte_eal -DEPDIRS-y += $(RTE_SDK)/lib/librte_ether -DEPDIRS-y += $(RTE_SDK)/lib/librte_mbuf -DEPDIRS-y += $(RTE_SDK)lib/librte_net -DEPDIRS-y += $(RTE_SDK)lib/librte_ip_frag +# this lib dependencies +DEPDIRS-y += lib/libtle_dring include $(RTE_SDK)/mk/rte.extlib.mk diff --git a/lib/libtle_udp/buf_cage.c b/lib/libtle_udp/buf_cage.c deleted file mode 100644 index 0ae21b0..0000000 --- a/lib/libtle_udp/buf_cage.c +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2016 Intel Corporation. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include - -#include "buf_cage.h" -#include "osdep.h" - -struct bcg_store * -bcg_create(const struct bcg_store_prm *prm) -{ - struct buf_cage *bc; - struct bcg_store *st; - uintptr_t end, p; - size_t sz, tsz; - uint32_t n; - - if (prm == NULL || (prm->cage_align != 0 && - rte_is_power_of_2(prm->cage_align) == 0)) { - rte_errno = EINVAL; - return NULL; - } - - /* number of cages required. */ - n = (prm->max_bufs + prm->cage_bufs - 1) / prm->cage_bufs; - n = RTE_MAX(n, prm->min_cages); - - /* size of each cage. */ - sz = prm->cage_bufs * sizeof(bc->bufs[0]) + sizeof(*bc); - sz = RTE_ALIGN_CEIL(sz, prm->cage_align); - - /* total number of bytes required. */ - tsz = n * sz + RTE_ALIGN_CEIL(sizeof(*st), prm->cage_align); - - st = rte_zmalloc_socket(NULL, tsz, RTE_CACHE_LINE_SIZE, prm->socket_id); - if (st == NULL) { - UDP_LOG(ERR, "%s: allocation of %zu bytes on " - "socket %d failed\n", - __func__, tsz, prm->socket_id); - return NULL; - } - - st->prm = prm[0]; - bcg_queue_reset(&st->free); - - p = (uintptr_t)RTE_PTR_ALIGN_CEIL((st + 1), prm->cage_align); - end = p + n * sz; - - for (; p != end; p += sz) { - bc = (struct buf_cage *)p; - bc->st = st; - bc->num = prm->cage_bufs; - STAILQ_INSERT_TAIL(&st->free.queue, bc, ql); - } - - st->free.num = n; - st->nb_cages = n; - st->cage_sz = sz; - st->total_sz = tsz; - return st; -} - -void -bcg_destroy(struct bcg_store *st) -{ - rte_free(st); -} diff --git a/lib/libtle_udp/buf_cage.h b/lib/libtle_udp/buf_cage.h deleted file mode 100644 index 3b3c429..0000000 --- a/lib/libtle_udp/buf_cage.h +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright (c) 2016 Intel Corporation. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef _BUF_CAGE_H_ -#define _BUF_CAGE_H_ - -#include -#include -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -struct bcg_store; - -struct buf_cage { - struct bcg_store *st; - STAILQ_ENTRY(buf_cage) ql; - uint32_t num; - uint32_t rp; - uint32_t wp; - const void *bufs[0]; -}; - -struct bcg_queue { - rte_spinlock_t lock; - uint32_t num; - STAILQ_HEAD(, buf_cage) queue; -}; - -struct bcg_store_prm { - void *user_data; - int32_t socket_id; /* NUMA socket to allocate memory from. */ - uint32_t max_bufs; /* total number of bufs to cage. */ - uint32_t min_cages; /* min number of cages per store. */ - uint32_t cage_bufs; /* min number of bufs per cage. */ - uint32_t cage_align; /* each cage to be aligned (power of 2). */ -}; - -struct bcg_store { - struct bcg_queue free; - uint32_t nb_cages; - size_t cage_sz; - size_t total_sz; - struct bcg_store_prm prm; -} __rte_cache_aligned; - -struct bcg_store *bcg_create(const struct bcg_store_prm *prm); -void bcg_destroy(struct bcg_store *st); - -static inline int -bcg_store_full(const struct bcg_store *st) -{ - return st->nb_cages == st->free.num; -} - -static inline void -bcg_queue_reset(struct bcg_queue *bq) -{ - STAILQ_INIT(&bq->queue); - bq->num = 0; - rte_spinlock_init(&bq->lock); -} - -static inline void -bcg_reset(struct buf_cage *bc) -{ - bc->rp = 0; - bc->wp = 0; -} - -static inline void * -bcg_get_udata(struct buf_cage *bc) -{ - return bc->st->prm.user_data; -} - -static inline struct buf_cage * -__bcg_dequeue_head(struct bcg_queue *bq) -{ - struct buf_cage *bc; - - bc = STAILQ_FIRST(&bq->queue); - if (bc != NULL) { - STAILQ_REMOVE_HEAD(&bq->queue, ql); - bq->num--; - } - return bc; -} - -static inline struct buf_cage * -bcg_dequeue_head(struct bcg_queue *bq) -{ - struct buf_cage *bc; - - if (bq->num == 0) - return NULL; - - rte_compiler_barrier(); - - rte_spinlock_lock(&bq->lock); - bc = __bcg_dequeue_head(bq); - rte_spinlock_unlock(&bq->lock); - return bc; -} - -static inline uint32_t -__bcg_enqueue_head(struct bcg_queue *bq, struct buf_cage *bc) -{ - STAILQ_INSERT_HEAD(&bq->queue, bc, ql); - return ++bq->num; -} - -static inline uint32_t -bcg_enqueue_head(struct bcg_queue *bq, struct buf_cage *bc) -{ - uint32_t n; - - rte_spinlock_lock(&bq->lock); - n = __bcg_enqueue_head(bq, bc); - rte_spinlock_unlock(&bq->lock); - return n; -} - -static inline uint32_t -__bcg_enqueue_tail(struct bcg_queue *bq, struct buf_cage *bc) -{ - STAILQ_INSERT_TAIL(&bq->queue, bc, ql); - return ++bq->num; -} - -static inline uint32_t -bcg_enqueue_tail(struct bcg_queue *bq, struct buf_cage *bc) -{ - uint32_t n; - - rte_spinlock_lock(&bq->lock); - n = __bcg_enqueue_tail(bq, bc); - rte_spinlock_unlock(&bq->lock); - return n; -} - -static inline uint32_t -bcg_queue_append(struct bcg_queue *dst, struct bcg_queue *src) -{ - rte_spinlock_lock(&src->lock); - STAILQ_CONCAT(&dst->queue, &src->queue); - dst->num += src->num; - src->num = 0; - rte_spinlock_unlock(&src->lock); - return dst->num; -} - -static inline uint32_t -bcg_free_count(const struct buf_cage *bc) -{ - return bc->num - bc->wp; -} - - -static inline uint32_t -bcg_fill_count(const struct buf_cage *bc) -{ - return bc->wp - bc->rp; -} - -/* !!! if going to keep it - try to unroll copying stuff. !!! */ -static inline uint32_t -bcg_get(struct buf_cage *bc, const void *bufs[], uint32_t num) -{ - uint32_t i, n, r; - - r = bc->rp; - n = RTE_MIN(num, bc->wp - r); - for (i = 0; i != n; i++) - bufs[i] = bc->bufs[r + i]; - - bc->rp = r + n; - return n; -} - -static inline uint32_t -bcg_put(struct buf_cage *bc, const void *bufs[], uint32_t num) -{ - uint32_t i, n, w; - - w = bc->wp; - n = RTE_MIN(num, bc->num - w); - for (i = 0; i != n; i++) - bc->bufs[w + i] = bufs[i]; - - bc->wp = w + n; - return n; -} - - -static inline struct buf_cage * -bcg_alloc(struct bcg_store *st) -{ - return bcg_dequeue_head(&st->free); -} - -static inline uint32_t -bcg_free(struct buf_cage *bc) -{ - struct bcg_store *st; - - st = bc->st; - bcg_reset(bc); - return bcg_enqueue_head(&st->free, bc); -} - -#ifdef __cplusplus -} -#endif - -#endif /* _BUF_CAGE_H_ */ diff --git a/lib/libtle_udp/misc.h b/lib/libtle_udp/misc.h index 3874647..359f400 100644 --- a/lib/libtle_udp/misc.h +++ b/lib/libtle_udp/misc.h @@ -44,6 +44,21 @@ _mbuf_tx_offload(uint64_t il2, uint64_t il3, uint64_t il4, uint64_t tso, return il2 | il3 << 7 | il4 << 16 | tso << 24 | ol3 << 40 | ol2 << 49; } +/* + * Given the value of mbuf's tx_offload, calculate L4 payload offset. + */ +static inline uint32_t +_tx_offload_l4_offset(uint64_t ofl) +{ + uint32_t l2, l3, l4; + + l2 = ofl & 0x7f; + l3 = ofl >> 7 & 0x1ff; + l4 = ofl >> 16 & UINT8_MAX; + + return l2 + l3 + l4; +} + /* * Routines to calculate L3/L4 checksums in SW. * Pretty similar to ones from DPDK librte_net/rte_ip.h, diff --git a/lib/libtle_udp/osdep.h b/lib/libtle_udp/osdep.h index 6161242..8e91964 100644 --- a/lib/libtle_udp/osdep.h +++ b/lib/libtle_udp/osdep.h @@ -23,6 +23,17 @@ extern "C" { #endif +/* + * internal defines. + */ +#define MAX_PKT_BURST 0x20 + +#define MAX_DRB_BURST 4 + +/* + * logging related macros. + */ + #define UDP_LOG(lvl, fmt, args...) RTE_LOG(lvl, USER1, fmt, ##args) /* diff --git a/lib/libtle_udp/tle_udp_impl.h b/lib/libtle_udp/tle_udp_impl.h index a5d17e1..8e61ea6 100644 --- a/lib/libtle_udp/tle_udp_impl.h +++ b/lib/libtle_udp/tle_udp_impl.h @@ -84,6 +84,7 @@ struct tle_udp_ctx_param { uint32_t max_streams; /**< max number of streams in context. */ uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */ uint32_t max_stream_sbufs; /**< max send mbufs per stream. */ + uint32_t send_bulk_size; /**< expected # of packets per send call. */ int (*lookup4)(void *opaque, const struct in_addr *addr, struct tle_udp_dest *res); diff --git a/lib/libtle_udp/udp_ctl.c b/lib/libtle_udp/udp_ctl.c index 36ec8a6..55c4afd 100644 --- a/lib/libtle_udp/udp_ctl.c +++ b/lib/libtle_udp/udp_ctl.c @@ -29,8 +29,6 @@ #define LPORT_START_BLK PORT_BLK(LPORT_START) #define LPORT_END_BLK PORT_BLK(LPORT_END) -#define MAX_BURST 0x20 - static const struct in6_addr tle_udp6_any = IN6ADDR_ANY_INIT; static const struct in6_addr tle_udp6_none = { { @@ -60,12 +58,31 @@ unuse_stream(struct tle_udp_stream *s) rte_atomic32_set(&s->tx.use, INT32_MIN); } +/* calculate number of drbs per stream. */ +static uint32_t +calc_stream_drb_num(const struct tle_udp_ctx *ctx, uint32_t obj_num) +{ + uint32_t num; + + num = (ctx->prm.max_stream_sbufs + obj_num - 1) / obj_num; + num = num + num / 2; + num = RTE_MAX(num, RTE_DIM(ctx->dev) + 1); + return num; +} + +static uint32_t +drb_nb_elem(const struct tle_udp_ctx *ctx) +{ + return (ctx->prm.send_bulk_size != 0) ? + ctx->prm.send_bulk_size : MAX_PKT_BURST; +} + static int init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s) { - size_t sz; - uint32_t n; - struct bcg_store_prm sp; + size_t bsz, rsz, sz; + uint32_t i, k, n, nb; + struct tle_drb *drb; char name[RTE_RING_NAMESIZE]; /* init RX part. */ @@ -88,21 +105,45 @@ init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s) /* init TX part. */ - sp.socket_id = ctx->prm.socket_id; - sp.max_bufs = ctx->prm.max_stream_sbufs; - sp.min_cages = RTE_DIM(ctx->dev) + 1; - sp.cage_bufs = MAX_BURST; - sp.cage_align = RTE_CACHE_LINE_SIZE; - sp.user_data = s; - - s->tx.st = bcg_create(&sp); - if (s->tx.st == NULL) { - UDP_LOG(ERR, - "%s(%p): bcg_create() failed with error code: %d\n", - __func__, s, rte_errno); + nb = drb_nb_elem(ctx); + k = calc_stream_drb_num(ctx, nb); + n = rte_align32pow2(k); + + /* size of the drbs ring */ + rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]); + rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE); + + /* size of the drb. */ + bsz = tle_drb_calc_size(nb); + + /* total stream drbs size. */ + sz = rsz + bsz * k; + + s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s->tx.drb.r == NULL) { + UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, s, sz, ctx->prm.socket_id, rte_errno); return ENOMEM; } + snprintf(name, sizeof(name), "%p@%zu", s, sz); + rte_ring_init(s->tx.drb.r, name, n, 0); + + for (i = 0; i != k; i++) { + drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r + + rsz + bsz * i); + drb->udata = s; + drb->size = nb; + rte_ring_enqueue(s->tx.drb.r, drb); + } + + s->tx.drb.nb_elem = nb; + s->tx.drb.nb_max = k; + + /* mark stream as avaialble to use. */ + s->ctx = ctx; unuse_stream(s); STAILQ_INSERT_TAIL(&ctx->streams.free, s, link); @@ -113,8 +154,8 @@ init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s) static void fini_stream(struct tle_udp_stream *s) { - bcg_destroy(s->tx.st); rte_free(s->rx.q); + rte_free(s->tx.drb.r); } struct tle_udp_ctx * @@ -181,15 +222,15 @@ tle_udp_destroy(struct tle_udp_ctx *ctx) return; } + for (i = 0; i != RTE_DIM(ctx->dev); i++) + tle_udp_del_dev(ctx->dev + i); + if (ctx->streams.buf != 0) { for (i = 0; i != ctx->prm.max_streams; i++) fini_stream(&ctx->streams.buf[i]); rte_free(ctx->streams.buf); } - for (i = 0; i != RTE_DIM(ctx->dev); i++) - tle_udp_del_dev(ctx->dev + i); - rte_free(ctx); } @@ -279,8 +320,7 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx, } /* setup TX data. */ - bcg_queue_reset(&dev->tx.beq); - bcg_queue_reset(&dev->tx.feq); + tle_dring_reset(&dev->tx.dr); if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0) { dev->tx.ol_flags[TLE_UDP_V4] |= PKT_TX_IPV4 | PKT_TX_UDP_CKSUM; @@ -297,25 +337,33 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx, } static void -empty_cage(struct buf_cage *bc) +empty_dring(struct tle_dring *dr) { - uint32_t i, n; - struct rte_mbuf *pkt[MAX_BURST]; + uint32_t i, k, n; + struct tle_udp_stream *s; + struct rte_mbuf *pkt[MAX_PKT_BURST]; + struct tle_drb *drb[MAX_PKT_BURST]; do { - n = bcg_get(bc, (const void **)(uintptr_t)pkt, RTE_DIM(pkt)); + k = RTE_DIM(drb); + n = tle_dring_sc_dequeue(dr, (const void **)(uintptr_t)pkt, + RTE_DIM(pkt), drb, &k); + + /* free mbufs */ for (i = 0; i != n; i++) rte_pktmbuf_free(pkt[i]); + /* free drbs */ + for (i = 0; i != k; i++) { + s = drb[i]->udata; + rte_ring_enqueue(s->tx.drb.r, drb[i]); + } } while (n != 0); - - bcg_free(bc); } int tle_udp_del_dev(struct tle_udp_dev *dev) { uint32_t p; - struct buf_cage *bc; struct tle_udp_ctx *ctx; ctx = dev->ctx; @@ -331,13 +379,7 @@ tle_udp_del_dev(struct tle_udp_dev *dev) return -EINVAL; /* emtpy TX queues. */ - if (dev->tx.bc != NULL) - empty_cage(dev->tx.bc); - - bcg_queue_append(&dev->tx.beq, &dev->tx.feq); - - while ((bc = __bcg_dequeue_head(&dev->tx.beq)) != NULL) - empty_cage(bc); + empty_dring(&dev->tx.dr); rte_free(dev->dp[TLE_UDP_V4]); rte_free(dev->dp[TLE_UDP_V6]); @@ -591,7 +633,7 @@ tle_udp_stream_open(struct tle_udp_ctx *ctx, return NULL; /* some TX still pending for that stream. */ - } else if (bcg_store_full(s->tx.st) == 0) { + } else if (UDP_STREAM_TX_PENDING(s)) { put_stream(ctx, s, 0); rte_errno = EAGAIN; return NULL; @@ -649,7 +691,7 @@ tle_udp_stream_close(struct tle_udp_stream *s) uint32_t i, n; int32_t rc; struct tle_udp_ctx *ctx; - struct rte_mbuf *m[MAX_BURST]; + struct rte_mbuf *m[MAX_PKT_BURST]; static const struct tle_udp_stream_cb zcb; @@ -661,11 +703,6 @@ tle_udp_stream_close(struct tle_udp_stream *s) /* mark stream as unavaialbe for RX/TX. */ stream_down(s); - /* reset TX cages. */ - rte_spinlock_lock(&s->tx.lock); - memset(s->tx.cg, 0, sizeof(s->tx.cg)); - rte_spinlock_unlock(&s->tx.lock); - /* reset stream events if any. */ if (s->rx.ev != NULL) { tle_event_idle(s->rx.ev); @@ -696,7 +733,7 @@ tle_udp_stream_close(struct tle_udp_stream *s) * if there still are pkts queued for TX, * then put this stream to the tail of free list. */ - put_stream(ctx, s, bcg_store_full(s->tx.st)); + put_stream(ctx, s, UDP_STREAM_TX_FINISHED(s)); return rc; } diff --git a/lib/libtle_udp/udp_impl.h b/lib/libtle_udp/udp_impl.h index fbdb743..af35197 100644 --- a/lib/libtle_udp/udp_impl.h +++ b/lib/libtle_udp/udp_impl.h @@ -18,10 +18,10 @@ #include #include +#include #include #include -#include "buf_cage.h" #include "port_bitmap.h" #include "osdep.h" @@ -104,16 +104,24 @@ struct tle_udp_stream { struct { rte_atomic32_t use; - rte_spinlock_t lock; + struct { + uint32_t nb_elem; /* number of obects per drb. */ + uint32_t nb_max; /* number of drbs per stream. */ + struct rte_ring *r; + } drb; struct tle_event *ev; struct tle_udp_stream_cb cb; - struct bcg_store *st; - struct buf_cage *cg[RTE_MAX_ETHPORTS]; } tx __rte_cache_aligned; struct tle_udp_stream_param prm; } __rte_cache_aligned; +#define UDP_STREAM_TX_PENDING(s) \ + ((s)->tx.drb.nb_max != rte_ring_count((s)->tx.drb.r)) + +#define UDP_STREAM_TX_FINISHED(s) \ + ((s)->tx.drb.nb_max == rte_ring_count((s)->tx.drb.r)) + struct tle_udp_dport { struct udp_pbm use; /* ports in use. */ struct tle_udp_stream *streams[MAX_PORT_NUM]; /* port to stream. */ @@ -128,11 +136,9 @@ struct tle_udp_dev { /* used by FE. */ uint64_t ol_flags[TLE_UDP_VNUM]; rte_atomic32_t packet_id[TLE_UDP_VNUM]; - struct bcg_queue feq; - /* used by BE only. */ - struct bcg_queue beq __rte_cache_min_aligned; - struct buf_cage *bc; + /* used by FE & BE. */ + struct tle_dring dr; } tx; struct tle_udp_dev_param prm; /* copy of device paramaters. */ struct tle_udp_dport *dp[TLE_UDP_VNUM]; /* device udp ports */ @@ -140,7 +146,6 @@ struct tle_udp_dev { struct tle_udp_ctx { struct tle_udp_ctx_param prm; - struct { rte_spinlock_t lock; uint32_t nb_free; /* number of free streams. */ diff --git a/lib/libtle_udp/udp_rxtx.c b/lib/libtle_udp/udp_rxtx.c index a8ab3fb..7136714 100644 --- a/lib/libtle_udp/udp_rxtx.c +++ b/lib/libtle_udp/udp_rxtx.c @@ -217,13 +217,13 @@ tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], } static inline void -tx_cage_release(struct buf_cage *bc) +stream_drb_release(struct tle_udp_stream *s, struct tle_drb * drb[], + uint32_t nb_drb) { - struct tle_udp_stream *s; uint32_t n; - s = bcg_get_udata(bc); - n = bcg_free(bc); + n = rte_ring_count(s->tx.drb.r); + rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb); /* If stream is still open, then mark it as avaialble for writing. */ if (rwl_try_acquire(&s->tx.use) > 0) { @@ -232,7 +232,7 @@ tx_cage_release(struct buf_cage *bc) tle_event_raise(s->tx.ev); /* if stream send buffer was full invoke TX callback */ - else if (s->tx.cb.func != NULL && n == 1) + else if (s->tx.cb.func != NULL && n == 0) s->tx.cb.func(s->tx.cb.data, s); } @@ -240,53 +240,32 @@ tx_cage_release(struct buf_cage *bc) rwl_release(&s->tx.use); } -static inline void -tx_cage_update(struct tle_udp_dev *dev, struct buf_cage *bc) -{ - struct tle_udp_stream *s; - struct tle_udp_ctx *ctx; - uint32_t idx; - - ctx = dev->ctx; - s = bcg_get_udata(bc); - idx = dev - ctx->dev; - - /* mark cage as closed to the stream. */ - rte_spinlock_lock(&s->tx.lock); - if (bc == s->tx.cg[idx]) - s->tx.cg[idx] = NULL; - rte_spinlock_unlock(&s->tx.lock); -} - uint16_t tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num) { - struct buf_cage *bc; - uint32_t i, n; + uint32_t i, j, k, n; + struct tle_drb *drb[num]; + struct tle_udp_stream *s; - for (i = 0; i != num; i += n) { + /* extract packets from device TX queue. */ - bc = dev->tx.bc; - if (bc == NULL) { - if (dev->tx.beq.num == 0) - bcg_queue_append(&dev->tx.beq, &dev->tx.feq); - bc = __bcg_dequeue_head(&dev->tx.beq); - if (bc == NULL) - break; - tx_cage_update(dev, bc); - dev->tx.bc = bc; - } + k = num; + n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt, + num, drb, &k); - n = bcg_get(bc, (const void **)(uintptr_t)&pkt[i], num - i); + if (n == 0) + return 0; - /* cage is empty, need to free it and notify related stream. */ - if (bcg_fill_count(bc) == 0) { - tx_cage_release(bc); - dev->tx.bc = NULL; - } + /* free empty drbs and notify related streams. */ + + for (i = 0; i != k; i = j) { + s = drb[i]->udata; + for (j = i + 1; j != k && s == drb[i]->udata; j++) + ; + stream_drb_release(s, drb + i, j - i); } - return i; + return n; } static int @@ -359,30 +338,41 @@ compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero) * helper function, do the necessary pre-processing for the received packets * before handiing them to the strem_recv caller. */ -static inline struct rte_mbuf * -recv_pkt_process(struct rte_mbuf *m, uint32_t type) +static inline uint32_t +recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type) { - uint64_t f; - - f = m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD); - if (f != 0) { - if (check_pkt_csum(m, type) == 0) - m->ol_flags ^= f; - else { - rte_pktmbuf_free(m); - return NULL; + uint32_t i, k; + uint64_t f, flg[num], ofl[num]; + + for (i = 0; i != num; i++) { + flg[i] = m[i]->ol_flags; + ofl[i] = m[i]->tx_offload; + } + + k = 0; + for (i = 0; i != num; i++) { + + f = flg[i] & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD); + + /* drop packets with invalid cksum(s). */ + if (f != 0 && check_pkt_csum(m[i], type) != 0) { + rte_pktmbuf_free(m[i]); + m[i] = NULL; + k++; } + + m[i]->ol_flags ^= f; + rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i])); } - rte_pktmbuf_adj(m, m->l2_len + m->l3_len + m->l4_len); - return m; + return k; } uint16_t tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[], uint16_t num) { - uint32_t i, k, n; + uint32_t k, n; n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num); if (n == 0) @@ -398,28 +388,7 @@ tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[], rwl_release(&s->rx.use); } - k = 0; - for (i = 0; i != RTE_ALIGN_FLOOR(n, 4); i += 4) { - pkt[i] = recv_pkt_process(pkt[i], s->type); - pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type); - pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type); - pkt[i + 3] = recv_pkt_process(pkt[i + 3], s->type); - k += (pkt[i] == NULL) + (pkt[i + 1] == NULL) + - (pkt[i + 2] == NULL) + (pkt[i + 3] == NULL); - } - - switch (n % 4) { - case 3: - pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type); - k += (pkt[i + 2] == NULL); - case 2: - pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type); - k += (pkt[i + 1] == NULL); - case 1: - pkt[i] = recv_pkt_process(pkt[i], s->type); - k += (pkt[i] == NULL); - } - + k = recv_pkt_process(pkt, n, s->type); return compress_pkt_list(pkt, n, k); } @@ -586,65 +555,59 @@ fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num, return frag_num; } +static inline void +stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[], + uint32_t nb_drb) +{ + rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb); +} + +static inline uint32_t +stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[], + uint32_t nb_drb) +{ + return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb); +} + /* enqueue up to num packets to the destination device queue. */ static inline uint16_t -queue_pkt_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di, - const void *pkt[], uint16_t num) +queue_pkt_out(struct tle_udp_stream *s, struct tle_udp_dev *dev, + const void *pkt[], uint16_t nb_pkt, + struct tle_drb *drbs[], uint32_t *nb_drb) { - struct buf_cage *bc; - uint32_t i, n; + uint32_t bsz, i, n, nb, nbc, nbm; - rte_spinlock_lock(&s->tx.lock); - bc = s->tx.cg[di]; + bsz = s->tx.drb.nb_elem; - for (i = 0; i != num; i += n) { - if (bc == NULL) { - bc = bcg_alloc(s->tx.st); - if (bc == NULL) - break; - n = bcg_put(bc, pkt + i, num - i); - bcg_enqueue_tail(bq, bc); - } else - n = bcg_put(bc, pkt + i, num - i); + /* calulate how many drbs are needed.*/ + nbc = *nb_drb; + nbm = (nb_pkt + bsz - 1) / bsz; + nb = RTE_MAX(nbm, nbc) - nbc; - if (n != num - i) - bc = NULL; - } + /* allocate required drbs */ + if (nb != 0) + nb = stream_drb_alloc(s, drbs + nbc, nb); - s->tx.cg[di] = bc; - rte_spinlock_unlock(&s->tx.lock); - return i; -} + nb += nbc; -/* - * etiher enqueue all num packets or none. - * assumes that all number of input packets not exceed size of buf_cage. - */ -static inline uint16_t -queue_frg_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di, - const void *pkt[], uint16_t num) -{ - struct buf_cage *bc, *bcp; - uint32_t n; + /* no free drbs, can't send anything */ + if (nb == 0) + return 0; - rte_spinlock_lock(&s->tx.lock); - bc = s->tx.cg[di]; + /* not enough free drbs, reduce number of packets to send. */ + else if (nb != nbm) + nb_pkt = nb * bsz; - n = 0; - if (bc == NULL || bcg_free_count(bc) < num) { - bcp = bc; - bc = bcg_alloc(s->tx.st); - if (bc != NULL) { - if (bcp != NULL) - n = bcg_put(bcp, pkt, num); - n += bcg_put(bc, pkt, num - n); - bcg_enqueue_tail(bq, bc); - } - } else - n = bcg_put(bc, pkt, num); + /* enqueue packets to the destination device. */ + nbc = nb; + n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb); - s->tx.cg[di] = bc; - rte_spinlock_unlock(&s->tx.lock); + /* if not all available drbs were consumed, move them to the start. */ + nbc -= nb; + for (i = 0; i != nb; i++) + drbs[i] = drbs[nbc + i]; + + *nb_drb = nb; return n; } @@ -654,13 +617,14 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], { int32_t di, frg, rc; uint64_t ol_flags; - uint32_t i, k, n; + uint32_t i, k, n, nb; uint32_t mtu, pid, type; const struct sockaddr_in *d4; const struct sockaddr_in6 *d6; const void *da; union udph udph; struct tle_udp_dest dst; + struct tle_drb *drb[num]; type = s->type; @@ -704,6 +668,7 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], if (rwl_acquire(&s->tx.use) < 0) return 0; + nb = 0; for (i = 0, k = 0; k != num; k = i) { /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */ @@ -726,8 +691,9 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], /* enqueue non-fragment packets to the destination device. */ if (k != i) { - k += queue_pkt_out(s, &dst.dev->tx.feq, di, - (const void **)(uintptr_t)&pkt[k], i - k); + k += queue_pkt_out(s, dst.dev, + (const void **)(uintptr_t)&pkt[k], i - k, + drb, &nb); /* stream TX queue is full. */ if (k != i) @@ -746,8 +712,8 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], break; } - n = queue_frg_out(s, &dst.dev->tx.feq, di, - (const void **)(uintptr_t)frag, rc); + n = queue_pkt_out(s, dst.dev, + (const void **)(uintptr_t)frag, rc, drb, &nb); if (n == 0) { while (rc-- != 0) rte_pktmbuf_free(frag[rc]); @@ -765,6 +731,11 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], tle_event_raise(s->tx.ev); out: + /* free unused drbs. */ + if (nb != 0) + stream_drb_free(s, drb, nb); + + /* stream can be closed. */ rwl_release(&s->tx.use); /* -- cgit 1.2.3-korg