From 21e7392fca2c430018cf387bb3e368ea4c665446 Mon Sep 17 00:00:00 2001
From: Konstantin Ananyev <konstantin.ananyev@intel.com>
Date: Fri, 3 Mar 2017 18:40:23 +0000
Subject: Rewrite accept() code-path and make l4fwd not to close() on FIN
 immediatelly.

Changes in public API:
 - removes tle_tcp_stream_synreqs() and tle_tcp_reject()
 - adds tle_tcp_stream_update_cfg
Allocates and fills new stream when final ACK for 3-way handshake
is received.

Changes in l4fwd sample application:
prevents l4fwd to call close() on error event immediately:
first try to recv/send remaining data.

Change-Id: I8c5b9d365353084083731a4ce582197a8268688f
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
 lib/libtle_l4p/stream_table.h |   4 +-
 lib/libtle_l4p/syncookie.h    |   9 +-
 lib/libtle_l4p/tcp_ctl.h      |   7 +
 lib/libtle_l4p/tcp_rxtx.c     | 500 +++++++++++++++++++-----------------------
 lib/libtle_l4p/tcp_stream.c   |  85 +++++++
 lib/libtle_l4p/tle_event.h    |   5 +
 lib/libtle_l4p/tle_tcp.h      | 104 ++++-----
 7 files changed, 361 insertions(+), 353 deletions(-)

(limited to 'lib')

diff --git a/lib/libtle_l4p/stream_table.h b/lib/libtle_l4p/stream_table.h
index 8ad1103..29f1f63 100644
--- a/lib/libtle_l4p/stream_table.h
+++ b/lib/libtle_l4p/stream_table.h
@@ -110,13 +110,13 @@ stbl_add_entry(struct stbl *st, const union pkt_info *pi)
 }
 
 static inline struct stbl_entry *
-stbl_add_pkt(struct stbl *st, const union pkt_info *pi, const void *pkt)
+stbl_add_stream(struct stbl *st, const union pkt_info *pi, const void *s)
 {
 	struct stbl_entry *se;
 
 	se = stbl_add_entry(st, pi);
 	if (se != NULL)
-		se->data = (void *)((uintptr_t)pkt | STE_PKT);
+		se->data = (void *)(uintptr_t)s;
 	return se;
 }
 
diff --git a/lib/libtle_l4p/syncookie.h b/lib/libtle_l4p/syncookie.h
index 276d45a..ad70b7d 100644
--- a/lib/libtle_l4p/syncookie.h
+++ b/lib/libtle_l4p/syncookie.h
@@ -156,13 +156,8 @@ sync_get_opts(struct syn_opts *so, uintptr_t p, uint32_t len)
 
 static inline void
 sync_fill_tcb(struct tcb *tcb, const union seg_info *si,
-	const struct rte_mbuf *mb)
+	const struct syn_opts *so)
 {
-	const struct tcp_hdr *th;
-
-	th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
-		mb->l2_len + mb->l3_len);
-
 	tcb->rcv.nxt = si->seq;
 	tcb->rcv.irs = si->seq - 1;
 
