aboutsummaryrefslogtreecommitdiffstats
path: root/lib/libtle_l4p/tcp_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libtle_l4p/tcp_stream.c')
-rw-r--r--lib/libtle_l4p/tcp_stream.c395
1 files changed, 305 insertions, 90 deletions
diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c
index 676521b..4a65053 100644
--- a/lib/libtle_l4p/tcp_stream.c
+++ b/lib/libtle_l4p/tcp_stream.c
@@ -20,6 +20,8 @@
#include <rte_ip.h>
#include <rte_tcp.h>
+#include <netinet/tcp.h>
+
#include "tcp_stream.h"
#include "tcp_timer.h"
#include "stream_table.h"
@@ -27,6 +29,7 @@
#include "tcp_ctl.h"
#include "tcp_ofo.h"
#include "tcp_txq.h"
+#include "tcp_rxtx.h"
static void
unuse_stream(struct tle_tcp_stream *s)
@@ -38,25 +41,27 @@ unuse_stream(struct tle_tcp_stream *s)
static void
fini_stream(struct tle_tcp_stream *s)
{
- if (s != NULL) {
- rte_free(s->rx.q);
- tcp_ofo_free(s->rx.ofo);
- rte_free(s->tx.q);
- rte_free(s->tx.drb.r);
- }
+ rte_free(s);
}
static void
tcp_fini_streams(struct tle_ctx *ctx)
{
- uint32_t i;
struct tcp_streams *ts;
+ struct tle_stream *s;
ts = CTX_TCP_STREAMS(ctx);
if (ts != NULL) {
stbl_fini(&ts->st);
- for (i = 0; i != ctx->prm.max_streams; i++)
- fini_stream(&ts->s[i]);
+
+ /* TODO: free those in use? may be not necessary, as we assume
+ * all streams have been closed and are free.
+ */
+ while (ctx->streams.nb_free--) {
+ s = STAILQ_FIRST(&ctx->streams.free);
+ STAILQ_FIRST(&ctx->streams.free) = STAILQ_NEXT(s, link);
+ fini_stream(TCP_STREAM(s));
+ }
/* free the timer wheel */
tle_timer_free(ts->tmr);
@@ -94,61 +99,100 @@ alloc_ring(uint32_t n, uint32_t flags, int32_t socket)
return r;
}
+/* stream memory layout:
+ * [tle_tcp_stream] [rx.q] [rx.ofo] [tx.q] [tx.drb.r]
+ */
static int
-init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s)
+add_stream(struct tle_ctx *ctx)
{
- size_t bsz, rsz, sz;
- uint32_t f, i, k, n, nb;
+ size_t sz_s, sz_rxq, sz_ofo, sz_txq, sz_drb_r, sz;
+ /* for rx.q */
+ uint32_t n_rxq;
+ /* for rx.ofo */
+ struct ofo *ofo;
+ struct rte_mbuf **obj;
+ uint32_t ndb, nobj;
+ size_t dsz, osz;
+ /* for tx.q */
+ uint32_t n_txq;
+ /* for tx.drb.r */
+ size_t bsz, rsz;
struct tle_drb *drb;
- char name[RTE_RING_NAMESIZE];
-
- f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
- (RING_F_SP_ENQ | RING_F_SC_DEQ);
-
- /* init RX part. */
-
- n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
- s->rx.q = alloc_ring(n, f | RING_F_SP_ENQ, ctx->prm.socket_id);
- if (s->rx.q == NULL)
- return -ENOMEM;
-
- s->rx.ofo = tcp_ofo_alloc(n, ctx->prm.socket_id);
- if (s->rx.ofo == NULL)
- return -ENOMEM;
-
- /* init TX part. */
+ uint32_t k, nb, n_drb;
- n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
- s->tx.q = alloc_ring(n, f | RING_F_SC_DEQ, ctx->prm.socket_id);
- if (s->tx.q == NULL)
- return -ENOMEM;
+ uint32_t f, i;
+ char name[RTE_RING_NAMESIZE];
+ struct tle_tcp_stream *s;
+ // stream
+ sz_s = RTE_ALIGN_CEIL(sizeof(*s), RTE_CACHE_LINE_SIZE);
+
+ // rx.q
+ n_rxq = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
+ n_rxq = rte_align32pow2(n_rxq);
+ sz_rxq = rte_ring_get_memsize(n_rxq);
+ sz_rxq = RTE_ALIGN_CEIL(sz_rxq, RTE_CACHE_LINE_SIZE);
+
+ // rx.ofo
+ calc_ofo_elems(n_rxq, &nobj, &ndb);
+ osz = sizeof(*ofo) + sizeof(ofo->db[0]) * ndb;
+ dsz = sizeof(ofo->db[0].obj[0]) * nobj * ndb;
+ sz_ofo = osz + dsz;
+ sz_ofo = RTE_ALIGN_CEIL(sz_ofo, RTE_CACHE_LINE_SIZE);
+
+ // tx.q
+ n_txq = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
+ n_txq = rte_align32pow2(n_txq);
+ sz_txq = rte_ring_get_memsize(n_txq);
+ sz_txq = RTE_ALIGN_CEIL(sz_txq, RTE_CACHE_LINE_SIZE);
+
+ // tx.drb.r
nb = drb_nb_elem(ctx);
k = calc_stream_drb_num(ctx, nb);
- n = rte_align32pow2(k);
-
- /* size of the drbs ring */
- rsz = rte_ring_get_memsize(n);
+ n_drb = rte_align32pow2(k);
+ rsz = rte_ring_get_memsize(n_drb); /* size of the drbs ring */
rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
+ bsz = tle_drb_calc_size(nb); /* size of the drb. */
+ sz_drb_r = rsz + bsz * k; /* total stream drbs size. */
+ sz_drb_r = RTE_ALIGN_CEIL(sz_drb_r, RTE_CACHE_LINE_SIZE);
- /* size of the drb. */
- bsz = tle_drb_calc_size(nb);
-
- /* total stream drbs size. */
- sz = rsz + bsz * k;
-
- s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
- ctx->prm.socket_id);
- if (s->tx.drb.r == NULL) {
- TCP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
+ sz = sz_s + sz_rxq + sz_ofo + sz_txq + sz_drb_r;
+ s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+ ctx->prm.socket_id);
+ if (s == NULL) {
+ TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
"failed with error code: %d\n",
- __func__, s, sz, ctx->prm.socket_id, rte_errno);
+ __func__, sz, ctx->prm.socket_id, rte_errno);
return -ENOMEM;
}
- snprintf(name, sizeof(name), "%p@%zu", s, sz);
- rte_ring_init(s->tx.drb.r, name, n, f);
+ s->rx.q = (struct rte_ring *)((uintptr_t)s + sz_s);
+ s->rx.ofo = (struct ofo *)((uintptr_t)s->rx.q + sz_rxq);
+ ofo = s->rx.ofo;
+ s->tx.q = (struct rte_ring *)((uintptr_t)s->rx.ofo + sz_ofo);
+ s->tx.drb.r = (struct rte_ring *)((uintptr_t)s->tx.q + sz_txq);
+ // ring flags
+ f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
+ (RING_F_SP_ENQ | RING_F_SC_DEQ);
+
+ /* init RX part. */
+ snprintf(name, sizeof(name), "%p@%zu", s->rx.q, sz_rxq);
+ rte_ring_init(s->rx.q, name, n_rxq, f);
+
+ obj = (struct rte_mbuf **)&ofo->db[ndb];
+ for (i = 0; i != ndb; i++) {
+ ofo->db[i].nb_max = nobj;
+ ofo->db[i].obj = obj + i * nobj;
+ }
+ ofo->nb_max = ndb;
+
+ /* init TX part. */
+ snprintf(name, sizeof(name), "%p@%zu", s->tx.q, sz_txq);
+ rte_ring_init(s->tx.q, name, n_txq, f);
+
+ snprintf(name, sizeof(name), "%p@%zu", s->tx.drb.r, sz_drb_r);
+ rte_ring_init(s->tx.drb.r, name, n_drb, f);
for (i = 0; i != k; i++) {
drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
rsz + bsz * i);
@@ -200,7 +244,7 @@ tcp_init_streams(struct tle_ctx *ctx)
f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
(RING_F_SP_ENQ | RING_F_SC_DEQ);
- sz = sizeof(*ts) + sizeof(ts->s[0]) * ctx->prm.max_streams;
+ sz = sizeof(*ts);
ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
ctx->prm.socket_id);
if (ts == NULL) {
@@ -210,6 +254,7 @@ tcp_init_streams(struct tle_ctx *ctx)
return -ENOMEM;
}
+ rte_spinlock_init(&ts->dr.lock);
STAILQ_INIT(&ts->dr.fe);
STAILQ_INIT(&ts->dr.be);
@@ -228,12 +273,11 @@ tcp_init_streams(struct tle_ctx *ctx)
if (ts->tsq == NULL)
rc = -ENOMEM;
else
- rc = stbl_init(&ts->st, ctx->prm.max_streams,
- ctx->prm.socket_id);
+ rc = stbl_init(&ts->st, (ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
}
- for (i = 0; rc == 0 && i != ctx->prm.max_streams; i++)
- rc = init_stream(ctx, &ts->s[i]);
+ for (i = 0; rc == 0 && i != ctx->prm.min_streams; i++)
+ rc = add_stream(ctx);
if (rc != 0) {
TCP_LOG(ERR, "initalisation of %u-th stream failed", i);
@@ -243,11 +287,30 @@ tcp_init_streams(struct tle_ctx *ctx)
return rc;
}
-static void __attribute__((constructor))
+/*
+ * Note this function is not thread-safe, and we did not lock here as we
+ * have the assumption that this ctx is dedicated to one thread.
+ */
+static uint32_t
+tcp_more_streams(struct tle_ctx *ctx)
+{
+ uint32_t i, nb;
+ uint32_t nb_max = ctx->prm.max_streams - 1;
+ uint32_t nb_cur = ctx->streams.nb_cur;
+
+ nb = RTE_MIN(ctx->prm.delta_streams, nb_max - nb_cur);
+ for (i = 0; i < nb; i++)
+ if (add_stream(ctx) != 0)
+ break;
+ return i;
+}
+
+static void __attribute__((constructor(101)))
tcp_stream_setup(void)
{
static const struct stream_ops tcp_ops = {
.init_streams = tcp_init_streams,
+ .more_streams = tcp_more_streams,
.fini_streams = tcp_fini_streams,
.free_drbs = tcp_free_drbs,
};
@@ -305,16 +368,12 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
s = (struct tle_tcp_stream *)get_stream(ctx);
if (s == NULL) {
- rte_errno = ENFILE;
- return NULL;
-
- /* some TX still pending for that stream. */
- } else if (TCP_STREAM_TX_PENDING(s)) {
- put_stream(ctx, &s->s, 0);
rte_errno = EAGAIN;
return NULL;
}
+ s->s.option.raw = prm->option;
+
/* setup L4 ports and L3 addresses fields. */
rc = stream_fill_ctx(ctx, &s->s,
(const struct sockaddr *)&prm->addr.local,
@@ -336,12 +395,14 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
/* store other params */
s->flags = ctx->prm.flags;
+ s->tcb.err = 0;
s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
TLE_TCP_DEFAULT_RETRIES;
s->tcb.snd.cwnd = (ctx->prm.icw == 0) ? TCP_INITIAL_CWND_MAX :
ctx->prm.icw;
s->tcb.snd.rto_tw = (ctx->prm.timewait == TLE_TCP_TIMEWAIT_DEFAULT) ?
TCP_RTO_2MSL : ctx->prm.timewait;
+ s->tcb.snd.rto_fw = TLE_TCP_FINWAIT_TIMEOUT;
tcp_stream_up(s);
return &s->s;
@@ -354,9 +415,16 @@ static inline int
stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
{
uint16_t uop;
- uint32_t state;
static const struct tle_stream_cb zcb;
+ /* Put uop operation into this wlock; or it may cause this stream
+ * to be put into death ring twice, for example:
+ * 1) FE sets OP_CLOSE;
+ * 2) BE stream_term sets state as TCP_ST_CLOSED, and put in queue;
+ * 3) FE down the stream, and calls stream_term again.
+ */
+ tcp_stream_down(s);
+
/* check was close() already invoked */
uop = s->tcb.uop;
if ((uop & TCP_OP_CLOSE) != 0)
@@ -366,47 +434,66 @@ stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0)
return -EDEADLK;
- /* mark stream as unavaialbe for RX/TX. */
- tcp_stream_down(s);
-
/* reset events/callbacks */
- s->rx.ev = NULL;
s->tx.ev = NULL;
+ s->rx.ev = NULL;
s->err.ev = NULL;
s->rx.cb = zcb;
s->tx.cb = zcb;
s->err.cb = zcb;
- state = s->tcb.state;
-
- /* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */
- if (state <= TCP_ST_SYN_SENT) {
+ switch (s->tcb.state) {
+ case TCP_ST_LISTEN:
+ /* close the stream straightway */
tcp_stream_reset(ctx, s);
return 0;
- }
-
- /* generate FIN and proceed with normal connection termination */
- if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) {
-
- /* change state */
- s->tcb.state = (state == TCP_ST_ESTABLISHED) ?
- TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK;
-
- /* mark stream as writable/readable again */
+ case TCP_ST_CLOSED:
+ /* it could be put into this state if a RST packet is
+ * received, but this stream could be still in tsq trying
+ * to send something.
+ */
+ /* fallthrough */
+ case TCP_ST_SYN_SENT:
+ /* timer on and could be in tsq (SYN retrans) */
+ stream_term(s);
+ /* fallthrough */
+ case TCP_ST_FIN_WAIT_1:
+ /* fallthrough */
+ case TCP_ST_CLOSING:
+ /* fallthrough */
+ case TCP_ST_TIME_WAIT:
+ /* fallthrough */
+ case TCP_ST_LAST_ACK:
tcp_stream_up(s);
-
- /* queue stream into to-send queue */
- txs_enqueue(ctx, s);
return 0;
+ case TCP_ST_ESTABLISHED:
+ /* fallthrough */
+ case TCP_ST_CLOSE_WAIT:
+ if (s->tcb.state == TCP_ST_ESTABLISHED) {
+ s->tcb.state = TCP_ST_FIN_WAIT_1;
+ TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB);
+ } else
+ s->tcb.state = TCP_ST_LAST_ACK;
+
+ if (!rte_ring_empty(s->rx.q)) {
+ TCP_INC_STATS(TCP_MIB_ESTABRESETS);
+ s->tcb.uop |= TCP_OP_RESET;
+ stream_term(s);
+ }
+ break;
+ case TCP_ST_FIN_WAIT_2:
+ /* Can reach this state if shutdown was called, but the timer
+ * shall be set after this close.
+ */
+ break;
+ default:
+ rte_panic("Invalid state when close: %d\n", s->tcb.state);
}
- /*
- * accroding to the state, close() was already invoked,
- * should never that point.
- */
- RTE_ASSERT(0);
- return -EINVAL;
+ tcp_stream_up(s);
+ txs_enqueue(ctx, s);
+ return 0;
}
uint32_t
@@ -453,6 +540,64 @@ tle_tcp_stream_close(struct tle_stream *ts)
}
int
+tle_tcp_stream_shutdown(struct tle_stream *ts, int how)
+{
+ int ret;
+ bool wakeup;
+ uint32_t state;
+ struct tle_tcp_stream *s;
+
+ s = TCP_STREAM(ts);
+ if (ts == NULL || s->s.type >= TLE_VNUM)
+ return -EINVAL;
+
+ /* Refer to linux/net/ipv4/tcp.c:tcp_shutdown() */
+ if (how == SHUT_RD)
+ return 0;
+
+ tcp_stream_down(s);
+
+ state = s->tcb.state;
+
+ switch (state) {
+ case TCP_ST_LISTEN:
+ /* fallthrough */
+ case TCP_ST_SYN_SENT:
+ s->tcb.state = TCP_ST_CLOSED;
+ wakeup = true;
+ ret = 0;
+ break;
+ case TCP_ST_ESTABLISHED:
+ /* fallthrough */
+ case TCP_ST_CLOSE_WAIT:
+ if (state == TCP_ST_ESTABLISHED) {
+ TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB);
+ s->tcb.state = TCP_ST_FIN_WAIT_1;
+ } else
+ s->tcb.state = TCP_ST_LAST_ACK;
+ txs_enqueue(ts->ctx, s);
+ wakeup = true;
+ ret = 0;
+ break;
+ default:
+ wakeup = false;
+ rte_errno = ENOTCONN;
+ ret = -1;
+ }
+
+ if (wakeup) {
+ /* Notify other threads which may wait on the event */
+ if (s->tx.ev)
+ tle_event_raise(s->tx.ev);
+ if (how == SHUT_RDWR && s->err.ev)
+ tle_event_raise(s->err.ev);
+ }
+
+ tcp_stream_up(s);
+ return ret;
+}
+
+int
tle_tcp_stream_get_addr(const struct tle_stream *ts,
struct tle_tcp_stream_addr *addr)
{
@@ -617,3 +762,73 @@ tle_tcp_stream_get_mss(const struct tle_stream * ts)
s = TCP_STREAM(ts);
return s->tcb.snd.mss;
}
+
+int
+tle_tcp_stream_get_info(const struct tle_stream * ts, void *info, socklen_t *optlen)
+{
+ struct tle_tcp_stream *s;
+ struct tcp_info i;
+
+ if (ts == NULL)
+ return -EINVAL;
+
+ s = TCP_STREAM(ts);
+
+ memset(&i, 0, sizeof(struct tcp_info));
+
+ /* transform from tldk state into linux kernel state */
+ switch (s->tcb.state) {
+ case TCP_ST_CLOSED:
+ i.tcpi_state = TCP_CLOSE;
+ break;
+ case TCP_ST_LISTEN:
+ i.tcpi_state = TCP_LISTEN;
+ break;
+ case TCP_ST_SYN_SENT:
+ i.tcpi_state = TCP_SYN_SENT;
+ break;
+ case TCP_ST_SYN_RCVD:
+ i.tcpi_state = TCP_SYN_RECV;
+ break;
+ case TCP_ST_ESTABLISHED:
+ i.tcpi_state = TCP_ESTABLISHED;
+ break;
+ case TCP_ST_FIN_WAIT_1:
+ i.tcpi_state = TCP_FIN_WAIT1;
+ break;
+ case TCP_ST_FIN_WAIT_2:
+ i.tcpi_state = TCP_FIN_WAIT2;
+ break;
+ case TCP_ST_CLOSE_WAIT:
+ i.tcpi_state = TCP_CLOSE_WAIT;
+ break;
+ case TCP_ST_CLOSING:
+ i.tcpi_state = TCP_CLOSING;
+ break;
+ case TCP_ST_LAST_ACK:
+ i.tcpi_state = TCP_LAST_ACK;
+ break;
+ case TCP_ST_TIME_WAIT:
+ i.tcpi_state = TCP_TIME_WAIT;
+ break;
+ }
+
+ /* fix me, total retrans? */
+ i.tcpi_total_retrans = s->tcb.snd.nb_retx;
+
+ if (*optlen > sizeof(struct tcp_info))
+ *optlen = sizeof(struct tcp_info);
+ rte_memcpy(info, &i, *optlen);
+ return 0;
+}
+
+void
+tle_tcp_stream_set_keepalive(struct tle_stream *ts)
+{
+ struct tle_tcp_stream *s;
+
+ s = TCP_STREAM(ts);
+
+ s->tcb.uop |= TCP_OP_KEEPALIVE;
+ txs_enqueue(ts->ctx, s);
+}