diff options
Diffstat (limited to 'lib/libtle_l4p/udp_stream.c')
-rw-r--r-- | lib/libtle_l4p/udp_stream.c | 347 |
1 files changed, 265 insertions, 82 deletions
diff --git a/lib/libtle_l4p/udp_stream.c b/lib/libtle_l4p/udp_stream.c index 29f5a40..0cd5c27 100644 --- a/lib/libtle_l4p/udp_stream.c +++ b/lib/libtle_l4p/udp_stream.c @@ -43,74 +43,87 @@ fini_stream(struct tle_udp_stream *s) static void udp_fini_streams(struct tle_ctx *ctx) { - uint32_t i; - struct tle_udp_stream *s; + struct udp_streams *us; + struct tle_stream *s; + + us = CTX_UDP_STREAMS(ctx); + if (us != NULL) { + stbl_fini(&us->st); + + while (ctx->streams.nb_free--) { + s = STAILQ_FIRST(&ctx->streams.free); + STAILQ_FIRST(&ctx->streams.free) = STAILQ_NEXT(s, link); + fini_stream(UDP_STREAM(s)); + } - s = ctx->streams.buf; - if (s != NULL) { - for (i = 0; i != ctx->prm.max_streams; i++) - fini_stream(s + i); } - rte_free(s); + rte_free(us); ctx->streams.buf = NULL; STAILQ_INIT(&ctx->streams.free); } +/* stream memory layout: + * [tle_udp_stream] [rx.q] [tx.drb.r] + */ static int -init_stream(struct tle_ctx *ctx, struct tle_udp_stream *s) +add_stream(struct tle_ctx *ctx) { - size_t bsz, rsz, sz; - uint32_t i, k, n, nb; + size_t sz_s, sz_rxq, sz_drb_r, sz; + /* for rx.q */ + uint32_t n_rxq; + /* for tx.drb.r */ + size_t bsz, rsz; struct tle_drb *drb; - char name[RTE_RING_NAMESIZE]; + uint32_t k, nb, n_drb; - /* init RX part. */ - - n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); - n = rte_align32pow2(n); - sz = rte_ring_get_memsize(n); - - s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, - ctx->prm.socket_id); - if (s->rx.q == NULL) { - UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " - "failed with error code: %d\n", - __func__, s, sz, ctx->prm.socket_id, rte_errno); - return -ENOMEM; - } + uint32_t i, f; + char name[RTE_RING_NAMESIZE]; + struct tle_udp_stream *s; - snprintf(name, sizeof(name), "%p@%zu", s, sz); - rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ); + // stream + sz_s = RTE_ALIGN_CEIL(sizeof(*s), RTE_CACHE_LINE_SIZE); - /* init TX part. */ + // rx.q + n_rxq = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); + n_rxq = rte_align32pow2(n_rxq); + sz_rxq = rte_ring_get_memsize(n_rxq); + sz_rxq = RTE_ALIGN_CEIL(sz_rxq, RTE_CACHE_LINE_SIZE); + // tx.drb.r nb = drb_nb_elem(ctx); k = calc_stream_drb_num(ctx, nb); - n = rte_align32pow2(k); - - /* size of the drbs ring */ - rsz = rte_ring_get_memsize(n); + n_drb = rte_align32pow2(k); + rsz = rte_ring_get_memsize(n_drb); /* size of the drbs ring */ rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE); + bsz = tle_drb_calc_size(nb); /* size of the drb. */ + sz_drb_r = rsz + bsz * k; /* total stream drbs size. */ + sz_drb_r = RTE_ALIGN_CEIL(sz_drb_r, RTE_CACHE_LINE_SIZE); - /* size of the drb. */ - bsz = tle_drb_calc_size(nb); - - /* total stream drbs size. */ - sz = rsz + bsz * k; - - s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, - ctx->prm.socket_id); - if (s->tx.drb.r == NULL) { - UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + sz = sz_s + sz_rxq + sz_drb_r; + s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s == NULL) { + UDP_LOG(ERR, "%s: allocation of %zu bytes on socket %d " "failed with error code: %d\n", - __func__, s, sz, ctx->prm.socket_id, rte_errno); + __func__, sz, ctx->prm.socket_id, rte_errno); return -ENOMEM; } - snprintf(name, sizeof(name), "%p@%zu", s, sz); - rte_ring_init(s->tx.drb.r, name, n, 0); + s->rx.q = (struct rte_ring *)((uintptr_t)s + sz_s); + s->tx.drb.r = (struct rte_ring *)((uintptr_t)s->rx.q + sz_rxq); + + // ring flags + f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 : + (RING_F_SP_ENQ | RING_F_SC_DEQ); + + /* init RX part. */ + snprintf(name, sizeof(name), "%p@%zu", s->rx.q, sz_rxq); + rte_ring_init(s->rx.q, name, n_rxq, f); + /* init TX part. */ + snprintf(name, sizeof(name), "%p@%zu", s->tx.drb.r, sz_drb_r); + rte_ring_init(s->tx.drb.r, name, n_drb, f); for (i = 0; i != k; i++) { drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r + rsz + bsz * i); @@ -146,38 +159,59 @@ udp_init_streams(struct tle_ctx *ctx) size_t sz; uint32_t i; int32_t rc; - struct tle_udp_stream *s; + struct udp_streams *us; - sz = sizeof(*s) * ctx->prm.max_streams; - s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + sz = sizeof(*us); + us = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, ctx->prm.socket_id); - if (s == NULL) { + if (us == NULL) { UDP_LOG(ERR, "allocation of %zu bytes on socket %d " "for %u udp_streams failed\n", sz, ctx->prm.socket_id, ctx->prm.max_streams); return -ENOMEM; } - ctx->streams.buf = s; + ctx->streams.buf = us; STAILQ_INIT(&ctx->streams.free); - for (i = 0; i != ctx->prm.max_streams; i++) { - rc = init_stream(ctx, s + i); - if (rc != 0) { - UDP_LOG(ERR, "initalisation of %u-th stream failed", i); - udp_fini_streams(ctx); - return rc; - } + rc = stbl_init(&us->st, (ctx->prm.flags & TLE_CTX_FLAG_ST) == 0); + if (rc < 0) { + UDP_LOG(ERR, "failed to init UDP stbl: rc = %dl\n", rc); + return rc; } - return 0; + for (i = 0; rc == 0 && i != ctx->prm.min_streams; i++) + rc = add_stream(ctx); + + if (rc != 0) { + UDP_LOG(ERR, "initalisation of %u-th stream failed", i); + udp_fini_streams(ctx); + } + + return rc; } -static void __attribute__((constructor)) +static uint32_t +udp_more_streams(struct tle_ctx *ctx) +{ + uint32_t i, nb; + uint32_t nb_max = ctx->prm.max_streams; + uint32_t nb_cur = ctx->streams.nb_cur; + + nb = RTE_MIN(ctx->prm.delta_streams, nb_max - nb_cur); + for (i = 0; i < nb; i++) + if (add_stream(ctx) != 0) + break; + + return i; +} + +static void __attribute__((constructor(101))) udp_stream_setup(void) { static const struct stream_ops udp_ops = { .init_streams = udp_init_streams, + .more_streams = udp_more_streams, .fini_streams = udp_fini_streams, .free_drbs = udp_free_drbs, }; @@ -188,8 +222,8 @@ udp_stream_setup(void) static inline void stream_down(struct tle_udp_stream *s) { - rwl_down(&s->rx.use); - rwl_down(&s->tx.use); + rwl_try_down(&s->rx.use); + rwl_try_down(&s->tx.use); } static inline void @@ -224,6 +258,59 @@ check_stream_prm(const struct tle_ctx *ctx, } struct tle_stream * +tle_udp_stream_set(struct tle_stream *ts, struct tle_ctx *ctx, + const struct tle_udp_stream_param *prm) +{ + struct tle_udp_stream *s; + int32_t rc; + + if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) { + tle_udp_stream_close(ts); + rte_errno = EINVAL; + return NULL; + } + + s = UDP_STREAM(ts); + + /* free stream's destination port */ + rc = stream_clear_ctx(ctx, &s->s); + + if (s->ste) { + stbl_del_stream(CTX_UDP_STLB(ctx), s->ste, ts); + s->ste = NULL; + } + + /* copy input parameters. */ + s->prm = *prm; + s->s.option.raw = prm->option; + + /* setup L4 ports and L3 addresses fields. */ + rc = stream_fill_ctx(ctx, &s->s, + (const struct sockaddr *)&prm->local_addr, + (const struct sockaddr *)&prm->remote_addr); + + if (rc != 0) + goto error; + + /* add stream to the table for non-listen type stream */ + if (!is_empty_addr((const struct sockaddr *)&prm->remote_addr)) { + s->ste = stbl_add_stream(CTX_UDP_STLB(ctx), &s->s); + if (s->ste == NULL) { + rc = EEXIST; + goto error; + } + } + + return &s->s; + +error: + tle_udp_stream_close(ts); + rte_errno = rc; + return NULL; + +} + +struct tle_stream * tle_udp_stream_open(struct tle_ctx *ctx, const struct tle_udp_stream_param *prm) { @@ -237,42 +324,80 @@ tle_udp_stream_open(struct tle_ctx *ctx, s = (struct tle_udp_stream *)get_stream(ctx); if (s == NULL) { - rte_errno = ENFILE; - return NULL; - - /* some TX still pending for that stream. */ - } else if (UDP_STREAM_TX_PENDING(s)) { - put_stream(ctx, &s->s, 0); rte_errno = EAGAIN; return NULL; } /* copy input parameters. */ s->prm = *prm; + s->s.option.raw = prm->option; /* setup L4 ports and L3 addresses fields. */ rc = stream_fill_ctx(ctx, &s->s, (const struct sockaddr *)&prm->local_addr, (const struct sockaddr *)&prm->remote_addr); - if (rc != 0) { - put_stream(ctx, &s->s, 1); - s = NULL; - rte_errno = rc; - } else { - /* setup stream notification menchanism */ - s->rx.ev = prm->recv_ev; - s->rx.cb = prm->recv_cb; - s->tx.ev = prm->send_ev; - s->tx.cb = prm->send_cb; - - /* mark stream as avaialbe for RX/TX */ - if (s->tx.ev != NULL) - tle_event_raise(s->tx.ev); - stream_up(s); + if (rc != 0) + goto error; + + /* add stream to the table for non-listen type stream */ + if (!is_empty_addr((const struct sockaddr *)&prm->remote_addr)) { + s->ste = stbl_add_stream(CTX_UDP_STLB(ctx), &s->s); + if (s->ste == NULL) { + rc = EEXIST; + goto error; + } } + /* setup stream notification menchanism */ + s->rx.ev = prm->recv_ev; + s->rx.cb = prm->recv_cb; + s->tx.ev = prm->send_ev; + s->tx.cb = prm->send_cb; + + /* mark stream as avaialbe for RX/TX */ + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + stream_up(s); + return &s->s; + +error: + put_stream(ctx, &s->s, 1); + rte_errno = rc; + return NULL; +} + +int +tle_udp_stream_shutdown(struct tle_stream *us, int how) +{ + bool shut_rd = false; + bool shut_wr = false; + struct tle_udp_stream *s = UDP_STREAM(us); + + switch (how) { + case SHUT_RD: + shut_rd = true; + rwl_down(&s->rx.use); + break; + case SHUT_WR: + shut_wr = true; + rwl_down(&s->tx.use); + break; + case SHUT_RDWR: + shut_rd = true; + shut_wr = true; + stream_down(s); + break; + default: + return -EINVAL; + } + + if (shut_rd && s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + if (shut_wr && s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + return 0; } int @@ -312,6 +437,11 @@ tle_udp_stream_close(struct tle_stream *us) /* empty stream's RX queue */ empty_mbuf_ring(s->rx.q); + if (s->ste) { + stbl_del_stream(CTX_UDP_STLB(ctx), s->ste, us); + s->ste = NULL; + } + /* * mark the stream as free again. * if there still are pkts queued for TX, @@ -344,3 +474,56 @@ tle_udp_stream_get_param(const struct tle_stream *us, return 0; } + +/* + * helper function, updates stream config + */ +static inline int +stream_update_cfg(struct tle_stream *us, struct tle_udp_stream_param *prm) +{ + struct tle_udp_stream *s; + + s = UDP_STREAM(us); + + /* setup stream notification menchanism */ + s->rx.ev = prm->recv_ev; + s->rx.cb = prm->recv_cb; + s->tx.ev = prm->send_ev; + s->tx.cb = prm->send_cb; + + rte_smp_wmb(); + + /* invoke async notifications, if any */ + if (rte_ring_count(s->rx.q) != 0) { + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + else if (s->rx.cb.func != NULL) + s->rx.cb.func(s->rx.cb.data, &s->s); + } + + /* always ok to write */ + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + else if (s->tx.cb.func != NULL) + s->tx.cb.func(s->tx.cb.data, &s->s); + + return 0; +} + +uint32_t +tle_udp_stream_update_cfg(struct tle_stream *us[], + struct tle_udp_stream_param prm[], uint32_t num) +{ + int32_t rc; + uint32_t i; + + for (i = 0; i != num; i++) { + rc = stream_update_cfg(us[i], &prm[i]); + if (rc != 0) { + rte_errno = -rc; + break; + } + } + + return i; +} |