@@ -174,7 +169,7 @@ sync_fill_tcb(struct tcb *tcb, const union seg_info *si,
 	tcb->snd.wu.wl1 = si->seq;
 	tcb->snd.wu.wl2 = si->ack;
 
-	get_syn_opts(&tcb->so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
+	tcb->so = *so;
 
 	tcb->snd.wscale = tcb->so.wscale;
 	tcb->snd.mss = tcb->so.mss;
diff --git a/lib/libtle_l4p/tcp_ctl.h b/lib/libtle_l4p/tcp_ctl.h
index dcb9c3e..95c2bbc 100644
--- a/lib/libtle_l4p/tcp_ctl.h
+++ b/lib/libtle_l4p/tcp_ctl.h
@@ -41,6 +41,13 @@ tcp_stream_up(struct tle_tcp_stream *s)
 	rwl_up(&s->tx.use);
 }
 
+/* calculate RCV.WND value based on size of stream receive buffer */
+static inline uint32_t
+calc_rx_wnd(const struct tle_tcp_stream *s, uint32_t scale)
+{
+	return  s->rx.q->prod.mask << scale;
+}
+
 /* empty stream's receive queue */
 static void
 empty_rq(struct tle_tcp_stream *s)
diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c
index 4e43730..6085814 100644
--- a/lib/libtle_l4p/tcp_rxtx.c
+++ b/lib/libtle_l4p/tcp_rxtx.c
@@ -173,7 +173,7 @@ fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
 	l4h->dst_port = port.src;
 
 	wnd = (flags & TCP_FLAG_SYN) ?
-		RTE_MAX(TCP4_MIN_MSS, tcb->so.mss) :
+		RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
 		tcb->rcv.wnd >> tcb->rcv.wscale;
 
 	/* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
@@ -760,68 +760,24 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
 }
 
 static inline int
-restore_syn_pkt(const union pkt_info *pi, const union seg_info *si,
-	uint32_t ts, struct rte_mbuf *mb)
+restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
+	const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb)
 {
 	int32_t rc;
 	uint32_t len;
-	struct tcp_hdr *th;
-	struct syn_opts so;
+	const struct tcp_hdr *th;
 
 	/* check that ACK, etc fields are what we expected. */
 	rc = sync_check_ack(pi, si->seq, si->ack - 1, ts);
 	if (rc < 0)
 		return rc;
 
-	so.mss = rc;
+	so->mss = rc;
 
-	th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
+	th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
 		mb->l2_len + mb->l3_len);
 	len = mb->l4_len - sizeof(*th);
-	sync_get_opts(&so, (uintptr_t)(th + 1), len);
-
-	/* reconstruct SYN options, extend header size if necessary */
-	if (len < TCP_TX_OPT_LEN_MAX) {
-		len = TCP_TX_OPT_LEN_MAX - len;
-		th->data_off = TCP_TX_OPT_LEN_MAX / TCP_DATA_ALIGN <<
-			TCP_DATA_OFFSET;
-		mb->pkt_len += len;
-		mb->data_len += len;
-		mb->l4_len += len;
-	}
-
-	fill_syn_opts(th + 1, &so);
-	return 0;
-}
-
-static inline int
-rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
-	const union pkt_info *pi, const union seg_info *si,
-	uint32_t ts, struct rte_mbuf *mb)
-{
-	int32_t rc;
-	struct stbl_entry *se;
-
-	if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
-		return -EINVAL;
-
-	/* ACK for new connection request. */
-
-	rc = restore_syn_pkt(pi, si, ts, mb);
-	if (rc < 0)
-		return rc;
-
-	se = stbl_add_pkt(st, pi, mb);
-	if (se == NULL)
-		return -ENOBUFS;
-
-	/* put new connection requests into stream listen queue */
-	if (rte_ring_enqueue_burst(s->rx.q,
-			(void * const *)&se, 1) != 1) {
-		stbl_del_pkt(st, se, pi);
-		return -ENOBUFS;
-	}
-
+	sync_get_opts(so, (uintptr_t)(th + 1), len);
 	return 0;
 }
 
@@ -848,6 +804,151 @@ stream_term(struct tle_tcp_stream *s)
 		s->err.cb.func(s->err.cb.data, &s->s);
 }
 
