From 78c896b3b3127515478090c19447e27dc406427e Mon Sep 17 00:00:00 2001 From: Jianfeng Tan Date: Mon, 18 Nov 2019 06:59:50 +0000 Subject: TLDKv2 Signed-off-by: Jianfeng Tan Signed-off-by: Jielong Zhou Signed-off-by: Jian Zhang Signed-off-by: Chen Zhao Change-Id: I55c39de4c6cd30f991f35631eb507f770230f08e --- lib/libtle_l4p/tcp_stream.c | 395 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 305 insertions(+), 90 deletions(-) (limited to 'lib/libtle_l4p/tcp_stream.c') diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c index 676521b..4a65053 100644 --- a/lib/libtle_l4p/tcp_stream.c +++ b/lib/libtle_l4p/tcp_stream.c @@ -20,6 +20,8 @@ #include #include +#include + #include "tcp_stream.h" #include "tcp_timer.h" #include "stream_table.h" @@ -27,6 +29,7 @@ #include "tcp_ctl.h" #include "tcp_ofo.h" #include "tcp_txq.h" +#include "tcp_rxtx.h" static void unuse_stream(struct tle_tcp_stream *s) @@ -38,25 +41,27 @@ unuse_stream(struct tle_tcp_stream *s) static void fini_stream(struct tle_tcp_stream *s) { - if (s != NULL) { - rte_free(s->rx.q); - tcp_ofo_free(s->rx.ofo); - rte_free(s->tx.q); - rte_free(s->tx.drb.r); - } + rte_free(s); } static void tcp_fini_streams(struct tle_ctx *ctx) { - uint32_t i; struct tcp_streams *ts; + struct tle_stream *s; ts = CTX_TCP_STREAMS(ctx); if (ts != NULL) { stbl_fini(&ts->st); - for (i = 0; i != ctx->prm.max_streams; i++) - fini_stream(&ts->s[i]); + + /* TODO: free those in use? may be not necessary, as we assume + * all streams have been closed and are free. + */ + while (ctx->streams.nb_free--) { + s = STAILQ_FIRST(&ctx->streams.free); + STAILQ_FIRST(&ctx->streams.free) = STAILQ_NEXT(s, link); + fini_stream(TCP_STREAM(s)); + } /* free the timer wheel */ tle_timer_free(ts->tmr); @@ -94,61 +99,100 @@ alloc_ring(uint32_t n, uint32_t flags, int32_t socket) return r; } +/* stream memory layout: + * [tle_tcp_stream] [rx.q] [rx.ofo] [tx.q] [tx.drb.r] + */ static int -init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s) +add_stream(struct tle_ctx *ctx) { - size_t bsz, rsz, sz; - uint32_t f, i, k, n, nb; + size_t sz_s, sz_rxq, sz_ofo, sz_txq, sz_drb_r, sz; + /* for rx.q */ + uint32_t n_rxq; + /* for rx.ofo */ + struct ofo *ofo; + struct rte_mbuf **obj; + uint32_t ndb, nobj; + size_t dsz, osz; + /* for tx.q */ + uint32_t n_txq; + /* for tx.drb.r */ + size_t bsz, rsz; struct tle_drb *drb; - char name[RTE_RING_NAMESIZE]; - - f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 : - (RING_F_SP_ENQ | RING_F_SC_DEQ); - - /* init RX part. */ - - n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); - s->rx.q = alloc_ring(n, f | RING_F_SP_ENQ, ctx->prm.socket_id); - if (s->rx.q == NULL) - return -ENOMEM; - - s->rx.ofo = tcp_ofo_alloc(n, ctx->prm.socket_id); - if (s->rx.ofo == NULL) - return -ENOMEM; - - /* init TX part. */ + uint32_t k, nb, n_drb; - n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U); - s->tx.q = alloc_ring(n, f | RING_F_SC_DEQ, ctx->prm.socket_id); - if (s->tx.q == NULL) - return -ENOMEM; + uint32_t f, i; + char name[RTE_RING_NAMESIZE]; + struct tle_tcp_stream *s; + // stream + sz_s = RTE_ALIGN_CEIL(sizeof(*s), RTE_CACHE_LINE_SIZE); + + // rx.q + n_rxq = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); + n_rxq = rte_align32pow2(n_rxq); + sz_rxq = rte_ring_get_memsize(n_rxq); + sz_rxq = RTE_ALIGN_CEIL(sz_rxq, RTE_CACHE_LINE_SIZE); + + // rx.ofo + calc_ofo_elems(n_rxq, &nobj, &ndb); + osz = sizeof(*ofo) + sizeof(ofo->db[0]) * ndb; + dsz = sizeof(ofo->db[0].obj[0]) * nobj * ndb; + sz_ofo = osz + dsz; + sz_ofo = RTE_ALIGN_CEIL(sz_ofo, RTE_CACHE_LINE_SIZE); + + // tx.q + n_txq = RTE_MAX(ctx->prm.max_stream_sbufs, 1U); + n_txq = rte_align32pow2(n_txq); + sz_txq = rte_ring_get_memsize(n_txq); + sz_txq = RTE_ALIGN_CEIL(sz_txq, RTE_CACHE_LINE_SIZE); + + // tx.drb.r nb = drb_nb_elem(ctx); k = calc_stream_drb_num(ctx, nb); - n = rte_align32pow2(k); - - /* size of the drbs ring */ - rsz = rte_ring_get_memsize(n); + n_drb = rte_align32pow2(k); + rsz = rte_ring_get_memsize(n_drb); /* size of the drbs ring */ rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE); + bsz = tle_drb_calc_size(nb); /* size of the drb. */ + sz_drb_r = rsz + bsz * k; /* total stream drbs size. */ + sz_drb_r = RTE_ALIGN_CEIL(sz_drb_r, RTE_CACHE_LINE_SIZE); - /* size of the drb. */ - bsz = tle_drb_calc_size(nb); - - /* total stream drbs size. */ - sz = rsz + bsz * k; - - s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, - ctx->prm.socket_id); - if (s->tx.drb.r == NULL) { - TCP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + sz = sz_s + sz_rxq + sz_ofo + sz_txq + sz_drb_r; + s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s == NULL) { + TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d " "failed with error code: %d\n", - __func__, s, sz, ctx->prm.socket_id, rte_errno); + __func__, sz, ctx->prm.socket_id, rte_errno); return -ENOMEM; } - snprintf(name, sizeof(name), "%p@%zu", s, sz); - rte_ring_init(s->tx.drb.r, name, n, f); + s->rx.q = (struct rte_ring *)((uintptr_t)s + sz_s); + s->rx.ofo = (struct ofo *)((uintptr_t)s->rx.q + sz_rxq); + ofo = s->rx.ofo; + s->tx.q = (struct rte_ring *)((uintptr_t)s->rx.ofo + sz_ofo); + s->tx.drb.r = (struct rte_ring *)((uintptr_t)s->tx.q + sz_txq); + // ring flags + f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 : + (RING_F_SP_ENQ | RING_F_SC_DEQ); + + /* init RX part. */ + snprintf(name, sizeof(name), "%p@%zu", s->rx.q, sz_rxq); + rte_ring_init(s->rx.q, name, n_rxq, f); + + obj = (struct rte_mbuf **)&ofo->db[ndb]; + for (i = 0; i != ndb; i++) { + ofo->db[i].nb_max = nobj; + ofo->db[i].obj = obj + i * nobj; + } + ofo->nb_max = ndb; + + /* init TX part. */ + snprintf(name, sizeof(name), "%p@%zu", s->tx.q, sz_txq); + rte_ring_init(s->tx.q, name, n_txq, f); + + snprintf(name, sizeof(name), "%p@%zu", s->tx.drb.r, sz_drb_r); + rte_ring_init(s->tx.drb.r, name, n_drb, f); for (i = 0; i != k; i++) { drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r + rsz + bsz * i); @@ -200,7 +244,7 @@ tcp_init_streams(struct tle_ctx *ctx) f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 : (RING_F_SP_ENQ | RING_F_SC_DEQ); - sz = sizeof(*ts) + sizeof(ts->s[0]) * ctx->prm.max_streams; + sz = sizeof(*ts); ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, ctx->prm.socket_id); if (ts == NULL) { @@ -210,6 +254,7 @@ tcp_init_streams(struct tle_ctx *ctx) return -ENOMEM; } + rte_spinlock_init(&ts->dr.lock); STAILQ_INIT(&ts->dr.fe); STAILQ_INIT(&ts->dr.be); @@ -228,12 +273,11 @@ tcp_init_streams(struct tle_ctx *ctx) if (ts->tsq == NULL) rc = -ENOMEM; else - rc = stbl_init(&ts->st, ctx->prm.max_streams, - ctx->prm.socket_id); + rc = stbl_init(&ts->st, (ctx->prm.flags & TLE_CTX_FLAG_ST) == 0); } - for (i = 0; rc == 0 && i != ctx->prm.max_streams; i++) - rc = init_stream(ctx, &ts->s[i]); + for (i = 0; rc == 0 && i != ctx->prm.min_streams; i++) + rc = add_stream(ctx); if (rc != 0) { TCP_LOG(ERR, "initalisation of %u-th stream failed", i); @@ -243,11 +287,30 @@ tcp_init_streams(struct tle_ctx *ctx) return rc; } -static void __attribute__((constructor)) +/* + * Note this function is not thread-safe, and we did not lock here as we + * have the assumption that this ctx is dedicated to one thread. + */ +static uint32_t +tcp_more_streams(struct tle_ctx *ctx) +{ + uint32_t i, nb; + uint32_t nb_max = ctx->prm.max_streams - 1; + uint32_t nb_cur = ctx->streams.nb_cur; + + nb = RTE_MIN(ctx->prm.delta_streams, nb_max - nb_cur); + for (i = 0; i < nb; i++) + if (add_stream(ctx) != 0) + break; + return i; +} + +static void __attribute__((constructor(101))) tcp_stream_setup(void) { static const struct stream_ops tcp_ops = { .init_streams = tcp_init_streams, + .more_streams = tcp_more_streams, .fini_streams = tcp_fini_streams, .free_drbs = tcp_free_drbs, }; @@ -305,16 +368,12 @@ tle_tcp_stream_open(struct tle_ctx *ctx, s = (struct tle_tcp_stream *)get_stream(ctx); if (s == NULL) { - rte_errno = ENFILE; - return NULL; - - /* some TX still pending for that stream. */ - } else if (TCP_STREAM_TX_PENDING(s)) { - put_stream(ctx, &s->s, 0); rte_errno = EAGAIN; return NULL; } + s->s.option.raw = prm->option; + /* setup L4 ports and L3 addresses fields. */ rc = stream_fill_ctx(ctx, &s->s, (const struct sockaddr *)&prm->addr.local, @@ -336,12 +395,14 @@ tle_tcp_stream_open(struct tle_ctx *ctx, /* store other params */ s->flags = ctx->prm.flags; + s->tcb.err = 0; s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries : TLE_TCP_DEFAULT_RETRIES; s->tcb.snd.cwnd = (ctx->prm.icw == 0) ? TCP_INITIAL_CWND_MAX : ctx->prm.icw; s->tcb.snd.rto_tw = (ctx->prm.timewait == TLE_TCP_TIMEWAIT_DEFAULT) ? TCP_RTO_2MSL : ctx->prm.timewait; + s->tcb.snd.rto_fw = TLE_TCP_FINWAIT_TIMEOUT; tcp_stream_up(s); return &s->s; @@ -354,9 +415,16 @@ static inline int stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s) { uint16_t uop; - uint32_t state; static const struct tle_stream_cb zcb; + /* Put uop operation into this wlock; or it may cause this stream + * to be put into death ring twice, for example: + * 1) FE sets OP_CLOSE; + * 2) BE stream_term sets state as TCP_ST_CLOSED, and put in queue; + * 3) FE down the stream, and calls stream_term again. + */ + tcp_stream_down(s); + /* check was close() already invoked */ uop = s->tcb.uop; if ((uop & TCP_OP_CLOSE) != 0) @@ -366,47 +434,66 @@ stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s) if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0) return -EDEADLK; - /* mark stream as unavaialbe for RX/TX. */ - tcp_stream_down(s); - /* reset events/callbacks */ - s->rx.ev = NULL; s->tx.ev = NULL; + s->rx.ev = NULL; s->err.ev = NULL; s->rx.cb = zcb; s->tx.cb = zcb; s->err.cb = zcb; - state = s->tcb.state; - - /* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */ - if (state <= TCP_ST_SYN_SENT) { + switch (s->tcb.state) { + case TCP_ST_LISTEN: + /* close the stream straightway */ tcp_stream_reset(ctx, s); return 0; - } - - /* generate FIN and proceed with normal connection termination */ - if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) { - - /* change state */ - s->tcb.state = (state == TCP_ST_ESTABLISHED) ? - TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK; - - /* mark stream as writable/readable again */ + case TCP_ST_CLOSED: + /* it could be put into this state if a RST packet is + * received, but this stream could be still in tsq trying + * to send something. + */ + /* fallthrough */ + case TCP_ST_SYN_SENT: + /* timer on and could be in tsq (SYN retrans) */ + stream_term(s); + /* fallthrough */ + case TCP_ST_FIN_WAIT_1: + /* fallthrough */ + case TCP_ST_CLOSING: + /* fallthrough */ + case TCP_ST_TIME_WAIT: + /* fallthrough */ + case TCP_ST_LAST_ACK: tcp_stream_up(s); - - /* queue stream into to-send queue */ - txs_enqueue(ctx, s); return 0; + case TCP_ST_ESTABLISHED: + /* fallthrough */ + case TCP_ST_CLOSE_WAIT: + if (s->tcb.state == TCP_ST_ESTABLISHED) { + s->tcb.state = TCP_ST_FIN_WAIT_1; + TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB); + } else + s->tcb.state = TCP_ST_LAST_ACK; + + if (!rte_ring_empty(s->rx.q)) { + TCP_INC_STATS(TCP_MIB_ESTABRESETS); + s->tcb.uop |= TCP_OP_RESET; + stream_term(s); + } + break; + case TCP_ST_FIN_WAIT_2: + /* Can reach this state if shutdown was called, but the timer + * shall be set after this close. + */ + break; + default: + rte_panic("Invalid state when close: %d\n", s->tcb.state); } - /* - * accroding to the state, close() was already invoked, - * should never that point. - */ - RTE_ASSERT(0); - return -EINVAL; + tcp_stream_up(s); + txs_enqueue(ctx, s); + return 0; } uint32_t @@ -452,6 +539,64 @@ tle_tcp_stream_close(struct tle_stream *ts) return stream_close(ctx, s); } +int +tle_tcp_stream_shutdown(struct tle_stream *ts, int how) +{ + int ret; + bool wakeup; + uint32_t state; + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + if (ts == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + /* Refer to linux/net/ipv4/tcp.c:tcp_shutdown() */ + if (how == SHUT_RD) + return 0; + + tcp_stream_down(s); + + state = s->tcb.state; + + switch (state) { + case TCP_ST_LISTEN: + /* fallthrough */ + case TCP_ST_SYN_SENT: + s->tcb.state = TCP_ST_CLOSED; + wakeup = true; + ret = 0; + break; + case TCP_ST_ESTABLISHED: + /* fallthrough */ + case TCP_ST_CLOSE_WAIT: + if (state == TCP_ST_ESTABLISHED) { + TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB); + s->tcb.state = TCP_ST_FIN_WAIT_1; + } else + s->tcb.state = TCP_ST_LAST_ACK; + txs_enqueue(ts->ctx, s); + wakeup = true; + ret = 0; + break; + default: + wakeup = false; + rte_errno = ENOTCONN; + ret = -1; + } + + if (wakeup) { + /* Notify other threads which may wait on the event */ + if (s->tx.ev) + tle_event_raise(s->tx.ev); + if (how == SHUT_RDWR && s->err.ev) + tle_event_raise(s->err.ev); + } + + tcp_stream_up(s); + return ret; +} + int tle_tcp_stream_get_addr(const struct tle_stream *ts, struct tle_tcp_stream_addr *addr) @@ -617,3 +762,73 @@ tle_tcp_stream_get_mss(const struct tle_stream * ts) s = TCP_STREAM(ts); return s->tcb.snd.mss; } + +int +tle_tcp_stream_get_info(const struct tle_stream * ts, void *info, socklen_t *optlen) +{ + struct tle_tcp_stream *s; + struct tcp_info i; + + if (ts == NULL) + return -EINVAL; + + s = TCP_STREAM(ts); + + memset(&i, 0, sizeof(struct tcp_info)); + + /* transform from tldk state into linux kernel state */ + switch (s->tcb.state) { + case TCP_ST_CLOSED: + i.tcpi_state = TCP_CLOSE; + break; + case TCP_ST_LISTEN: + i.tcpi_state = TCP_LISTEN; + break; + case TCP_ST_SYN_SENT: + i.tcpi_state = TCP_SYN_SENT; + break; + case TCP_ST_SYN_RCVD: + i.tcpi_state = TCP_SYN_RECV; + break; + case TCP_ST_ESTABLISHED: + i.tcpi_state = TCP_ESTABLISHED; + break; + case TCP_ST_FIN_WAIT_1: + i.tcpi_state = TCP_FIN_WAIT1; + break; + case TCP_ST_FIN_WAIT_2: + i.tcpi_state = TCP_FIN_WAIT2; + break; + case TCP_ST_CLOSE_WAIT: + i.tcpi_state = TCP_CLOSE_WAIT; + break; + case TCP_ST_CLOSING: + i.tcpi_state = TCP_CLOSING; + break; + case TCP_ST_LAST_ACK: + i.tcpi_state = TCP_LAST_ACK; + break; + case TCP_ST_TIME_WAIT: + i.tcpi_state = TCP_TIME_WAIT; + break; + } + + /* fix me, total retrans? */ + i.tcpi_total_retrans = s->tcb.snd.nb_retx; + + if (*optlen > sizeof(struct tcp_info)) + *optlen = sizeof(struct tcp_info); + rte_memcpy(info, &i, *optlen); + return 0; +} + +void +tle_tcp_stream_set_keepalive(struct tle_stream *ts) +{ + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + + s->tcb.uop |= TCP_OP_KEEPALIVE; + txs_enqueue(ts->ctx, s); +} -- cgit 1.2.3-korg