From 21e7392fca2c430018cf387bb3e368ea4c665446 Mon Sep 17 00:00:00 2001 From: Konstantin Ananyev Date: Fri, 3 Mar 2017 18:40:23 +0000 Subject: Rewrite accept() code-path and make l4fwd not to close() on FIN immediatelly. Changes in public API: - removes tle_tcp_stream_synreqs() and tle_tcp_reject() - adds tle_tcp_stream_update_cfg Allocates and fills new stream when final ACK for 3-way handshake is received. Changes in l4fwd sample application: prevents l4fwd to call close() on error event immediately: first try to recv/send remaining data. Change-Id: I8c5b9d365353084083731a4ce582197a8268688f Signed-off-by: Konstantin Ananyev --- examples/l4fwd/common.h | 10 +-- examples/l4fwd/netbe.h | 1 + examples/l4fwd/tcp.h | 183 ++++++++++++++++++++++++++---------------------- 3 files changed, 108 insertions(+), 86 deletions(-) (limited to 'examples') diff --git a/examples/l4fwd/common.h b/examples/l4fwd/common.h index ff8ee7a..8d757f3 100644 --- a/examples/l4fwd/common.h +++ b/examples/l4fwd/common.h @@ -619,7 +619,7 @@ netbe_lcore(void) } } -static inline void +static inline int netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) { uint32_t k, n; @@ -631,12 +631,12 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) if (k == 0) { tle_event_idle(fes->rxev); fes->stat.rxev[TLE_SEV_IDLE]++; - return; + return 0; } n = tle_stream_recv(fes->s, fes->pbuf.pkt + n, k); if (n == 0) - return; + return 0; NETFE_TRACE("%s(%u): tle_%s_stream_recv(%p, %u) returns %u\n", __func__, lcore, proto_name[fes->proto], fes->s, k, n); @@ -648,7 +648,7 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) if (fes->op == RXONLY) fes->stat.rxb += pkt_buf_empty(&fes->pbuf); /* mark stream as writable */ - else if (k == RTE_DIM(fes->pbuf.pkt)) { + else if (k == RTE_DIM(fes->pbuf.pkt)) { if (fes->op == RXTX) { tle_event_active(fes->txev, TLE_SEV_UP); fes->stat.txev[TLE_SEV_UP]++; @@ -657,6 +657,8 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) fes->stat.txev[TLE_SEV_UP]++; } } + + return n; } #endif /* COMMON_H_ */ diff --git a/examples/l4fwd/netbe.h b/examples/l4fwd/netbe.h index 6d25603..80d1c28 100644 --- a/examples/l4fwd/netbe.h +++ b/examples/l4fwd/netbe.h @@ -195,6 +195,7 @@ struct netfe_stream { uint16_t proto; uint16_t family; uint16_t txlen; + uint16_t posterr; /* # of time error event handling was postponed */ struct { uint64_t rxp; uint64_t rxb; diff --git a/examples/l4fwd/tcp.h b/examples/l4fwd/tcp.h index 031ad8d..f6ca3a5 100644 --- a/examples/l4fwd/tcp.h +++ b/examples/l4fwd/tcp.h @@ -23,7 +23,9 @@ netfe_stream_term_tcp(struct netfe_lcore *fe, struct netfe_stream *fes) { fes->s = NULL; fes->fwds = NULL; + fes->posterr = 0; memset(&fes->stat, 0, sizeof(fes->stat)); + pkt_buf_empty(&fes->pbuf); netfe_put_stream(fe, &fe->free, fes); } @@ -251,7 +253,7 @@ netfe_create_fwd_stream(struct netfe_lcore *fe, struct netfe_stream *fes, return fws; } -static inline void +static inline int netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes) { uint32_t i, k, n; @@ -264,7 +266,7 @@ netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes) pkt = fes->pbuf.pkt; if (n == 0) - return; + return 0; fed = fes->fwds; @@ -307,88 +309,73 @@ netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes) tle_event_active(fes->rxev, TLE_SEV_UP); fes->stat.rxev[TLE_SEV_UP]++; } + + return (fed == NULL) ? 0 : k; } static inline void -netfe_new_conn_tcp(struct netfe_lcore *fe, __rte_unused uint32_t lcore, +netfe_new_conn_tcp(struct netfe_lcore *fe, uint32_t lcore, struct netfe_stream *fes) { - uint32_t i, k, n, rc; - struct tle_tcp_stream_cfg *prm; - struct tle_tcp_accept_param acpt_prm[MAX_PKT_BURST]; - struct tle_stream *rs[MAX_PKT_BURST]; - struct tle_syn_req syn_reqs[MAX_PKT_BURST]; + uint32_t i, k, n; struct netfe_stream *ts; + struct tle_stream *rs[MAX_PKT_BURST]; struct netfe_stream *fs[MAX_PKT_BURST]; - - static const struct tle_stream_cb zcb = {.func = NULL, .data = NULL}; + struct tle_tcp_stream_cfg prm[MAX_PKT_BURST]; /* check if any syn requests are waiting */ - n = tle_tcp_stream_synreqs(fes->s, syn_reqs, RTE_DIM(syn_reqs)); + n = tle_tcp_stream_accept(fes->s, rs, RTE_DIM(rs)); if (n == 0) return; - NETFE_TRACE("%s(%u): tle_tcp_stream_synreqs(%p, %u) returns %u\n", + NETFE_TRACE("%s(%u): tle_tcp_stream_accept(%p, %u) returns %u\n", __func__, lcore, fes->s, MAX_PKT_BURST, n); /* get n free streams */ k = netfe_get_streams(&fe->free, fs, n); + if (n != k) + RTE_LOG(ERR, USER1, + "%s(lc=%u): not enough FE resources to handle %u new " + "TCP streams;\n", + __func__, lcore, n - k); /* fill accept params to accept k connection requests*/ for (i = 0; i != k; i++) { - acpt_prm[i].syn = syn_reqs[i]; - prm = &acpt_prm[i].cfg; - prm->nb_retries = 0; - prm->recv_ev = fs[i]->rxev; - prm->send_ev = fs[i]->txev; - prm->err_ev = fs[i]->erev; - tle_event_active(fs[i]->erev, TLE_SEV_DOWN); - prm->err_cb = zcb; - prm->recv_cb = zcb; - prm->send_cb = zcb; - } - - /* accept k new connections */ - rc = tle_tcp_stream_accept(fes->s, acpt_prm, rs, k); - - NETFE_TRACE("%s(%u): tle_tcp_stream_accept(%p, %u) returns %u\n", - __func__, lcore, fes->s, k, rc); - - if (rc != n) { - /* n - rc connections could not be accepted */ - tle_tcp_reject(fes->s, syn_reqs + rc, n - rc); - - /* put back k - rc streams free list */ - netfe_put_streams(fe, &fe->free, fs + rc, k - rc); - } - - /* update the params for accepted streams */ - for (i = 0; i != rc; i++) { ts = fs[i]; - ts->s = rs[i]; ts->op = fes->op; ts->proto = fes->proto; ts->family = fes->family; ts->txlen = fes->txlen; - if (fes->op == TXONLY) { + tle_event_active(ts->erev, TLE_SEV_DOWN); + if (fes->op == TXONLY || fes->op == FWD) { tle_event_active(ts->txev, TLE_SEV_UP); ts->stat.txev[TLE_SEV_UP]++; - } else { + } + if (fes->op != TXONLY) { tle_event_active(ts->rxev, TLE_SEV_DOWN); ts->stat.rxev[TLE_SEV_DOWN]++; } netfe_put_stream(fe, &fe->use, ts); - NETFE_TRACE("%s(%u) accept (stream=%p, s=%p)\n", - __func__, lcore, ts, rs[i]); - /* create a new fwd stream if needed */ - if (fes->op == FWD) { - tle_event_active(ts->txev, TLE_SEV_DOWN); - ts->stat.txev[TLE_SEV_DOWN]++; + memset(&prm[i], 0, sizeof(prm[i])); + prm[i].recv_ev = ts->rxev; + prm[i].send_ev = ts->txev; + prm[i].err_ev = ts->erev; + } + + tle_tcp_stream_update_cfg(rs, prm, k); + + tle_tcp_stream_close_bulk(rs + k, n - k); + + /* for the forwarding mode, open the second one */ + if (fes->op == FWD) { + for (i = 0; i != k; i++) { + + ts = fs[i]; ts->fwds = netfe_create_fwd_stream(fe, fes, lcore, fes->fwdprm.bidx); @@ -396,8 +383,9 @@ netfe_new_conn_tcp(struct netfe_lcore *fe, __rte_unused uint32_t lcore, ts->fwds->fwds = ts; } } - fe->tcp_stat.acc += rc; - fe->tcp_stat.rej += n - rc; + + fe->tcp_stat.acc += k; + fe->tcp_stat.rej += n - k; } static inline void @@ -430,7 +418,7 @@ netfe_lcore_tcp_rst(void) { struct netfe_lcore *fe; struct netfe_stream *fwds; - uint32_t j, n; + uint32_t j, k, n; struct tle_stream *s[MAX_PKT_BURST]; struct netfe_stream *fs[MAX_PKT_BURST]; struct tle_event *rv[MAX_PKT_BURST]; @@ -449,36 +437,44 @@ netfe_lcore_tcp_rst(void) NETFE_TRACE("%s(%u): tle_evq_get(errevq=%p) returns %u\n", __func__, rte_lcore_id(), fe->ereq, n); + k = 0; for (j = 0; j != n; j++) { if (verbose > VERBOSE_NONE) { struct tle_tcp_stream_addr addr; tle_tcp_stream_get_addr(fs[j]->s, &addr); netfe_stream_dump(fs[j], &addr.local, &addr.remote); } - s[j] = fs[j]->s; - rv[j] = fs[j]->rxev; - tv[j] = fs[j]->txev; - ev[j] = fs[j]->erev; + + /* check do we still have something to send/recv */ + if (fs[j]->posterr == 0 && + (tle_event_state(fs[j]->rxev) == TLE_SEV_UP || + tle_event_state(fs[j]->txev) == TLE_SEV_UP)) { + fs[j]->posterr++; + } else { + s[k] = fs[j]->s; + rv[k] = fs[j]->rxev; + tv[k] = fs[j]->txev; + ev[k] = fs[j]->erev; + fs[k] = fs[j]; + k++; + } } - tle_evq_idle(fe->rxeq, rv, n); - tle_evq_idle(fe->txeq, tv, n); - tle_evq_idle(fe->ereq, ev, n); + if (k == 0) + return; - tle_tcp_stream_close_bulk(s, n); + tle_evq_idle(fe->rxeq, rv, k); + tle_evq_idle(fe->txeq, tv, k); + tle_evq_idle(fe->ereq, ev, k); - for (j = 0; j != n; j++) { + tle_tcp_stream_close_bulk(s, k); + + for (j = 0; j != k; j++) { - /* - * if forwarding mode, send unsent packets and - * signal peer stream to terminate too. - */ + /* if forwarding mode, signal peer stream to terminate too. */ fwds = fs[j]->fwds; if (fwds != NULL && fwds->s != NULL) { - /* forward all unsent packets */ - netfe_fwd_tcp(rte_lcore_id(), fs[j]); - fwds->fwds = NULL; tle_event_raise(fwds->erev); fs[j]->fwds = NULL; @@ -491,7 +487,7 @@ netfe_lcore_tcp_rst(void) } } -static inline void +static inline int netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes) { uint32_t i, k, n; @@ -504,7 +500,7 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes) if (n == 0) { tle_event_idle(fes->txev); fes->stat.txev[TLE_SEV_IDLE]++; - return; + return 0; } @@ -512,13 +508,13 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes) NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) returns %u\n", __func__, lcore, proto_name[fes->proto], - fes->s, n, k); + fes->s, n, k); fes->stat.txp += k; fes->stat.drops += n - k; /* not able to send anything. */ if (k == 0) - return; + return 0; if (n == RTE_DIM(fes->pbuf.pkt)) { /* mark stream as readable */ @@ -530,19 +526,22 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes) fes->pbuf.num = n - k; for (i = 0; i != n - k; i++) pkt[i] = pkt[i + k]; + + return k; } -static inline void +static inline int netfe_tx_process_tcp(uint32_t lcore, struct netfe_stream *fes) { uint32_t i, k, n; /* refill with new mbufs. */ - pkt_buf_fill(lcore, &fes->pbuf, fes->txlen); + if (fes->posterr == 0) + pkt_buf_fill(lcore, &fes->pbuf, fes->txlen); n = fes->pbuf.num; if (n == 0) - return; + return 0; /** * TODO: cannot use function pointers for unequal param num. @@ -555,19 +554,22 @@ netfe_tx_process_tcp(uint32_t lcore, struct netfe_stream *fes) fes->stat.drops += n - k; if (k == 0) - return; + return 0; /* adjust pbuf array. */ fes->pbuf.num = n - k; for (i = k; i != n; i++) fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i]; + + return k; } static inline void netfe_lcore_tcp(void) { - struct netfe_lcore *fe; + int32_t rc; uint32_t j, n, lcore; + struct netfe_lcore *fe; struct netfe_stream *fs[MAX_PKT_BURST]; fe = RTE_PER_LCORE(_fe); @@ -580,25 +582,42 @@ netfe_lcore_tcp(void) n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs, RTE_DIM(fs)); if (n != 0) { + NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) returns %u\n", __func__, lcore, fe->rxeq, n); - for (j = 0; j != n; j++) - netfe_rx_process(lcore, fs[j]); + + for (j = 0; j != n; j++) { + + rc = netfe_rx_process(lcore, fs[j]); + + /* we are ok to close the stream */ + if (rc == 0 && fs[j]->posterr != 0) + tle_event_raise(fs[j]->erev); + } } /* look for tx events */ n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs, RTE_DIM(fs)); if (n != 0) { + NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n", __func__, lcore, fe->txeq, n); + for (j = 0; j != n; j++) { + + rc = 0; + if (fs[j]->op == RXTX) - netfe_rxtx_process_tcp(lcore, fs[j]); + rc = netfe_rxtx_process_tcp(lcore, fs[j]); else if (fs[j]->op == FWD) - netfe_fwd_tcp(lcore, fs[j]); + rc = netfe_fwd_tcp(lcore, fs[j]); else if (fs[j]->op == TXONLY) - netfe_tx_process_tcp(lcore, fs[j]); + rc = netfe_tx_process_tcp(lcore, fs[j]); + + /* we are ok to close the stream */ + if (rc == 0 && fs[j]->posterr != 0) + tle_event_raise(fs[j]->erev); } } } -- cgit 1.2.3-korg