aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKonstantin Ananyev <konstantin.ananyev@intel.com>2017-03-03 18:40:23 +0000
committerKonstantin Ananyev <konstantin.ananyev@intel.com>2017-03-06 15:06:38 +0000
commit21e7392fca2c430018cf387bb3e368ea4c665446 (patch)
tree82109e1b1af7aa3303d1f9b04c1d063aa3b14873
parent9cbfd751b210f03fdb2fcbf8cafe59b9db516295 (diff)
Rewrite accept() code-path and make l4fwd not to close() on FIN immediatelly.
Changes in public API: - removes tle_tcp_stream_synreqs() and tle_tcp_reject() - adds tle_tcp_stream_update_cfg Allocates and fills new stream when final ACK for 3-way handshake is received. Changes in l4fwd sample application: prevents l4fwd to call close() on error event immediately: first try to recv/send remaining data. Change-Id: I8c5b9d365353084083731a4ce582197a8268688f Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
-rw-r--r--examples/l4fwd/common.h10
-rw-r--r--examples/l4fwd/netbe.h1
-rw-r--r--examples/l4fwd/tcp.h183
-rw-r--r--lib/libtle_l4p/stream_table.h4
-rw-r--r--lib/libtle_l4p/syncookie.h9
-rw-r--r--lib/libtle_l4p/tcp_ctl.h7
-rw-r--r--lib/libtle_l4p/tcp_rxtx.c500
-rw-r--r--lib/libtle_l4p/tcp_stream.c85
-rw-r--r--lib/libtle_l4p/tle_event.h5
-rw-r--r--lib/libtle_l4p/tle_tcp.h104
10 files changed, 469 insertions, 439 deletions
diff --git a/examples/l4fwd/common.h b/examples/l4fwd/common.h
index ff8ee7a..8d757f3 100644
--- a/examples/l4fwd/common.h
+++ b/examples/l4fwd/common.h
@@ -619,7 +619,7 @@ netbe_lcore(void)
}
}
-static inline void
+static inline int
netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
{
uint32_t k, n;
@@ -631,12 +631,12 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
if (k == 0) {
tle_event_idle(fes->rxev);
fes->stat.rxev[TLE_SEV_IDLE]++;
- return;
+ return 0;
}
n = tle_stream_recv(fes->s, fes->pbuf.pkt + n, k);
if (n == 0)
- return;
+ return 0;
NETFE_TRACE("%s(%u): tle_%s_stream_recv(%p, %u) returns %u\n",
__func__, lcore, proto_name[fes->proto], fes->s, k, n);
@@ -648,7 +648,7 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
if (fes->op == RXONLY)
fes->stat.rxb += pkt_buf_empty(&fes->pbuf);
/* mark stream as writable */
- else if (k == RTE_DIM(fes->pbuf.pkt)) {
+ else if (k == RTE_DIM(fes->pbuf.pkt)) {
if (fes->op == RXTX) {
tle_event_active(fes->txev, TLE_SEV_UP);
fes->stat.txev[TLE_SEV_UP]++;
@@ -657,6 +657,8 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
fes->stat.txev[TLE_SEV_UP]++;
}
}
+
+ return n;
}
#endif /* COMMON_H_ */
diff --git a/examples/l4fwd/netbe.h b/examples/l4fwd/netbe.h
index 6d25603..80d1c28 100644
--- a/examples/l4fwd/netbe.h
+++ b/examples/l4fwd/netbe.h
@@ -195,6 +195,7 @@ struct netfe_stream {
uint16_t proto;
uint16_t family;
uint16_t txlen;
+ uint16_t posterr; /* # of time error event handling was postponed */
struct {
uint64_t rxp;
uint64_t rxb;
diff --git a/examples/l4fwd/tcp.h b/examples/l4fwd/tcp.h
index 031ad8d..f6ca3a5 100644
--- a/examples/l4fwd/tcp.h
+++ b/examples/l4fwd/tcp.h
@@ -23,7 +23,9 @@ netfe_stream_term_tcp(struct netfe_lcore *fe, struct netfe_stream *fes)
{
fes->s = NULL;
fes->fwds = NULL;
+ fes->posterr = 0;
memset(&fes->stat, 0, sizeof(fes->stat));
+ pkt_buf_empty(&fes->pbuf);
netfe_put_stream(fe, &fe->free, fes);
}
@@ -251,7 +253,7 @@ netfe_create_fwd_stream(struct netfe_lcore *fe, struct netfe_stream *fes,
return fws;
}
-static inline void
+static inline int
netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes)
{
uint32_t i, k, n;
@@ -264,7 +266,7 @@ netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes)
pkt = fes->pbuf.pkt;
if (n == 0)
- return;
+ return 0;
fed = fes->fwds;
@@ -307,88 +309,73 @@ netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes)
tle_event_active(fes->rxev, TLE_SEV_UP);
fes->stat.rxev[TLE_SEV_UP]++;
}
+
+ return (fed == NULL) ? 0 : k;
}
static inline void
-netfe_new_conn_tcp(struct netfe_lcore *fe, __rte_unused uint32_t lcore,
+netfe_new_conn_tcp(struct netfe_lcore *fe, uint32_t lcore,
struct netfe_stream *fes)
{
- uint32_t i, k, n, rc;
- struct tle_tcp_stream_cfg *prm;
- struct tle_tcp_accept_param acpt_prm[MAX_PKT_BURST];
- struct tle_stream *rs[MAX_PKT_BURST];
- struct tle_syn_req syn_reqs[MAX_PKT_BURST];
+ uint32_t i, k, n;
struct netfe_stream *ts;
+ struct tle_stream *rs[MAX_PKT_BURST];
struct netfe_stream *fs[MAX_PKT_BURST];
-
- static const struct tle_stream_cb zcb = {.func = NULL, .data = NULL};
+ struct tle_tcp_stream_cfg prm[MAX_PKT_BURST];
/* check if any syn requests are waiting */
- n = tle_tcp_stream_synreqs(fes->s, syn_reqs, RTE_DIM(syn_reqs));
+ n = tle_tcp_stream_accept(fes->s, rs, RTE_DIM(rs));
if (n == 0)
return;
- NETFE_TRACE("%s(%u): tle_tcp_stream_synreqs(%p, %u) returns %u\n",
+ NETFE_TRACE("%s(%u): tle_tcp_stream_accept(%p, %u) returns %u\n",
__func__, lcore, fes->s, MAX_PKT_BURST, n);
/* get n free streams */
k = netfe_get_streams(&fe->free, fs, n);
+ if (n != k)
+ RTE_LOG(ERR, USER1,
+ "%s(lc=%u): not enough FE resources to handle %u new "
+ "TCP streams;\n",
+ __func__, lcore, n - k);
/* fill accept params to accept k connection requests*/
for (i = 0; i != k; i++) {
- acpt_prm[i].syn = syn_reqs[i];
- prm = &acpt_prm[i].cfg;
- prm->nb_retries = 0;
- prm->recv_ev = fs[i]->rxev;
- prm->send_ev = fs[i]->txev;
- prm->err_ev = fs[i]->erev;
- tle_event_active(fs[i]->erev, TLE_SEV_DOWN);
- prm->err_cb = zcb;
- prm->recv_cb = zcb;
- prm->send_cb = zcb;
- }
-
- /* accept k new connections */
- rc = tle_tcp_stream_accept(fes->s, acpt_prm, rs, k);
-
- NETFE_TRACE("%s(%u): tle_tcp_stream_accept(%p, %u) returns %u\n",
- __func__, lcore, fes->s, k, rc);
-
- if (rc != n) {
- /* n - rc connections could not be accepted */
- tle_tcp_reject(fes->s, syn_reqs + rc, n - rc);
-
- /* put back k - rc streams free list */
- netfe_put_streams(fe, &fe->free, fs + rc, k - rc);
- }
-
- /* update the params for accepted streams */
- for (i = 0; i != rc; i++) {
ts = fs[i];
-
ts->s = rs[i];
ts->op = fes->op;
ts->proto = fes->proto;
ts->family = fes->family;
ts->txlen = fes->txlen;
- if (fes->op == TXONLY) {
+ tle_event_active(ts->erev, TLE_SEV_DOWN);
+ if (fes->op == TXONLY || fes->op == FWD) {
tle_event_active(ts->txev, TLE_SEV_UP);
ts->stat.txev[TLE_SEV_UP]++;
- } else {
+ }
+ if (fes->op != TXONLY) {
tle_event_active(ts->rxev, TLE_SEV_DOWN);
ts->stat.rxev[TLE_SEV_DOWN]++;
}
netfe_put_stream(fe, &fe->use, ts);
- NETFE_TRACE("%s(%u) accept (stream=%p, s=%p)\n",
- __func__, lcore, ts, rs[i]);
- /* create a new fwd stream if needed */
- if (fes->op == FWD) {
- tle_event_active(ts->txev, TLE_SEV_DOWN);
- ts->stat.txev[TLE_SEV_DOWN]++;
+ memset(&prm[i], 0, sizeof(prm[i]));
+ prm[i].recv_ev = ts->rxev;
+ prm[i].send_ev = ts->txev;
+ prm[i].err_ev = ts->erev;
+ }
+
+ tle_tcp_stream_update_cfg(rs, prm, k);
+
+ tle_tcp_stream_close_bulk(rs + k, n - k);
+
+ /* for the forwarding mode, open the second one */
+ if (fes->op == FWD) {
+ for (i = 0; i != k; i++) {
+
+ ts = fs[i];
ts->fwds = netfe_create_fwd_stream(fe, fes, lcore,
fes->fwdprm.bidx);
@@ -396,8 +383,9 @@ netfe_new_conn_tcp(struct netfe_lcore *fe, __rte_unused uint32_t lcore,
ts->fwds->fwds = ts;
}
}
- fe->tcp_stat.acc += rc;
- fe->tcp_stat.rej += n - rc;
+
+ fe->tcp_stat.acc += k;
+ fe->tcp_stat.rej += n - k;
}
static inline void
@@ -430,7 +418,7 @@ netfe_lcore_tcp_rst(void)
{
struct netfe_lcore *fe;
struct netfe_stream *fwds;
- uint32_t j, n;
+ uint32_t j, k, n;
struct tle_stream *s[MAX_PKT_BURST];
struct netfe_stream *fs[MAX_PKT_BURST];
struct tle_event *rv[MAX_PKT_BURST];
@@ -449,36 +437,44 @@ netfe_lcore_tcp_rst(void)
NETFE_TRACE("%s(%u): tle_evq_get(errevq=%p) returns %u\n",
__func__, rte_lcore_id(), fe->ereq, n);
+ k = 0;
for (j = 0; j != n; j++) {
if (verbose > VERBOSE_NONE) {
struct tle_tcp_stream_addr addr;
tle_tcp_stream_get_addr(fs[j]->s, &addr);
netfe_stream_dump(fs[j], &addr.local, &addr.remote);
}
- s[j] = fs[j]->s;
- rv[j] = fs[j]->rxev;
- tv[j] = fs[j]->txev;
- ev[j] = fs[j]->erev;
+
+ /* check do we still have something to send/recv */
+ if (fs[j]->posterr == 0 &&
+ (tle_event_state(fs[j]->rxev) == TLE_SEV_UP ||
+ tle_event_state(fs[j]->txev) == TLE_SEV_UP)) {
+ fs[j]->posterr++;
+ } else {
+ s[k] = fs[j]->s;
+ rv[k] = fs[j]->rxev;
+ tv[k] = fs[j]->txev;
+ ev[k] = fs[j]->erev;
+ fs[k] = fs[j];
+ k++;
+ }
}
- tle_evq_idle(fe->rxeq, rv, n);
- tle_evq_idle(fe->txeq, tv, n);
- tle_evq_idle(fe->ereq, ev, n);
+ if (k == 0)
+ return;
- tle_tcp_stream_close_bulk(s, n);
+ tle_evq_idle(fe->rxeq, rv, k);
+ tle_evq_idle(fe->txeq, tv, k);
+ tle_evq_idle(fe->ereq, ev, k);
- for (j = 0; j != n; j++) {
+ tle_tcp_stream_close_bulk(s, k);
+
+ for (j = 0; j != k; j++) {
- /*
- * if forwarding mode, send unsent packets and
- * signal peer stream to terminate too.
- */
+ /* if forwarding mode, signal peer stream to terminate too. */
fwds = fs[j]->fwds;
if (fwds != NULL && fwds->s != NULL) {
- /* forward all unsent packets */
- netfe_fwd_tcp(rte_lcore_id(), fs[j]);
-
fwds->fwds = NULL;
tle_event_raise(fwds->erev);
fs[j]->fwds = NULL;
@@ -491,7 +487,7 @@ netfe_lcore_tcp_rst(void)
}
}
-static inline void
+static inline int
netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
{
uint32_t i, k, n;
@@ -504,7 +500,7 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
if (n == 0) {
tle_event_idle(fes->txev);
fes->stat.txev[TLE_SEV_IDLE]++;
- return;
+ return 0;
}
@@ -512,13 +508,13 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) returns %u\n",
__func__, lcore, proto_name[fes->proto],
- fes->s, n, k);
+ fes->s, n, k);
fes->stat.txp += k;
fes->stat.drops += n - k;
/* not able to send anything. */
if (k == 0)
- return;
+ return 0;
if (n == RTE_DIM(fes->pbuf.pkt)) {
/* mark stream as readable */
@@ -530,19 +526,22 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
fes->pbuf.num = n - k;
for (i = 0; i != n - k; i++)
pkt[i] = pkt[i + k];
+
+ return k;
}
-static inline void
+static inline int
netfe_tx_process_tcp(uint32_t lcore, struct netfe_stream *fes)
{
uint32_t i, k, n;
/* refill with new mbufs. */
- pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
+ if (fes->posterr == 0)
+ pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
n = fes->pbuf.num;
if (n == 0)
- return;
+ return 0;
/**
* TODO: cannot use function pointers for unequal param num.
@@ -555,19 +554,22 @@ netfe_tx_process_tcp(uint32_t lcore, struct netfe_stream *fes)
fes->stat.drops += n - k;
if (k == 0)
- return;
+ return 0;
/* adjust pbuf array. */
fes->pbuf.num = n - k;
for (i = k; i != n; i++)
fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
+
+ return k;
}
static inline void
netfe_lcore_tcp(void)
{
- struct netfe_lcore *fe;
+ int32_t rc;
uint32_t j, n, lcore;
+ struct netfe_lcore *fe;
struct netfe_stream *fs[MAX_PKT_BURST];
fe = RTE_PER_LCORE(_fe);
@@ -580,25 +582,42 @@ netfe_lcore_tcp(void)
n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
if (n != 0) {
+
NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) returns %u\n",
__func__, lcore, fe->rxeq, n);
- for (j = 0; j != n; j++)
- netfe_rx_process(lcore, fs[j]);
+
+ for (j = 0; j != n; j++) {
+
+ rc = netfe_rx_process(lcore, fs[j]);
+
+ /* we are ok to close the stream */
+ if (rc == 0 && fs[j]->posterr != 0)
+ tle_event_raise(fs[j]->erev);
+ }
}
/* look for tx events */
n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
if (n != 0) {
+
NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n",
__func__, lcore, fe->txeq, n);
+
for (j = 0; j != n; j++) {
+
+ rc = 0;
+
if (fs[j]->op == RXTX)
- netfe_rxtx_process_tcp(lcore, fs[j]);
+ rc = netfe_rxtx_process_tcp(lcore, fs[j]);
else if (fs[j]->op == FWD)
- netfe_fwd_tcp(lcore, fs[j]);
+ rc = netfe_fwd_tcp(lcore, fs[j]);
else if (fs[j]->op == TXONLY)
- netfe_tx_process_tcp(lcore, fs[j]);
+ rc = netfe_tx_process_tcp(lcore, fs[j]);
+
+ /* we are ok to close the stream */
+ if (rc == 0 && fs[j]->posterr != 0)
+ tle_event_raise(fs[j]->erev);
}
}
}
diff --git a/lib/libtle_l4p/stream_table.h b/lib/libtle_l4p/stream_table.h
index 8ad1103..29f1f63 100644
--- a/lib/libtle_l4p/stream_table.h
+++ b/lib/libtle_l4p/stream_table.h
@@ -110,13 +110,13 @@ stbl_add_entry(struct stbl *st, const union pkt_info *pi)
}
static inline struct stbl_entry *
-stbl_add_pkt(struct stbl *st, const union pkt_info *pi, const void *pkt)
+stbl_add_stream(struct stbl *st, const union pkt_info *pi, const void *s)
{
struct stbl_entry *se;
se = stbl_add_entry(st, pi);
if (se != NULL)
- se->data = (void *)((uintptr_t)pkt | STE_PKT);
+ se->data = (void *)(uintptr_t)s;
return se;
}
diff --git a/lib/libtle_l4p/syncookie.h b/lib/libtle_l4p/syncookie.h
index 276d45a..ad70b7d 100644
--- a/lib/libtle_l4p/syncookie.h
+++ b/lib/libtle_l4p/syncookie.h
@@ -156,13 +156,8 @@ sync_get_opts(struct syn_opts *so, uintptr_t p, uint32_t len)
static inline void
sync_fill_tcb(struct tcb *tcb, const union seg_info *si,
- const struct rte_mbuf *mb)
+ const struct syn_opts *so)
{
- const struct tcp_hdr *th;
-
- th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
- mb->l2_len + mb->l3_len);
-
tcb->rcv.nxt = si->seq;
tcb->rcv.irs = si->seq - 1;
@@ -174,7 +169,7 @@ sync_fill_tcb(struct tcb *tcb, const union seg_info *si,
tcb->snd.wu.wl1 = si->seq;
tcb->snd.wu.wl2 = si->ack;
- get_syn_opts(&tcb->so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
+ tcb->so = *so;
tcb->snd.wscale = tcb->so.wscale;
tcb->snd.mss = tcb->so.mss;
diff --git a/lib/libtle_l4p/tcp_ctl.h b/lib/libtle_l4p/tcp_ctl.h
index dcb9c3e..95c2bbc 100644
--- a/lib/libtle_l4p/tcp_ctl.h
+++ b/lib/libtle_l4p/tcp_ctl.h
@@ -41,6 +41,13 @@ tcp_stream_up(struct tle_tcp_stream *s)
rwl_up(&s->tx.use);
}
+/* calculate RCV.WND value based on size of stream receive buffer */
+static inline uint32_t
+calc_rx_wnd(const struct tle_tcp_stream *s, uint32_t scale)
+{
+ return s->rx.q->prod.mask << scale;
+}
+
/* empty stream's receive queue */
static void
empty_rq(struct tle_tcp_stream *s)
diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c
index 4e43730..6085814 100644
--- a/lib/libtle_l4p/tcp_rxtx.c
+++ b/lib/libtle_l4p/tcp_rxtx.c
@@ -173,7 +173,7 @@ fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
l4h->dst_port = port.src;
wnd = (flags & TCP_FLAG_SYN) ?
- RTE_MAX(TCP4_MIN_MSS, tcb->so.mss) :
+ RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
tcb->rcv.wnd >> tcb->rcv.wscale;
/* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
@@ -760,68 +760,24 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
}
static inline int
-restore_syn_pkt(const union pkt_info *pi, const union seg_info *si,
- uint32_t ts, struct rte_mbuf *mb)
+restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
+ const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb)
{
int32_t rc;
uint32_t len;
- struct tcp_hdr *th;
- struct syn_opts so;
+ const struct tcp_hdr *th;
/* check that ACK, etc fields are what we expected. */
rc = sync_check_ack(pi, si->seq, si->ack - 1, ts);
if (rc < 0)
return rc;
- so.mss = rc;
+ so->mss = rc;
- th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
+ th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
mb->l2_len + mb->l3_len);
len = mb->l4_len - sizeof(*th);
- sync_get_opts(&so, (uintptr_t)(th + 1), len);
-
- /* reconstruct SYN options, extend header size if necessary */
- if (len < TCP_TX_OPT_LEN_MAX) {
- len = TCP_TX_OPT_LEN_MAX - len;
- th->data_off = TCP_TX_OPT_LEN_MAX / TCP_DATA_ALIGN <<
- TCP_DATA_OFFSET;
- mb->pkt_len += len;
- mb->data_len += len;
- mb->l4_len += len;
- }
-
- fill_syn_opts(th + 1, &so);
- return 0;
-}
-
-static inline int
-rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
- const union pkt_info *pi, const union seg_info *si,
- uint32_t ts, struct rte_mbuf *mb)
-{
- int32_t rc;
- struct stbl_entry *se;
-
- if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
- return -EINVAL;
-
- /* ACK for new connection request. */
-
- rc = restore_syn_pkt(pi, si, ts, mb);
- if (rc < 0)
- return rc;
-
- se = stbl_add_pkt(st, pi, mb);
- if (se == NULL)
- return -ENOBUFS;
-
- /* put new connection requests into stream listen queue */
- if (rte_ring_enqueue_burst(s->rx.q,
- (void * const *)&se, 1) != 1) {
- stbl_del_pkt(st, se, pi);
- return -ENOBUFS;
- }
-
+ sync_get_opts(so, (uintptr_t)(th + 1), len);
return 0;
}
@@ -849,6 +805,151 @@ stream_term(struct tle_tcp_stream *s)
}
static inline int
+stream_fill_dest(struct tle_tcp_stream *s)
+{
+ int32_t rc;
+ const void *da;
+
+ if (s->s.type == TLE_V4)
+ da = &s->s.ipv4.addr.src;
+ else
+ da = &s->s.ipv6.addr.src;
+
+ rc = stream_get_dest(&s->s, da, &s->tx.dst);
+ return (rc < 0) ? rc : 0;
+}
+
+/*
+ * helper function, prepares a new accept stream.
+ */
+static inline int
+accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
+ struct tle_tcp_stream *cs, const struct syn_opts *so,
+ uint32_t tms, const union pkt_info *pi, const union seg_info *si)
+{
+ int32_t rc;
+ uint32_t rtt;
+
+ /* some TX still pending for that stream. */
+ if (TCP_STREAM_TX_PENDING(cs))
+ return -EAGAIN;
+
+ /* setup L4 ports and L3 addresses fields. */
+ cs->s.port.raw = pi->port.raw;
+ cs->s.pmsk.raw = UINT32_MAX;
+
+ if (pi->tf.type == TLE_V4) {
+ cs->s.ipv4.addr = pi->addr4;
+ cs->s.ipv4.mask.src = INADDR_NONE;
+ cs->s.ipv4.mask.dst = INADDR_NONE;
+ } else if (pi->tf.type == TLE_V6) {
+ cs->s.ipv6.addr = *pi->addr6;
+ rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
+ sizeof(cs->s.ipv6.mask.src));
+ rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
+ sizeof(cs->s.ipv6.mask.dst));
+ }
+
+ /* setup TCB */
+ sync_fill_tcb(&cs->tcb, si, so);
+ cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
+
+ /*
+ * estimate the rto
+ * for now rtt is calculated based on the tcp TMS option,
+ * later add real-time one
+ */
+ if (cs->tcb.so.ts.ecr) {
+ rtt = tms - cs->tcb.so.ts.ecr;
+ rto_estimate(&cs->tcb, rtt);
+ } else
+ cs->tcb.snd.rto = TCP_RTO_DEFAULT;
+
+ /* copy streams type. */
+ cs->s.type = ps->s.type;
+
+ /* retrive and cache destination information. */
+ rc = stream_fill_dest(cs);
+ if (rc != 0)
+ return rc;
+
+ /* update snd.mss with SMSS value */
+ cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
+
+ /* setup congestion variables */
+ cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
+ cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
+
+ cs->tcb.state = TCP_ST_ESTABLISHED;
+
+ /* add stream to the table */
+ cs->ste = stbl_add_stream(st, pi, cs);
+ if (cs->ste == NULL)
+ return -ENOBUFS;
+
+ cs->tcb.uop |= TCP_OP_ACCEPT;
+ tcp_stream_up(cs);
+ return 0;
+}
+
+
+/*
+ * ACK for new connection request arrived.
+ * Check that the packet meets all conditions and try to open a new stream.
+ * returns:
+ * < 0 - invalid packet
+ * == 0 - packet is valid and new stream was opened for it.
+ * > 0 - packet is valid, but failed to open new stream.
+ */
+static inline int
+rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
+ const union pkt_info *pi, const union seg_info *si,
+ uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
+{
+ int32_t rc;
+ struct tle_ctx *ctx;
+ struct tle_stream *ts;
+ struct tle_tcp_stream *cs;
+ struct syn_opts so;
+
+ *csp = NULL;
+
+ if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
+ return -EINVAL;
+
+ rc = restore_syn_opt(&so, pi, si, tms, mb);
+ if (rc < 0)
+ return rc;
+
+ ctx = s->s.ctx;
+
+ /* allocate new stream */
+ ts = get_stream(ctx);
+ cs = TCP_STREAM(ts);
+ if (ts == NULL)
+ return ENFILE;
+
+ /* prepare stream to handle new connection */
+ if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
+
+ /* put new stream in the accept queue */
+ if (rte_ring_enqueue_burst(s->rx.q,
+ (void * const *)&ts, 1) == 1) {
+ *csp = cs;
+ return 0;
+ }
+
+ /* cleanup on failure */
+ tcp_stream_down(cs);
+ stbl_del_pkt(st, cs->ste, pi);
+ cs->ste = NULL;
+ }
+
+ tcp_stream_reset(ctx, cs);
+ return ENOBUFS;
+}
+
+static inline int
data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
uint32_t *seqn, uint32_t *plen)
{
@@ -1592,13 +1693,34 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts,
}
static inline uint32_t
+rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
+ const union pkt_info *pi, const union seg_info si[],
+ struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
+ uint32_t num)
+{
+ uint32_t i;
+
+ if (rwl_acquire(&s->rx.use) > 0) {
+ i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
+ rwl_release(&s->rx.use);
+ return i;
+ }
+
+ for (i = 0; i != num; i++) {
+ rc[i] = ENOENT;
+ rp[i] = mb[i];
+ }
+ return 0;
+}
+
+static inline uint32_t
rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
const union pkt_info pi[], const union seg_info si[],
struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
uint32_t num)
{
- struct tle_tcp_stream *s;
- uint32_t i, k, state;
+ struct tle_tcp_stream *cs, *s;
+ uint32_t i, k, n, state;
int32_t ret;
s = rx_obtain_stream(dev, st, &pi[0], type);
@@ -1616,25 +1738,51 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
if (state == TCP_ST_LISTEN) {
/* one connection per flow */
- ret = EINVAL;
- for (i = 0; i != num && ret != 0; i++) {
- ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i]);
- if (ret != 0) {
- rc[k] = -ret;
- rp[k] = mb[i];
- k++;
- }
- }
- /* duplicate SYN requests */
- for (; i != num; i++, k++) {
- rc[k] = EINVAL;
+ cs = NULL;
+ ret = -EINVAL;
+ for (i = 0; i != num; i++) {
+
+ ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
+
+ /* valid packet encountered */
+ if (ret >= 0)
+ break;
+
+ /* invalid packet, keep trying to find a proper one */
+ rc[k] = -ret;
rp[k] = mb[i];
+ k++;
}
- if (k != num && s->rx.ev != NULL)
- tle_event_raise(s->rx.ev);
- else if (s->rx.cb.func != NULL && rte_ring_count(s->rx.q) == 1)
- s->rx.cb.func(s->rx.cb.data, &s->s);
+ /* packet is valid, but we are out of streams to serve it */
+ if (ret > 0) {
+ for (; i != num; i++, k++) {
+ rc[k] = ret;
+ rp[k] = mb[i];
+ }
+ /* new stream is accepted */
+ } else if (ret == 0) {
+
+ /* inform listen stream about new connections */
+ if (s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ else if (s->rx.cb.func != NULL &&
+ rte_ring_count(s->rx.q) == 1)
+ s->rx.cb.func(s->rx.cb.data, &s->s);
+
+ /* if there is no data, drop current packet */
+ if (PKT_L4_PLEN(mb[i]) == 0) {
+ rc[k] = ENODATA;
+ rp[k++] = mb[i++];
+ }
+
+ /* process remaining packets for that stream */
+ if (num != i) {
+ n = rx_new_stream(cs, ts, pi + i, si + i,
+ mb + i, rp + k, rc + k, num - i);
+ k += num - n - i;
+ }
+ }
} else {
i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
@@ -1761,23 +1909,17 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
}
uint16_t
-tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[],
+tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
uint32_t num)
{
- uint32_t i, n;
+ uint32_t n;
struct tle_tcp_stream *s;
- struct stbl_entry *se[num];
s = TCP_STREAM(ts);
- n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)se, num);
+ n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
if (n == 0)
return 0;
- for (i = 0; i != n; i++) {
- rq[i].pkt = stbl_get_pkt(se[i]);
- rq[i].opaque = se[i];
- }
-
/*
* if we still have packets to read,
* then rearm stream RX event.
@@ -1791,206 +1933,6 @@ tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[],
return n;
}
-static inline int
-stream_fill_dest(struct tle_tcp_stream *s)
-{
- int32_t rc;
- const void *da;
-
- if (s->s.type == TLE_V4)
- da = &s->s.ipv4.addr.src;
- else
- da = &s->s.ipv6.addr.src;
-
- rc = stream_get_dest(&s->s, da, &s->tx.dst);
- return (rc < 0) ? rc : 0;
-}
-
-/*
- * helper function, prepares an accepted stream.
- */
-static int
-accept_fill_stream(struct tle_tcp_stream *ps, struct tle_tcp_stream *cs,
- const struct tle_tcp_accept_param *prm, uint32_t tms,
- const union pkt_info *pi, const union seg_info *si)
-{
- int32_t rc;
- uint32_t rtt;
-
- /* some TX still pending for that stream. */
- if (TCP_STREAM_TX_PENDING(cs))
- return -EAGAIN;
-
- /* setup L4 ports and L3 addresses fields. */
- cs->s.port.raw = pi->port.raw;
- cs->s.pmsk.raw = UINT32_MAX;
-
- if (pi->tf.type == TLE_V4) {
- cs->s.ipv4.addr = pi->addr4;
- cs->s.ipv4.mask.src = INADDR_NONE;
- cs->s.ipv4.mask.dst = INADDR_NONE;
- } else if (pi->tf.type == TLE_V6) {
- cs->s.ipv6.addr = *pi->addr6;
- rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
- sizeof(cs->s.ipv6.mask.src));
- rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
- sizeof(cs->s.ipv6.mask.dst));
- }
-
- /* setup TCB */
- sync_fill_tcb(&cs->tcb, si, prm->syn.pkt);
- cs->tcb.rcv.wnd = cs->rx.q->prod.mask << cs->tcb.rcv.wscale;
-
- /* setup stream notification menchanism */
- cs->rx.ev = prm->cfg.recv_ev;
- cs->rx.cb = prm->cfg.recv_cb;
- cs->tx.ev = prm->cfg.send_ev;
- cs->tx.cb = prm->cfg.send_cb;
- cs->err.ev = prm->cfg.err_ev;
- cs->err.cb = prm->cfg.err_cb;
-
- /* store other params */
- cs->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
- TLE_TCP_DEFAULT_RETRIES;
-
- /*
- * estimate the rto
- * for now rtt is calculated based on the tcp TMS option,
- * later add real-time one
- */
- if (cs->tcb.so.ts.ecr) {
- rtt = tms - cs->tcb.so.ts.ecr;
- rto_estimate(&cs->tcb, rtt);
- } else
- cs->tcb.snd.rto = TCP_RTO_DEFAULT;
-
- tcp_stream_up(cs);
-
- /* copy streams type. */
- cs->s.type = ps->s.type;
-
- /* retrive and cache destination information. */
- rc = stream_fill_dest(cs);
- if (rc != 0)
- return rc;
-
- /* update snd.mss with SMSS value */
- cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
-
- /* setup congestion variables */
- cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
- cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
-
- cs->tcb.state = TCP_ST_ESTABLISHED;
- cs->tcb.uop |= TCP_OP_ACCEPT;
-
- /* add stream to the table */
- cs->ste = prm->syn.opaque;
- rte_smp_wmb();
- cs->ste->data = cs;
- return 0;
-}
-
-/*
- * !!!
- * Right now new stream rcv.wnd is set to zero.
- * That simplifies handling of new connection establishment
- * (as no data segments could be received),
- * but has to be addressed.
- * possible ways:
- * - send ack after accept creates new stream with new rcv.wnd value.
- * the problem with that approach that single ack is not delivered
- * reliably (could be lost), plus might slowdown connection establishment
- * (extra packet per connection, that client has to wait for).
- * - allocate new stream at ACK recieve stage.
- * As a drawback - whole new stream allocation/connection establishment
- * will be done in BE.
- * !!!
- */
-int
-tle_tcp_stream_accept(struct tle_stream *ts,
- const struct tle_tcp_accept_param prm[], struct tle_stream *rs[],
- uint32_t num)
-{
- struct tle_tcp_stream *cs, *s;
- struct tle_ctx *ctx;
- uint32_t i, j, n, tms;
- int32_t rc;
- union pkt_info pi[num];
- union seg_info si[num];
-
- tms = tcp_get_tms();
- s = TCP_STREAM(ts);
-
- for (i = 0; i != num; i++)
- get_pkt_info(prm[i].syn.pkt, &pi[i], &si[i]);
-
- /* mark stream as not closable */
- if (rwl_acquire(&s->rx.use) < 0)
- return -EINVAL;
-
- ctx = s->s.ctx;
- n = get_streams(ctx, rs, num);
-
- rc = 0;
- for (i = 0; i != n; i++) {
-
- /* prepare new stream */
- cs = TCP_STREAM(rs[i]);
- rc = accept_fill_stream(s, cs, prm + i, tms, pi + i, si + i);
- if (rc != 0)
- break;
- }
-
- rwl_release(&s->rx.use);
-
- /* free 'SYN' mbufs. */
- for (j = 0; j != i; j++)
- rte_pktmbuf_free(prm[j].syn.pkt);
-
- /* close failed stream, put unused streams back to the free list. */
- if (rc != 0) {
- tle_tcp_stream_close(rs[i]);
- for (j = i + 1; j != n; j++) {
- cs = TCP_STREAM(rs[j]);
- put_stream(ctx, rs[j], TCP_STREAM_TX_PENDING(cs));
- }
- rte_errno = -rc;
-
- /* not enough streams are available */
- } else if (n != num)
- rte_errno = ENFILE;
-
- return i;
-}
-
-/*
- * !!! implement a proper one, or delete !!!
- * need to make sure no race conditions with add/lookup stream table.
- */
-void
-tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[],
- uint32_t num)
-{
- uint32_t i;
- struct rte_mbuf *mb;
- struct stbl *st;
- union pkt_info pi;
- union seg_info si;
-
- st = CTX_TCP_STLB(s->ctx);
-
- for (i = 0; i != num; i++) {
- mb = rq[i].pkt;
- get_pkt_info(mb, &pi, &si);
- if (pi.tf.type < TLE_VNUM)
- stbl_del_pkt_lock(st, rq[i].opaque, &pi);
-
- /* !!! send RST pkt to the peer !!! */
- rte_pktmbuf_free(mb);
- }
-}
-
uint16_t
tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
{
@@ -2121,7 +2063,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
s->tcb.rcv.mss = s->tcb.so.mss;
s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
- s->tcb.rcv.wnd = s->rx.q->prod.mask << s->tcb.rcv.wscale;
+ s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
s->tcb.rcv.ts = 0;
/* add the stream in stream table */
diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c
index 67ed66b..f06b2ed 100644
--- a/lib/libtle_l4p/tcp_stream.c
+++ b/lib/libtle_l4p/tcp_stream.c
@@ -511,6 +511,7 @@ tle_tcp_stream_listen(struct tle_stream *ts)
TCP_ST_LISTEN);
if (rc != 0) {
s->tcb.uop |= TCP_OP_LISTEN;
+ s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT);
rc = 0;
} else
rc = -EDEADLK;
@@ -520,3 +521,87 @@ tle_tcp_stream_listen(struct tle_stream *ts)
rwl_release(&s->rx.use);
return rc;
}
+
+/*
+ * helper function, updates stream config
+ */
+static inline int
+stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
+{
+ int32_t rc1, rc2;
+ struct tle_tcp_stream *s;
+
+ s = TCP_STREAM(ts);
+
+ rc1 = rwl_try_acquire(&s->rx.use);
+ rc2 = rwl_try_acquire(&s->tx.use);
+
+ if (rc1 < 0 || rc2 < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
+ rwl_release(&s->tx.use);
+ rwl_release(&s->rx.use);
+ return -EINVAL;
+ }
+
+ /* setup stream notification menchanism */
+ s->rx.ev = prm->recv_ev;
+ s->tx.ev = prm->send_ev;
+ s->err.ev = prm->err_ev;
+
+ s->rx.cb.data = prm->recv_cb.data;
+ s->tx.cb.data = prm->send_cb.data;
+ s->err.cb.data = prm->err_cb.data;
+
+ rte_smp_wmb();
+
+ s->rx.cb.func = prm->recv_cb.func;
+ s->tx.cb.func = prm->send_cb.func;
+ s->err.cb.func = prm->err_cb.func;
+
+ /* store other params */
+ s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries :
+ TLE_TCP_DEFAULT_RETRIES;
+
+ /* invoke async notifications, if any */
+ if (rte_ring_count(s->rx.q) != 0) {
+ if (s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ else if (s->rx.cb.func != NULL)
+ s->rx.cb.func(s->rx.cb.data, &s->s);
+ }
+ if (rte_ring_free_count(s->tx.q) != 0) {
+ if (s->tx.ev != NULL)
+ tle_event_raise(s->tx.ev);
+ else if (s->tx.cb.func != NULL)
+ s->tx.cb.func(s->tx.cb.data, &s->s);
+ }
+ if (s->tcb.state == TCP_ST_CLOSE_WAIT ||
+ s->tcb.state == TCP_ST_CLOSED) {
+ if (s->err.ev != NULL)
+ tle_event_raise(s->err.ev);
+ else if (s->err.cb.func != NULL)
+ s->err.cb.func(s->err.cb.data, &s->s);
+ }
+
+ rwl_release(&s->tx.use);
+ rwl_release(&s->rx.use);
+
+ return 0;
+}
+
+uint32_t
+tle_tcp_stream_update_cfg(struct tle_stream *ts[],
+ struct tle_tcp_stream_cfg prm[], uint32_t num)
+{
+ int32_t rc;
+ uint32_t i;
+
+ for (i = 0; i != num; i++) {
+ rc = stream_update_cfg(ts[i], &prm[i]);
+ if (rc != 0) {
+ rte_errno = -rc;
+ break;
+ }
+ }
+
+ return i;
+}
diff --git a/lib/libtle_l4p/tle_event.h b/lib/libtle_l4p/tle_event.h
index b19954a..d730345 100644
--- a/lib/libtle_l4p/tle_event.h
+++ b/lib/libtle_l4p/tle_event.h
@@ -106,6 +106,11 @@ struct tle_event *tle_event_alloc(struct tle_evq *evq, const void *data);
*/
void tle_event_free(struct tle_event *ev);
+static inline enum tle_ev_state
+tle_event_state(const struct tle_event *ev)
+{
+ return ev->state;
+}
/**
* move event from DOWN to UP state.
diff --git a/lib/libtle_l4p/tle_tcp.h b/lib/libtle_l4p/tle_tcp.h
index e6eb336..ec89746 100644
--- a/lib/libtle_l4p/tle_tcp.h
+++ b/lib/libtle_l4p/tle_tcp.h
@@ -148,40 +148,19 @@ int tle_tcp_stream_connect(struct tle_stream *s, const struct sockaddr *addr);
* <stream open happens here>
* tle_tcp_stream_listen(stream_to_listen);
* <wait for read event/callback on that stream>
- * n = tle_tcp_synreqs(stream_to_listen, syn_reqs, sizeof(syn_reqs));
- * for (i = 0, k = 0; i != n; i++) {
- * rc = <decide should connection from that endpoint be allowed>;
- * if (rc == 0) {
- * //proceed with connection establishment
- * k++;
- * accept_param[k].syn = syn_reqs[i];
- * <fill rest of accept_param fields for k-th connection>
- * } else {
- * //reject connection requests from that endpoint
- * rej_reqs[i - k] = syn_reqs[i];
- * }
+ * n = tle_tcp_accept(stream_to_listen, accepted_streams,
+ * sizeof(accepted_streams));
+ * for (i = 0, i != n; i++) {
+ * //prepare tle_tcp_stream_cfg for newly accepted streams
+ * ...
+ * }
+ * k = tle_tcp_stream_update_cfg(rs, prm, n);
+ * if (n != k) {
+ * //handle error
+ * ...
* }
- *
- * //reject n - k connection requests
- * tle_tcp_reject(stream_to_listen, rej_reqs, n - k);
- *
- * //accept k new connections
- * rc = tle_tcp_accept(stream_to_listen, accept_param, new_con_streams, k);
- * <handle errors>
*/
-struct tle_syn_req {
- struct rte_mbuf *pkt;
- /*< mbuf with incoming connection request. */
- void *opaque; /*< tldk related opaque pointer. */
-};
-
-struct tle_tcp_accept_param {
- struct tle_syn_req syn; /*< mbuf with incoming SYN request. */
- struct tle_tcp_stream_cfg cfg; /*< stream configure options. */
-};
-
-
/**
* Set stream into the listen state (passive opener), i.e. make stream ready
* to accept new connections.
@@ -198,29 +177,42 @@ struct tle_tcp_accept_param {
int tle_tcp_stream_listen(struct tle_stream *s);
/**
- * return up to *num* mbufs with SYN requests that were received
+ * return up to *num* streams from the queue of pending connections
* for given TCP endpoint.
- * Note that the stream has to be in listen state.
- * For each returned mbuf:
- * data_off set to the start of the packet
- * l2_len, l3_len, l4_len are setup properly
- * (so user can still extract L2/L3/L4 header info if needed)
- * packet_type RTE_PTYPE_L2/L3/L4 bits are setup properly.
- * L3/L4 checksum is verified.
* @param s
- * TCP stream to receive packets from.
- * @param rq
- * An array of tle_syn_req structures that contains
- * at least *num* elements in it.
+ * TCP stream in listen state.
+ * @param rs
+ * An array of pointers to the newily accepted streams.
+ * Each such new stream represents a new connection to the given TCP endpoint.
+ * Newly accepted stream should be in connected state and ready to use
+ * by other FE API routines (send/recv/close/etc.).
* @param num
- * Number of elements in the *pkt* array.
+ * Number of elements in the *rs* array.
* @return
- * number of of entries filled inside *pkt* array.
+ * number of entries filled inside *rs* array.
*/
-uint16_t tle_tcp_stream_synreqs(struct tle_stream *s, struct tle_syn_req rq[],
+uint16_t tle_tcp_stream_accept(struct tle_stream *s, struct tle_stream *rs[],
uint32_t num);
/**
+ * updates configuration (associated events, callbacks, stream parameters)
+ * for the given streams.
+ * @param ts
+ * An array of pointers to the streams to update.
+ * @param prm
+ * An array of parameters to update for the given streams.
+ * @param num
+ * Number of elements in the *ts* and *prm* arrays.
+ * @return
+ * number of streams successfully updated.
+ * In case of error, error code set in rte_errno.
+ * Possible rte_errno errors include:
+ * - EINVAL - invalid parameter passed to function
+ */
+uint32_t tle_tcp_stream_update_cfg(struct tle_stream *ts[],
+ struct tle_tcp_stream_cfg prm[], uint32_t num);
+
+/**
* Accept connection requests for the given stream.
* Note that the stream has to be in listen state.
* For each new connection a new stream will be open.
@@ -241,27 +233,9 @@ uint16_t tle_tcp_stream_synreqs(struct tle_stream *s, struct tle_syn_req rq[],
* - EINVAL - invalid parameter passed to function
* - ENFILE - no more streams are avaialble to open.
*/
-int tle_tcp_stream_accept(struct tle_stream *s,
- const struct tle_tcp_accept_param prm[], struct tle_stream *rs[],
- uint32_t num);
-
-/**
- * Reject connection requests for the given stream.
- * Note that the stream has to be in listen state.
- * For each new connection a new stream will be open.
- * @param s
- * TCP listen stream.
- * @param rq
- * An array of tle_syn_req structures that contains
- * at least *num* elements in it.
- * @param num
- * Number of elements in the *pkt* array.
- */
-void tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[],
- uint32_t num);
/**
- * return up to *num* mbufs that was received for given TCP stream.
+ * Return up to *num* mbufs that was received for given TCP stream.
* Note that the stream has to be in connected state.
* Data ordering is preserved.
* For each returned mbuf: