aboutsummaryrefslogtreecommitdiffstats
path: root/lib/libtle_l4p/tcp_rxtx.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libtle_l4p/tcp_rxtx.c')
-rw-r--r--lib/libtle_l4p/tcp_rxtx.c500
1 files changed, 221 insertions, 279 deletions
diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c
index 4e43730..6085814 100644
--- a/lib/libtle_l4p/tcp_rxtx.c
+++ b/lib/libtle_l4p/tcp_rxtx.c
@@ -173,7 +173,7 @@ fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
l4h->dst_port = port.src;
wnd = (flags & TCP_FLAG_SYN) ?
- RTE_MAX(TCP4_MIN_MSS, tcb->so.mss) :
+ RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
tcb->rcv.wnd >> tcb->rcv.wscale;
/* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
@@ -760,68 +760,24 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
}
static inline int
-restore_syn_pkt(const union pkt_info *pi, const union seg_info *si,
- uint32_t ts, struct rte_mbuf *mb)
+restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
+ const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb)
{
int32_t rc;
uint32_t len;
- struct tcp_hdr *th;
- struct syn_opts so;
+ const struct tcp_hdr *th;
/* check that ACK, etc fields are what we expected. */
rc = sync_check_ack(pi, si->seq, si->ack - 1, ts);
if (rc < 0)
return rc;
- so.mss = rc;
+ so->mss = rc;
- th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
+ th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
mb->l2_len + mb->l3_len);
len = mb->l4_len - sizeof(*th);
- sync_get_opts(&so, (uintptr_t)(th + 1), len);
-
- /* reconstruct SYN options, extend header size if necessary */
- if (len < TCP_TX_OPT_LEN_MAX) {
- len = TCP_TX_OPT_LEN_MAX - len;
- th->data_off = TCP_TX_OPT_LEN_MAX / TCP_DATA_ALIGN <<
- TCP_DATA_OFFSET;
- mb->pkt_len += len;
- mb->data_len += len;
- mb->l4_len += len;
- }
-
- fill_syn_opts(th + 1, &so);
- return 0;
-}
-
-static inline int
-rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
- const union pkt_info *pi, const union seg_info *si,
- uint32_t ts, struct rte_mbuf *mb)
-{
- int32_t rc;
- struct stbl_entry *se;
-
- if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
- return -EINVAL;
-
- /* ACK for new connection request. */
-
- rc = restore_syn_pkt(pi, si, ts, mb);
- if (rc < 0)
- return rc;
-
- se = stbl_add_pkt(st, pi, mb);
- if (se == NULL)
- return -ENOBUFS;
-
- /* put new connection requests into stream listen queue */
- if (rte_ring_enqueue_burst(s->rx.q,
- (void * const *)&se, 1) != 1) {
- stbl_del_pkt(st, se, pi);
- return -ENOBUFS;
- }
-
+ sync_get_opts(so, (uintptr_t)(th + 1), len);
return 0;
}
@@ -849,6 +805,151 @@ stream_term(struct tle_tcp_stream *s)
}
static inline int
+stream_fill_dest(struct tle_tcp_stream *s)
+{
+ int32_t rc;
+ const void *da;
+
+ if (s->s.type == TLE_V4)
+ da = &s->s.ipv4.addr.src;
+ else
+ da = &s->s.ipv6.addr.src;
+
+ rc = stream_get_dest(&s->s, da, &s->tx.dst);
+ return (rc < 0) ? rc : 0;
+}
+
+/*
+ * helper function, prepares a new accept stream.
+ */
+static inline int
+accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
+ struct tle_tcp_stream *cs, const struct syn_opts *so,
+ uint32_t tms, const union pkt_info *pi, const union seg_info *si)
+{
+ int32_t rc;
+ uint32_t rtt;
+
+ /* some TX still pending for that stream. */
+ if (TCP_STREAM_TX_PENDING(cs))
+ return -EAGAIN;
+
+ /* setup L4 ports and L3 addresses fields. */
+ cs->s.port.raw = pi->port.raw;
+ cs->s.pmsk.raw = UINT32_MAX;
+
+ if (pi->tf.type == TLE_V4) {
+ cs->s.ipv4.addr = pi->addr4;
+ cs->s.ipv4.mask.src = INADDR_NONE;
+ cs->s.ipv4.mask.dst = INADDR_NONE;
+ } else if (pi->tf.type == TLE_V6) {
+ cs->s.ipv6.addr = *pi->addr6;
+ rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
+ sizeof(cs->s.ipv6.mask.src));
+ rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
+ sizeof(cs->s.ipv6.mask.dst));
+ }
+
+ /* setup TCB */
+ sync_fill_tcb(&cs->tcb, si, so);
+ cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
+
+ /*
+ * estimate the rto
+ * for now rtt is calculated based on the tcp TMS option,
+ * later add real-time one
+ */
+ if (cs->tcb.so.ts.ecr) {
+ rtt = tms - cs->tcb.so.ts.ecr;
+ rto_estimate(&cs->tcb, rtt);
+ } else
+ cs->tcb.snd.rto = TCP_RTO_DEFAULT;
+
+ /* copy streams type. */
+ cs->s.type = ps->s.type;
+
+ /* retrive and cache destination information. */
+ rc = stream_fill_dest(cs);
+ if (rc != 0)
+ return rc;
+
+ /* update snd.mss with SMSS value */
+ cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
+
+ /* setup congestion variables */
+ cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
+ cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
+
+ cs->tcb.state = TCP_ST_ESTABLISHED;
+
+ /* add stream to the table */
+ cs->ste = stbl_add_stream(st, pi, cs);
+ if (cs->ste == NULL)
+ return -ENOBUFS;
+
+ cs->tcb.uop |= TCP_OP_ACCEPT;
+ tcp_stream_up(cs);
+ return 0;
+}
+
+
+/*
+ * ACK for new connection request arrived.
+ * Check that the packet meets all conditions and try to open a new stream.
+ * returns:
+ * < 0 - invalid packet
+ * == 0 - packet is valid and new stream was opened for it.
+ * > 0 - packet is valid, but failed to open new stream.
+ */
+static inline int
+rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
+ const union pkt_info *pi, const union seg_info *si,
+ uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
+{
+ int32_t rc;
+ struct tle_ctx *ctx;
+ struct tle_stream *ts;
+ struct tle_tcp_stream *cs;
+ struct syn_opts so;
+
+ *csp = NULL;
+
+ if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
+ return -EINVAL;
+
+ rc = restore_syn_opt(&so, pi, si, tms, mb);
+ if (rc < 0)
+ return rc;
+
+ ctx = s->s.ctx;
+
+ /* allocate new stream */
+ ts = get_stream(ctx);
+ cs = TCP_STREAM(ts);
+ if (ts == NULL)
+ return ENFILE;
+
+ /* prepare stream to handle new connection */
+ if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
+
+ /* put new stream in the accept queue */
+ if (rte_ring_enqueue_burst(s->rx.q,
+ (void * const *)&ts, 1) == 1) {
+ *csp = cs;
+ return 0;
+ }
+
+ /* cleanup on failure */
+ tcp_stream_down(cs);
+ stbl_del_pkt(st, cs->ste, pi);
+ cs->ste = NULL;
+ }
+
+ tcp_stream_reset(ctx, cs);
+ return ENOBUFS;
+}
+
+static inline int
data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
uint32_t *seqn, uint32_t *plen)
{
@@ -1592,13 +1693,34 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts,
}
static inline uint32_t
+rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
+ const union pkt_info *pi, const union seg_info si[],
+ struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
+ uint32_t num)
+{
+ uint32_t i;
+
+ if (rwl_acquire(&s->rx.use) > 0) {
+ i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
+ rwl_release(&s->rx.use);
+ return i;
+ }
+
+ for (i = 0; i != num; i++) {
+ rc[i] = ENOENT;
+ rp[i] = mb[i];
+ }
+ return 0;
+}
+
+static inline uint32_t
rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
const union pkt_info pi[], const union seg_info si[],
struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
uint32_t num)
{
- struct tle_tcp_stream *s;
- uint32_t i, k, state;
+ struct tle_tcp_stream *cs, *s;
+ uint32_t i, k, n, state;
int32_t ret;
s = rx_obtain_stream(dev, st, &pi[0], type);
@@ -1616,25 +1738,51 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
if (state == TCP_ST_LISTEN) {
/* one connection per flow */
- ret = EINVAL;
- for (i = 0; i != num && ret != 0; i++) {
- ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i]);
- if (ret != 0) {
- rc[k] = -ret;
- rp[k] = mb[i];
- k++;
- }
- }
- /* duplicate SYN requests */
- for (; i != num; i++, k++) {
- rc[k] = EINVAL;
+ cs = NULL;
+ ret = -EINVAL;
+ for (i = 0; i != num; i++) {
+
+ ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
+
+ /* valid packet encountered */
+ if (ret >= 0)
+ break;
+
+ /* invalid packet, keep trying to find a proper one */
+ rc[k] = -ret;
rp[k] = mb[i];
+ k++;
}
- if (k != num && s->rx.ev != NULL)
- tle_event_raise(s->rx.ev);
- else if (s->rx.cb.func != NULL && rte_ring_count(s->rx.q) == 1)
- s->rx.cb.func(s->rx.cb.data, &s->s);
+ /* packet is valid, but we are out of streams to serve it */
+ if (ret > 0) {
+ for (; i != num; i++, k++) {
+ rc[k] = ret;
+ rp[k] = mb[i];
+ }
+ /* new stream is accepted */
+ } else if (ret == 0) {
+
+ /* inform listen stream about new connections */
+ if (s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ else if (s->rx.cb.func != NULL &&
+ rte_ring_count(s->rx.q) == 1)
+ s->rx.cb.func(s->rx.cb.data, &s->s);
+
+ /* if there is no data, drop current packet */
+ if (PKT_L4_PLEN(mb[i]) == 0) {
+ rc[k] = ENODATA;
+ rp[k++] = mb[i++];
+ }
+
+ /* process remaining packets for that stream */
+ if (num != i) {
+ n = rx_new_stream(cs, ts, pi + i, si + i,
+ mb + i, rp + k, rc + k, num - i);
+ k += num - n - i;
+ }
+ }
} else {
i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
@@ -1761,23 +1909,17 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
}
uint16_t
-tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[],
+tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
uint32_t num)
{
- uint32_t i, n;
+ uint32_t n;
struct tle_tcp_stream *s;
- struct stbl_entry *se[num];
s = TCP_STREAM(ts);
- n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)se, num);
+ n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
if (n == 0)
return 0;
- for (i = 0; i != n; i++) {
- rq[i].pkt = stbl_get_pkt(se[i]);
- rq[i].opaque = se[i];
- }
-
/*
* if we still have packets to read,
* then rearm stream RX event.
@@ -1791,206 +1933,6 @@ tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[],
return n;
}
-static inline int
-stream_fill_dest(struct tle_tcp_stream *s)
-{
- int32_t rc;
- const void *da;
-
- if (s->s.type == TLE_V4)
- da = &s->s.ipv4.addr.src;
- else
- da = &s->s.ipv6.addr.src;
-
- rc = stream_get_dest(&s->s, da, &s->tx.dst);
- return (rc < 0) ? rc : 0;
-}
-
-/*
- * helper function, prepares an accepted stream.
- */
-static int
-accept_fill_stream(struct tle_tcp_stream *ps, struct tle_tcp_stream *cs,
- const struct tle_tcp_accept_param *prm, uint32_t tms,
- const union pkt_info *pi, const union seg_info *si)
-{
- int32_t rc;
- uint32_t rtt;
-
- /* some TX still pending for that stream. */
- if (TCP_STREAM_TX_PENDING(cs))
- return -EAGAIN;
-
- /* setup L4 ports and L3 addresses fields. */
- cs->s.port.raw = pi->port.raw;
- cs->s.pmsk.raw = UINT32_MAX;
-
- if (pi->tf.type == TLE_V4) {
- cs->s.ipv4.addr = pi->addr4;
- cs->s.ipv4.mask.src = INADDR_NONE;
- cs->s.ipv4.mask.dst = INADDR_NONE;
- } else if (pi->tf.type == TLE_V6) {
- cs->s.ipv6.addr = *pi->addr6;
- rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
- sizeof(cs->s.ipv6.mask.src));
- rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
- sizeof(cs->s.ipv6.mask.dst));
- }
-
- /* setup TCB */
- sync_fill_tcb(&cs->tcb, si, prm->syn.pkt);
- cs->tcb.rcv.wnd = cs->rx.q->prod.mask << cs->tcb.rcv.wscale;
-
- /* setup stream notification menchanism */
- cs->rx.ev = prm->cfg.recv_ev;
- cs->rx.cb = prm->cfg.recv_cb;
- cs->tx.ev = prm->cfg.send_ev;
- cs->tx.cb = prm->cfg.send_cb;
- cs->err.ev = prm->cfg.err_ev;
- cs->err.cb = prm->cfg.err_cb;
-
- /* store other params */
- cs->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
- TLE_TCP_DEFAULT_RETRIES;
-
- /*
- * estimate the rto
- * for now rtt is calculated based on the tcp TMS option,
- * later add real-time one
- */
- if (cs->tcb.so.ts.ecr) {
- rtt = tms - cs->tcb.so.ts.ecr;
- rto_estimate(&cs->tcb, rtt);
- } else
- cs->tcb.snd.rto = TCP_RTO_DEFAULT;
-
- tcp_stream_up(cs);
-
- /* copy streams type. */
- cs->s.type = ps->s.type;
-
- /* retrive and cache destination information. */
- rc = stream_fill_dest(cs);
- if (rc != 0)
- return rc;
-
- /* update snd.mss with SMSS value */
- cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
-
- /* setup congestion variables */
- cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
- cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
-
- cs->tcb.state = TCP_ST_ESTABLISHED;
- cs->tcb.uop |= TCP_OP_ACCEPT;
-
- /* add stream to the table */
- cs->ste = prm->syn.opaque;
- rte_smp_wmb();
- cs->ste->data = cs;
- return 0;
-}
-
-/*
- * !!!
- * Right now new stream rcv.wnd is set to zero.
- * That simplifies handling of new connection establishment
- * (as no data segments could be received),
- * but has to be addressed.
- * possible ways:
- * - send ack after accept creates new stream with new rcv.wnd value.
- * the problem with that approach that single ack is not delivered
- * reliably (could be lost), plus might slowdown connection establishment
- * (extra packet per connection, that client has to wait for).
- * - allocate new stream at ACK recieve stage.
- * As a drawback - whole new stream allocation/connection establishment
- * will be done in BE.
- * !!!
- */
-int
-tle_tcp_stream_accept(struct tle_stream *ts,
- const struct tle_tcp_accept_param prm[], struct tle_stream *rs[],
- uint32_t num)
-{
- struct tle_tcp_stream *cs, *s;
- struct tle_ctx *ctx;
- uint32_t i, j, n, tms;
- int32_t rc;
- union pkt_info pi[num];
- union seg_info si[num];
-
- tms = tcp_get_tms();
- s = TCP_STREAM(ts);
-
- for (i = 0; i != num; i++)
- get_pkt_info(prm[i].syn.pkt, &pi[i], &si[i]);
-
- /* mark stream as not closable */
- if (rwl_acquire(&s->rx.use) < 0)
- return -EINVAL;
-
- ctx = s->s.ctx;
- n = get_streams(ctx, rs, num);
-
- rc = 0;
- for (i = 0; i != n; i++) {
-
- /* prepare new stream */
- cs = TCP_STREAM(rs[i]);
- rc = accept_fill_stream(s, cs, prm + i, tms, pi + i, si + i);
- if (rc != 0)
- break;
- }
-
- rwl_release(&s->rx.use);
-
- /* free 'SYN' mbufs. */
- for (j = 0; j != i; j++)
- rte_pktmbuf_free(prm[j].syn.pkt);
-
- /* close failed stream, put unused streams back to the free list. */
- if (rc != 0) {
- tle_tcp_stream_close(rs[i]);
- for (j = i + 1; j != n; j++) {
- cs = TCP_STREAM(rs[j]);
- put_stream(ctx, rs[j], TCP_STREAM_TX_PENDING(cs));
- }
- rte_errno = -rc;
-
- /* not enough streams are available */
- } else if (n != num)
- rte_errno = ENFILE;
-
- return i;
-}
-
-/*
- * !!! implement a proper one, or delete !!!
- * need to make sure no race conditions with add/lookup stream table.
- */
-void
-tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[],
- uint32_t num)
-{
- uint32_t i;
- struct rte_mbuf *mb;
- struct stbl *st;
- union pkt_info pi;
- union seg_info si;
-
- st = CTX_TCP_STLB(s->ctx);
-
- for (i = 0; i != num; i++) {
- mb = rq[i].pkt;
- get_pkt_info(mb, &pi, &si);
- if (pi.tf.type < TLE_VNUM)
- stbl_del_pkt_lock(st, rq[i].opaque, &pi);
-
- /* !!! send RST pkt to the peer !!! */
- rte_pktmbuf_free(mb);
- }
-}
-
uint16_t
tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
{
@@ -2121,7 +2063,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
s->tcb.rcv.mss = s->tcb.so.mss;
s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
- s->tcb.rcv.wnd = s->rx.q->prod.mask << s->tcb.rcv.wscale;
+ s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
s->tcb.rcv.ts = 0;
/* add the stream in stream table */