+static inline int
+stream_fill_dest(struct tle_tcp_stream *s)
+{
+	int32_t rc;
+	const void *da;
+
+	if (s->s.type == TLE_V4)
+		da = &s->s.ipv4.addr.src;
+	else
+		da = &s->s.ipv6.addr.src;
+
+	rc = stream_get_dest(&s->s, da, &s->tx.dst);
+	return (rc < 0) ? rc : 0;
+}
+
+/*
+ * helper function, prepares a new accept stream.
+ */
+static inline int
+accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
+	struct tle_tcp_stream *cs, const struct syn_opts *so,
+	uint32_t tms, const union pkt_info *pi, const union seg_info *si)
+{
+	int32_t rc;
+	uint32_t rtt;
+
+	/* some TX still pending for that stream. */
+	if (TCP_STREAM_TX_PENDING(cs))
+		return -EAGAIN;
+
+	/* setup L4 ports and L3 addresses fields. */
+	cs->s.port.raw = pi->port.raw;
+	cs->s.pmsk.raw = UINT32_MAX;
+
+	if (pi->tf.type == TLE_V4) {
+		cs->s.ipv4.addr = pi->addr4;
+		cs->s.ipv4.mask.src = INADDR_NONE;
+		cs->s.ipv4.mask.dst = INADDR_NONE;
+	} else if (pi->tf.type == TLE_V6) {
+		cs->s.ipv6.addr = *pi->addr6;
+		rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
+			sizeof(cs->s.ipv6.mask.src));
+		rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
+			sizeof(cs->s.ipv6.mask.dst));
+	}
+
+	/* setup TCB */
+	sync_fill_tcb(&cs->tcb, si, so);
+	cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
+
+	/*
+	 * estimate the rto
+	 * for now rtt is calculated based on the tcp TMS option,
+	 * later add real-time one
+	 */
+	if (cs->tcb.so.ts.ecr) {
+		rtt = tms - cs->tcb.so.ts.ecr;
+		rto_estimate(&cs->tcb, rtt);
+	} else
+		cs->tcb.snd.rto = TCP_RTO_DEFAULT;
+
+	/* copy streams type. */
+	cs->s.type = ps->s.type;
+
+	/* retrive and cache destination information. */
+	rc = stream_fill_dest(cs);
+	if (rc != 0)
+		return rc;
+
+	/* update snd.mss with SMSS value */
+	cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
+
+	/* setup congestion variables */
+	cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
+	cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
+
+	cs->tcb.state = TCP_ST_ESTABLISHED;
+
+	/* add stream to the table */
+	cs->ste = stbl_add_stream(st, pi, cs);
+	if (cs->ste == NULL)
+		return -ENOBUFS;
+
+	cs->tcb.uop |= TCP_OP_ACCEPT;
+	tcp_stream_up(cs);
+	return 0;
+}
+
+
+/*
+ * ACK for new connection request arrived.
+ * Check that the packet meets all conditions and try to open a new stream.
+ * returns:
+ * < 0  - invalid packet
+ * == 0 - packet is valid and new stream was opened for it.
+ * > 0  - packet is valid, but failed to open new stream.
+ */
+static inline int
+rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
+	const union pkt_info *pi, const union seg_info *si,
+	uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
+{
+	int32_t rc;
+	struct tle_ctx *ctx;
+	struct tle_stream *ts;
+	struct tle_tcp_stream *cs;
+	struct syn_opts so;
+
+	*csp = NULL;
+
+	if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
+		return -EINVAL;
+
+	rc = restore_syn_opt(&so, pi, si, tms, mb);
+	if (rc < 0)
+		return rc;
+
+	ctx = s->s.ctx;
+
+	/* allocate new stream */
+	ts = get_stream(ctx);
+	cs = TCP_STREAM(ts);
+	if (ts == NULL)
+		return ENFILE;
+
+	/* prepare stream to handle new connection */
+	if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
+
+		/* put new stream in the accept queue */
+		if (rte_ring_enqueue_burst(s->rx.q,
+				(void * const *)&ts, 1) == 1) {
+			*csp = cs;
+			return 0;
+		}
+
+		/* cleanup on failure */
+		tcp_stream_down(cs);
+		stbl_del_pkt(st, cs->ste, pi);
+		cs->ste = NULL;
+	}
+
+	tcp_stream_reset(ctx, cs);
+	return ENOBUFS;
+}
+
 static inline int
 data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
 	uint32_t *seqn, uint32_t *plen)
@@ -1591,14 +1692,35 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts,
 	return num - k;
 }
 
