aboutsummaryrefslogtreecommitdiffstats
path: root/lib/libtle_l4p/tcp_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libtle_l4p/tcp_stream.c')
-rw-r--r--lib/libtle_l4p/tcp_stream.c199
1 files changed, 142 insertions, 57 deletions
diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c
index c1a007a..f41ff3c 100644
--- a/lib/libtle_l4p/tcp_stream.c
+++ b/lib/libtle_l4p/tcp_stream.c
@@ -381,6 +381,10 @@ tcp_stream_fill_cfg(struct tle_tcp_stream *s, const struct tle_ctx_param *cprm,
cprm->icw;
s->tcb.snd.rto_tw = (cprm->timewait == TLE_TCP_TIMEWAIT_DEFAULT) ?
TCP_RTO_2MSL : cprm->timewait;
+
+ s->ts_offset = 0;
+
+ s->s.udata = scfg->udata;
}
static int
@@ -501,65 +505,44 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
}
/*
- * Helper functions, used by close API.
+ * Helper function, used by close()/shutdown API
+ * Check stream state, if FIN was not generatedi yet, then
+ * change stream state and queue it for TX.
*/
static inline int
-stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
+stream_finalize(struct tle_ctx *ctx, struct tle_tcp_stream *s, uint32_t state)
{
- uint16_t uop;
- uint32_t state;
- static const struct tle_stream_cb zcb;
-
- /* check was close() already invoked */
- uop = s->tcb.uop;
- if ((uop & TCP_OP_CLOSE) != 0)
- return -EDEADLK;
-
- /* record that close() was already invoked */
- if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0)
- return -EDEADLK;
-
- /* mark stream as unavaialbe for RX/TX. */
- tcp_stream_down(s);
+ if (state != TLE_TCP_ST_ESTABLISHED && state != TLE_TCP_ST_CLOSE_WAIT)
+ return -EINVAL;
- /* reset events/callbacks */
- s->rx.ev = NULL;
- s->tx.ev = NULL;
- s->err.ev = NULL;
+ /* change state */
+ s->tcb.state = (state == TLE_TCP_ST_ESTABLISHED) ?
+ TLE_TCP_ST_FIN_WAIT_1 : TLE_TCP_ST_LAST_ACK;
- s->rx.cb = zcb;
- s->tx.cb = zcb;
- s->err.cb = zcb;
+ /* queue stream into to-send queue */
+ txs_enqueue(ctx, s);
+ return 0;
+}
- state = s->tcb.state;
+/*
+ * Helper function, used by close API.
+ */
+static inline int
+stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
+{
+ int32_t rc;
- /* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */
- if (state <= TCP_ST_SYN_SENT) {
- tcp_stream_reset(ctx, s);
- return 0;
- }
+ rc = stream_close_prolog(ctx, s, TLE_TCP_OP_CLOSE);
+ if (rc <= 0)
+ return rc;
/* generate FIN and proceed with normal connection termination */
- if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) {
-
- /* change state */
- s->tcb.state = (state == TCP_ST_ESTABLISHED) ?
- TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK;
+ stream_finalize(ctx, s, rc);
- /* mark stream as writable/readable again */
- tcp_stream_up(s);
-
- /* queue stream into to-send queue */
- txs_enqueue(ctx, s);
- return 0;
- }
+ /* mark stream as writable/readable again */
+ tcp_stream_up(s);
- /*
- * accroding to the state, close() was already invoked,
- * should never that point.
- */
- RTE_ASSERT(0);
- return -EINVAL;
+ return 0;
}
uint32_t
@@ -608,6 +591,89 @@ tle_tcp_stream_close(struct tle_stream *ts)
}
int
+tle_tcp_stream_abort(struct tle_stream *ts)
+{
+ int32_t rc;
+ struct tle_ctx *ctx;
+ struct tle_tcp_stream *s;
+
+ s = TCP_STREAM(ts);
+ if (ts == NULL || s->s.type >= TLE_VNUM)
+ return -EINVAL;
+
+ ctx = s->s.ctx;
+ rc = stream_close_prolog(ctx, s, TLE_TCP_OP_CLOSE_ABORT);
+ if (rc > 0) {
+
+ /*
+ * RFC 793, On ABORT call, for states:
+ * SYN-RECEIVED STATE
+ * ESTABLISHED STATE
+ * FIN-WAIT-1 STATE
+ * FIN-WAIT-2 STATE
+ * CLOSE-WAIT STATE
+ * Send a reset segment: <SEQ=SND.NXT><CTL=RST>
+ * ...; all segments queued for transmission (except for the
+ * RST formed above) or retransmission should be flushed,
+ * delete the TCB, enter CLOSED state, and return.
+ */
+
+ if (rc >= TLE_TCP_ST_ESTABLISHED && rc <= TLE_TCP_ST_CLOSE_WAIT)
+ s->tcb.snd.close_flags |= TCP_FLAG_RST;
+
+ /*
+ * set state to CLOSED, mark stream as writable/readable again
+ * and enqueue stream into to-send queue.
+ * That will cause later RST generation and stream termination.
+ */
+ s->tcb.state = TLE_TCP_ST_CLOSED;
+ tcp_stream_up(s);
+ txs_enqueue(ctx, s);
+ rc = 0;
+ }
+
+ tle_memtank_shrink(CTX_TCP_MTS(ctx));
+ return rc;
+}
+
+int
+tle_tcp_stream_shutdown(struct tle_stream *ts)
+{
+ int32_t rc;
+ uint16_t uop;
+ struct tle_ctx *ctx;
+ struct tle_tcp_stream *s;
+
+ const uint16_t nop = TLE_TCP_OP_SHUTDOWN;
+
+ s = TCP_STREAM(ts);
+ if (ts == NULL || s->s.type >= TLE_VNUM)
+ return -EINVAL;
+
+ ctx = s->s.ctx;
+
+ /* check was shutdown() or close() already invoked */
+ uop = s->tcb.uop;
+ if ((uop & (TLE_TCP_OP_CLOSE | nop)) != 0)
+ return -EDEADLK;
+
+ /* record that shutdown was invoked */
+ if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | nop) == 0)
+ return -EDEADLK;
+
+ /* mark stream as unavaialbe for RX/TX. */
+ tcp_stream_down(s);
+
+ /* change state, generate FIN */
+ rc = stream_finalize(ctx, s, s->tcb.state);
+
+ /* mark stream as writable/readable again */
+ tcp_stream_up(s);
+
+ return rc;
+}
+
+int
tle_tcp_stream_get_addr(const struct tle_stream *ts,
struct tle_tcp_stream_addr *addr)
{
@@ -664,15 +730,15 @@ tle_tcp_stream_listen(struct tle_stream *ts)
/* app may listen for multiple times to change backlog,
* we will just return success for such cases.
*/
- if (s->tcb.state == TCP_ST_LISTEN)
+ if (s->tcb.state == TLE_TCP_ST_LISTEN)
return 0;
/* mark stream as not closable. */
if (tcp_stream_try_acquire(s) > 0) {
- rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
- TCP_ST_LISTEN);
+ rc = rte_atomic16_cmpset(&s->tcb.state, TLE_TCP_ST_CLOSED,
+ TLE_TCP_ST_LISTEN);
if (rc != 0) {
- s->tcb.uop |= TCP_OP_LISTEN;
+ s->tcb.uop |= TLE_TCP_OP_LISTEN;
s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT);
rc = 0;
} else
@@ -694,7 +760,8 @@ stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
s = TCP_STREAM(ts);
- if (tcp_stream_try_acquire(s) < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
+ if (tcp_stream_try_acquire(s) < 0 ||
+ (s->tcb.uop & TLE_TCP_OP_CLOSE) != 0) {
tcp_stream_release(s);
return -EINVAL;
}
@@ -717,6 +784,7 @@ stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
/* store other params */
s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries :
TLE_TCP_DEFAULT_RETRIES;
+ s->s.udata = prm->udata;
/* invoke async notifications, if any */
if (rte_ring_count(s->rx.q) != 0) {
@@ -731,8 +799,8 @@ stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
else if (s->tx.cb.func != NULL)
s->tx.cb.func(s->tx.cb.data, &s->s);
}
- if (s->tcb.state == TCP_ST_CLOSE_WAIT ||
- s->tcb.state == TCP_ST_CLOSED) {
+ if (s->tcb.state == TLE_TCP_ST_CLOSE_WAIT ||
+ s->tcb.state == TLE_TCP_ST_CLOSED) {
if (s->err.ev != NULL)
tle_event_raise(s->err.ev);
else if (s->err.cb.func != NULL)
@@ -766,9 +834,26 @@ tle_tcp_stream_get_mss(const struct tle_stream * ts)
{
struct tle_tcp_stream *s;
- if (ts == NULL)
+ s = TCP_STREAM(ts);
+ if (ts == NULL || s->s.type >= TLE_VNUM)
return -EINVAL;
- s = TCP_STREAM(ts);
return s->tcb.snd.mss;
}
+
+int
+tle_tcp_stream_get_state(const struct tle_stream * ts,
+ struct tle_tcp_stream_state *st)
+{
+ struct tle_tcp_stream *s;
+
+ s = TCP_STREAM(ts);
+ if (ts == NULL || s->s.type >= TLE_VNUM)
+ return -EINVAL;
+
+ st->state = s->tcb.state;
+ st->uop = s->tcb.uop;
+ st->rev = s->err.rev;
+
+ return 0;
+}