From 21e7392fca2c430018cf387bb3e368ea4c665446 Mon Sep 17 00:00:00 2001 From: Konstantin Ananyev Date: Fri, 3 Mar 2017 18:40:23 +0000 Subject: Rewrite accept() code-path and make l4fwd not to close() on FIN immediatelly. Changes in public API: - removes tle_tcp_stream_synreqs() and tle_tcp_reject() - adds tle_tcp_stream_update_cfg Allocates and fills new stream when final ACK for 3-way handshake is received. Changes in l4fwd sample application: prevents l4fwd to call close() on error event immediately: first try to recv/send remaining data. Change-Id: I8c5b9d365353084083731a4ce582197a8268688f Signed-off-by: Konstantin Ananyev --- examples/l4fwd/common.h | 10 +- examples/l4fwd/netbe.h | 1 + examples/l4fwd/tcp.h | 183 +++++++++------- lib/libtle_l4p/stream_table.h | 4 +- lib/libtle_l4p/syncookie.h | 9 +- lib/libtle_l4p/tcp_ctl.h | 7 + lib/libtle_l4p/tcp_rxtx.c | 500 +++++++++++++++++++----------------------- lib/libtle_l4p/tcp_stream.c | 85 +++++++ lib/libtle_l4p/tle_event.h | 5 + lib/libtle_l4p/tle_tcp.h | 104 ++++----- 10 files changed, 469 insertions(+), 439 deletions(-) diff --git a/examples/l4fwd/common.h b/examples/l4fwd/common.h index ff8ee7a..8d757f3 100644 --- a/examples/l4fwd/common.h +++ b/examples/l4fwd/common.h @@ -619,7 +619,7 @@ netbe_lcore(void) } } -static inline void +static inline int netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) { uint32_t k, n; @@ -631,12 +631,12 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) if (k == 0) { tle_event_idle(fes->rxev); fes->stat.rxev[TLE_SEV_IDLE]++; - return; + return 0; } n = tle_stream_recv(fes->s, fes->pbuf.pkt + n, k); if (n == 0) - return; + return 0; NETFE_TRACE("%s(%u): tle_%s_stream_recv(%p, %u) returns %u\n", __func__, lcore, proto_name[fes->proto], fes->s, k, n); @@ -648,7 +648,7 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) if (fes->op == RXONLY) fes->stat.rxb += pkt_buf_empty(&fes->pbuf); /* mark stream as writable */ - else if (k == RTE_DIM(fes->pbuf.pkt)) { + else if (k == RTE_DIM(fes->pbuf.pkt)) { if (fes->op == RXTX) { tle_event_active(fes->txev, TLE_SEV_UP); fes->stat.txev[TLE_SEV_UP]++; @@ -657,6 +657,8 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) fes->stat.txev[TLE_SEV_UP]++; } } + + return n; } #endif /* COMMON_H_ */ diff --git a/examples/l4fwd/netbe.h b/examples/l4fwd/netbe.h index 6d25603..80d1c28 100644 --- a/examples/l4fwd/netbe.h +++ b/examples/l4fwd/netbe.h @@ -195,6 +195,7 @@ struct netfe_stream { uint16_t proto; uint16_t family; uint16_t txlen; + uint16_t posterr; /* # of time error event handling was postponed */ struct { uint64_t rxp; uint64_t rxb; diff --git a/examples/l4fwd/tcp.h b/examples/l4fwd/tcp.h index 031ad8d..f6ca3a5 100644 --- a/examples/l4fwd/tcp.h +++ b/examples/l4fwd/tcp.h @@ -23,7 +23,9 @@ netfe_stream_term_tcp(struct netfe_lcore *fe, struct netfe_stream *fes) { fes->s = NULL; fes->fwds = NULL; + fes->posterr = 0; memset(&fes->stat, 0, sizeof(fes->stat)); + pkt_buf_empty(&fes->pbuf); netfe_put_stream(fe, &fe->free, fes); } @@ -251,7 +253,7 @@ netfe_create_fwd_stream(struct netfe_lcore *fe, struct netfe_stream *fes, return fws; } -static inline void +static inline int netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes) { uint32_t i, k, n; @@ -264,7 +266,7 @@ netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes) pkt = fes->pbuf.pkt; if (n == 0) - return; + return 0; fed = fes->fwds; @@ -307,88 +309,73 @@ netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes) tle_event_active(fes->rxev, TLE_SEV_UP); fes->stat.rxev[TLE_SEV_UP]++; } + + return (fed == NULL) ? 0 : k; } static inline void -netfe_new_conn_tcp(struct netfe_lcore *fe, __rte_unused uint32_t lcore, +netfe_new_conn_tcp(struct netfe_lcore *fe, uint32_t lcore, struct netfe_stream *fes) { - uint32_t i, k, n, rc; - struct tle_tcp_stream_cfg *prm; - struct tle_tcp_accept_param acpt_prm[MAX_PKT_BURST]; - struct tle_stream *rs[MAX_PKT_BURST]; - struct tle_syn_req syn_reqs[MAX_PKT_BURST]; + uint32_t i, k, n; struct netfe_stream *ts; + struct tle_stream *rs[MAX_PKT_BURST]; struct netfe_stream *fs[MAX_PKT_BURST]; - - static const struct tle_stream_cb zcb = {.func = NULL, .data = NULL}; + struct tle_tcp_stream_cfg prm[MAX_PKT_BURST]; /* check if any syn requests are waiting */ - n = tle_tcp_stream_synreqs(fes->s, syn_reqs, RTE_DIM(syn_reqs)); + n = tle_tcp_stream_accept(fes->s, rs, RTE_DIM(rs)); if (n == 0) return; - NETFE_TRACE("%s(%u): tle_tcp_stream_synreqs(%p, %u) returns %u\n", + NETFE_TRACE("%s(%u): tle_tcp_stream_accept(%p, %u) returns %u\n", __func__, lcore, fes->s, MAX_PKT_BURST, n); /* get n free streams */ k = netfe_get_streams(&fe->free, fs, n); + if (n != k) + RTE_LOG(ERR, USER1, + "%s(lc=%u): not enough FE resources to handle %u new " + "TCP streams;\n", + __func__, lcore, n - k); /* fill accept params to accept k connection requests*/ for (i = 0; i != k; i++) { - acpt_prm[i].syn = syn_reqs[i]; - prm = &acpt_prm[i].cfg; - prm->nb_retries = 0; - prm->recv_ev = fs[i]->rxev; - prm->send_ev = fs[i]->txev; - prm->err_ev = fs[i]->erev; - tle_event_active(fs[i]->erev, TLE_SEV_DOWN); - prm->err_cb = zcb; - prm->recv_cb = zcb; - prm->send_cb = zcb; - } - - /* accept k new connections */ - rc = tle_tcp_stream_accept(fes->s, acpt_prm, rs, k); - - NETFE_TRACE("%s(%u): tle_tcp_stream_accept(%p, %u) returns %u\n", - __func__, lcore, fes->s, k, rc); - - if (rc != n) { - /* n - rc connections could not be accepted */ - tle_tcp_reject(fes->s, syn_reqs + rc, n - rc); - - /* put back k - rc streams free list */ - netfe_put_streams(fe, &fe->free, fs + rc, k - rc); - } - - /* update the params for accepted streams */ - for (i = 0; i != rc; i++) { ts = fs[i]; - ts->s = rs[i]; ts->op = fes->op; ts->proto = fes->proto; ts->family = fes->family; ts->txlen = fes->txlen; - if (fes->op == TXONLY) { + tle_event_active(ts->erev, TLE_SEV_DOWN); + if (fes->op == TXONLY || fes->op == FWD) { tle_event_active(ts->txev, TLE_SEV_UP); ts->stat.txev[TLE_SEV_UP]++; - } else { + } + if (fes->op != TXONLY) { tle_event_active(ts->rxev, TLE_SEV_DOWN); ts->stat.rxev[TLE_SEV_DOWN]++; } netfe_put_stream(fe, &fe->use, ts); - NETFE_TRACE("%s(%u) accept (stream=%p, s=%p)\n", - __func__, lcore, ts, rs[i]); - /* create a new fwd stream if needed */ - if (fes->op == FWD) { - tle_event_active(ts->txev, TLE_SEV_DOWN); - ts->stat.txev[TLE_SEV_DOWN]++; + memset(&prm[i], 0, sizeof(prm[i])); + prm[i].recv_ev = ts->rxev; + prm[i].send_ev = ts->txev; + prm[i].err_ev = ts->erev; + } + + tle_tcp_stream_update_cfg(rs, prm, k); + + tle_tcp_stream_close_bulk(rs + k, n - k); + + /* for the forwarding mode, open the second one */ + if (fes->op == FWD) { + for (i = 0; i != k; i++) { + + ts = fs[i]; ts->fwds = netfe_create_fwd_stream(fe, fes, lcore, fes->fwdprm.bidx); @@ -396,8 +383,9 @@ netfe_new_conn_tcp(struct netfe_lcore *fe, __rte_unused uint32_t lcore, ts->fwds->fwds = ts; } } - fe->tcp_stat.acc += rc; - fe->tcp_stat.rej += n - rc; + + fe->tcp_stat.acc += k; + fe->tcp_stat.rej += n - k; } static inline void @@ -430,7 +418,7 @@ netfe_lcore_tcp_rst(void) { struct netfe_lcore *fe; struct netfe_stream *fwds; - uint32_t j, n; + uint32_t j, k, n; struct tle_stream *s[MAX_PKT_BURST]; struct netfe_stream *fs[MAX_PKT_BURST]; struct tle_event *rv[MAX_PKT_BURST]; @@ -449,36 +437,44 @@ netfe_lcore_tcp_rst(void) NETFE_TRACE("%s(%u): tle_evq_get(errevq=%p) returns %u\n", __func__, rte_lcore_id(), fe->ereq, n); + k = 0; for (j = 0; j != n; j++) { if (verbose > VERBOSE_NONE) { struct tle_tcp_stream_addr addr; tle_tcp_stream_get_addr(fs[j]->s, &addr); netfe_stream_dump(fs[j], &addr.local, &addr.remote); } - s[j] = fs[j]->s; - rv[j] = fs[j]->rxev; - tv[j] = fs[j]->txev; - ev[j] = fs[j]->erev; + + /* check do we still have something to send/recv */ + if (fs[j]->posterr == 0 && + (tle_event_state(fs[j]->rxev) == TLE_SEV_UP || + tle_event_state(fs[j]->txev) == TLE_SEV_UP)) { + fs[j]->posterr++; + } else { + s[k] = fs[j]->s; + rv[k] = fs[j]->rxev; + tv[k] = fs[j]->txev; + ev[k] = fs[j]->erev; + fs[k] = fs[j]; + k++; + } } - tle_evq_idle(fe->rxeq, rv, n); - tle_evq_idle(fe->txeq, tv, n); - tle_evq_idle(fe->ereq, ev, n); + if (k == 0) + return; - tle_tcp_stream_close_bulk(s, n); + tle_evq_idle(fe->rxeq, rv, k); + tle_evq_idle(fe->txeq, tv, k); + tle_evq_idle(fe->ereq, ev, k); - for (j = 0; j != n; j++) { + tle_tcp_stream_close_bulk(s, k); + + for (j = 0; j != k; j++) { - /* - * if forwarding mode, send unsent packets and - * signal peer stream to terminate too. - */ + /* if forwarding mode, signal peer stream to terminate too. */ fwds = fs[j]->fwds; if (fwds != NULL && fwds->s != NULL) { - /* forward all unsent packets */ - netfe_fwd_tcp(rte_lcore_id(), fs[j]); - fwds->fwds = NULL; tle_event_raise(fwds->erev); fs[j]->fwds = NULL; @@ -491,7 +487,7 @@ netfe_lcore_tcp_rst(void) } } -static inline void +static inline int netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes) { uint32_t i, k, n; @@ -504,7 +500,7 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes) if (n == 0) { tle_event_idle(fes->txev); fes->stat.txev[TLE_SEV_IDLE]++; - return; + return 0; } @@ -512,13 +508,13 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes) NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) returns %u\n", __func__, lcore, proto_name[fes->proto], - fes->s, n, k); + fes->s, n, k); fes->stat.txp += k; fes->stat.drops += n - k; /* not able to send anything. */ if (k == 0) - return; + return 0; if (n == RTE_DIM(fes->pbuf.pkt)) { /* mark stream as readable */ @@ -530,19 +526,22 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes) fes->pbuf.num = n - k; for (i = 0; i != n - k; i++) pkt[i] = pkt[i + k]; + + return k; } -static inline void +static inline int netfe_tx_process_tcp(uint32_t lcore, struct netfe_stream *fes) { uint32_t i, k, n; /* refill with new mbufs. */ - pkt_buf_fill(lcore, &fes->pbuf, fes->txlen); + if (fes->posterr == 0) + pkt_buf_fill(lcore, &fes->pbuf, fes->txlen); n = fes->pbuf.num; if (n == 0) - return; + return 0; /** * TODO: cannot use function pointers for unequal param num. @@ -555,19 +554,22 @@ netfe_tx_process_tcp(uint32_t lcore, struct netfe_stream *fes) fes->stat.drops += n - k; if (k == 0) - return; + return 0; /* adjust pbuf array. */ fes->pbuf.num = n - k; for (i = k; i != n; i++) fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i]; + + return k; } static inline void netfe_lcore_tcp(void) { - struct netfe_lcore *fe; + int32_t rc; uint32_t j, n, lcore; + struct netfe_lcore *fe; struct netfe_stream *fs[MAX_PKT_BURST]; fe = RTE_PER_LCORE(_fe); @@ -580,25 +582,42 @@ netfe_lcore_tcp(void) n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs, RTE_DIM(fs)); if (n != 0) { + NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) returns %u\n", __func__, lcore, fe->rxeq, n); - for (j = 0; j != n; j++) - netfe_rx_process(lcore, fs[j]); + + for (j = 0; j != n; j++) { + + rc = netfe_rx_process(lcore, fs[j]); + + /* we are ok to close the stream */ + if (rc == 0 && fs[j]->posterr != 0) + tle_event_raise(fs[j]->erev); + } } /* look for tx events */ n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs, RTE_DIM(fs)); if (n != 0) { + NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n", __func__, lcore, fe->txeq, n); + for (j = 0; j != n; j++) { + + rc = 0; + if (fs[j]->op == RXTX) - netfe_rxtx_process_tcp(lcore, fs[j]); + rc = netfe_rxtx_process_tcp(lcore, fs[j]); else if (fs[j]->op == FWD) - netfe_fwd_tcp(lcore, fs[j]); + rc = netfe_fwd_tcp(lcore, fs[j]); else if (fs[j]->op == TXONLY) - netfe_tx_process_tcp(lcore, fs[j]); + rc = netfe_tx_process_tcp(lcore, fs[j]); + + /* we are ok to close the stream */ + if (rc == 0 && fs[j]->posterr != 0) + tle_event_raise(fs[j]->erev); } } } diff --git a/lib/libtle_l4p/stream_table.h b/lib/libtle_l4p/stream_table.h index 8ad1103..29f1f63 100644 --- a/lib/libtle_l4p/stream_table.h +++ b/lib/libtle_l4p/stream_table.h @@ -110,13 +110,13 @@ stbl_add_entry(struct stbl *st, const union pkt_info *pi) } static inline struct stbl_entry * -stbl_add_pkt(struct stbl *st, const union pkt_info *pi, const void *pkt) +stbl_add_stream(struct stbl *st, const union pkt_info *pi, const void *s) { struct stbl_entry *se; se = stbl_add_entry(st, pi); if (se != NULL) - se->data = (void *)((uintptr_t)pkt | STE_PKT); + se->data = (void *)(uintptr_t)s; return se; } diff --git a/lib/libtle_l4p/syncookie.h b/lib/libtle_l4p/syncookie.h index 276d45a..ad70b7d 100644 --- a/lib/libtle_l4p/syncookie.h +++ b/lib/libtle_l4p/syncookie.h @@ -156,13 +156,8 @@ sync_get_opts(struct syn_opts *so, uintptr_t p, uint32_t len) static inline void sync_fill_tcb(struct tcb *tcb, const union seg_info *si, - const struct rte_mbuf *mb) + const struct syn_opts *so) { - const struct tcp_hdr *th; - - th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *, - mb->l2_len + mb->l3_len); - tcb->rcv.nxt = si->seq; tcb->rcv.irs = si->seq - 1; @@ -174,7 +169,7 @@ sync_fill_tcb(struct tcb *tcb, const union seg_info *si, tcb->snd.wu.wl1 = si->seq; tcb->snd.wu.wl2 = si->ack; - get_syn_opts(&tcb->so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th)); + tcb->so = *so; tcb->snd.wscale = tcb->so.wscale; tcb->snd.mss = tcb->so.mss; diff --git a/lib/libtle_l4p/tcp_ctl.h b/lib/libtle_l4p/tcp_ctl.h index dcb9c3e..95c2bbc 100644 --- a/lib/libtle_l4p/tcp_ctl.h +++ b/lib/libtle_l4p/tcp_ctl.h @@ -41,6 +41,13 @@ tcp_stream_up(struct tle_tcp_stream *s) rwl_up(&s->tx.use); } +/* calculate RCV.WND value based on size of stream receive buffer */ +static inline uint32_t +calc_rx_wnd(const struct tle_tcp_stream *s, uint32_t scale) +{ + return s->rx.q->prod.mask << scale; +} + /* empty stream's receive queue */ static void empty_rq(struct tle_tcp_stream *s) diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c index 4e43730..6085814 100644 --- a/lib/libtle_l4p/tcp_rxtx.c +++ b/lib/libtle_l4p/tcp_rxtx.c @@ -173,7 +173,7 @@ fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port, l4h->dst_port = port.src; wnd = (flags & TCP_FLAG_SYN) ? - RTE_MAX(TCP4_MIN_MSS, tcb->so.mss) : + RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) : tcb->rcv.wnd >> tcb->rcv.wscale; /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */ @@ -760,68 +760,24 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len, } static inline int -restore_syn_pkt(const union pkt_info *pi, const union seg_info *si, - uint32_t ts, struct rte_mbuf *mb) +restore_syn_opt(struct syn_opts *so, const union pkt_info *pi, + const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb) { int32_t rc; uint32_t len; - struct tcp_hdr *th; - struct syn_opts so; + const struct tcp_hdr *th; /* check that ACK, etc fields are what we expected. */ rc = sync_check_ack(pi, si->seq, si->ack - 1, ts); if (rc < 0) return rc; - so.mss = rc; + so->mss = rc; - th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *, + th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *, mb->l2_len + mb->l3_len); len = mb->l4_len - sizeof(*th); - sync_get_opts(&so, (uintptr_t)(th + 1), len); - - /* reconstruct SYN options, extend header size if necessary */ - if (len < TCP_TX_OPT_LEN_MAX) { - len = TCP_TX_OPT_LEN_MAX - len; - th->data_off = TCP_TX_OPT_LEN_MAX / TCP_DATA_ALIGN << - TCP_DATA_OFFSET; - mb->pkt_len += len; - mb->data_len += len; - mb->l4_len += len; - } - - fill_syn_opts(th + 1, &so); - return 0; -} - -static inline int -rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, - const union pkt_info *pi, const union seg_info *si, - uint32_t ts, struct rte_mbuf *mb) -{ - int32_t rc; - struct stbl_entry *se; - - if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0) - return -EINVAL; - - /* ACK for new connection request. */ - - rc = restore_syn_pkt(pi, si, ts, mb); - if (rc < 0) - return rc; - - se = stbl_add_pkt(st, pi, mb); - if (se == NULL) - return -ENOBUFS; - - /* put new connection requests into stream listen queue */ - if (rte_ring_enqueue_burst(s->rx.q, - (void * const *)&se, 1) != 1) { - stbl_del_pkt(st, se, pi); - return -ENOBUFS; - } - + sync_get_opts(so, (uintptr_t)(th + 1), len); return 0; } @@ -848,6 +804,151 @@ stream_term(struct tle_tcp_stream *s) s->err.cb.func(s->err.cb.data, &s->s); } +static inline int +stream_fill_dest(struct tle_tcp_stream *s) +{ + int32_t rc; + const void *da; + + if (s->s.type == TLE_V4) + da = &s->s.ipv4.addr.src; + else + da = &s->s.ipv6.addr.src; + + rc = stream_get_dest(&s->s, da, &s->tx.dst); + return (rc < 0) ? rc : 0; +} + +/* + * helper function, prepares a new accept stream. + */ +static inline int +accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, + struct tle_tcp_stream *cs, const struct syn_opts *so, + uint32_t tms, const union pkt_info *pi, const union seg_info *si) +{ + int32_t rc; + uint32_t rtt; + + /* some TX still pending for that stream. */ + if (TCP_STREAM_TX_PENDING(cs)) + return -EAGAIN; + + /* setup L4 ports and L3 addresses fields. */ + cs->s.port.raw = pi->port.raw; + cs->s.pmsk.raw = UINT32_MAX; + + if (pi->tf.type == TLE_V4) { + cs->s.ipv4.addr = pi->addr4; + cs->s.ipv4.mask.src = INADDR_NONE; + cs->s.ipv4.mask.dst = INADDR_NONE; + } else if (pi->tf.type == TLE_V6) { + cs->s.ipv6.addr = *pi->addr6; + rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none, + sizeof(cs->s.ipv6.mask.src)); + rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none, + sizeof(cs->s.ipv6.mask.dst)); + } + + /* setup TCB */ + sync_fill_tcb(&cs->tcb, si, so); + cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale); + + /* + * estimate the rto + * for now rtt is calculated based on the tcp TMS option, + * later add real-time one + */ + if (cs->tcb.so.ts.ecr) { + rtt = tms - cs->tcb.so.ts.ecr; + rto_estimate(&cs->tcb, rtt); + } else + cs->tcb.snd.rto = TCP_RTO_DEFAULT; + + /* copy streams type. */ + cs->s.type = ps->s.type; + + /* retrive and cache destination information. */ + rc = stream_fill_dest(cs); + if (rc != 0) + return rc; + + /* update snd.mss with SMSS value */ + cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst); + + /* setup congestion variables */ + cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss); + cs->tcb.snd.ssthresh = cs->tcb.snd.wnd; + + cs->tcb.state = TCP_ST_ESTABLISHED; + + /* add stream to the table */ + cs->ste = stbl_add_stream(st, pi, cs); + if (cs->ste == NULL) + return -ENOBUFS; + + cs->tcb.uop |= TCP_OP_ACCEPT; + tcp_stream_up(cs); + return 0; +} + + +/* + * ACK for new connection request arrived. + * Check that the packet meets all conditions and try to open a new stream. + * returns: + * < 0 - invalid packet + * == 0 - packet is valid and new stream was opened for it. + * > 0 - packet is valid, but failed to open new stream. + */ +static inline int +rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, + const union pkt_info *pi, const union seg_info *si, + uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp) +{ + int32_t rc; + struct tle_ctx *ctx; + struct tle_stream *ts; + struct tle_tcp_stream *cs; + struct syn_opts so; + + *csp = NULL; + + if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0) + return -EINVAL; + + rc = restore_syn_opt(&so, pi, si, tms, mb); + if (rc < 0) + return rc; + + ctx = s->s.ctx; + + /* allocate new stream */ + ts = get_stream(ctx); + cs = TCP_STREAM(ts); + if (ts == NULL) + return ENFILE; + + /* prepare stream to handle new connection */ + if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) { + + /* put new stream in the accept queue */ + if (rte_ring_enqueue_burst(s->rx.q, + (void * const *)&ts, 1) == 1) { + *csp = cs; + return 0; + } + + /* cleanup on failure */ + tcp_stream_down(cs); + stbl_del_pkt(st, cs->ste, pi); + cs->ste = NULL; + } + + tcp_stream_reset(ctx, cs); + return ENOBUFS; +} + static inline int data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen, uint32_t *seqn, uint32_t *plen) @@ -1591,14 +1692,35 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts, return num - k; } +static inline uint32_t +rx_new_stream(struct tle_tcp_stream *s, uint32_t ts, + const union pkt_info *pi, const union seg_info si[], + struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[], + uint32_t num) +{ + uint32_t i; + + if (rwl_acquire(&s->rx.use) > 0) { + i = rx_stream(s, ts, pi, si, mb, rp, rc, num); + rwl_release(&s->rx.use); + return i; + } + + for (i = 0; i != num; i++) { + rc[i] = ENOENT; + rp[i] = mb[i]; + } + return 0; +} + static inline uint32_t rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, const union pkt_info pi[], const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[], uint32_t num) { - struct tle_tcp_stream *s; - uint32_t i, k, state; + struct tle_tcp_stream *cs, *s; + uint32_t i, k, n, state; int32_t ret; s = rx_obtain_stream(dev, st, &pi[0], type); @@ -1616,25 +1738,51 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, if (state == TCP_ST_LISTEN) { /* one connection per flow */ - ret = EINVAL; - for (i = 0; i != num && ret != 0; i++) { - ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i]); - if (ret != 0) { - rc[k] = -ret; - rp[k] = mb[i]; - k++; - } - } - /* duplicate SYN requests */ - for (; i != num; i++, k++) { - rc[k] = EINVAL; + cs = NULL; + ret = -EINVAL; + for (i = 0; i != num; i++) { + + ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs); + + /* valid packet encountered */ + if (ret >= 0) + break; + + /* invalid packet, keep trying to find a proper one */ + rc[k] = -ret; rp[k] = mb[i]; + k++; } - if (k != num && s->rx.ev != NULL) - tle_event_raise(s->rx.ev); - else if (s->rx.cb.func != NULL && rte_ring_count(s->rx.q) == 1) - s->rx.cb.func(s->rx.cb.data, &s->s); + /* packet is valid, but we are out of streams to serve it */ + if (ret > 0) { + for (; i != num; i++, k++) { + rc[k] = ret; + rp[k] = mb[i]; + } + /* new stream is accepted */ + } else if (ret == 0) { + + /* inform listen stream about new connections */ + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + else if (s->rx.cb.func != NULL && + rte_ring_count(s->rx.q) == 1) + s->rx.cb.func(s->rx.cb.data, &s->s); + + /* if there is no data, drop current packet */ + if (PKT_L4_PLEN(mb[i]) == 0) { + rc[k] = ENODATA; + rp[k++] = mb[i++]; + } + + /* process remaining packets for that stream */ + if (num != i) { + n = rx_new_stream(cs, ts, pi + i, si + i, + mb + i, rp + k, rc + k, num - i); + k += num - n - i; + } + } } else { i = rx_stream(s, ts, pi, si, mb, rp, rc, num); @@ -1761,23 +1909,17 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], } uint16_t -tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[], +tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[], uint32_t num) { - uint32_t i, n; + uint32_t n; struct tle_tcp_stream *s; - struct stbl_entry *se[num]; s = TCP_STREAM(ts); - n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)se, num); + n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num); if (n == 0) return 0; - for (i = 0; i != n; i++) { - rq[i].pkt = stbl_get_pkt(se[i]); - rq[i].opaque = se[i]; - } - /* * if we still have packets to read, * then rearm stream RX event. @@ -1791,206 +1933,6 @@ tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[], return n; } -static inline int -stream_fill_dest(struct tle_tcp_stream *s) -{ - int32_t rc; - const void *da; - - if (s->s.type == TLE_V4) - da = &s->s.ipv4.addr.src; - else - da = &s->s.ipv6.addr.src; - - rc = stream_get_dest(&s->s, da, &s->tx.dst); - return (rc < 0) ? rc : 0; -} - -/* - * helper function, prepares an accepted stream. - */ -static int -accept_fill_stream(struct tle_tcp_stream *ps, struct tle_tcp_stream *cs, - const struct tle_tcp_accept_param *prm, uint32_t tms, - const union pkt_info *pi, const union seg_info *si) -{ - int32_t rc; - uint32_t rtt; - - /* some TX still pending for that stream. */ - if (TCP_STREAM_TX_PENDING(cs)) - return -EAGAIN; - - /* setup L4 ports and L3 addresses fields. */ - cs->s.port.raw = pi->port.raw; - cs->s.pmsk.raw = UINT32_MAX; - - if (pi->tf.type == TLE_V4) { - cs->s.ipv4.addr = pi->addr4; - cs->s.ipv4.mask.src = INADDR_NONE; - cs->s.ipv4.mask.dst = INADDR_NONE; - } else if (pi->tf.type == TLE_V6) { - cs->s.ipv6.addr = *pi->addr6; - rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none, - sizeof(cs->s.ipv6.mask.src)); - rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none, - sizeof(cs->s.ipv6.mask.dst)); - } - - /* setup TCB */ - sync_fill_tcb(&cs->tcb, si, prm->syn.pkt); - cs->tcb.rcv.wnd = cs->rx.q->prod.mask << cs->tcb.rcv.wscale; - - /* setup stream notification menchanism */ - cs->rx.ev = prm->cfg.recv_ev; - cs->rx.cb = prm->cfg.recv_cb; - cs->tx.ev = prm->cfg.send_ev; - cs->tx.cb = prm->cfg.send_cb; - cs->err.ev = prm->cfg.err_ev; - cs->err.cb = prm->cfg.err_cb; - - /* store other params */ - cs->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries : - TLE_TCP_DEFAULT_RETRIES; - - /* - * estimate the rto - * for now rtt is calculated based on the tcp TMS option, - * later add real-time one - */ - if (cs->tcb.so.ts.ecr) { - rtt = tms - cs->tcb.so.ts.ecr; - rto_estimate(&cs->tcb, rtt); - } else - cs->tcb.snd.rto = TCP_RTO_DEFAULT; - - tcp_stream_up(cs); - - /* copy streams type. */ - cs->s.type = ps->s.type; - - /* retrive and cache destination information. */ - rc = stream_fill_dest(cs); - if (rc != 0) - return rc; - - /* update snd.mss with SMSS value */ - cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst); - - /* setup congestion variables */ - cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss); - cs->tcb.snd.ssthresh = cs->tcb.snd.wnd; - - cs->tcb.state = TCP_ST_ESTABLISHED; - cs->tcb.uop |= TCP_OP_ACCEPT; - - /* add stream to the table */ - cs->ste = prm->syn.opaque; - rte_smp_wmb(); - cs->ste->data = cs; - return 0; -} - -/* - * !!! - * Right now new stream rcv.wnd is set to zero. - * That simplifies handling of new connection establishment - * (as no data segments could be received), - * but has to be addressed. - * possible ways: - * - send ack after accept creates new stream with new rcv.wnd value. - * the problem with that approach that single ack is not delivered - * reliably (could be lost), plus might slowdown connection establishment - * (extra packet per connection, that client has to wait for). - * - allocate new stream at ACK recieve stage. - * As a drawback - whole new stream allocation/connection establishment - * will be done in BE. - * !!! - */ -int -tle_tcp_stream_accept(struct tle_stream *ts, - const struct tle_tcp_accept_param prm[], struct tle_stream *rs[], - uint32_t num) -{ - struct tle_tcp_stream *cs, *s; - struct tle_ctx *ctx; - uint32_t i, j, n, tms; - int32_t rc; - union pkt_info pi[num]; - union seg_info si[num]; - - tms = tcp_get_tms(); - s = TCP_STREAM(ts); - - for (i = 0; i != num; i++) - get_pkt_info(prm[i].syn.pkt, &pi[i], &si[i]); - - /* mark stream as not closable */ - if (rwl_acquire(&s->rx.use) < 0) - return -EINVAL; - - ctx = s->s.ctx; - n = get_streams(ctx, rs, num); - - rc = 0; - for (i = 0; i != n; i++) { - - /* prepare new stream */ - cs = TCP_STREAM(rs[i]); - rc = accept_fill_stream(s, cs, prm + i, tms, pi + i, si + i); - if (rc != 0) - break; - } - - rwl_release(&s->rx.use); - - /* free 'SYN' mbufs. */ - for (j = 0; j != i; j++) - rte_pktmbuf_free(prm[j].syn.pkt); - - /* close failed stream, put unused streams back to the free list. */ - if (rc != 0) { - tle_tcp_stream_close(rs[i]); - for (j = i + 1; j != n; j++) { - cs = TCP_STREAM(rs[j]); - put_stream(ctx, rs[j], TCP_STREAM_TX_PENDING(cs)); - } - rte_errno = -rc; - - /* not enough streams are available */ - } else if (n != num) - rte_errno = ENFILE; - - return i; -} - -/* - * !!! implement a proper one, or delete !!! - * need to make sure no race conditions with add/lookup stream table. - */ -void -tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[], - uint32_t num) -{ - uint32_t i; - struct rte_mbuf *mb; - struct stbl *st; - union pkt_info pi; - union seg_info si; - - st = CTX_TCP_STLB(s->ctx); - - for (i = 0; i != num; i++) { - mb = rq[i].pkt; - get_pkt_info(mb, &pi, &si); - if (pi.tf.type < TLE_VNUM) - stbl_del_pkt_lock(st, rq[i].opaque, &pi); - - /* !!! send RST pkt to the peer !!! */ - rte_pktmbuf_free(mb); - } -} - uint16_t tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num) { @@ -2121,7 +2063,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) s->tcb.rcv.mss = s->tcb.so.mss; s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT; - s->tcb.rcv.wnd = s->rx.q->prod.mask << s->tcb.rcv.wscale; + s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale); s->tcb.rcv.ts = 0; /* add the stream in stream table */ diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c index 67ed66b..f06b2ed 100644 --- a/lib/libtle_l4p/tcp_stream.c +++ b/lib/libtle_l4p/tcp_stream.c @@ -511,6 +511,7 @@ tle_tcp_stream_listen(struct tle_stream *ts) TCP_ST_LISTEN); if (rc != 0) { s->tcb.uop |= TCP_OP_LISTEN; + s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT); rc = 0; } else rc = -EDEADLK; @@ -520,3 +521,87 @@ tle_tcp_stream_listen(struct tle_stream *ts) rwl_release(&s->rx.use); return rc; } + +/* + * helper function, updates stream config + */ +static inline int +stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm) +{ + int32_t rc1, rc2; + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + + rc1 = rwl_try_acquire(&s->rx.use); + rc2 = rwl_try_acquire(&s->tx.use); + + if (rc1 < 0 || rc2 < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) { + rwl_release(&s->tx.use); + rwl_release(&s->rx.use); + return -EINVAL; + } + + /* setup stream notification menchanism */ + s->rx.ev = prm->recv_ev; + s->tx.ev = prm->send_ev; + s->err.ev = prm->err_ev; + + s->rx.cb.data = prm->recv_cb.data; + s->tx.cb.data = prm->send_cb.data; + s->err.cb.data = prm->err_cb.data; + + rte_smp_wmb(); + + s->rx.cb.func = prm->recv_cb.func; + s->tx.cb.func = prm->send_cb.func; + s->err.cb.func = prm->err_cb.func; + + /* store other params */ + s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries : + TLE_TCP_DEFAULT_RETRIES; + + /* invoke async notifications, if any */ + if (rte_ring_count(s->rx.q) != 0) { + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + else if (s->rx.cb.func != NULL) + s->rx.cb.func(s->rx.cb.data, &s->s); + } + if (rte_ring_free_count(s->tx.q) != 0) { + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + else if (s->tx.cb.func != NULL) + s->tx.cb.func(s->tx.cb.data, &s->s); + } + if (s->tcb.state == TCP_ST_CLOSE_WAIT || + s->tcb.state == TCP_ST_CLOSED) { + if (s->err.ev != NULL) + tle_event_raise(s->err.ev); + else if (s->err.cb.func != NULL) + s->err.cb.func(s->err.cb.data, &s->s); + } + + rwl_release(&s->tx.use); + rwl_release(&s->rx.use); + + return 0; +} + +uint32_t +tle_tcp_stream_update_cfg(struct tle_stream *ts[], + struct tle_tcp_stream_cfg prm[], uint32_t num) +{ + int32_t rc; + uint32_t i; + + for (i = 0; i != num; i++) { + rc = stream_update_cfg(ts[i], &prm[i]); + if (rc != 0) { + rte_errno = -rc; + break; + } + } + + return i; +} diff --git a/lib/libtle_l4p/tle_event.h b/lib/libtle_l4p/tle_event.h index b19954a..d730345 100644 --- a/lib/libtle_l4p/tle_event.h +++ b/lib/libtle_l4p/tle_event.h @@ -106,6 +106,11 @@ struct tle_event *tle_event_alloc(struct tle_evq *evq, const void *data); */ void tle_event_free(struct tle_event *ev); +static inline enum tle_ev_state +tle_event_state(const struct tle_event *ev) +{ + return ev->state; +} /** * move event from DOWN to UP state. diff --git a/lib/libtle_l4p/tle_tcp.h b/lib/libtle_l4p/tle_tcp.h index e6eb336..ec89746 100644 --- a/lib/libtle_l4p/tle_tcp.h +++ b/lib/libtle_l4p/tle_tcp.h @@ -148,40 +148,19 @@ int tle_tcp_stream_connect(struct tle_stream *s, const struct sockaddr *addr); * * tle_tcp_stream_listen(stream_to_listen); * - * n = tle_tcp_synreqs(stream_to_listen, syn_reqs, sizeof(syn_reqs)); - * for (i = 0, k = 0; i != n; i++) { - * rc = ; - * if (rc == 0) { - * //proceed with connection establishment - * k++; - * accept_param[k].syn = syn_reqs[i]; - * - * } else { - * //reject connection requests from that endpoint - * rej_reqs[i - k] = syn_reqs[i]; - * } + * n = tle_tcp_accept(stream_to_listen, accepted_streams, + * sizeof(accepted_streams)); + * for (i = 0, i != n; i++) { + * //prepare tle_tcp_stream_cfg for newly accepted streams + * ... + * } + * k = tle_tcp_stream_update_cfg(rs, prm, n); + * if (n != k) { + * //handle error + * ... * } - * - * //reject n - k connection requests - * tle_tcp_reject(stream_to_listen, rej_reqs, n - k); - * - * //accept k new connections - * rc = tle_tcp_accept(stream_to_listen, accept_param, new_con_streams, k); - * */ -struct tle_syn_req { - struct rte_mbuf *pkt; - /*< mbuf with incoming connection request. */ - void *opaque; /*< tldk related opaque pointer. */ -}; - -struct tle_tcp_accept_param { - struct tle_syn_req syn; /*< mbuf with incoming SYN request. */ - struct tle_tcp_stream_cfg cfg; /*< stream configure options. */ -}; - - /** * Set stream into the listen state (passive opener), i.e. make stream ready * to accept new connections. @@ -198,28 +177,41 @@ struct tle_tcp_accept_param { int tle_tcp_stream_listen(struct tle_stream *s); /** - * return up to *num* mbufs with SYN requests that were received + * return up to *num* streams from the queue of pending connections * for given TCP endpoint. - * Note that the stream has to be in listen state. - * For each returned mbuf: - * data_off set to the start of the packet - * l2_len, l3_len, l4_len are setup properly - * (so user can still extract L2/L3/L4 header info if needed) - * packet_type RTE_PTYPE_L2/L3/L4 bits are setup properly. - * L3/L4 checksum is verified. * @param s - * TCP stream to receive packets from. - * @param rq - * An array of tle_syn_req structures that contains - * at least *num* elements in it. + * TCP stream in listen state. + * @param rs + * An array of pointers to the newily accepted streams. + * Each such new stream represents a new connection to the given TCP endpoint. + * Newly accepted stream should be in connected state and ready to use + * by other FE API routines (send/recv/close/etc.). * @param num - * Number of elements in the *pkt* array. + * Number of elements in the *rs* array. * @return - * number of of entries filled inside *pkt* array. + * number of entries filled inside *rs* array. */ -uint16_t tle_tcp_stream_synreqs(struct tle_stream *s, struct tle_syn_req rq[], +uint16_t tle_tcp_stream_accept(struct tle_stream *s, struct tle_stream *rs[], uint32_t num); +/** + * updates configuration (associated events, callbacks, stream parameters) + * for the given streams. + * @param ts + * An array of pointers to the streams to update. + * @param prm + * An array of parameters to update for the given streams. + * @param num + * Number of elements in the *ts* and *prm* arrays. + * @return + * number of streams successfully updated. + * In case of error, error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + */ +uint32_t tle_tcp_stream_update_cfg(struct tle_stream *ts[], + struct tle_tcp_stream_cfg prm[], uint32_t num); + /** * Accept connection requests for the given stream. * Note that the stream has to be in listen state. @@ -241,27 +233,9 @@ uint16_t tle_tcp_stream_synreqs(struct tle_stream *s, struct tle_syn_req rq[], * - EINVAL - invalid parameter passed to function * - ENFILE - no more streams are avaialble to open. */ -int tle_tcp_stream_accept(struct tle_stream *s, - const struct tle_tcp_accept_param prm[], struct tle_stream *rs[], - uint32_t num); - -/** - * Reject connection requests for the given stream. - * Note that the stream has to be in listen state. - * For each new connection a new stream will be open. - * @param s - * TCP listen stream. - * @param rq - * An array of tle_syn_req structures that contains - * at least *num* elements in it. - * @param num - * Number of elements in the *pkt* array. - */ -void tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[], - uint32_t num); /** - * return up to *num* mbufs that was received for given TCP stream. + * Return up to *num* mbufs that was received for given TCP stream. * Note that the stream has to be in connected state. * Data ordering is preserved. * For each returned mbuf: -- cgit 1.2.3-korg