+static inline uint32_t
+rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
+	const union pkt_info *pi, const union seg_info si[],
+	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
+	uint32_t num)
+{
+	uint32_t i;
+
+	if (rwl_acquire(&s->rx.use) > 0) {
+		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
+		rwl_release(&s->rx.use);
+		return i;
+	}
+
+	for (i = 0; i != num; i++) {
+		rc[i] = ENOENT;
+		rp[i] = mb[i];
+	}
+	return 0;
+}
+
 static inline uint32_t
 rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
 	const union pkt_info pi[], const union seg_info si[],
 	struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
 	uint32_t num)
 {
-	struct tle_tcp_stream *s;
-	uint32_t i, k, state;
+	struct tle_tcp_stream *cs, *s;
+	uint32_t i, k, n, state;
 	int32_t ret;
 
 	s = rx_obtain_stream(dev, st, &pi[0], type);
@@ -1616,25 +1738,51 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
 	if (state == TCP_ST_LISTEN) {
 
 		/* one connection per flow */
-		ret = EINVAL;
-		for (i = 0; i != num && ret != 0; i++) {
-			ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i]);
-			if (ret != 0) {
-				rc[k] = -ret;
-				rp[k] = mb[i];
-				k++;
-			}
-		}
-		/* duplicate SYN requests */
-		for (; i != num; i++, k++) {
-			rc[k] = EINVAL;
+		cs = NULL;
+		ret = -EINVAL;
+		for (i = 0; i != num; i++) {
+
+			ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
+
+			/* valid packet encountered */
+			if (ret >= 0)
+				break;
+
+			/* invalid packet, keep trying to find a proper one */
+			rc[k] = -ret;
 			rp[k] = mb[i];
+			k++;
 		}
 
-		if (k != num && s->rx.ev != NULL)
-			tle_event_raise(s->rx.ev);
-		else if (s->rx.cb.func != NULL && rte_ring_count(s->rx.q) == 1)
-			s->rx.cb.func(s->rx.cb.data, &s->s);
+		/* packet is valid, but we are out of streams to serve it */
+		if (ret > 0) {
+			for (; i != num; i++, k++) {
+				rc[k] = ret;
+				rp[k] = mb[i];
+			}
+		/* new stream is accepted */
+		} else if (ret == 0) {
+
+			/* inform listen stream about new connections */
+			if (s->rx.ev != NULL)
+				tle_event_raise(s->rx.ev);
+			else if (s->rx.cb.func != NULL &&
+					rte_ring_count(s->rx.q) == 1)
+				s->rx.cb.func(s->rx.cb.data, &s->s);
+
+			/* if there is no data, drop current packet */
+			if (PKT_L4_PLEN(mb[i]) == 0) {
+				rc[k] = ENODATA;
+				rp[k++] = mb[i++];
+			}
+
+			/*  process remaining packets for that stream */
+			if (num != i) {
+				n = rx_new_stream(cs, ts, pi + i, si + i,
+					mb + i, rp + k, rc + k, num - i);
+				k += num - n - i;
+			}
+		}
 
 	} else {
 		i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
@@ -1761,23 +1909,17 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
 }
 
 uint16_t
-tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[],
+tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
 	uint32_t num)
 {
-	uint32_t i, n;
+	uint32_t n;
 	struct tle_tcp_stream *s;
-	struct stbl_entry *se[num];
 
 	s = TCP_STREAM(ts);
-	n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)se, num);
+	n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
 	if (n == 0)
 		return 0;
 
