summaryrefslogtreecommitdiffstats
path: root/lib/libtle_udp/udp_ctl.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libtle_udp/udp_ctl.c')
-rw-r--r--lib/libtle_udp/udp_ctl.c127
1 files changed, 82 insertions, 45 deletions
diff --git a/lib/libtle_udp/udp_ctl.c b/lib/libtle_udp/udp_ctl.c
index 36ec8a6..55c4afd 100644
--- a/lib/libtle_udp/udp_ctl.c
+++ b/lib/libtle_udp/udp_ctl.c
@@ -29,8 +29,6 @@
#define LPORT_START_BLK PORT_BLK(LPORT_START)
#define LPORT_END_BLK PORT_BLK(LPORT_END)
-#define MAX_BURST 0x20
-
static const struct in6_addr tle_udp6_any = IN6ADDR_ANY_INIT;
static const struct in6_addr tle_udp6_none = {
{
@@ -60,12 +58,31 @@ unuse_stream(struct tle_udp_stream *s)
rte_atomic32_set(&s->tx.use, INT32_MIN);
}
+/* calculate number of drbs per stream. */
+static uint32_t
+calc_stream_drb_num(const struct tle_udp_ctx *ctx, uint32_t obj_num)
+{
+ uint32_t num;
+
+ num = (ctx->prm.max_stream_sbufs + obj_num - 1) / obj_num;
+ num = num + num / 2;
+ num = RTE_MAX(num, RTE_DIM(ctx->dev) + 1);
+ return num;
+}
+
+static uint32_t
+drb_nb_elem(const struct tle_udp_ctx *ctx)
+{
+ return (ctx->prm.send_bulk_size != 0) ?
+ ctx->prm.send_bulk_size : MAX_PKT_BURST;
+}
+
static int
init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
{
- size_t sz;
- uint32_t n;
- struct bcg_store_prm sp;
+ size_t bsz, rsz, sz;
+ uint32_t i, k, n, nb;
+ struct tle_drb *drb;
char name[RTE_RING_NAMESIZE];
/* init RX part. */
@@ -88,21 +105,45 @@ init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
/* init TX part. */
- sp.socket_id = ctx->prm.socket_id;
- sp.max_bufs = ctx->prm.max_stream_sbufs;
- sp.min_cages = RTE_DIM(ctx->dev) + 1;
- sp.cage_bufs = MAX_BURST;
- sp.cage_align = RTE_CACHE_LINE_SIZE;
- sp.user_data = s;
-
- s->tx.st = bcg_create(&sp);
- if (s->tx.st == NULL) {
- UDP_LOG(ERR,
- "%s(%p): bcg_create() failed with error code: %d\n",
- __func__, s, rte_errno);
+ nb = drb_nb_elem(ctx);
+ k = calc_stream_drb_num(ctx, nb);
+ n = rte_align32pow2(k);
+
+ /* size of the drbs ring */
+ rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]);
+ rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
+
+ /* size of the drb. */
+ bsz = tle_drb_calc_size(nb);
+
+ /* total stream drbs size. */
+ sz = rsz + bsz * k;
+
+ s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+ ctx->prm.socket_id);
+ if (s->tx.drb.r == NULL) {
+ UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
+ "failed with error code: %d\n",
+ __func__, s, sz, ctx->prm.socket_id, rte_errno);
return ENOMEM;
}
+ snprintf(name, sizeof(name), "%p@%zu", s, sz);
+ rte_ring_init(s->tx.drb.r, name, n, 0);
+
+ for (i = 0; i != k; i++) {
+ drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
+ rsz + bsz * i);
+ drb->udata = s;
+ drb->size = nb;
+ rte_ring_enqueue(s->tx.drb.r, drb);
+ }
+
+ s->tx.drb.nb_elem = nb;
+ s->tx.drb.nb_max = k;
+
+ /* mark stream as avaialble to use. */
+
s->ctx = ctx;
unuse_stream(s);
STAILQ_INSERT_TAIL(&ctx->streams.free, s, link);
@@ -113,8 +154,8 @@ init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
static void
fini_stream(struct tle_udp_stream *s)
{
- bcg_destroy(s->tx.st);
rte_free(s->rx.q);
+ rte_free(s->tx.drb.r);
}
struct tle_udp_ctx *
@@ -181,15 +222,15 @@ tle_udp_destroy(struct tle_udp_ctx *ctx)
return;
}
+ for (i = 0; i != RTE_DIM(ctx->dev); i++)
+ tle_udp_del_dev(ctx->dev + i);
+
if (ctx->streams.buf != 0) {
for (i = 0; i != ctx->prm.max_streams; i++)
fini_stream(&ctx->streams.buf[i]);
rte_free(ctx->streams.buf);
}
- for (i = 0; i != RTE_DIM(ctx->dev); i++)
- tle_udp_del_dev(ctx->dev + i);
-
rte_free(ctx);
}
@@ -279,8 +320,7 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx,
}
/* setup TX data. */
- bcg_queue_reset(&dev->tx.beq);
- bcg_queue_reset(&dev->tx.feq);
+ tle_dring_reset(&dev->tx.dr);
if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0) {
dev->tx.ol_flags[TLE_UDP_V4] |= PKT_TX_IPV4 | PKT_TX_UDP_CKSUM;
@@ -297,25 +337,33 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx,
}
static void
-empty_cage(struct buf_cage *bc)
+empty_dring(struct tle_dring *dr)
{
- uint32_t i, n;
- struct rte_mbuf *pkt[MAX_BURST];
+ uint32_t i, k, n;
+ struct tle_udp_stream *s;
+ struct rte_mbuf *pkt[MAX_PKT_BURST];
+ struct tle_drb *drb[MAX_PKT_BURST];
do {
- n = bcg_get(bc, (const void **)(uintptr_t)pkt, RTE_DIM(pkt));
+ k = RTE_DIM(drb);
+ n = tle_dring_sc_dequeue(dr, (const void **)(uintptr_t)pkt,
+ RTE_DIM(pkt), drb, &k);
+
+ /* free mbufs */
for (i = 0; i != n; i++)
rte_pktmbuf_free(pkt[i]);
+ /* free drbs */
+ for (i = 0; i != k; i++) {
+ s = drb[i]->udata;
+ rte_ring_enqueue(s->tx.drb.r, drb[i]);
+ }
} while (n != 0);
-
- bcg_free(bc);
}
int
tle_udp_del_dev(struct tle_udp_dev *dev)
{
uint32_t p;
- struct buf_cage *bc;
struct tle_udp_ctx *ctx;
ctx = dev->ctx;
@@ -331,13 +379,7 @@ tle_udp_del_dev(struct tle_udp_dev *dev)
return -EINVAL;
/* emtpy TX queues. */
- if (dev->tx.bc != NULL)
- empty_cage(dev->tx.bc);
-
- bcg_queue_append(&dev->tx.beq, &dev->tx.feq);
-
- while ((bc = __bcg_dequeue_head(&dev->tx.beq)) != NULL)
- empty_cage(bc);
+ empty_dring(&dev->tx.dr);
rte_free(dev->dp[TLE_UDP_V4]);
rte_free(dev->dp[TLE_UDP_V6]);
@@ -591,7 +633,7 @@ tle_udp_stream_open(struct tle_udp_ctx *ctx,
return NULL;
/* some TX still pending for that stream. */
- } else if (bcg_store_full(s->tx.st) == 0) {
+ } else if (UDP_STREAM_TX_PENDING(s)) {
put_stream(ctx, s, 0);
rte_errno = EAGAIN;
return NULL;
@@ -649,7 +691,7 @@ tle_udp_stream_close(struct tle_udp_stream *s)
uint32_t i, n;
int32_t rc;
struct tle_udp_ctx *ctx;
- struct rte_mbuf *m[MAX_BURST];
+ struct rte_mbuf *m[MAX_PKT_BURST];
static const struct tle_udp_stream_cb zcb;
@@ -661,11 +703,6 @@ tle_udp_stream_close(struct tle_udp_stream *s)
/* mark stream as unavaialbe for RX/TX. */
stream_down(s);
- /* reset TX cages. */
- rte_spinlock_lock(&s->tx.lock);
- memset(s->tx.cg, 0, sizeof(s->tx.cg));
- rte_spinlock_unlock(&s->tx.lock);
-
/* reset stream events if any. */
if (s->rx.ev != NULL) {
tle_event_idle(s->rx.ev);
@@ -696,7 +733,7 @@ tle_udp_stream_close(struct tle_udp_stream *s)
* if there still are pkts queued for TX,
* then put this stream to the tail of free list.
*/
- put_stream(ctx, s, bcg_store_full(s->tx.st));
+ put_stream(ctx, s, UDP_STREAM_TX_FINISHED(s));
return rc;
}