aboutsummaryrefslogtreecommitdiffstats
path: root/lib/libtle_l4p/tcp_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libtle_l4p/tcp_stream.c')
-rw-r--r--lib/libtle_l4p/tcp_stream.c173
1 files changed, 118 insertions, 55 deletions
diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c
index a212405..fce3b9a 100644
--- a/lib/libtle_l4p/tcp_stream.c
+++ b/lib/libtle_l4p/tcp_stream.c
@@ -28,6 +28,8 @@
#include "tcp_ofo.h"
#include "tcp_txq.h"
+#define MAX_STREAM_BURST 0x40
+
static void
unuse_stream(struct tle_tcp_stream *s)
{
@@ -42,11 +44,13 @@ tcp_fini_streams(struct tle_ctx *ctx)
ts = CTX_TCP_STREAMS(ctx);
if (ts != NULL) {
- stbl_fini(&ts->st);
- /* free the timer wheel */
+ stbl_fini(&ts->st);
tle_timer_free(ts->tmr);
rte_free(ts->tsq);
+ tle_memtank_dump(stdout, ts->mts, TLE_MTANK_DUMP_STAT);
+ tle_memtank_sanity_check(ts->mts, 0);
+ tle_memtank_destroy(ts->mts);
STAILQ_INIT(&ts->dr.fe);
STAILQ_INIT(&ts->dr.be);
@@ -122,7 +126,7 @@ calc_stream_szofs(struct tle_ctx *ctx, struct stream_szofs *szofs)
szofs->size = sz;
}
-static int
+static void
init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s,
const struct stream_szofs *szofs)
{
@@ -163,9 +167,6 @@ init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s,
s->s.ctx = ctx;
unuse_stream(s);
- STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link);
-
- return 0;
}
static void
@@ -178,38 +179,107 @@ tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
}
static struct tle_timer_wheel *
-alloc_timers(uint32_t num, uint32_t mshift, int32_t socket)
+alloc_timers(const struct tle_ctx *ctx)
{
+ struct tle_timer_wheel *twl;
struct tle_timer_wheel_args twprm;
twprm.tick_size = TCP_RTO_GRANULARITY;
- twprm.max_timer = num;
- twprm.socket_id = socket;
- return tle_timer_create(&twprm, tcp_get_tms(mshift));
+ twprm.max_timer = ctx->prm.max_streams;
+ twprm.socket_id = ctx->prm.socket_id;
+
+ twl = tle_timer_create(&twprm, tcp_get_tms(ctx->cycles_ms_shift));
+ if (twl == NULL)
+ TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
+ ctx, rte_errno);
+ return twl;
+}
+
+static void *
+mts_alloc(size_t sz, void *udata)
+{
+ struct tle_ctx *ctx;
+
+ ctx = udata;
+ return rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+ ctx->prm.socket_id);
+}
+
+static void
+mts_free(void *p, void *udata)
+{
+ RTE_SET_USED(udata);
+ rte_free(p);
+}
+
+static void
+mts_init(void *pa[], uint32_t num, void *udata)
+{
+ uint32_t i;
+ struct tle_ctx *ctx;
+ struct tcp_streams *ts;
+
+ ctx = udata;
+ ts = CTX_TCP_STREAMS(ctx);
+
+ for (i = 0; i != num; i++)
+ init_stream(ctx, pa[i], &ts->szofs);
+}
+
+static struct tle_memtank *
+alloc_mts(struct tle_ctx *ctx, uint32_t stream_size)
+{
+ struct tle_memtank *mts;
+ struct tle_memtank_prm prm;
+
+ static const struct tle_memtank_prm cprm = {
+ .obj_align = RTE_CACHE_LINE_SIZE,
+ .flags = TLE_MTANK_OBJ_DBG,
+ .alloc = mts_alloc,
+ .free = mts_free,
+ .init = mts_init,
+ };
+
+ prm = cprm;
+ prm.udata = ctx;
+
+ prm.obj_size = stream_size;
+
+ prm.min_free = (ctx->prm.free_streams.min != 0) ?
+ ctx->prm.free_streams.min : ctx->prm.max_streams;
+ prm.max_free = (ctx->prm.free_streams.max > prm.min_free) ?
+ ctx->prm.free_streams.max : prm.min_free;
+
+ prm.nb_obj_chunk = MAX_STREAM_BURST;
+ prm.max_obj = ctx->prm.max_streams;
+
+ mts = tle_memtank_create(&prm);
+ if (mts == NULL)
+ TCP_LOG(ERR, "%s(ctx=%p) failed with error=%d\n",
+ __func__, ctx, rte_errno);
+ else
+ tle_memtank_grow(mts);
+
+ return mts;
}
static int
tcp_init_streams(struct tle_ctx *ctx)
{
- size_t sz;
- uint32_t f, i;
+ uint32_t f;
int32_t rc;
struct tcp_streams *ts;
- struct tle_tcp_stream *ps;
struct stream_szofs szofs;
f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
(RING_F_SP_ENQ | RING_F_SC_DEQ);
calc_stream_szofs(ctx, &szofs);
+ TCP_LOG(NOTICE, "ctx:%p, caluclated stream size: %u\n",
+ ctx, szofs.size);
- sz = sizeof(*ts) + szofs.size * ctx->prm.max_streams;
- ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+ ts = rte_zmalloc_socket(NULL, sizeof(*ts), RTE_CACHE_LINE_SIZE,
ctx->prm.socket_id);
-
- TCP_LOG(NOTICE, "allocation of %zu bytes on socket %d "
- "for %u tcp_streams returns %p\n",
- sz, ctx->prm.socket_id, ctx->prm.max_streams, ts);
if (ts == NULL)
return -ENOMEM;
@@ -221,29 +291,22 @@ tcp_init_streams(struct tle_ctx *ctx)
ctx->streams.buf = ts;
STAILQ_INIT(&ctx->streams.free);
- ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->cycles_ms_shift,
- ctx->prm.socket_id);
- if (ts->tmr == NULL) {
- TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
- ctx, rte_errno);
- rc = -ENOMEM;
- } else {
- ts->tsq = alloc_ring(ctx->prm.max_streams,
- f | RING_F_SC_DEQ, ctx->prm.socket_id);
- if (ts->tsq == NULL)
+ rc = stbl_init(&ts->st, ctx->prm.max_streams, ctx->prm.socket_id);
+
+ if (rc == 0) {
+ ts->tsq = alloc_ring(ctx->prm.max_streams, f | RING_F_SC_DEQ,
+ ctx->prm.socket_id);
+ ts->tmr = alloc_timers(ctx);
+ ts->mts = alloc_mts(ctx, szofs.size);
+
+ if (ts->tsq == NULL || ts->tmr == NULL || ts->mts == NULL)
rc = -ENOMEM;
- else
- rc = stbl_init(&ts->st, ctx->prm.max_streams,
- ctx->prm.socket_id);
- }
- for (i = 0; rc == 0 && i != ctx->prm.max_streams; i++) {
- ps = (void *)((uintptr_t)ts->s + i * ts->szofs.size);
- rc = init_stream(ctx, ps, &ts->szofs);
+ tle_memtank_dump(stdout, ts->mts, TLE_MTANK_DUMP_STAT);
}
if (rc != 0) {
- TCP_LOG(ERR, "initalisation of %u-th stream failed", i);
+ TCP_LOG(ERR, "initalisation of tcp streams failed");
tcp_fini_streams(ctx);
}
@@ -302,6 +365,7 @@ struct tle_stream *
tle_tcp_stream_open(struct tle_ctx *ctx,
const struct tle_tcp_stream_param *prm)
{
+ struct tcp_streams *ts;
struct tle_tcp_stream *s;
int32_t rc;
@@ -310,15 +374,11 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
return NULL;
}
- s = (struct tle_tcp_stream *)get_stream(ctx);
- if (s == NULL) {
- rte_errno = ENFILE;
- return NULL;
+ ts = CTX_TCP_STREAMS(ctx);
- /* some TX still pending for that stream. */
- } else if (TCP_STREAM_TX_PENDING(s)) {
- put_stream(ctx, &s->s, 0);
- rte_errno = EAGAIN;
+ s = tcp_stream_get(ctx, TLE_MTANK_ALLOC_CHUNK | TLE_MTANK_ALLOC_GROW);
+ if (s == NULL) {
+ rte_errno = ENFILE;
return NULL;
}
@@ -328,7 +388,8 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
(const struct sockaddr *)&prm->addr.remote);
if (rc != 0) {
- put_stream(ctx, &s->s, 1);
+ tle_memtank_free(ts->mts, (void **)&s, 1,
+ TLE_MTANK_FREE_SHRINK);
rte_errno = rc;
return NULL;
}
@@ -426,18 +487,17 @@ tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num)
rc = 0;
- for (i = 0; i != num; i++) {
+ for (i = 0; i != num && rc == 0; i++) {
s = TCP_STREAM(ts[i]);
- if (ts[i] == NULL || s->s.type >= TLE_VNUM) {
+ if (ts[i] == NULL || s->s.type >= TLE_VNUM)
rc = EINVAL;
- break;
- }
- ctx = s->s.ctx;
- rc = stream_close(ctx, s);
- if (rc != 0)
- break;
+ else {
+ ctx = s->s.ctx;
+ rc = stream_close(ctx, s);
+ tle_memtank_shrink(CTX_TCP_MTS(ctx));
+ }
}
if (rc != 0)
@@ -448,6 +508,7 @@ tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num)
int
tle_tcp_stream_close(struct tle_stream *ts)
{
+ int32_t rc;
struct tle_ctx *ctx;
struct tle_tcp_stream *s;
@@ -456,7 +517,9 @@ tle_tcp_stream_close(struct tle_stream *ts)
return -EINVAL;
ctx = s->s.ctx;
- return stream_close(ctx, s);
+ rc = stream_close(ctx, s);
+ tle_memtank_shrink(CTX_TCP_MTS(ctx));
+ return rc;
}
int