-	for (i = 0; i != n; i++) {
-		rq[i].pkt = stbl_get_pkt(se[i]);
-		rq[i].opaque = se[i];
-	}
-
 	/*
 	 * if we still have packets to read,
 	 * then rearm stream RX event.
@@ -1791,206 +1933,6 @@ tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[],
 	return n;
 }
 
-static inline int
-stream_fill_dest(struct tle_tcp_stream *s)
-{
-	int32_t rc;
-	const void *da;
-
-	if (s->s.type == TLE_V4)
-		da = &s->s.ipv4.addr.src;
-	else
-		da = &s->s.ipv6.addr.src;
-
-	rc = stream_get_dest(&s->s, da, &s->tx.dst);
-	return (rc < 0) ? rc : 0;
-}
-
-/*
- * helper function, prepares an accepted stream.
- */
-static int
-accept_fill_stream(struct tle_tcp_stream *ps, struct tle_tcp_stream *cs,
-	const struct tle_tcp_accept_param *prm, uint32_t tms,
-	const union pkt_info *pi, const union seg_info *si)
-{
-	int32_t rc;
-	uint32_t rtt;
-
-	/* some TX still pending for that stream. */
-	if (TCP_STREAM_TX_PENDING(cs))
-		return -EAGAIN;
-
-	/* setup L4 ports and L3 addresses fields. */
-	cs->s.port.raw = pi->port.raw;
-	cs->s.pmsk.raw = UINT32_MAX;
-
-	if (pi->tf.type == TLE_V4) {
-		cs->s.ipv4.addr = pi->addr4;
-		cs->s.ipv4.mask.src = INADDR_NONE;
-		cs->s.ipv4.mask.dst = INADDR_NONE;
-	} else if (pi->tf.type == TLE_V6) {
-		cs->s.ipv6.addr = *pi->addr6;
-		rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
-			sizeof(cs->s.ipv6.mask.src));
-		rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
-			sizeof(cs->s.ipv6.mask.dst));
-	}
-
-	/* setup TCB */
-	sync_fill_tcb(&cs->tcb, si, prm->syn.pkt);
-	cs->tcb.rcv.wnd = cs->rx.q->prod.mask << cs->tcb.rcv.wscale;
-
-	/* setup stream notification menchanism */
-	cs->rx.ev = prm->cfg.recv_ev;
-	cs->rx.cb = prm->cfg.recv_cb;
-	cs->tx.ev = prm->cfg.send_ev;
-	cs->tx.cb = prm->cfg.send_cb;
-	cs->err.ev = prm->cfg.err_ev;
-	cs->err.cb = prm->cfg.err_cb;
-
-	/* store other params */
-	cs->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
-		TLE_TCP_DEFAULT_RETRIES;
-
-	/*
-	 * estimate the rto
-	 * for now rtt is calculated based on the tcp TMS option,
-	 * later add real-time one
-	 */
-	if (cs->tcb.so.ts.ecr) {
-		rtt = tms - cs->tcb.so.ts.ecr;
-		rto_estimate(&cs->tcb, rtt);
-	} else
-		cs->tcb.snd.rto = TCP_RTO_DEFAULT;
-
-	tcp_stream_up(cs);
-
-	/* copy streams type. */
-	cs->s.type = ps->s.type;
-
-	/* retrive and cache destination information. */
-	rc = stream_fill_dest(cs);
-	if (rc != 0)
-		return rc;
-
-	/* update snd.mss with SMSS value */
-	cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
-
-	/* setup congestion variables */
-	cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
-	cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
-
-	cs->tcb.state = TCP_ST_ESTABLISHED;
-	cs->tcb.uop |= TCP_OP_ACCEPT;
-
-	/* add stream to the table */
-	cs->ste = prm->syn.opaque;
-	rte_smp_wmb();
-	cs->ste->data = cs;
-	return 0;
-}
-
-/*
- * !!!
- * Right now new stream rcv.wnd is set to zero.
- * That simplifies handling of new connection establishment
- * (as no data segments could be received),
- * but has to be addressed.
- * possible ways:
- *  - send ack after accept creates new stream with new rcv.wnd value.
- *    the problem with that approach that single ack is not delivered
- *    reliably (could be lost), plus might slowdown connection establishment
- *    (extra packet per connection, that client has to wait for).
- *  - allocate new stream at ACK recieve stage.
- *    As a drawback - whole new stream allocation/connection establishment
- *    will be done in BE.
- * !!!
- */
-int
-tle_tcp_stream_accept(struct tle_stream *ts,
-	const struct tle_tcp_accept_param prm[], struct tle_stream *rs[],
-	uint32_t num)
-{
-	struct tle_tcp_stream *cs, *s;
-	struct tle_ctx *ctx;
-	uint32_t i, j, n, tms;
-	int32_t rc;
-	union pkt_info pi[num];
-	union seg_info si[num];
-
-	tms = tcp_get_tms();
-	s = TCP_STREAM(ts);
-
-	for (i = 0; i != num; i++)
-		get_pkt_info(prm[i].syn.pkt, &pi[i], &si[i]);
-
-	/* mark stream as not closable */
-	if (rwl_acquire(&s->rx.use) < 0)
-		return -EINVAL;
-
-	ctx = s->s.ctx;
-	n = get_streams(ctx, rs, num);
-
-	rc = 0;
-	for (i = 0; i != n; i++) {
-
-		/* prepare new stream */
-		cs = TCP_STREAM(rs[i]);
-		rc = accept_fill_stream(s, cs, prm + i, tms, pi + i, si + i);
-		if (rc != 0)
-			break;
-	}
-
-	rwl_release(&s->rx.use);
-
-	/* free 'SYN' mbufs. */
-	for (j = 0; j != i; j++)
-		rte_pktmbuf_free(prm[j].syn.pkt);
-
-	/* close failed stream, put unused streams back to the free list. */
-	if (rc != 0) {
-		tle_tcp_stream_close(rs[i]);
-		for (j = i + 1; j != n; j++) {
-			cs = TCP_STREAM(rs[j]);
-			put_stream(ctx, rs[j], TCP_STREAM_TX_PENDING(cs));
-		}
-		rte_errno = -rc;
-
-	/* not enough streams are available */
-	} else if (n != num)
-		rte_errno = ENFILE;
-
-	return i;
-}
-
-/*
- * !!! implement a proper one, or delete !!!
- * need to make sure no race conditions with add/lookup stream table.
- */
-void
-tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[],
-	uint32_t num)
-{
-	uint32_t i;
-	struct rte_mbuf *mb;
-	struct stbl *st;
-	union pkt_info pi;
-	union seg_info si;
-
-	st = CTX_TCP_STLB(s->ctx);
-
-	for (i = 0; i != num; i++) {
-		mb = rq[i].pkt;
-		get_pkt_info(mb, &pi, &si);
-		if (pi.tf.type < TLE_VNUM)
-			stbl_del_pkt_lock(st, rq[i].opaque, &pi);
-
-		/* !!! send RST pkt to the peer !!! */
-		rte_pktmbuf_free(mb);
-	}
-}
-
 uint16_t
 tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
 {
@@ -2121,7 +2063,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
 
 	s->tcb.rcv.mss = s->tcb.so.mss;
 	s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
-	s->tcb.rcv.wnd = s->rx.q->prod.mask << s->tcb.rcv.wscale;
+	s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
 	s->tcb.rcv.ts = 0;
 
 	/* add the stream in stream table */
diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c
index 67ed66b..f06b2ed 100644
--- a/lib/libtle_l4p/tcp_stream.c
+++ b/lib/libtle_l4p/tcp_stream.c
@@ -511,6 +511,7 @@ tle_tcp_stream_listen(struct tle_stream *ts)
 				TCP_ST_LISTEN);
 		if (rc != 0) {
 			s->tcb.uop |= TCP_OP_LISTEN;
+			s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT);
 			rc = 0;
 		} else
 			rc = -EDEADLK;
