aboutsummaryrefslogtreecommitdiffstats
path: root/lib/libtle_l4p/udp_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libtle_l4p/udp_stream.c')
-rw-r--r--lib/libtle_l4p/udp_stream.c347
1 files changed, 265 insertions, 82 deletions
diff --git a/lib/libtle_l4p/udp_stream.c b/lib/libtle_l4p/udp_stream.c
index 29f5a40..0cd5c27 100644
--- a/lib/libtle_l4p/udp_stream.c
+++ b/lib/libtle_l4p/udp_stream.c
@@ -43,74 +43,87 @@ fini_stream(struct tle_udp_stream *s)
static void
udp_fini_streams(struct tle_ctx *ctx)
{
- uint32_t i;
- struct tle_udp_stream *s;
+ struct udp_streams *us;
+ struct tle_stream *s;
+
+ us = CTX_UDP_STREAMS(ctx);
+ if (us != NULL) {
+ stbl_fini(&us->st);
+
+ while (ctx->streams.nb_free--) {
+ s = STAILQ_FIRST(&ctx->streams.free);
+ STAILQ_FIRST(&ctx->streams.free) = STAILQ_NEXT(s, link);
+ fini_stream(UDP_STREAM(s));
+ }
- s = ctx->streams.buf;
- if (s != NULL) {
- for (i = 0; i != ctx->prm.max_streams; i++)
- fini_stream(s + i);
}
- rte_free(s);
+ rte_free(us);
ctx->streams.buf = NULL;
STAILQ_INIT(&ctx->streams.free);
}
+/* stream memory layout:
+ * [tle_udp_stream] [rx.q] [tx.drb.r]
+ */
static int
-init_stream(struct tle_ctx *ctx, struct tle_udp_stream *s)
+add_stream(struct tle_ctx *ctx)
{
- size_t bsz, rsz, sz;
- uint32_t i, k, n, nb;
+ size_t sz_s, sz_rxq, sz_drb_r, sz;
+ /* for rx.q */
+ uint32_t n_rxq;
+ /* for tx.drb.r */
+ size_t bsz, rsz;
struct tle_drb *drb;
- char name[RTE_RING_NAMESIZE];
+ uint32_t k, nb, n_drb;
- /* init RX part. */
-
- n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
- n = rte_align32pow2(n);
- sz = rte_ring_get_memsize(n);
-
- s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
- ctx->prm.socket_id);
- if (s->rx.q == NULL) {
- UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
- "failed with error code: %d\n",
- __func__, s, sz, ctx->prm.socket_id, rte_errno);
- return -ENOMEM;
- }
+ uint32_t i, f;
+ char name[RTE_RING_NAMESIZE];
+ struct tle_udp_stream *s;
- snprintf(name, sizeof(name), "%p@%zu", s, sz);
- rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ);
+ // stream
+ sz_s = RTE_ALIGN_CEIL(sizeof(*s), RTE_CACHE_LINE_SIZE);
- /* init TX part. */
+ // rx.q
+ n_rxq = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
+ n_rxq = rte_align32pow2(n_rxq);
+ sz_rxq = rte_ring_get_memsize(n_rxq);
+ sz_rxq = RTE_ALIGN_CEIL(sz_rxq, RTE_CACHE_LINE_SIZE);
+ // tx.drb.r
nb = drb_nb_elem(ctx);
k = calc_stream_drb_num(ctx, nb);
- n = rte_align32pow2(k);
-
- /* size of the drbs ring */
- rsz = rte_ring_get_memsize(n);
+ n_drb = rte_align32pow2(k);
+ rsz = rte_ring_get_memsize(n_drb); /* size of the drbs ring */
rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
+ bsz = tle_drb_calc_size(nb); /* size of the drb. */
+ sz_drb_r = rsz + bsz * k; /* total stream drbs size. */
+ sz_drb_r = RTE_ALIGN_CEIL(sz_drb_r, RTE_CACHE_LINE_SIZE);
- /* size of the drb. */
- bsz = tle_drb_calc_size(nb);
-
- /* total stream drbs size. */
- sz = rsz + bsz * k;
-
- s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
- ctx->prm.socket_id);
- if (s->tx.drb.r == NULL) {
- UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
+ sz = sz_s + sz_rxq + sz_drb_r;
+ s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+ ctx->prm.socket_id);
+ if (s == NULL) {
+ UDP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
"failed with error code: %d\n",
- __func__, s, sz, ctx->prm.socket_id, rte_errno);
+ __func__, sz, ctx->prm.socket_id, rte_errno);
return -ENOMEM;
}
- snprintf(name, sizeof(name), "%p@%zu", s, sz);
- rte_ring_init(s->tx.drb.r, name, n, 0);
+ s->rx.q = (struct rte_ring *)((uintptr_t)s + sz_s);
+ s->tx.drb.r = (struct rte_ring *)((uintptr_t)s->rx.q + sz_rxq);
+
+ // ring flags
+ f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
+ (RING_F_SP_ENQ | RING_F_SC_DEQ);
+
+ /* init RX part. */
+ snprintf(name, sizeof(name), "%p@%zu", s->rx.q, sz_rxq);
+ rte_ring_init(s->rx.q, name, n_rxq, f);
+ /* init TX part. */
+ snprintf(name, sizeof(name), "%p@%zu", s->tx.drb.r, sz_drb_r);
+ rte_ring_init(s->tx.drb.r, name, n_drb, f);
for (i = 0; i != k; i++) {
drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
rsz + bsz * i);
@@ -146,38 +159,59 @@ udp_init_streams(struct tle_ctx *ctx)
size_t sz;
uint32_t i;
int32_t rc;
- struct tle_udp_stream *s;
+ struct udp_streams *us;
- sz = sizeof(*s) * ctx->prm.max_streams;
- s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+ sz = sizeof(*us);
+ us = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
ctx->prm.socket_id);
- if (s == NULL) {
+ if (us == NULL) {
UDP_LOG(ERR, "allocation of %zu bytes on socket %d "
"for %u udp_streams failed\n",
sz, ctx->prm.socket_id, ctx->prm.max_streams);
return -ENOMEM;
}
- ctx->streams.buf = s;
+ ctx->streams.buf = us;
STAILQ_INIT(&ctx->streams.free);
- for (i = 0; i != ctx->prm.max_streams; i++) {
- rc = init_stream(ctx, s + i);
- if (rc != 0) {
- UDP_LOG(ERR, "initalisation of %u-th stream failed", i);
- udp_fini_streams(ctx);
- return rc;
- }
+ rc = stbl_init(&us->st, (ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
+ if (rc < 0) {
+ UDP_LOG(ERR, "failed to init UDP stbl: rc = %dl\n", rc);
+ return rc;
}
- return 0;
+ for (i = 0; rc == 0 && i != ctx->prm.min_streams; i++)
+ rc = add_stream(ctx);
+
+ if (rc != 0) {
+ UDP_LOG(ERR, "initalisation of %u-th stream failed", i);
+ udp_fini_streams(ctx);
+ }
+
+ return rc;
}
-static void __attribute__((constructor))
+static uint32_t
+udp_more_streams(struct tle_ctx *ctx)
+{
+ uint32_t i, nb;
+ uint32_t nb_max = ctx->prm.max_streams;
+ uint32_t nb_cur = ctx->streams.nb_cur;
+
+ nb = RTE_MIN(ctx->prm.delta_streams, nb_max - nb_cur);
+ for (i = 0; i < nb; i++)
+ if (add_stream(ctx) != 0)
+ break;
+
+ return i;
+}
+
+static void __attribute__((constructor(101)))
udp_stream_setup(void)
{
static const struct stream_ops udp_ops = {
.init_streams = udp_init_streams,
+ .more_streams = udp_more_streams,
.fini_streams = udp_fini_streams,
.free_drbs = udp_free_drbs,
};
@@ -188,8 +222,8 @@ udp_stream_setup(void)
static inline void
stream_down(struct tle_udp_stream *s)
{
- rwl_down(&s->rx.use);
- rwl_down(&s->tx.use);
+ rwl_try_down(&s->rx.use);
+ rwl_try_down(&s->tx.use);
}
static inline void
@@ -224,6 +258,59 @@ check_stream_prm(const struct tle_ctx *ctx,
}
struct tle_stream *
+tle_udp_stream_set(struct tle_stream *ts, struct tle_ctx *ctx,
+ const struct tle_udp_stream_param *prm)
+{
+ struct tle_udp_stream *s;
+ int32_t rc;
+
+ if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
+ tle_udp_stream_close(ts);
+ rte_errno = EINVAL;
+ return NULL;
+ }
+
+ s = UDP_STREAM(ts);
+
+ /* free stream's destination port */
+ rc = stream_clear_ctx(ctx, &s->s);
+
+ if (s->ste) {
+ stbl_del_stream(CTX_UDP_STLB(ctx), s->ste, ts);
+ s->ste = NULL;
+ }
+
+ /* copy input parameters. */
+ s->prm = *prm;
+ s->s.option.raw = prm->option;
+
+ /* setup L4 ports and L3 addresses fields. */
+ rc = stream_fill_ctx(ctx, &s->s,
+ (const struct sockaddr *)&prm->local_addr,
+ (const struct sockaddr *)&prm->remote_addr);
+
+ if (rc != 0)
+ goto error;
+
+ /* add stream to the table for non-listen type stream */
+ if (!is_empty_addr((const struct sockaddr *)&prm->remote_addr)) {
+ s->ste = stbl_add_stream(CTX_UDP_STLB(ctx), &s->s);
+ if (s->ste == NULL) {
+ rc = EEXIST;
+ goto error;
+ }
+ }
+
+ return &s->s;
+
+error:
+ tle_udp_stream_close(ts);
+ rte_errno = rc;
+ return NULL;
+
+}
+
+struct tle_stream *
tle_udp_stream_open(struct tle_ctx *ctx,
const struct tle_udp_stream_param *prm)
{
@@ -237,42 +324,80 @@ tle_udp_stream_open(struct tle_ctx *ctx,
s = (struct tle_udp_stream *)get_stream(ctx);
if (s == NULL) {
- rte_errno = ENFILE;
- return NULL;
-
- /* some TX still pending for that stream. */
- } else if (UDP_STREAM_TX_PENDING(s)) {
- put_stream(ctx, &s->s, 0);
rte_errno = EAGAIN;
return NULL;
}
/* copy input parameters. */
s->prm = *prm;
+ s->s.option.raw = prm->option;
/* setup L4 ports and L3 addresses fields. */
rc = stream_fill_ctx(ctx, &s->s,
(const struct sockaddr *)&prm->local_addr,
(const struct sockaddr *)&prm->remote_addr);
- if (rc != 0) {
- put_stream(ctx, &s->s, 1);
- s = NULL;
- rte_errno = rc;
- } else {
- /* setup stream notification menchanism */
- s->rx.ev = prm->recv_ev;
- s->rx.cb = prm->recv_cb;
- s->tx.ev = prm->send_ev;
- s->tx.cb = prm->send_cb;
-
- /* mark stream as avaialbe for RX/TX */
- if (s->tx.ev != NULL)
- tle_event_raise(s->tx.ev);
- stream_up(s);
+ if (rc != 0)
+ goto error;
+
+ /* add stream to the table for non-listen type stream */
+ if (!is_empty_addr((const struct sockaddr *)&prm->remote_addr)) {
+ s->ste = stbl_add_stream(CTX_UDP_STLB(ctx), &s->s);
+ if (s->ste == NULL) {
+ rc = EEXIST;
+ goto error;
+ }
}
+ /* setup stream notification menchanism */
+ s->rx.ev = prm->recv_ev;
+ s->rx.cb = prm->recv_cb;
+ s->tx.ev = prm->send_ev;
+ s->tx.cb = prm->send_cb;
+
+ /* mark stream as avaialbe for RX/TX */
+ if (s->tx.ev != NULL)
+ tle_event_raise(s->tx.ev);
+ stream_up(s);
+
return &s->s;
+
+error:
+ put_stream(ctx, &s->s, 1);
+ rte_errno = rc;
+ return NULL;
+}
+
+int
+tle_udp_stream_shutdown(struct tle_stream *us, int how)
+{
+ bool shut_rd = false;
+ bool shut_wr = false;
+ struct tle_udp_stream *s = UDP_STREAM(us);
+
+ switch (how) {
+ case SHUT_RD:
+ shut_rd = true;
+ rwl_down(&s->rx.use);
+ break;
+ case SHUT_WR:
+ shut_wr = true;
+ rwl_down(&s->tx.use);
+ break;
+ case SHUT_RDWR:
+ shut_rd = true;
+ shut_wr = true;
+ stream_down(s);
+ break;
+ default:
+ return -EINVAL;
+ }
+
+ if (shut_rd && s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ if (shut_wr && s->tx.ev != NULL)
+ tle_event_raise(s->tx.ev);
+ return 0;
}
int
@@ -312,6 +437,11 @@ tle_udp_stream_close(struct tle_stream *us)
/* empty stream's RX queue */
empty_mbuf_ring(s->rx.q);
+ if (s->ste) {
+ stbl_del_stream(CTX_UDP_STLB(ctx), s->ste, us);
+ s->ste = NULL;
+ }
+
/*
* mark the stream as free again.
* if there still are pkts queued for TX,
@@ -344,3 +474,56 @@ tle_udp_stream_get_param(const struct tle_stream *us,
return 0;
}
+
+/*
+ * helper function, updates stream config
+ */
+static inline int
+stream_update_cfg(struct tle_stream *us, struct tle_udp_stream_param *prm)
+{
+ struct tle_udp_stream *s;
+
+ s = UDP_STREAM(us);
+
+ /* setup stream notification menchanism */
+ s->rx.ev = prm->recv_ev;
+ s->rx.cb = prm->recv_cb;
+ s->tx.ev = prm->send_ev;
+ s->tx.cb = prm->send_cb;
+
+ rte_smp_wmb();
+
+ /* invoke async notifications, if any */
+ if (rte_ring_count(s->rx.q) != 0) {
+ if (s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ else if (s->rx.cb.func != NULL)
+ s->rx.cb.func(s->rx.cb.data, &s->s);
+ }
+
+ /* always ok to write */
+ if (s->tx.ev != NULL)
+ tle_event_raise(s->tx.ev);
+ else if (s->tx.cb.func != NULL)
+ s->tx.cb.func(s->tx.cb.data, &s->s);
+
+ return 0;
+}
+
+uint32_t
+tle_udp_stream_update_cfg(struct tle_stream *us[],
+ struct tle_udp_stream_param prm[], uint32_t num)
+{
+ int32_t rc;
+ uint32_t i;
+
+ for (i = 0; i != num; i++) {
+ rc = stream_update_cfg(us[i], &prm[i]);
+ if (rc != 0) {
+ rte_errno = -rc;
+ break;
+ }
+ }
+
+ return i;
+}