diff options
Diffstat (limited to 'lib/libtle_l4p/tcp_rxtx.c')
-rw-r--r-- | lib/libtle_l4p/tcp_rxtx.c | 500 |
1 files changed, 221 insertions, 279 deletions
diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c index 4e43730..6085814 100644 --- a/lib/libtle_l4p/tcp_rxtx.c +++ b/lib/libtle_l4p/tcp_rxtx.c @@ -173,7 +173,7 @@ fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port, l4h->dst_port = port.src; wnd = (flags & TCP_FLAG_SYN) ? - RTE_MAX(TCP4_MIN_MSS, tcb->so.mss) : + RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) : tcb->rcv.wnd >> tcb->rcv.wscale; /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */ @@ -760,68 +760,24 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len, } static inline int -restore_syn_pkt(const union pkt_info *pi, const union seg_info *si, - uint32_t ts, struct rte_mbuf *mb) +restore_syn_opt(struct syn_opts *so, const union pkt_info *pi, + const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb) { int32_t rc; uint32_t len; - struct tcp_hdr *th; - struct syn_opts so; + const struct tcp_hdr *th; /* check that ACK, etc fields are what we expected. */ rc = sync_check_ack(pi, si->seq, si->ack - 1, ts); if (rc < 0) return rc; - so.mss = rc; + so->mss = rc; - th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *, + th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *, mb->l2_len + mb->l3_len); len = mb->l4_len - sizeof(*th); - sync_get_opts(&so, (uintptr_t)(th + 1), len); - - /* reconstruct SYN options, extend header size if necessary */ - if (len < TCP_TX_OPT_LEN_MAX) { - len = TCP_TX_OPT_LEN_MAX - len; - th->data_off = TCP_TX_OPT_LEN_MAX / TCP_DATA_ALIGN << - TCP_DATA_OFFSET; - mb->pkt_len += len; - mb->data_len += len; - mb->l4_len += len; - } - - fill_syn_opts(th + 1, &so); - return 0; -} - -static inline int -rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, - const union pkt_info *pi, const union seg_info *si, - uint32_t ts, struct rte_mbuf *mb) -{ - int32_t rc; - struct stbl_entry *se; - - if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0) - return -EINVAL; - - /* ACK for new connection request. */ - - rc = restore_syn_pkt(pi, si, ts, mb); - if (rc < 0) - return rc; - - se = stbl_add_pkt(st, pi, mb); - if (se == NULL) - return -ENOBUFS; - - /* put new connection requests into stream listen queue */ - if (rte_ring_enqueue_burst(s->rx.q, - (void * const *)&se, 1) != 1) { - stbl_del_pkt(st, se, pi); - return -ENOBUFS; - } - + sync_get_opts(so, (uintptr_t)(th + 1), len); return 0; } @@ -849,6 +805,151 @@ stream_term(struct tle_tcp_stream *s) } static inline int +stream_fill_dest(struct tle_tcp_stream *s) +{ + int32_t rc; + const void *da; + + if (s->s.type == TLE_V4) + da = &s->s.ipv4.addr.src; + else + da = &s->s.ipv6.addr.src; + + rc = stream_get_dest(&s->s, da, &s->tx.dst); + return (rc < 0) ? rc : 0; +} + +/* + * helper function, prepares a new accept stream. + */ +static inline int +accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, + struct tle_tcp_stream *cs, const struct syn_opts *so, + uint32_t tms, const union pkt_info *pi, const union seg_info *si) +{ + int32_t rc; + uint32_t rtt; + + /* some TX still pending for that stream. */ + if (TCP_STREAM_TX_PENDING(cs)) + return -EAGAIN; + + /* setup L4 ports and L3 addresses fields. */ + cs->s.port.raw = pi->port.raw; + cs->s.pmsk.raw = UINT32_MAX; + + if (pi->tf.type == TLE_V4) { + cs->s.ipv4.addr = pi->addr4; + cs->s.ipv4.mask.src = INADDR_NONE; + cs->s.ipv4.mask.dst = INADDR_NONE; + } else if (pi->tf.type == TLE_V6) { + cs->s.ipv6.addr = *pi->addr6; + rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none, + sizeof(cs->s.ipv6.mask.src)); + rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none, + sizeof(cs->s.ipv6.mask.dst)); + } + + /* setup TCB */ + sync_fill_tcb(&cs->tcb, si, so); + cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale); + + /* + * estimate the rto + * for now rtt is calculated based on the tcp TMS option, + * later add real-time one + */ + if (cs->tcb.so.ts.ecr) { + rtt = tms - cs->tcb.so.ts.ecr; + rto_estimate(&cs->tcb, rtt); + } else + cs->tcb.snd.rto = TCP_RTO_DEFAULT; + + /* copy streams type. */ + cs->s.type = ps->s.type; + + /* retrive and cache destination information. */ + rc = stream_fill_dest(cs); + if (rc != 0) + return rc; + + /* update snd.mss with SMSS value */ + cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst); + + /* setup congestion variables */ + cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss); + cs->tcb.snd.ssthresh = cs->tcb.snd.wnd; + + cs->tcb.state = TCP_ST_ESTABLISHED; + + /* add stream to the table */ + cs->ste = stbl_add_stream(st, pi, cs); + if (cs->ste == NULL) + return -ENOBUFS; + + cs->tcb.uop |= TCP_OP_ACCEPT; + tcp_stream_up(cs); + return 0; +} + + +/* + * ACK for new connection request arrived. + * Check that the packet meets all conditions and try to open a new stream. + * returns: + * < 0 - invalid packet + * == 0 - packet is valid and new stream was opened for it. + * > 0 - packet is valid, but failed to open new stream. + */ +static inline int +rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, + const union pkt_info *pi, const union seg_info *si, + uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp) +{ + int32_t rc; + struct tle_ctx *ctx; + struct tle_stream *ts; + struct tle_tcp_stream *cs; + struct syn_opts so; + + *csp = NULL; + + if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0) + return -EINVAL; + + rc = restore_syn_opt(&so, pi, si, tms, mb); + if (rc < 0) + return rc; + + ctx = s->s.ctx; + + /* allocate new stream */ + ts = get_stream(ctx); + cs = TCP_STREAM(ts); + if (ts == NULL) + return ENFILE; + + /* prepare stream to handle new connection */ + if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) { + + /* put new stream in the accept queue */ + if (rte_ring_enqueue_burst(s->rx.q, + (void * const *)&ts, 1) == 1) { + *csp = cs; + return 0; + } + + /* cleanup on failure */ + tcp_stream_down(cs); + stbl_del_pkt(st, cs->ste, pi); + cs->ste = NULL; + } + + tcp_stream_reset(ctx, cs); + return ENOBUFS; +} + +static inline int data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen, uint32_t *seqn, uint32_t *plen) { @@ -1592,13 +1693,34 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts, } static inline uint32_t +rx_new_stream(struct tle_tcp_stream *s, uint32_t ts, + const union pkt_info *pi, const union seg_info si[], + struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[], + uint32_t num) +{ + uint32_t i; + + if (rwl_acquire(&s->rx.use) > 0) { + i = rx_stream(s, ts, pi, si, mb, rp, rc, num); + rwl_release(&s->rx.use); + return i; + } + + for (i = 0; i != num; i++) { + rc[i] = ENOENT; + rp[i] = mb[i]; + } + return 0; +} + +static inline uint32_t rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, const union pkt_info pi[], const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[], uint32_t num) { - struct tle_tcp_stream *s; - uint32_t i, k, state; + struct tle_tcp_stream *cs, *s; + uint32_t i, k, n, state; int32_t ret; s = rx_obtain_stream(dev, st, &pi[0], type); @@ -1616,25 +1738,51 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, if (state == TCP_ST_LISTEN) { /* one connection per flow */ - ret = EINVAL; - for (i = 0; i != num && ret != 0; i++) { - ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i]); - if (ret != 0) { - rc[k] = -ret; - rp[k] = mb[i]; - k++; - } - } - /* duplicate SYN requests */ - for (; i != num; i++, k++) { - rc[k] = EINVAL; + cs = NULL; + ret = -EINVAL; + for (i = 0; i != num; i++) { + + ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs); + + /* valid packet encountered */ + if (ret >= 0) + break; + + /* invalid packet, keep trying to find a proper one */ + rc[k] = -ret; rp[k] = mb[i]; + k++; } - if (k != num && s->rx.ev != NULL) - tle_event_raise(s->rx.ev); - else if (s->rx.cb.func != NULL && rte_ring_count(s->rx.q) == 1) - s->rx.cb.func(s->rx.cb.data, &s->s); + /* packet is valid, but we are out of streams to serve it */ + if (ret > 0) { + for (; i != num; i++, k++) { + rc[k] = ret; + rp[k] = mb[i]; + } + /* new stream is accepted */ + } else if (ret == 0) { + + /* inform listen stream about new connections */ + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + else if (s->rx.cb.func != NULL && + rte_ring_count(s->rx.q) == 1) + s->rx.cb.func(s->rx.cb.data, &s->s); + + /* if there is no data, drop current packet */ + if (PKT_L4_PLEN(mb[i]) == 0) { + rc[k] = ENODATA; + rp[k++] = mb[i++]; + } + + /* process remaining packets for that stream */ + if (num != i) { + n = rx_new_stream(cs, ts, pi + i, si + i, + mb + i, rp + k, rc + k, num - i); + k += num - n - i; + } + } } else { i = rx_stream(s, ts, pi, si, mb, rp, rc, num); @@ -1761,23 +1909,17 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], } uint16_t -tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[], +tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[], uint32_t num) { - uint32_t i, n; + uint32_t n; struct tle_tcp_stream *s; - struct stbl_entry *se[num]; s = TCP_STREAM(ts); - n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)se, num); + n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num); if (n == 0) return 0; - for (i = 0; i != n; i++) { - rq[i].pkt = stbl_get_pkt(se[i]); - rq[i].opaque = se[i]; - } - /* * if we still have packets to read, * then rearm stream RX event. @@ -1791,206 +1933,6 @@ tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[], return n; } -static inline int -stream_fill_dest(struct tle_tcp_stream *s) -{ - int32_t rc; - const void *da; - - if (s->s.type == TLE_V4) - da = &s->s.ipv4.addr.src; - else - da = &s->s.ipv6.addr.src; - - rc = stream_get_dest(&s->s, da, &s->tx.dst); - return (rc < 0) ? rc : 0; -} - -/* - * helper function, prepares an accepted stream. - */ -static int -accept_fill_stream(struct tle_tcp_stream *ps, struct tle_tcp_stream *cs, - const struct tle_tcp_accept_param *prm, uint32_t tms, - const union pkt_info *pi, const union seg_info *si) -{ - int32_t rc; - uint32_t rtt; - - /* some TX still pending for that stream. */ - if (TCP_STREAM_TX_PENDING(cs)) - return -EAGAIN; - - /* setup L4 ports and L3 addresses fields. */ - cs->s.port.raw = pi->port.raw; - cs->s.pmsk.raw = UINT32_MAX; - - if (pi->tf.type == TLE_V4) { - cs->s.ipv4.addr = pi->addr4; - cs->s.ipv4.mask.src = INADDR_NONE; - cs->s.ipv4.mask.dst = INADDR_NONE; - } else if (pi->tf.type == TLE_V6) { - cs->s.ipv6.addr = *pi->addr6; - rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none, - sizeof(cs->s.ipv6.mask.src)); - rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none, - sizeof(cs->s.ipv6.mask.dst)); - } - - /* setup TCB */ - sync_fill_tcb(&cs->tcb, si, prm->syn.pkt); - cs->tcb.rcv.wnd = cs->rx.q->prod.mask << cs->tcb.rcv.wscale; - - /* setup stream notification menchanism */ - cs->rx.ev = prm->cfg.recv_ev; - cs->rx.cb = prm->cfg.recv_cb; - cs->tx.ev = prm->cfg.send_ev; - cs->tx.cb = prm->cfg.send_cb; - cs->err.ev = prm->cfg.err_ev; - cs->err.cb = prm->cfg.err_cb; - - /* store other params */ - cs->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries : - TLE_TCP_DEFAULT_RETRIES; - - /* - * estimate the rto - * for now rtt is calculated based on the tcp TMS option, - * later add real-time one - */ - if (cs->tcb.so.ts.ecr) { - rtt = tms - cs->tcb.so.ts.ecr; - rto_estimate(&cs->tcb, rtt); - } else - cs->tcb.snd.rto = TCP_RTO_DEFAULT; - - tcp_stream_up(cs); - - /* copy streams type. */ - cs->s.type = ps->s.type; - - /* retrive and cache destination information. */ - rc = stream_fill_dest(cs); - if (rc != 0) - return rc; - - /* update snd.mss with SMSS value */ - cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst); - - /* setup congestion variables */ - cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss); - cs->tcb.snd.ssthresh = cs->tcb.snd.wnd; - - cs->tcb.state = TCP_ST_ESTABLISHED; - cs->tcb.uop |= TCP_OP_ACCEPT; - - /* add stream to the table */ - cs->ste = prm->syn.opaque; - rte_smp_wmb(); - cs->ste->data = cs; - return 0; -} - -/* - * !!! - * Right now new stream rcv.wnd is set to zero. - * That simplifies handling of new connection establishment - * (as no data segments could be received), - * but has to be addressed. - * possible ways: - * - send ack after accept creates new stream with new rcv.wnd value. - * the problem with that approach that single ack is not delivered - * reliably (could be lost), plus might slowdown connection establishment - * (extra packet per connection, that client has to wait for). - * - allocate new stream at ACK recieve stage. - * As a drawback - whole new stream allocation/connection establishment - * will be done in BE. - * !!! - */ -int -tle_tcp_stream_accept(struct tle_stream *ts, - const struct tle_tcp_accept_param prm[], struct tle_stream *rs[], - uint32_t num) -{ - struct tle_tcp_stream *cs, *s; - struct tle_ctx *ctx; - uint32_t i, j, n, tms; - int32_t rc; - union pkt_info pi[num]; - union seg_info si[num]; - - tms = tcp_get_tms(); - s = TCP_STREAM(ts); - - for (i = 0; i != num; i++) - get_pkt_info(prm[i].syn.pkt, &pi[i], &si[i]); - - /* mark stream as not closable */ - if (rwl_acquire(&s->rx.use) < 0) - return -EINVAL; - - ctx = s->s.ctx; - n = get_streams(ctx, rs, num); - - rc = 0; - for (i = 0; i != n; i++) { - - /* prepare new stream */ - cs = TCP_STREAM(rs[i]); - rc = accept_fill_stream(s, cs, prm + i, tms, pi + i, si + i); - if (rc != 0) - break; - } - - rwl_release(&s->rx.use); - - /* free 'SYN' mbufs. */ - for (j = 0; j != i; j++) - rte_pktmbuf_free(prm[j].syn.pkt); - - /* close failed stream, put unused streams back to the free list. */ - if (rc != 0) { - tle_tcp_stream_close(rs[i]); - for (j = i + 1; j != n; j++) { - cs = TCP_STREAM(rs[j]); - put_stream(ctx, rs[j], TCP_STREAM_TX_PENDING(cs)); - } - rte_errno = -rc; - - /* not enough streams are available */ - } else if (n != num) - rte_errno = ENFILE; - - return i; -} - -/* - * !!! implement a proper one, or delete !!! - * need to make sure no race conditions with add/lookup stream table. - */ -void -tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[], - uint32_t num) -{ - uint32_t i; - struct rte_mbuf *mb; - struct stbl *st; - union pkt_info pi; - union seg_info si; - - st = CTX_TCP_STLB(s->ctx); - - for (i = 0; i != num; i++) { - mb = rq[i].pkt; - get_pkt_info(mb, &pi, &si); - if (pi.tf.type < TLE_VNUM) - stbl_del_pkt_lock(st, rq[i].opaque, &pi); - - /* !!! send RST pkt to the peer !!! */ - rte_pktmbuf_free(mb); - } -} - uint16_t tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num) { @@ -2121,7 +2063,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) s->tcb.rcv.mss = s->tcb.so.mss; s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT; - s->tcb.rcv.wnd = s->rx.q->prod.mask << s->tcb.rcv.wscale; + s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale); s->tcb.rcv.ts = 0; /* add the stream in stream table */ |