@@ -520,3 +521,87 @@ tle_tcp_stream_listen(struct tle_stream *ts)
 	rwl_release(&s->rx.use);
 	return rc;
 }
+
+/*
+ * helper function, updates stream config
+ */
+static inline int
+stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
+{
+	int32_t rc1, rc2;
+	struct tle_tcp_stream *s;
+
+	s = TCP_STREAM(ts);
+
+	rc1 = rwl_try_acquire(&s->rx.use);
+	rc2 = rwl_try_acquire(&s->tx.use);
+
+	if (rc1 < 0 || rc2 < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
+		rwl_release(&s->tx.use);
+		rwl_release(&s->rx.use);
+		return -EINVAL;
+	}
+
+	/* setup stream notification menchanism */
+	s->rx.ev = prm->recv_ev;
+	s->tx.ev = prm->send_ev;
+	s->err.ev = prm->err_ev;
+
+	s->rx.cb.data = prm->recv_cb.data;
+	s->tx.cb.data = prm->send_cb.data;
+	s->err.cb.data = prm->err_cb.data;
+
+	rte_smp_wmb();
+
+	s->rx.cb.func = prm->recv_cb.func;
+	s->tx.cb.func = prm->send_cb.func;
+	s->err.cb.func = prm->err_cb.func;
+
+	/* store other params */
+	s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries :
+		TLE_TCP_DEFAULT_RETRIES;
+
+	/* invoke async notifications, if any */
+	if (rte_ring_count(s->rx.q) != 0) {
+		if (s->rx.ev != NULL)
+			tle_event_raise(s->rx.ev);
+		else if (s->rx.cb.func != NULL)
+			s->rx.cb.func(s->rx.cb.data, &s->s);
+	}
+	if (rte_ring_free_count(s->tx.q) != 0) {
+		if (s->tx.ev != NULL)
+			tle_event_raise(s->tx.ev);
+		else if (s->tx.cb.func != NULL)
+			s->tx.cb.func(s->tx.cb.data, &s->s);
+	}
+	if (s->tcb.state == TCP_ST_CLOSE_WAIT ||
+			s->tcb.state ==  TCP_ST_CLOSED) {
+		if (s->err.ev != NULL)
+			tle_event_raise(s->err.ev);
+		else if (s->err.cb.func != NULL)
+			s->err.cb.func(s->err.cb.data, &s->s);
+	}
+
+	rwl_release(&s->tx.use);
+	rwl_release(&s->rx.use);
+
+	return 0;
+}
+
+uint32_t
+tle_tcp_stream_update_cfg(struct tle_stream *ts[],
+	struct tle_tcp_stream_cfg prm[], uint32_t num)
+{
+	int32_t rc;
+	uint32_t i;
+
+	for (i = 0; i != num; i++) {
+		rc = stream_update_cfg(ts[i], &prm[i]);
+		if (rc != 0) {
+			rte_errno = -rc;
+			break;
+		}
+	}
+
+	return i;
+}
diff --git a/lib/libtle_l4p/tle_event.h b/lib/libtle_l4p/tle_event.h
index b19954a..d730345 100644
--- a/lib/libtle_l4p/tle_event.h
+++ b/lib/libtle_l4p/tle_event.h
@@ -106,6 +106,11 @@ struct tle_event *tle_event_alloc(struct tle_evq *evq, const void *data);
  */
 void tle_event_free(struct tle_event *ev);
 
