aboutsummaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorKonstantin Ananyev <konstantin.ananyev@intel.com>2017-03-03 18:40:23 +0000
committerKonstantin Ananyev <konstantin.ananyev@intel.com>2017-03-06 15:06:38 +0000
commit21e7392fca2c430018cf387bb3e368ea4c665446 (patch)
tree82109e1b1af7aa3303d1f9b04c1d063aa3b14873 /examples
parent9cbfd751b210f03fdb2fcbf8cafe59b9db516295 (diff)
Rewrite accept() code-path and make l4fwd not to close() on FIN immediatelly.
Changes in public API: - removes tle_tcp_stream_synreqs() and tle_tcp_reject() - adds tle_tcp_stream_update_cfg Allocates and fills new stream when final ACK for 3-way handshake is received. Changes in l4fwd sample application: prevents l4fwd to call close() on error event immediately: first try to recv/send remaining data. Change-Id: I8c5b9d365353084083731a4ce582197a8268688f Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Diffstat (limited to 'examples')
-rw-r--r--examples/l4fwd/common.h10
-rw-r--r--examples/l4fwd/netbe.h1
-rw-r--r--examples/l4fwd/tcp.h183
3 files changed, 108 insertions, 86 deletions
diff --git a/examples/l4fwd/common.h b/examples/l4fwd/common.h
index ff8ee7a..8d757f3 100644
--- a/examples/l4fwd/common.h
+++ b/examples/l4fwd/common.h
@@ -619,7 +619,7 @@ netbe_lcore(void)
}
}
-static inline void
+static inline int
netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
{
uint32_t k, n;
@@ -631,12 +631,12 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
if (k == 0) {
tle_event_idle(fes->rxev);
fes->stat.rxev[TLE_SEV_IDLE]++;
- return;
+ return 0;
}
n = tle_stream_recv(fes->s, fes->pbuf.pkt + n, k);
if (n == 0)
- return;
+ return 0;
NETFE_TRACE("%s(%u): tle_%s_stream_recv(%p, %u) returns %u\n",
__func__, lcore, proto_name[fes->proto], fes->s, k, n);
@@ -648,7 +648,7 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
if (fes->op == RXONLY)
fes->stat.rxb += pkt_buf_empty(&fes->pbuf);
/* mark stream as writable */
- else if (k == RTE_DIM(fes->pbuf.pkt)) {
+ else if (k == RTE_DIM(fes->pbuf.pkt)) {
if (fes->op == RXTX) {
tle_event_active(fes->txev, TLE_SEV_UP);
fes->stat.txev[TLE_SEV_UP]++;
@@ -657,6 +657,8 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
fes->stat.txev[TLE_SEV_UP]++;
}
}
+
+ return n;
}
#endif /* COMMON_H_ */
diff --git a/examples/l4fwd/netbe.h b/examples/l4fwd/netbe.h
index 6d25603..80d1c28 100644
--- a/examples/l4fwd/netbe.h
+++ b/examples/l4fwd/netbe.h
@@ -195,6 +195,7 @@ struct netfe_stream {
uint16_t proto;
uint16_t family;
uint16_t txlen;
+ uint16_t posterr; /* # of time error event handling was postponed */
struct {
uint64_t rxp;
uint64_t rxb;
diff --git a/examples/l4fwd/tcp.h b/examples/l4fwd/tcp.h
index 031ad8d..f6ca3a5 100644
--- a/examples/l4fwd/tcp.h
+++ b/examples/l4fwd/tcp.h
@@ -23,7 +23,9 @@ netfe_stream_term_tcp(struct netfe_lcore *fe, struct netfe_stream *fes)
{
fes->s = NULL;
fes->fwds = NULL;
+ fes->posterr = 0;
memset(&fes->stat, 0, sizeof(fes->stat));
+ pkt_buf_empty(&fes->pbuf);
netfe_put_stream(fe, &fe->free, fes);
}
@@ -251,7 +253,7 @@ netfe_create_fwd_stream(struct netfe_lcore *fe, struct netfe_stream *fes,
return fws;
}
-static inline void
+static inline int
netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes)
{
uint32_t i, k, n;
@@ -264,7 +266,7 @@ netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes)
pkt = fes->pbuf.pkt;
if (n == 0)
- return;
+ return 0;
fed = fes->fwds;
@@ -307,88 +309,73 @@ netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes)
tle_event_active(fes->rxev, TLE_SEV_UP);
fes->stat.rxev[TLE_SEV_UP]++;
}
+
+ return (fed == NULL) ? 0 : k;
}
static inline void
-netfe_new_conn_tcp(struct netfe_lcore *fe, __rte_unused uint32_t lcore,
+netfe_new_conn_tcp(struct netfe_lcore *fe, uint32_t lcore,
struct netfe_stream *fes)
{
- uint32_t i, k, n, rc;
- struct tle_tcp_stream_cfg *prm;
- struct tle_tcp_accept_param acpt_prm[MAX_PKT_BURST];
- struct tle_stream *rs[MAX_PKT_BURST];
- struct tle_syn_req syn_reqs[MAX_PKT_BURST];
+ uint32_t i, k, n;
struct netfe_stream *ts;
+ struct tle_stream *rs[MAX_PKT_BURST];
struct netfe_stream *fs[MAX_PKT_BURST];
-
- static const struct tle_stream_cb zcb = {.func = NULL, .data = NULL};
+ struct tle_tcp_stream_cfg prm[MAX_PKT_BURST];
/* check if any syn requests are waiting */
- n = tle_tcp_stream_synreqs(fes->s, syn_reqs, RTE_DIM(syn_reqs));
+ n = tle_tcp_stream_accept(fes->s, rs, RTE_DIM(rs));
if (n == 0)
return;
- NETFE_TRACE("%s(%u): tle_tcp_stream_synreqs(%p, %u) returns %u\n",
+ NETFE_TRACE("%s(%u): tle_tcp_stream_accept(%p, %u) returns %u\n",
__func__, lcore, fes->s, MAX_PKT_BURST, n);
/* get n free streams */
k = netfe_get_streams(&fe->free, fs, n);
+ if (n != k)
+ RTE_LOG(ERR, USER1,
+ "%s(lc=%u): not enough FE resources to handle %u new "
+ "TCP streams;\n",
+ __func__, lcore, n - k);
/* fill accept params to accept k connection requests*/
for (i = 0; i != k; i++) {
- acpt_prm[i].syn = syn_reqs[i];
- prm = &acpt_prm[i].cfg;
- prm->nb_retries = 0;
- prm->recv_ev = fs[i]->rxev;
- prm->send_ev = fs[i]->txev;
- prm->err_ev = fs[i]->erev;
- tle_event_active(fs[i]->erev, TLE_SEV_DOWN);
- prm->err_cb = zcb;
- prm->recv_cb = zcb;
- prm->send_cb = zcb;
- }
-
- /* accept k new connections */
- rc = tle_tcp_stream_accept(fes->s, acpt_prm, rs, k);
-
- NETFE_TRACE("%s(%u): tle_tcp_stream_accept(%p, %u) returns %u\n",
- __func__, lcore, fes->s, k, rc);
-
- if (rc != n) {
- /* n - rc connections could not be accepted */
- tle_tcp_reject(fes->s, syn_reqs + rc, n - rc);
-
- /* put back k - rc streams free list */
- netfe_put_streams(fe, &fe->free, fs + rc, k - rc);
- }
-
- /* update the params for accepted streams */
- for (i = 0; i != rc; i++) {
ts = fs[i];
-
ts->s = rs[i];
ts->op = fes->op;
ts->proto = fes->proto;
ts->family = fes->family;
ts->txlen = fes->txlen;
- if (fes->op == TXONLY) {
+ tle_event_active(ts->erev, TLE_SEV_DOWN);
+ if (fes->op == TXONLY || fes->op == FWD) {
tle_event_active(ts->txev, TLE_SEV_UP);
ts->stat.txev[TLE_SEV_UP]++;
- } else {
+ }
+ if (fes->op != TXONLY) {
tle_event_active(ts->rxev, TLE_SEV_DOWN);
ts->stat.rxev[TLE_SEV_DOWN]++;
}
netfe_put_stream(fe, &fe->use, ts);
- NETFE_TRACE("%s(%u) accept (stream=%p, s=%p)\n",
- __func__, lcore, ts, rs[i]);
- /* create a new fwd stream if needed */
- if (fes->op == FWD) {
- tle_event_active(ts->txev, TLE_SEV_DOWN);
- ts->stat.txev[TLE_SEV_DOWN]++;
+ memset(&prm[i], 0, sizeof(prm[i]));
+ prm[i].recv_ev = ts->rxev;
+ prm[i].send_ev = ts->txev;
+ prm[i].err_ev = ts->erev;
+ }
+
+ tle_tcp_stream_update_cfg(rs, prm, k);
+
+ tle_tcp_stream_close_bulk(rs + k, n - k);
+
+ /* for the forwarding mode, open the second one */
+ if (fes->op == FWD) {
+ for (i = 0; i != k; i++) {
+
+ ts = fs[i];
ts->fwds = netfe_create_fwd_stream(fe, fes, lcore,
fes->fwdprm.bidx);
@@ -396,8 +383,9 @@ netfe_new_conn_tcp(struct netfe_lcore *fe, __rte_unused uint32_t lcore,
ts->fwds->fwds = ts;
}
}
- fe->tcp_stat.acc += rc;
- fe->tcp_stat.rej += n - rc;
+
+ fe->tcp_stat.acc += k;
+ fe->tcp_stat.rej += n - k;
}
static inline void
@@ -430,7 +418,7 @@ netfe_lcore_tcp_rst(void)
{
struct netfe_lcore *fe;
struct netfe_stream *fwds;
- uint32_t j, n;
+ uint32_t j, k, n;
struct tle_stream *s[MAX_PKT_BURST];
struct netfe_stream *fs[MAX_PKT_BURST];
struct tle_event *rv[MAX_PKT_BURST];
@@ -449,36 +437,44 @@ netfe_lcore_tcp_rst(void)
NETFE_TRACE("%s(%u): tle_evq_get(errevq=%p) returns %u\n",
__func__, rte_lcore_id(), fe->ereq, n);
+ k = 0;
for (j = 0; j != n; j++) {
if (verbose > VERBOSE_NONE) {
struct tle_tcp_stream_addr addr;
tle_tcp_stream_get_addr(fs[j]->s, &addr);
netfe_stream_dump(fs[j], &addr.local, &addr.remote);
}
- s[j] = fs[j]->s;
- rv[j] = fs[j]->rxev;
- tv[j] = fs[j]->txev;
- ev[j] = fs[j]->erev;
+
+ /* check do we still have something to send/recv */
+ if (fs[j]->posterr == 0 &&
+ (tle_event_state(fs[j]->rxev) == TLE_SEV_UP ||
+ tle_event_state(fs[j]->txev) == TLE_SEV_UP)) {
+ fs[j]->posterr++;
+ } else {
+ s[k] = fs[j]->s;
+ rv[k] = fs[j]->rxev;
+ tv[k] = fs[j]->txev;
+ ev[k] = fs[j]->erev;
+ fs[k] = fs[j];
+ k++;
+ }
}
- tle_evq_idle(fe->rxeq, rv, n);
- tle_evq_idle(fe->txeq, tv, n);
- tle_evq_idle(fe->ereq, ev, n);
+ if (k == 0)
+ return;
- tle_tcp_stream_close_bulk(s, n);
+ tle_evq_idle(fe->rxeq, rv, k);
+ tle_evq_idle(fe->txeq, tv, k);
+ tle_evq_idle(fe->ereq, ev, k);
- for (j = 0; j != n; j++) {
+ tle_tcp_stream_close_bulk(s, k);
+
+ for (j = 0; j != k; j++) {
- /*
- * if forwarding mode, send unsent packets and
- * signal peer stream to terminate too.
- */
+ /* if forwarding mode, signal peer stream to terminate too. */
fwds = fs[j]->fwds;
if (fwds != NULL && fwds->s != NULL) {
- /* forward all unsent packets */
- netfe_fwd_tcp(rte_lcore_id(), fs[j]);
-
fwds->fwds = NULL;
tle_event_raise(fwds->erev);
fs[j]->fwds = NULL;
@@ -491,7 +487,7 @@ netfe_lcore_tcp_rst(void)
}
}
-static inline void
+static inline int
netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
{
uint32_t i, k, n;
@@ -504,7 +500,7 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
if (n == 0) {
tle_event_idle(fes->txev);
fes->stat.txev[TLE_SEV_IDLE]++;
- return;
+ return 0;
}
@@ -512,13 +508,13 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) returns %u\n",
__func__, lcore, proto_name[fes->proto],
- fes->s, n, k);
+ fes->s, n, k);
fes->stat.txp += k;
fes->stat.drops += n - k;
/* not able to send anything. */
if (k == 0)
- return;
+ return 0;
if (n == RTE_DIM(fes->pbuf.pkt)) {
/* mark stream as readable */
@@ -530,19 +526,22 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
fes->pbuf.num = n - k;
for (i = 0; i != n - k; i++)
pkt[i] = pkt[i + k];
+
+ return k;
}
-static inline void
+static inline int
netfe_tx_process_tcp(uint32_t lcore, struct netfe_stream *fes)
{
uint32_t i, k, n;
/* refill with new mbufs. */
- pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
+ if (fes->posterr == 0)
+ pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
n = fes->pbuf.num;
if (n == 0)
- return;
+ return 0;
/**
* TODO: cannot use function pointers for unequal param num.
@@ -555,19 +554,22 @@ netfe_tx_process_tcp(uint32_t lcore, struct netfe_stream *fes)
fes->stat.drops += n - k;
if (k == 0)
- return;
+ return 0;
/* adjust pbuf array. */
fes->pbuf.num = n - k;
for (i = k; i != n; i++)
fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
+
+ return k;
}
static inline void
netfe_lcore_tcp(void)
{
- struct netfe_lcore *fe;
+ int32_t rc;
uint32_t j, n, lcore;
+ struct netfe_lcore *fe;
struct netfe_stream *fs[MAX_PKT_BURST];
fe = RTE_PER_LCORE(_fe);
@@ -580,25 +582,42 @@ netfe_lcore_tcp(void)
n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
if (n != 0) {
+
NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) returns %u\n",
__func__, lcore, fe->rxeq, n);
- for (j = 0; j != n; j++)
- netfe_rx_process(lcore, fs[j]);
+
+ for (j = 0; j != n; j++) {
+
+ rc = netfe_rx_process(lcore, fs[j]);
+
+ /* we are ok to close the stream */
+ if (rc == 0 && fs[j]->posterr != 0)
+ tle_event_raise(fs[j]->erev);
+ }
}
/* look for tx events */
n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
if (n != 0) {
+
NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n",
__func__, lcore, fe->txeq, n);
+
for (j = 0; j != n; j++) {
+
+ rc = 0;
+
if (fs[j]->op == RXTX)
- netfe_rxtx_process_tcp(lcore, fs[j]);
+ rc = netfe_rxtx_process_tcp(lcore, fs[j]);
else if (fs[j]->op == FWD)
- netfe_fwd_tcp(lcore, fs[j]);
+ rc = netfe_fwd_tcp(lcore, fs[j]);
else if (fs[j]->op == TXONLY)
- netfe_tx_process_tcp(lcore, fs[j]);
+ rc = netfe_tx_process_tcp(lcore, fs[j]);
+
+ /* we are ok to close the stream */
+ if (rc == 0 && fs[j]->posterr != 0)
+ tle_event_raise(fs[j]->erev);
}
}
}