diff options
Diffstat (limited to 'lib/libtle_l4p/tcp_stream.c')
-rw-r--r-- | lib/libtle_l4p/tcp_stream.c | 199 |
1 files changed, 142 insertions, 57 deletions
diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c index c1a007a..f41ff3c 100644 --- a/lib/libtle_l4p/tcp_stream.c +++ b/lib/libtle_l4p/tcp_stream.c @@ -381,6 +381,10 @@ tcp_stream_fill_cfg(struct tle_tcp_stream *s, const struct tle_ctx_param *cprm, cprm->icw; s->tcb.snd.rto_tw = (cprm->timewait == TLE_TCP_TIMEWAIT_DEFAULT) ? TCP_RTO_2MSL : cprm->timewait; + + s->ts_offset = 0; + + s->s.udata = scfg->udata; } static int @@ -501,65 +505,44 @@ tle_tcp_stream_open(struct tle_ctx *ctx, } /* - * Helper functions, used by close API. + * Helper function, used by close()/shutdown API + * Check stream state, if FIN was not generatedi yet, then + * change stream state and queue it for TX. */ static inline int -stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s) +stream_finalize(struct tle_ctx *ctx, struct tle_tcp_stream *s, uint32_t state) { - uint16_t uop; - uint32_t state; - static const struct tle_stream_cb zcb; - - /* check was close() already invoked */ - uop = s->tcb.uop; - if ((uop & TCP_OP_CLOSE) != 0) - return -EDEADLK; - - /* record that close() was already invoked */ - if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0) - return -EDEADLK; - - /* mark stream as unavaialbe for RX/TX. */ - tcp_stream_down(s); + if (state != TLE_TCP_ST_ESTABLISHED && state != TLE_TCP_ST_CLOSE_WAIT) + return -EINVAL; - /* reset events/callbacks */ - s->rx.ev = NULL; - s->tx.ev = NULL; - s->err.ev = NULL; + /* change state */ + s->tcb.state = (state == TLE_TCP_ST_ESTABLISHED) ? + TLE_TCP_ST_FIN_WAIT_1 : TLE_TCP_ST_LAST_ACK; - s->rx.cb = zcb; - s->tx.cb = zcb; - s->err.cb = zcb; + /* queue stream into to-send queue */ + txs_enqueue(ctx, s); + return 0; +} - state = s->tcb.state; +/* + * Helper function, used by close API. + */ +static inline int +stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s) +{ + int32_t rc; - /* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */ - if (state <= TCP_ST_SYN_SENT) { - tcp_stream_reset(ctx, s); - return 0; - } + rc = stream_close_prolog(ctx, s, TLE_TCP_OP_CLOSE); + if (rc <= 0) + return rc; /* generate FIN and proceed with normal connection termination */ - if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) { - - /* change state */ - s->tcb.state = (state == TCP_ST_ESTABLISHED) ? - TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK; + stream_finalize(ctx, s, rc); - /* mark stream as writable/readable again */ - tcp_stream_up(s); - - /* queue stream into to-send queue */ - txs_enqueue(ctx, s); - return 0; - } + /* mark stream as writable/readable again */ + tcp_stream_up(s); - /* - * accroding to the state, close() was already invoked, - * should never that point. - */ - RTE_ASSERT(0); - return -EINVAL; + return 0; } uint32_t @@ -608,6 +591,89 @@ tle_tcp_stream_close(struct tle_stream *ts) } int +tle_tcp_stream_abort(struct tle_stream *ts) +{ + int32_t rc; + struct tle_ctx *ctx; + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + if (ts == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + ctx = s->s.ctx; + rc = stream_close_prolog(ctx, s, TLE_TCP_OP_CLOSE_ABORT); + if (rc > 0) { + + /* + * RFC 793, On ABORT call, for states: + * SYN-RECEIVED STATE + * ESTABLISHED STATE + * FIN-WAIT-1 STATE + * FIN-WAIT-2 STATE + * CLOSE-WAIT STATE + * Send a reset segment: <SEQ=SND.NXT><CTL=RST> + * ...; all segments queued for transmission (except for the + * RST formed above) or retransmission should be flushed, + * delete the TCB, enter CLOSED state, and return. + */ + + if (rc >= TLE_TCP_ST_ESTABLISHED && rc <= TLE_TCP_ST_CLOSE_WAIT) + s->tcb.snd.close_flags |= TCP_FLAG_RST; + + /* + * set state to CLOSED, mark stream as writable/readable again + * and enqueue stream into to-send queue. + * That will cause later RST generation and stream termination. + */ + s->tcb.state = TLE_TCP_ST_CLOSED; + tcp_stream_up(s); + txs_enqueue(ctx, s); + rc = 0; + } + + tle_memtank_shrink(CTX_TCP_MTS(ctx)); + return rc; +} + +int +tle_tcp_stream_shutdown(struct tle_stream *ts) +{ + int32_t rc; + uint16_t uop; + struct tle_ctx *ctx; + struct tle_tcp_stream *s; + + const uint16_t nop = TLE_TCP_OP_SHUTDOWN; + + s = TCP_STREAM(ts); + if (ts == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + ctx = s->s.ctx; + + /* check was shutdown() or close() already invoked */ + uop = s->tcb.uop; + if ((uop & (TLE_TCP_OP_CLOSE | nop)) != 0) + return -EDEADLK; + + /* record that shutdown was invoked */ + if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | nop) == 0) + return -EDEADLK; + + /* mark stream as unavaialbe for RX/TX. */ + tcp_stream_down(s); + + /* change state, generate FIN */ + rc = stream_finalize(ctx, s, s->tcb.state); + + /* mark stream as writable/readable again */ + tcp_stream_up(s); + + return rc; +} + +int tle_tcp_stream_get_addr(const struct tle_stream *ts, struct tle_tcp_stream_addr *addr) { @@ -664,15 +730,15 @@ tle_tcp_stream_listen(struct tle_stream *ts) /* app may listen for multiple times to change backlog, * we will just return success for such cases. */ - if (s->tcb.state == TCP_ST_LISTEN) + if (s->tcb.state == TLE_TCP_ST_LISTEN) return 0; /* mark stream as not closable. */ if (tcp_stream_try_acquire(s) > 0) { - rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED, - TCP_ST_LISTEN); + rc = rte_atomic16_cmpset(&s->tcb.state, TLE_TCP_ST_CLOSED, + TLE_TCP_ST_LISTEN); if (rc != 0) { - s->tcb.uop |= TCP_OP_LISTEN; + s->tcb.uop |= TLE_TCP_OP_LISTEN; s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT); rc = 0; } else @@ -694,7 +760,8 @@ stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm) s = TCP_STREAM(ts); - if (tcp_stream_try_acquire(s) < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) { + if (tcp_stream_try_acquire(s) < 0 || + (s->tcb.uop & TLE_TCP_OP_CLOSE) != 0) { tcp_stream_release(s); return -EINVAL; } @@ -717,6 +784,7 @@ stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm) /* store other params */ s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries : TLE_TCP_DEFAULT_RETRIES; + s->s.udata = prm->udata; /* invoke async notifications, if any */ if (rte_ring_count(s->rx.q) != 0) { @@ -731,8 +799,8 @@ stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm) else if (s->tx.cb.func != NULL) s->tx.cb.func(s->tx.cb.data, &s->s); } - if (s->tcb.state == TCP_ST_CLOSE_WAIT || - s->tcb.state == TCP_ST_CLOSED) { + if (s->tcb.state == TLE_TCP_ST_CLOSE_WAIT || + s->tcb.state == TLE_TCP_ST_CLOSED) { if (s->err.ev != NULL) tle_event_raise(s->err.ev); else if (s->err.cb.func != NULL) @@ -766,9 +834,26 @@ tle_tcp_stream_get_mss(const struct tle_stream * ts) { struct tle_tcp_stream *s; - if (ts == NULL) + s = TCP_STREAM(ts); + if (ts == NULL || s->s.type >= TLE_VNUM) return -EINVAL; - s = TCP_STREAM(ts); return s->tcb.snd.mss; } + +int +tle_tcp_stream_get_state(const struct tle_stream * ts, + struct tle_tcp_stream_state *st) +{ + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + if (ts == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + st->state = s->tcb.state; + st->uop = s->tcb.uop; + st->rev = s->err.rev; + + return 0; +} |