+static inline enum tle_ev_state
+tle_event_state(const struct tle_event *ev)
+{
+	return ev->state;
+}
 
 /**
  * move event from DOWN to UP state.
diff --git a/lib/libtle_l4p/tle_tcp.h b/lib/libtle_l4p/tle_tcp.h
index e6eb336..ec89746 100644
--- a/lib/libtle_l4p/tle_tcp.h
+++ b/lib/libtle_l4p/tle_tcp.h
@@ -148,40 +148,19 @@ int tle_tcp_stream_connect(struct tle_stream *s, const struct sockaddr *addr);
  * <stream open happens here>
  * tle_tcp_stream_listen(stream_to_listen);
  * <wait for read event/callback on that stream>
- * n = tle_tcp_synreqs(stream_to_listen, syn_reqs, sizeof(syn_reqs));
- * for (i = 0, k = 0; i != n; i++) {
- *	rc = <decide should connection from that endpoint be allowed>;
- *	if (rc == 0) {
- *		//proceed with connection establishment
- *		k++;
- *		accept_param[k].syn = syn_reqs[i];
- *		<fill rest of accept_param fields for k-th connection>
- *	} else {
- *		//reject connection requests from that endpoint
- *		rej_reqs[i - k] = syn_reqs[i];
- *	}
+ * n = tle_tcp_accept(stream_to_listen, accepted_streams,
+ * 	sizeof(accepted_streams));
+ * for (i = 0, i != n; i++) {
+ * 	//prepare tle_tcp_stream_cfg for newly accepted streams
+ * 	...
+ * }
+ * k = tle_tcp_stream_update_cfg(rs, prm, n);
+ * if (n != k) {
+ * 	//handle error
+ *	...
  * }
- *
- *	//reject n - k connection requests
- *	tle_tcp_reject(stream_to_listen, rej_reqs, n - k);
- *
- *	//accept k new connections
- *	rc = tle_tcp_accept(stream_to_listen, accept_param, new_con_streams, k);
- *	<handle errors>
  */
 
-struct tle_syn_req {
-	struct rte_mbuf *pkt;
-	/*< mbuf with incoming connection request. */
-	void *opaque;    /*< tldk related opaque pointer. */
-};
-
-struct tle_tcp_accept_param {
-	struct tle_syn_req syn;        /*< mbuf with incoming SYN request. */
-	struct tle_tcp_stream_cfg cfg; /*< stream configure options. */
-};
-
-
 /**
  * Set stream into the listen state (passive opener), i.e. make stream ready
  * to accept new connections.
@@ -198,28 +177,41 @@ struct tle_tcp_accept_param {
 int tle_tcp_stream_listen(struct tle_stream *s);
 
 /**
- * return up to *num* mbufs with SYN requests that were received
+ * return up to *num* streams from the queue of pending connections
  * for given TCP endpoint.
- * Note that the stream has to be in listen state.
- * For each returned mbuf:
- * data_off set to the start of the packet
- * l2_len, l3_len, l4_len are setup properly
- * (so user can still extract L2/L3/L4 header info if needed)
- * packet_type RTE_PTYPE_L2/L3/L4 bits are setup properly.
- * L3/L4 checksum is verified.
  * @param s
- *   TCP stream to receive packets from.
- * @param rq
- *   An array of tle_syn_req structures that contains
- *   at least *num* elements in it.
+ *   TCP stream in listen state.
+ * @param rs
+ *   An array of pointers to the newily accepted streams.
+ *   Each such new stream represents a new connection to the given TCP endpoint.
+ *   Newly accepted stream should be in connected state and ready to use
+ *   by other FE API routines (send/recv/close/etc.).
  * @param num
- *   Number of elements in the *pkt* array.
+ *   Number of elements in the *rs* array.
  * @return
- *   number of of entries filled inside *pkt* array.
+ *   number of entries filled inside *rs* array.
  */
-uint16_t tle_tcp_stream_synreqs(struct tle_stream *s, struct tle_syn_req rq[],
+uint16_t tle_tcp_stream_accept(struct tle_stream *s, struct tle_stream *rs[],
 	uint32_t num);
 
+/**
+ * updates configuration (associated events, callbacks, stream parameters)
+ * for the given streams.
+ * @param ts
+ *   An array of pointers to the streams to update.
+ * @param prm
+ *   An array of parameters to update for the given streams.
+ * @param num
+ *   Number of elements in the *ts* and *prm* arrays.
+ * @return
+ *   number of streams successfully updated.
+ *   In case of error, error code set in rte_errno.
+ *   Possible rte_errno errors include:
+ *   - EINVAL - invalid parameter passed to function
+ */
+uint32_t tle_tcp_stream_update_cfg(struct tle_stream *ts[],
+	struct tle_tcp_stream_cfg prm[], uint32_t num);
+
 /**
  * Accept connection requests for the given stream.
  * Note that the stream has to be in listen state.
@@ -241,27 +233,9 @@ uint16_t tle_tcp_stream_synreqs(struct tle_stream *s, struct tle_syn_req rq[],
  *   - EINVAL - invalid parameter passed to function
  *   - ENFILE - no more streams are avaialble to open.
  */
-int tle_tcp_stream_accept(struct tle_stream *s,
-	const struct tle_tcp_accept_param prm[], struct tle_stream *rs[],
-	uint32_t num);
-
-/**
- * Reject connection requests for the given stream.
- * Note that the stream has to be in listen state.
- * For each new connection a new stream will be open.
- * @param s
- *   TCP listen stream.
- * @param rq
- *   An array of tle_syn_req structures that contains
- *   at least *num* elements in it.
- * @param num
- *   Number of elements in the *pkt* array.
- */
-void tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[],
-	uint32_t num);
 
 /**
- * return up to *num* mbufs that was received for given TCP stream.
+ * Return up to *num* mbufs that was received for given TCP stream.
  * Note that the stream has to be in connected state.
  * Data ordering is preserved.
  * For each returned mbuf:
-- 
cgit