From 78c896b3b3127515478090c19447e27dc406427e Mon Sep 17 00:00:00 2001 From: Jianfeng Tan Date: Mon, 18 Nov 2019 06:59:50 +0000 Subject: TLDKv2 Signed-off-by: Jianfeng Tan Signed-off-by: Jielong Zhou Signed-off-by: Jian Zhang Signed-off-by: Chen Zhao Change-Id: I55c39de4c6cd30f991f35631eb507f770230f08e --- lib/libtle_l4p/tcp_rxtx.c | 1445 +++++++++++++++++++++++++++++++-------------- 1 file changed, 996 insertions(+), 449 deletions(-) (limited to 'lib/libtle_l4p/tcp_rxtx.c') diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c index a519645..5d7e0d1 100644 --- a/lib/libtle_l4p/tcp_rxtx.c +++ b/lib/libtle_l4p/tcp_rxtx.c @@ -28,8 +28,30 @@ #include "tcp_rxq.h" #include "tcp_txq.h" #include "tcp_tx_seg.h" +#include "tcp_rxtx.h" -#define TCP_MAX_PKT_SEG 0x20 +/* Uncomment below line to debug cwnd */ +// #define DEBUG_CWND + +#ifdef DEBUG_CWND +#define CWND_INFO(msg, value) printf("CWND: %s: %d\n", msg, value) +#else +#define CWND_INFO(msg, value) do {} while (0) +#endif + +#define TCP_MAX_PKT_SEG 0x20 +#define DELAY_ACK_CHECK_INTERVAL 100 + +/* must larger than l2_len(14)+l3_len(20)+l4_len(20)+tms_option(12) */ +#define RESERVE_HEADER_LEN 128 + +/* If we encounter exhaustion of recv win, we set this thresh to + * update recv win to the remote. It's not set to 1 or some smaller + * value to avoid too-frequent update. + */ +#define RECV_WIN_NOTIFY_THRESH 64 + +static inline int stream_fill_dest(struct tle_tcp_stream *s); /* * checks if input TCP ports and IP addresses match given stream. @@ -54,11 +76,17 @@ rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi) static inline struct tle_tcp_stream * rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi, - uint32_t type) + uint32_t type, uint8_t reuse) { struct tle_tcp_stream *s; - s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst]; + if (type == TLE_V4) + s = bhash_lookup4(dev->ctx->bhash[type], + pi->addr4.dst, pi->port.dst, reuse); + else + s = bhash_lookup6(dev->ctx->bhash[type], + pi->addr6->dst, pi->port.dst, reuse); + if (s == NULL || tcp_stream_acquire(s) < 0) return NULL; @@ -77,10 +105,10 @@ rx_obtain_stream(const struct tle_dev *dev, struct stbl *st, { struct tle_tcp_stream *s; - s = stbl_find_data(st, pi); + s = TCP_STREAM(stbl_find_stream(st, pi)); if (s == NULL) { - if (pi->tf.flags == TCP_FLAG_ACK) - return rx_obtain_listen_stream(dev, pi, type); + if (pi->tf.flags & TCP_FLAG_ACK) + return rx_obtain_listen_stream(dev, pi, type, 1); return NULL; } @@ -150,131 +178,6 @@ pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num) return i; } -static inline void -stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[], - uint32_t nb_drb) -{ - _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb); -} - -static inline uint32_t -stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[], - uint32_t nb_drb) -{ - return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb); -} - -static inline uint32_t -get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st) -{ - uint32_t pid; - rte_atomic32_t *pa; - - pa = &dev->tx.packet_id[type]; - - if (st == 0) { - pid = rte_atomic32_add_return(pa, num); - return pid - num; - } else { - pid = rte_atomic32_read(pa); - rte_atomic32_set(pa, pid + num); - return pid; - } -} - -static inline void -fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port, - uint32_t seq, uint8_t hlen, uint8_t flags) -{ - uint16_t wnd; - - l4h->src_port = port.dst; - l4h->dst_port = port.src; - - wnd = (flags & TCP_FLAG_SYN) ? - RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) : - tcb->rcv.wnd >> tcb->rcv.wscale; - - /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */ - l4h->sent_seq = rte_cpu_to_be_32(seq); - l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt); - l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET; - l4h->tcp_flags = flags; - l4h->rx_win = rte_cpu_to_be_16(wnd); - l4h->cksum = 0; - l4h->tcp_urp = 0; - - if (flags & TCP_FLAG_SYN) - fill_syn_opts(l4h + 1, &tcb->so); - else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0) - fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts); -} - -static inline int -tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s, - const struct tle_dest *dst, uint64_t ol_flags, - union l4_ports port, uint32_t seq, uint32_t flags, - uint32_t pid, uint32_t swcsm) -{ - uint32_t l4, len, plen; - struct tcp_hdr *l4h; - char *l2h; - - len = dst->l2_len + dst->l3_len; - plen = m->pkt_len; - - if (flags & TCP_FLAG_SYN) - l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX; - else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0) - l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS; - else - l4 = sizeof(*l4h); - - /* adjust mbuf to put L2/L3/L4 headers into it. */ - l2h = rte_pktmbuf_prepend(m, len + l4); - if (l2h == NULL) - return -EINVAL; - - /* copy L2/L3 header */ - rte_memcpy(l2h, dst->hdr, len); - - /* setup TCP header & options */ - l4h = (struct tcp_hdr *)(l2h + len); - fill_tcph(l4h, &s->tcb, port, seq, l4, flags); - - /* setup mbuf TX offload related fields. */ - m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0); - m->ol_flags |= ol_flags; - - /* update proto specific fields. */ - - if (s->s.type == TLE_V4) { - struct ipv4_hdr *l3h; - l3h = (struct ipv4_hdr *)(l2h + dst->l2_len); - l3h->packet_id = rte_cpu_to_be_16(pid); - l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4); - - if ((ol_flags & PKT_TX_TCP_CKSUM) != 0) - l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len, - ol_flags); - else if (swcsm != 0) - l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h); - - if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0) - l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len); - } else { - struct ipv6_hdr *l3h; - l3h = (struct ipv6_hdr *)(l2h + dst->l2_len); - l3h->payload_len = rte_cpu_to_be_16(plen + l4); - if ((ol_flags & PKT_TX_TCP_CKSUM) != 0) - l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags); - else if (swcsm != 0) - l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h); - } - - return 0; -} - /* * That function supposed to be used only for data packets. * Assumes that L2/L3/L4 headers and mbuf fields already setup properly. @@ -355,6 +258,9 @@ tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num) i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m, num, drb, &nb); + if (i > 0) + timer_stop(s, TIMER_DACK); + /* free unused drbs. */ if (nb != 0) stream_drb_free(s, drb + nbm - nb, nb); @@ -362,6 +268,113 @@ tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num) return i; } +/* + * case 0: pkt is not split yet, (indicate plen > sl->len) + * case 1: pkt is split, but left packet > sl->len + * case 2: pkt is split, but left packet <= sl->len + */ +static inline struct rte_mbuf * +get_indirect_mbuf(struct tle_tcp_stream *s, + struct rte_mbuf *m, uint32_t *p_plen, + union seqlen *sl, uint32_t type, + uint32_t mss) +{ + uint32_t hdr_len = PKT_L234_HLEN(m), plen, left; + struct rte_mbuf *f, *t; + uint16_t i, nb_segs, adj; + void *hdr; + + if (s->tcb.snd.nxt_pkt) { + f = s->tcb.snd.nxt_pkt; + plen = f->data_len - s->tcb.snd.nxt_offset; + if (f == m) /* 1st segment contains net headers */ + plen -= hdr_len; + } else { + f = m; + plen = f->data_len - hdr_len; + } + + TCP_LOG(DEBUG, "m(%p): pkt_len=%u, nb_segs=%u, sl->len = %u\n", + m, m->pkt_len, m->nb_segs, sl->len); + + nb_segs = 1; + if (sl->len < plen) { + /* Segment split needed: sometimes, cwnd will be reset to + * 1 or 2 mss. In this case, we send part of this seg, and + * record which segment we've sent, and the offset of sent + * data in tcb. + */ + left = plen - sl->len; + plen = sl->len; + s->tcb.snd.nxt_pkt = f; + } else { + left = 0; + t = f->next; + while (t && plen + t->data_len <= sl->len) { + plen += t->data_len; + t = t->next; + nb_segs++; + } + s->tcb.snd.nxt_pkt = t; + } + + struct rte_mbuf *pkts[1 + nb_segs]; + if (rte_pktmbuf_alloc_bulk(s->tx.dst.head_mp, pkts, 1 + nb_segs) < 0) + return NULL; + + rte_pktmbuf_attach(pkts[1], f); + + /* remove bytes in the beginning */ + adj = s->tcb.snd.nxt_offset; + if (f == m) + adj += hdr_len; + if (adj) + rte_pktmbuf_adj(pkts[1], adj); + + /* remove bytes in the end */ + if (left > 0) { + rte_pktmbuf_trim(pkts[1], left); + s->tcb.snd.nxt_offset += plen; + } else + s->tcb.snd.nxt_offset = 0; + + /* attach chaining segment if we have */ + for (i = 1, t = f->next; i < nb_segs; ++i) { + rte_pktmbuf_attach(pkts[i+1], t); + pkts[i]->next = pkts[i+1]; + t = t->next; + } + + /* prepare l2/l3/l4 header */ + hdr = rte_pktmbuf_append(pkts[0], hdr_len); + rte_memcpy(hdr, rte_pktmbuf_mtod(m, void *), hdr_len); + pkts[0]->nb_segs = nb_segs + 1; + pkts[0]->pkt_len = plen + hdr_len; + pkts[0]->ol_flags = m->ol_flags; + pkts[0]->tx_offload = m->tx_offload; + if (type == TLE_V4) { + struct ipv4_hdr *l3h; + + l3h = rte_pktmbuf_mtod_offset(pkts[0], + struct ipv4_hdr *, m->l2_len); + l3h->total_length = + rte_cpu_to_be_16(plen + m->l3_len + m->l4_len); + } else { + struct ipv6_hdr *l3h; + + l3h = rte_pktmbuf_mtod_offset(pkts[0], + struct ipv6_hdr *, m->l2_len); + l3h->payload_len = + rte_cpu_to_be_16(plen + m->l4_len); + } + if (plen <= mss) + pkts[0]->ol_flags &= ~PKT_TX_TCP_SEG; + pkts[0]->next = pkts[1]; + + *p_plen = plen; + return pkts[0]; +} + static inline uint32_t tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[], uint32_t num) @@ -371,11 +384,13 @@ tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[], struct rte_mbuf *mb; struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG]; + /* check stream has drb to send pkts */ + if (stream_drb_empty(s)) + return 0; + mss = s->tcb.snd.mss; type = s->s.type; - dev = s->tx.dst.dev; - pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0); k = 0; tn = 0; @@ -383,26 +398,64 @@ tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[], for (i = 0; i != num && sl->len != 0 && fail == 0; i++) { mb = mi[i]; - sz = RTE_MIN(sl->len, mss); plen = PKT_L4_PLEN(mb); /*fast path, no need to use indirect mbufs. */ - if (plen <= sz) { - + if (s->tcb.snd.nxt_pkt == NULL && plen <= sl->len) { + pid = get_ip_pid(dev, calc_seg_cnt(plen, s->tcb.snd.mss), + type, (s->flags & TLE_CTX_FLAG_ST) != 0); /* update pkt TCP header */ - tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i); + tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid); /* keep mbuf till ACK is received. */ rte_pktmbuf_refcnt_update(mb, 1); sl->len -= plen; sl->seq += plen; mo[k++] = mb; - /* remaining snd.wnd is less them MSS, send nothing */ - } else if (sz < mss) + if (sl->seq <= s->tcb.snd.rcvr) + TCP_INC_STATS(TCP_MIB_RETRANSSEGS); + /* remaining snd.wnd is less than MSS, send nothing */ + } else if (sl->len < mss) { + break; + /* some data to send already */ + } else if (k != 0 || tn != 0) { break; /* packet indirection needed */ - else - RTE_VERIFY(0); + } else { + struct rte_mbuf *out; + + out = get_indirect_mbuf(s, mb, &plen, sl, type, mss); + if (out == NULL) + return 0; + + pid = get_ip_pid(dev, calc_seg_cnt(plen, s->tcb.snd.mss), + type, (s->flags & TLE_CTX_FLAG_ST) != 0); + /* update pkt TCP header */ + tcp_update_mbuf(out, type, &s->tcb, sl->seq, pid); + + /* no need to bump refcnt !!! */ + + sl->len -= plen; + sl->seq += plen; + + if (tx_data_pkts(s, &out, 1) == 0) { + /* should not happen, we have checked at least one + * drb is available to send this mbuf + */ + rte_pktmbuf_free(out); + return 0; + } + + if (sl->seq <= s->tcb.snd.rcvr) + TCP_INC_STATS(TCP_MIB_RETRANSSEGS); + + if (s->tcb.snd.nxt_pkt) + return 0; + else { + tn = 1; + continue; + } + } if (k >= MAX_PKT_BURST) { n = tx_data_pkts(s, mo, k); @@ -466,14 +519,17 @@ tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms) tcp_txq_set_nxt_head(s, n); } while (n == num); - s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt; + if (sl.seq != (uint32_t)s->tcb.snd.nxt) { + s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt; + s->tcb.snd.ack = s->tcb.rcv.nxt; + } return tn; } static inline void free_una_data(struct tle_tcp_stream *s, uint32_t len) { - uint32_t i, num, plen; + uint32_t i, num, plen, una_data; struct rte_mbuf **mi; plen = 0; @@ -487,14 +543,18 @@ free_una_data(struct tle_tcp_stream *s, uint32_t len) /* free acked data */ for (i = 0; i != num && plen != len; i++) { - uint32_t next_pkt_len = PKT_L4_PLEN(mi[i]); - if (plen + next_pkt_len > len) { - /* keep SND.UNA at the start of the packet */ - len = plen; + una_data = PKT_L4_PLEN(mi[i]) - s->tcb.snd.una_offset; + + /* partial ack */ + if (plen + una_data > len) { + s->tcb.snd.una_offset += len - plen; + plen = len; break; - } else { - plen += next_pkt_len; } + + /* monolithic ack */ + s->tcb.snd.una_offset = 0; + plen += una_data; rte_pktmbuf_free(mi[i]); } @@ -503,6 +563,7 @@ free_una_data(struct tle_tcp_stream *s, uint32_t len) } while (plen < len); s->tcb.snd.una += len; + s->tcb.snd.waitlen -= len; /* * that could happen in case of retransmit, @@ -519,7 +580,7 @@ calc_smss(uint16_t mss, const struct tle_dest *dst) { uint16_t n; - n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK; + n = dst->mtu - dst->l3_len - sizeof(struct tcp_hdr); mss = RTE_MIN(n, mss); return mss; } @@ -537,71 +598,53 @@ initial_cwnd(uint32_t smss, uint32_t icw) return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw)); } -/* - * queue standalone packet to he particular output device - * It assumes that: - * - L2/L3/L4 headers should be already set. - * - packet fits into one segment. - */ -static inline int -send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m) +void +tle_tcp_stream_kill(struct tle_stream *ts) { - uint32_t n, nb; - struct tle_drb *drb; - - if (stream_drb_alloc(s, &drb, 1) == 0) - return -ENOBUFS; - - /* enqueue pkt for TX. */ - nb = 1; - n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1, - &drb, &nb); - - /* free unused drbs. */ - if (nb != 0) - stream_drb_free(s, &drb, 1); - - return (n == 1) ? 0 : -ENOBUFS; -} + struct tle_tcp_stream *s; -static inline int -send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq, - uint32_t flags) -{ - const struct tle_dest *dst; - uint32_t pid, type; - int32_t rc; + s = TCP_STREAM(ts); + if (ts == NULL || s->s.type >= TLE_VNUM) + return; - dst = &s->tx.dst; - type = s->s.type; - pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0); + if (s->tcb.state > TCP_ST_LISTEN) + send_rst(s, s->tcb.snd.nxt); - rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1); - if (rc == 0) - rc = send_pkt(s, dst->dev, m); + if (s->tcb.state == TCP_ST_ESTABLISHED) + TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB); - return rc; + s->tcb.state = TCP_ST_CLOSED; + rte_smp_wmb(); + timer_stop(s, TIMER_RTO); } static inline int -send_rst(struct tle_tcp_stream *s, uint32_t seq) +send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags) { struct rte_mbuf *m; + uint32_t seq; int32_t rc; m = rte_pktmbuf_alloc(s->tx.dst.head_mp); if (m == NULL) return -ENOMEM; - rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST); - if (rc != 0) + seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0); + s->tcb.snd.ts = tms; + + rc = send_ctrl_pkt(s, m, seq, flags); + if (rc != 0) { rte_pktmbuf_free(m); + return rc; + } - return rc; + timer_stop(s, TIMER_DACK); + s->tcb.snd.ack = s->tcb.rcv.nxt; + return 0; } static inline int -send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags) +send_keepalive(struct tle_tcp_stream *s) { struct rte_mbuf *m; uint32_t seq; @@ -611,20 +654,16 @@ send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags) if (m == NULL) return -ENOMEM; - seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0); - s->tcb.snd.ts = tms; + seq = s->tcb.snd.una - 1; - rc = send_ctrl_pkt(s, m, seq, flags); + rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_ACK); if (rc != 0) { rte_pktmbuf_free(m); return rc; } - - s->tcb.snd.ack = s->tcb.rcv.nxt; return 0; } - static int sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi, const union seg_info *si, uint32_t ts, struct rte_mbuf *m) @@ -633,19 +672,23 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi, int32_t rc; uint32_t pid, seq, type; struct tle_dev *dev; - const void *da; + const void *sa, *da; struct tle_dest dst; const struct tcp_hdr *th; - type = s->s.type; + type = pi->tf.type; /* get destination information. */ - if (type == TLE_V4) + if (type == TLE_V4) { da = &pi->addr4.src; - else + sa = &pi->addr4.dst; + } + else { da = &pi->addr6->src; + sa = &pi->addr6->dst; + } - rc = stream_get_dest(&s->s, da, &dst); + rc = stream_get_dest(type, &s->s, sa, da, &dst); if (rc < 0) return rc; @@ -654,11 +697,16 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi, get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th)); s->tcb.rcv.nxt = si->seq + 1; + s->tcb.rcv.cpy = si->seq + 1; seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss, s->s.ctx->prm.hash_alg, &s->s.ctx->prm.secret_key); - s->tcb.so.ts.ecr = s->tcb.so.ts.val; - s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale); + + if (s->tcb.so.ts.raw) { + s->tcb.so.ts.ecr = s->tcb.so.ts.val; + s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale); + } + s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ? TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT; s->tcb.so.mss = calc_smss(dst.mtu, &dst); @@ -672,11 +720,13 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi, dev = dst.dev; pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0); - rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq, - TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1); + rc = tcp_fill_mbuf(m, s, &dst, TCP_OLFLAGS_CKSUM(dst.ol_flags), + pi->port, seq, TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1); if (rc == 0) rc = send_pkt(s, dev, m); + TCP_INC_STATS(TCP_MIB_PASSIVEOPENS); + return rc; } @@ -800,43 +850,24 @@ restore_syn_opt(union seg_info *si, union tsopt *to, return 0; } -static inline void -stream_term(struct tle_tcp_stream *s) -{ - struct sdr *dr; - - s->tcb.state = TCP_ST_CLOSED; - rte_smp_wmb(); - - timer_stop(s); - - /* close() was already invoked, schedule final cleanup */ - if ((s->tcb.uop & TCP_OP_CLOSE) != 0) { - - dr = CTX_TCP_SDR(s->s.ctx); - STAILQ_INSERT_TAIL(&dr->be, &s->s, link); - - /* notify user that stream need to be closed */ - } else if (s->err.ev != NULL) - tle_event_raise(s->err.ev); - else if (s->err.cb.func != NULL) - s->err.cb.func(s->err.cb.data, &s->s); -} - static inline int stream_fill_dest(struct tle_tcp_stream *s) { int32_t rc; uint32_t type; - const void *da; + const void *sa, *da; - type = s->s.type; - if (type == TLE_V4) + type = s->s.type; + if (type == TLE_V4) { + sa = &s->s.ipv4.addr.dst; da = &s->s.ipv4.addr.src; - else + } + else { + sa = &s->s.ipv6.addr.dst; da = &s->s.ipv6.addr.src; + } - rc = stream_get_dest(&s->s, da, &s->tx.dst); + rc = stream_get_dest(type, &s->s, sa, da, &s->tx.dst); return (rc < 0) ? rc : 0; } @@ -851,19 +882,17 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, int32_t rc; uint32_t rtt; - /* some TX still pending for that stream. */ - if (TCP_STREAM_TX_PENDING(cs)) - return -EAGAIN; - /* setup L4 ports and L3 addresses fields. */ cs->s.port.raw = pi->port.raw; cs->s.pmsk.raw = UINT32_MAX; if (pi->tf.type == TLE_V4) { + cs->s.type = TLE_V4; cs->s.ipv4.addr = pi->addr4; cs->s.ipv4.mask.src = INADDR_NONE; cs->s.ipv4.mask.dst = INADDR_NONE; } else if (pi->tf.type == TLE_V6) { + cs->s.type = TLE_V6; cs->s.ipv6.addr = *pi->addr6; rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none, sizeof(cs->s.ipv6.mask.src)); @@ -887,7 +916,7 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, cs->tcb.snd.rto = TCP_RTO_DEFAULT; /* copy streams type & flags. */ - cs->s.type = ps->s.type; + cs->s.type = pi->tf.type; cs->flags = ps->flags; /* retrive and cache destination information. */ @@ -897,16 +926,23 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, /* update snd.mss with SMSS value */ cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst); + if (cs->tcb.so.ts.raw != 0) { + cs->tcb.snd.mss -= TCP_TX_OPT_LEN_TMS; + } /* setup congestion variables */ cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd); + CWND_INFO("accept", cs->tcb.snd.cwnd); + cs->tcb.snd.ssthresh = cs->tcb.snd.wnd; cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw; + cs->tcb.snd.rto_fw = ps->tcb.snd.rto_fw; cs->tcb.state = TCP_ST_ESTABLISHED; + TCP_INC_STATS_ATOMIC(TCP_MIB_CURRESTAB); /* add stream to the table */ - cs->ste = stbl_add_stream(st, pi, cs); + cs->ste = stbl_add_stream(st, &cs->s); if (cs->ste == NULL) return -ENOBUFS; @@ -937,7 +973,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, *csp = NULL; - if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0) + if ((pi->tf.flags & TCP_FLAG_ACK) == 0|| rx_check_stream(s, pi) != 0) return -EINVAL; ctx = s->s.ctx; @@ -964,7 +1000,8 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, /* cleanup on failure */ tcp_stream_down(cs); - stbl_del_stream(st, cs->ste, cs, 0); + TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB); + stbl_del_stream(st, cs->ste, &cs->s); cs->ste = NULL; } @@ -982,6 +1019,10 @@ data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf **mb, uint32_t hlen, len = *plen; rte_pktmbuf_adj(*mb, hlen); + /* header is removed, so we clear tx_offload here to make sure + * we can get correct payload length with PKT_L4_PLEN. + */ + (*mb)->tx_offload = 0; if (len == 0) return -ENODATA; /* cut off the start of the packet */ @@ -1018,7 +1059,8 @@ rx_ackdata(struct tle_tcp_stream *s, uint32_t ack) tle_event_raise(s->tx.ev); else if (k == 0 && s->tx.cb.func != NULL) s->tx.cb.func(s->tx.cb.data, &s->s); - } + } else + txs_enqueue(s->s.ctx, s); } return n; @@ -1029,8 +1071,7 @@ stream_timewait(struct tle_tcp_stream *s, uint32_t rto) { if (rto != 0) { s->tcb.state = TCP_ST_TIME_WAIT; - s->tcb.snd.rto = rto; - timer_reset(s); + timer_reset(s, TIMER_RTO, rto); } else stream_term(s); } @@ -1041,20 +1082,30 @@ rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp) uint32_t state; int32_t ackfin; + s->tcb.rcv.frs.on = 2; s->tcb.rcv.nxt += 1; ackfin = (s->tcb.snd.una == s->tcb.snd.fss); state = s->tcb.state; if (state == TCP_ST_ESTABLISHED) { + TCP_DEC_STATS_ATOMIC(TCP_MIB_CURRESTAB); s->tcb.state = TCP_ST_CLOSE_WAIT; /* raise err.ev & err.cb */ - if (s->err.ev != NULL) + /* raise error event only when recvbuf is empty, to inform + * that the stream will not receive data any more. + */ + if (rte_ring_count(s->rx.q) == 0 && s->err.ev != NULL) tle_event_raise(s->err.ev); else if (s->err.cb.func != NULL) s->err.cb.func(s->err.cb.data, &s->s); } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) { rsp->flags |= TCP_FLAG_ACK; + + /* shutdown instead of close happens */ + if (rte_ring_count(s->rx.q) == 0 && s->err.ev != NULL) + tle_event_raise(s->err.ev); + if (ackfin != 0) stream_timewait(s, s->tcb.snd.rto_tw); else @@ -1089,8 +1140,10 @@ rx_fin(struct tle_tcp_stream *s, uint32_t state, ts = rx_tms_opt(&s->tcb, mb); ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts); - if (ret != 0) + if (ret != 0) { + rsp->flags |= TCP_FLAG_ACK; return ret; + } if (state < TCP_ST_ESTABLISHED) return -EINVAL; @@ -1108,9 +1161,10 @@ rx_fin(struct tle_tcp_stream *s, uint32_t state, * fast-path: all data & FIN was already sent out * and now is acknowledged. */ - if (s->tcb.snd.fss == s->tcb.snd.nxt && - si->ack == (uint32_t)s->tcb.snd.nxt) { + if (s->tcb.snd.fss >= s->tcb.snd.nxt && + si->ack == (uint32_t)s->tcb.snd.fss) { s->tcb.snd.una = s->tcb.snd.fss; + s->tcb.snd.nxt = s->tcb.snd.una; empty_tq(s); /* conventional ACK processiing */ } else @@ -1148,8 +1202,25 @@ rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags, else rc = check_seqn(&s->tcb, si->seq, 0); - if (rc == 0) + if (rc == 0) { + /* receive rst, connection is closed abnormal + * and should return errno in later operations. + */ + switch (state) { + case TCP_ST_SYN_SENT: + TCP_INC_STATS(TCP_MIB_ATTEMPTFAILS); + s->tcb.err = ECONNREFUSED; + break; + case TCP_ST_CLOSE_WAIT: + s->tcb.err = EPIPE; + break; + case TCP_ST_CLOSED: + return rc; + default: + s->tcb.err = ECONNRESET; + } stream_term(s); + } return rc; } @@ -1222,6 +1293,7 @@ rto_cwnd_update(struct tcb *tcb) * no more than 1 full-sized segment. */ tcb->snd.cwnd = tcb->snd.mss; + CWND_INFO("update", tcb->snd.cwnd); } static inline void @@ -1330,13 +1402,17 @@ rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack, ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack, plen, ts); - if (ret != 0) - break; - /* account for segment received */ ack_info_update(tack, &si[j], ret != 0, plen, ts); + if (ret != 0) + break; + rte_pktmbuf_adj(mb[j], hlen); + /* header is removed, so we clear tx_offload here to make sure + * we can get correct payload length with PKT_L4_PLEN. + */ + mb[j]->tx_offload = 0; } n = j - i; @@ -1377,6 +1453,7 @@ start_fast_retransmit(struct tle_tcp_stream *s) tcp_txq_rst_nxt_head(s); tcb->snd.nxt = tcb->snd.una; tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss; + CWND_INFO("start fast retrans", tcb->snd.cwnd); } static inline void @@ -1389,6 +1466,7 @@ stop_fast_retransmit(struct tle_tcp_stream *s) n = tcb->snd.nxt - tcb->snd.una; tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh, RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss); + CWND_INFO("stop fast retrans", tcb->snd.cwnd); tcb->snd.fastack = 0; } @@ -1415,8 +1493,10 @@ in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num, * during fast recovery, also reset the * retransmit timer. */ - if (tcb->snd.fastack == 1) - timer_reset(s); + if (tcb->snd.fastack == 1) { + timer_reset(s, TIMER_RTO, s->tcb.snd.rto); + s->tcb.snd.nb_retx = 0; + } tcb->snd.fastack += ack_num; return 1; @@ -1456,7 +1536,8 @@ process_ack(struct tle_tcp_stream *s, uint32_t acked, /* remain in normal mode */ } else if (acked != 0) { ack_cwnd_update(&s->tcb, acked, tack); - timer_stop(s); + timer_stop(s, TIMER_RTO); + s->tcb.snd.nb_retx = 0; } /* fast retransmit mode */ @@ -1470,7 +1551,7 @@ process_ack(struct tle_tcp_stream *s, uint32_t acked, } else { /* RFC 5682 3.2.3 full ACK */ stop_fast_retransmit(s); - timer_stop(s); + timer_stop(s, TIMER_RTO); /* if we have another series of dup ACKs */ if (tack->dup3.seg != 0 && @@ -1501,17 +1582,22 @@ rx_ackfin(struct tle_tcp_stream *s) uint32_t state; s->tcb.snd.una = s->tcb.snd.fss; + s->tcb.snd.nxt = s->tcb.snd.una; empty_tq(s); state = s->tcb.state; if (state == TCP_ST_LAST_ACK) stream_term(s); else if (state == TCP_ST_FIN_WAIT_1) { - timer_stop(s); + timer_stop(s, TIMER_RTO); s->tcb.state = TCP_ST_FIN_WAIT_2; - } else if (state == TCP_ST_CLOSING) { + /* if stream is closed, should be released + * before timeout even without fin from peer + */ + if (s->tcb.uop & TCP_OP_CLOSE) + timer_start(s, TIMER_RTO, s->tcb.snd.rto_fw); + } else if (state == TCP_ST_CLOSING) stream_timewait(s, s->tcb.snd.rto_tw); - } } static inline void @@ -1532,7 +1618,7 @@ rx_process_ack(struct tle_tcp_stream *s, uint32_t ts, /* restart RTO timer. */ if (s->tcb.snd.nxt != s->tcb.snd.una) - timer_start(s); + timer_start(s, TIMER_RTO, s->tcb.snd.rto); /* update rto, if fresh packet is here then calculate rtt */ if (tack->ts.ecr != 0) @@ -1554,15 +1640,9 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state, if (state != TCP_ST_SYN_SENT) return -EINVAL; - /* - * RFC 793 3.9: in the SYN-SENT state - * If SEG.ACK =< ISS, or SEG.ACK > SND.NXT, send a reset - * - * and discard the segment. - * The connection remains in the same state. - */ + /* invalid SEG.SEQ */ if (si->ack != (uint32_t)s->tcb.snd.nxt) { - send_rst(s, si->ack); + rsp->flags = TCP_FLAG_RST; return 0; } @@ -1574,18 +1654,25 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state, s->tcb.snd.una = s->tcb.snd.nxt; s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst); + if (s->tcb.so.ts.raw != 0) { + s->tcb.snd.mss -= TCP_TX_OPT_LEN_TMS; + } s->tcb.snd.wnd = si->wnd << so.wscale; s->tcb.snd.wu.wl1 = si->seq; s->tcb.snd.wu.wl2 = si->ack; s->tcb.snd.wscale = so.wscale; + s->tcb.snd.cork_ts = 0; /* setup congestion variables */ s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd); + CWND_INFO("synack", s->tcb.snd.cwnd); + s->tcb.snd.ssthresh = s->tcb.snd.wnd; s->tcb.rcv.ts = so.ts.val; s->tcb.rcv.irs = si->seq; s->tcb.rcv.nxt = si->seq + 1; + s->tcb.rcv.cpy = si->seq + 1; /* if peer doesn't support WSCALE opt, recalculate RCV.WND */ s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ? @@ -1597,9 +1684,14 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state, rsp->flags |= TCP_FLAG_ACK; - timer_stop(s); + timer_stop(s, TIMER_RTO); + s->tcb.snd.nb_retx = 0; s->tcb.state = TCP_ST_ESTABLISHED; rte_smp_wmb(); + TCP_INC_STATS_ATOMIC(TCP_MIB_CURRESTAB); + + if (s->s.option.keepalive) + timer_start(s, TIMER_KEEPALIVE, s->s.option.keepidle * MS_PER_S); if (s->tx.ev != NULL) tle_event_raise(s->tx.ev); @@ -1689,8 +1781,8 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts, * fast-path: all data & FIN was already sent out * and now is acknowledged. */ - if (s->tcb.snd.fss == s->tcb.snd.nxt && - tack.ack == (uint32_t)s->tcb.snd.nxt) + if (s->tcb.snd.fss >= s->tcb.snd.nxt && + tack.ack == (uint32_t)s->tcb.snd.fss) rx_ackfin(s); else rx_process_ack(s, ts, &tack); @@ -1702,27 +1794,44 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts, * - received segment with INO data and no TX is scheduled * for that stream. */ - if (tack.segs.badseq != 0 || tack.segs.ofo != 0 || - (tack.segs.data != 0 && - rte_atomic32_read(&s->tx.arm) == 0)) + if (tack.segs.badseq != 0 || tack.segs.ofo != 0) rsp.flags |= TCP_FLAG_ACK; + else if (tack.segs.data != 0 && + rte_atomic32_read(&s->tx.arm) == 0 && + (s->s.option.tcpquickack || + s->tcb.rcv.nxt - s->tcb.snd.ack > 8 * s->tcb.so.mss)) { + rsp.flags |= TCP_FLAG_ACK; + if (s->s.option.tcpquickack > 0) + s->s.option.tcpquickack--; + } + else if (tack.segs.data && rsp.flags == 0) + timer_start(s, TIMER_DACK, DELAY_ACK_CHECK_INTERVAL); rx_ofo_fin(s, &rsp); k += num - n; i = num; + if (s->s.option.keepalive) { + s->tcb.snd.nb_keepalive = 0; + timer_reset(s, TIMER_KEEPALIVE, s->s.option.keepidle * MS_PER_S); + } /* unhandled state, drop all packets. */ } else i = 0; /* we have a response packet to send. */ - if (rsp.flags != 0) { + if (rsp.flags == TCP_FLAG_RST) { + send_rst(s, si[i].ack); + stream_term(s); + } else if (rsp.flags != 0) { send_ack(s, ts, rsp.flags); /* start the timer for FIN packet */ - if ((rsp.flags & TCP_FLAG_FIN) != 0) - timer_reset(s); + if ((rsp.flags & TCP_FLAG_FIN) != 0) { + timer_reset(s, TIMER_RTO, s->tcb.snd.rto); + s->tcb.snd.nb_retx = 0; + } } /* unprocessed packets */ @@ -1778,7 +1887,6 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, state = s->tcb.state; if (state == TCP_ST_LISTEN) { - /* one connection per flow */ cs = NULL; ret = -EINVAL; @@ -1835,6 +1943,74 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, return num - k; } +static inline void +sync_refuse(struct tle_tcp_stream *s, struct tle_dev *dev, + const union pkt_info *pi, struct rte_mbuf *m) +{ + struct ether_hdr *eth_h; + struct ether_addr eth_addr; + struct ipv4_hdr *ip_h; + uint32_t ip_addr; + struct ipv6_hdr *ipv6_h; + struct in6_addr ipv6_addr; + struct tcp_hdr *th; + uint16_t port; + + /* rst pkt should not contain options for syn */ + rte_pktmbuf_trim(m, m->l4_len - sizeof(*th)); + + eth_h = rte_pktmbuf_mtod(m, struct ether_hdr*); + ether_addr_copy(ð_h->s_addr, ð_addr); + ether_addr_copy(ð_h->d_addr, ð_h->s_addr); + ether_addr_copy(ð_addr, ð_h->d_addr); + + th = rte_pktmbuf_mtod_offset(m, struct tcp_hdr*, + m->l2_len + m->l3_len); + port = th->src_port; + th->src_port = th->dst_port; + th->dst_port = port; + th->tcp_flags = TCP_FLAG_RST | TCP_FLAG_ACK; + th->recv_ack = rte_cpu_to_be_32(rte_be_to_cpu_32(th->sent_seq) + 1); + th->sent_seq = 0; + th->data_off &= 0x0f; + th->data_off |= (sizeof(*th) / 4) << 4; + th->cksum = 0; + + if (pi->tf.type == TLE_V4) { + ip_h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr*, + m->l2_len); + ip_addr = ip_h->src_addr; + ip_h->src_addr = ip_h->dst_addr; + ip_h->dst_addr = ip_addr; + ip_h->total_length = rte_cpu_to_be_16( + rte_be_to_cpu_16(ip_h->total_length) - + (m->l4_len - sizeof(*th))); + ip_h->hdr_checksum = 0; + th->cksum = rte_ipv4_udptcp_cksum(ip_h, th); + ip_h->hdr_checksum = rte_ipv4_cksum(ip_h); + } else { + ipv6_h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr*, + m->l2_len); + rte_memcpy(&ipv6_addr, ipv6_h->src_addr, + sizeof(struct in6_addr)); + rte_memcpy(ipv6_h->src_addr, ipv6_h->dst_addr, + sizeof(struct in6_addr)); + rte_memcpy(ipv6_h->dst_addr, &ipv6_addr, + sizeof(struct in6_addr)); + ipv6_h->payload_len = rte_cpu_to_be_16( + rte_be_to_cpu_16(ipv6_h->payload_len) - + (m->l4_len - sizeof(*th))); + th->cksum = rte_ipv6_udptcp_cksum(ipv6_h, th); + } + + if (m->pkt_len < ETHER_MIN_LEN) + rte_pktmbuf_append(m, ETHER_MIN_LEN - m->pkt_len); + + if (send_pkt(s, dev, m) != 0) + rte_pktmbuf_free(m); + else + TCP_INC_STATS(TCP_MIB_OUTRSTS); +} static inline uint32_t rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts, @@ -1846,20 +2022,35 @@ rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts, uint32_t i, k; int32_t ret; - s = rx_obtain_listen_stream(dev, &pi[0], type); + s = rx_obtain_listen_stream(dev, &pi[0], type, 0); if (s == NULL) { - for (i = 0; i != num; i++) { - rc[i] = ENOENT; - rp[i] = mb[i]; + /* no socket listening this syn, send rst to refuse connect */ + s = TCP_STREAM(get_stream(dev->ctx)); + if (s != NULL) { + sync_refuse(s, dev, &pi[0], mb[0]); + put_stream(dev->ctx, &s->s, 0); + i = 1; + } else { + i = 0; } - return 0; + k = 0; + for (; i != num; i++) { + rc[k] = ENOENT; + rp[k] = mb[i]; + k++; + } + return num - k; } k = 0; for (i = 0; i != num; i++) { - + /* check if stream has space to maintain new connection */ + if (rte_ring_free_count(s->rx.q) == 0 || + (s->s.ctx->streams.nb_free == 0 && + s->s.ctx->streams.nb_cur >= s->s.ctx->prm.max_streams - 1)) + ret = -ENOSPC; /* check that this remote is allowed to connect */ - if (rx_check_stream(s, &pi[i]) != 0) + else if (rx_check_stream(s, &pi[i]) != 0) ret = -ENOENT; else /* syncokie: reply with */ @@ -1882,43 +2073,34 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], { struct stbl *st; struct tle_ctx *ctx; - uint32_t i, j, k, mt, n, t, ts; + uint32_t i, j, k, n, t; + uint64_t ts; union pkt_info pi[num]; union seg_info si[num]; - union { - uint8_t t[TLE_VNUM]; - uint32_t raw; - } stu; + + TCP_ADD_STATS(TCP_MIB_INSEGS, num); ctx = dev->ctx; ts = tcp_get_tms(ctx->cycles_ms_shift); st = CTX_TCP_STLB(ctx); - mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0); - - stu.raw = 0; /* extract packet info and check the L3/L4 csums */ for (i = 0; i != num; i++) { get_pkt_info(pkt[i], &pi[i], &si[i]); - t = pi[i].tf.type; - pi[i].csf = check_pkt_csum(pkt[i], pi[i].csf, t, IPPROTO_TCP); - stu.t[t] = mt; + pi[i].csf = check_pkt_csum(pkt[i], t, IPPROTO_TCP); } - if (stu.t[TLE_V4] != 0) - stbl_lock(st, TLE_V4); - if (stu.t[TLE_V6] != 0) - stbl_lock(st, TLE_V6); - k = 0; for (i = 0; i != num; i += j) { - t = pi[i].tf.type; /*basic checks for incoming packet */ - if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) { + if (t >= TLE_VNUM || pi[i].csf != 0) { + TCP_INC_STATS(TCP_MIB_INERRS); + if (t < TLE_VNUM) + TCP_INC_STATS(TCP_MIB_CSUMERRORS); rc[k] = EINVAL; rp[k] = pkt[i]; j = 1; @@ -1937,11 +2119,6 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], } } - if (stu.t[TLE_V4] != 0) - stbl_unlock(st, TLE_V4); - if (stu.t[TLE_V6] != 0) - stbl_unlock(st, TLE_V6); - return num - k; } @@ -1953,21 +2130,37 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[], struct tle_tcp_stream *s; s = TCP_STREAM(ts); - n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num); - if (n == 0) - return 0; - /* - * if we still have packets to read, - * then rearm stream RX event. - */ - if (n == num && rte_ring_count(s->rx.q) != 0) { - if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) - tle_event_raise(s->rx.ev); + if (tcp_stream_try_acquire(s) > 0) { + if (s->tcb.state != TCP_ST_LISTEN) { + tcp_stream_release(s); + rte_errno = EINVAL; + return 0; + } + + n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num); + if (n == 0) + { + tcp_stream_release(s); + rte_errno = EAGAIN; + return 0; + } + + /* + * if we still have packets to read, + * then rearm stream RX event. + */ + if (n == num && rte_ring_count(s->rx.q) != 0) { + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + } + tcp_stream_release(s); + return n; + } else { tcp_stream_release(s); + rte_errno = EINVAL; + return 0; } - - return n; } uint16_t @@ -1995,6 +2188,7 @@ tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num) stream_drb_free(s, drb + i, j - i); } + TCP_ADD_STATS(TCP_MIB_OUTSEGS, n); return n; } @@ -2010,73 +2204,17 @@ stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi) pi->tf.type = s->s.type; } -static int -stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr) -{ - const struct sockaddr_in *in4; - const struct sockaddr_in6 *in6; - const struct tle_dev_param *prm; - int32_t rc; - - rc = 0; - s->s.pmsk.raw = UINT32_MAX; - - /* setup L4 src ports and src address fields. */ - if (s->s.type == TLE_V4) { - in4 = (const struct sockaddr_in *)addr; - if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0) - return -EINVAL; - - s->s.port.src = in4->sin_port; - s->s.ipv4.addr.src = in4->sin_addr.s_addr; - s->s.ipv4.mask.src = INADDR_NONE; - s->s.ipv4.mask.dst = INADDR_NONE; - - } else if (s->s.type == TLE_V6) { - in6 = (const struct sockaddr_in6 *)addr; - if (memcmp(&in6->sin6_addr, &tle_ipv6_any, - sizeof(tle_ipv6_any)) == 0 || - in6->sin6_port == 0) - return -EINVAL; - - s->s.port.src = in6->sin6_port; - rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr, - sizeof(s->s.ipv6.addr.src)); - rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none, - sizeof(s->s.ipv6.mask.src)); - rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none, - sizeof(s->s.ipv6.mask.dst)); - } - - /* setup the destination device. */ - rc = stream_fill_dest(s); - if (rc != 0) - return rc; - - /* setup L4 dst address from device param */ - prm = &s->tx.dst.dev->prm; - if (s->s.type == TLE_V4) { - if (s->s.ipv4.addr.dst == INADDR_ANY) - s->s.ipv4.addr.dst = prm->local_addr4.s_addr; - } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any, - sizeof(tle_ipv6_any)) == 0) - memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6, - sizeof(s->s.ipv6.addr.dst)); - - return rc; -} - static inline int -tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) +tx_syn(struct tle_tcp_stream *s) { int32_t rc; - uint32_t tms, seq; + uint32_t seq; + uint64_t tms; union pkt_info pi; struct stbl *st; struct stbl_entry *se; - /* fill stream address */ - rc = stream_fill_addr(s, addr); + rc = stream_fill_dest(s); if (rc != 0) return rc; @@ -2107,7 +2245,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) /* add the stream in stream table */ st = CTX_TCP_STLB(s->s.ctx); - se = stbl_add_stream_lock(st, s); + se = stbl_add_stream(st, &s->s); if (se == NULL) return -ENOBUFS; s->ste = se; @@ -2115,6 +2253,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) /* put stream into the to-send queue */ txs_enqueue(s->s.ctx, s); + TCP_INC_STATS(TCP_MIB_ACTIVEOPENS); return 0; } @@ -2147,7 +2286,7 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr) /* fill stream, prepare and transmit syn pkt */ s->tcb.uop |= TCP_OP_CONNECT; - rc = tx_syn(s, addr); + rc = tx_syn(s); tcp_stream_release(s); /* error happened, do a cleanup */ @@ -2160,13 +2299,29 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr) uint16_t tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) { - uint32_t n; + uint32_t n, i; + uint32_t free_slots; struct tle_tcp_stream *s; s = TCP_STREAM(ts); + + free_slots = rte_ring_free_count(s->rx.q); + n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num); - if (n == 0) + if (n == 0) { + if (s->tcb.err != 0) { + rte_errno = s->tcb.err; + } else { + rte_errno = EAGAIN; + } return 0; + } + + for (i = 0; i < n; ++i) + s->tcb.rcv.cpy += rte_pktmbuf_pkt_len(pkt[i]); + + /* update receive window with left recv buffer*/ + s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale); /* * if we still have packets to read, @@ -2176,28 +2331,99 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) tle_event_raise(s->rx.ev); tcp_stream_release(s); + /* if we have received fin, no more data will come, raise err event. */ + } else if (s->tcb.rcv.frs.on == 2) { + if (tcp_stream_try_acquire(s) > 0 && s->err.ev != NULL) + tle_event_raise(s->err.ev); + tcp_stream_release(s); + } + + /* update recv win to the remote */ + if (free_slots < RECV_WIN_NOTIFY_THRESH && + rte_ring_free_count(s->rx.q) >= RECV_WIN_NOTIFY_THRESH) { + s->tcb.snd.update_rcv = true; + txs_enqueue(s->s.ctx, s); } return n; } +uint16_t +tle_tcp_stream_inq(struct tle_stream *ts) +{ + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + return s->tcb.rcv.nxt - s->tcb.rcv.cpy; +} + +#define DECONST(type, var) ((type)(uintptr_t)(const void *)(var)) + +ssize_t +tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, int iovcnt) +{ + struct msghdr msg = {0}; + + msg.msg_iov = DECONST(struct iovec *, iov); /* Recover const later */ + msg.msg_iovlen = iovcnt; + return tle_tcp_stream_recvmsg(ts, &msg); +} + ssize_t -tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, - int iovcnt) +tle_tcp_stream_recvmsg(struct tle_stream *ts, struct msghdr *msg) { + size_t sz; int32_t i; uint32_t mn, n, tn; - size_t sz; + uint32_t free_slots; struct tle_tcp_stream *s; struct iovec iv; struct rxq_objs mo[2]; + struct sockaddr_in *addr; + struct sockaddr_in6 *addr6; + const struct iovec *iov = msg->msg_iov; + int iovcnt = msg->msg_iovlen; s = TCP_STREAM(ts); + free_slots = rte_ring_free_count(s->rx.q); + /* get group of packets */ mn = tcp_rxq_get_objs(s, mo); - if (mn == 0) - return 0; + if (mn == 0) { + if (s->tcb.err != 0) + rte_errno = s->tcb.err; + else + rte_errno = EAGAIN; + return -1; + } + + if (!ts->option.timestamp) + ts->timestamp = mo[0].mb[0]->timestamp; + + if (msg->msg_control != NULL) { + if (ts->option.timestamp) + tle_set_timestamp(msg, mo[0].mb[0]); + else + msg->msg_controllen = 0; + } + + if (msg->msg_name != NULL) { + if (s->s.type == TLE_V4) { + addr = (struct sockaddr_in*)msg->msg_name; + addr->sin_family = AF_INET; + addr->sin_addr.s_addr = s->s.ipv4.addr.src; + addr->sin_port = s->s.port.src; + msg->msg_namelen = sizeof(struct sockaddr_in); + } else { + addr6 = (struct sockaddr_in6*)msg->msg_name; + addr6->sin6_family = AF_INET6; + rte_memcpy(&addr6->sin6_addr, &s->s.ipv6.addr.src, + sizeof(struct sockaddr_in6)); + addr6->sin6_port = s->s.port.src; + msg->msg_namelen = sizeof(struct sockaddr_in6); + } + } sz = 0; n = 0; @@ -2229,6 +2455,8 @@ tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, } tcp_rxq_consume(s, tn); + /* update receive window with left recv buffer*/ + s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale); /* * if we still have packets to read, @@ -2238,6 +2466,20 @@ tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) tle_event_raise(s->rx.ev); tcp_stream_release(s); + /* if we have received fin, no more data will come, raise err event. */ + } else if (s->tcb.rcv.frs.on == 2) { + if (tcp_stream_try_acquire(s) > 0 && s->err.ev != NULL) + tle_event_raise(s->err.ev); + tcp_stream_release(s); + } + + s->tcb.rcv.cpy += sz; + + /* update recv win to the remote */ + if (free_slots < RECV_WIN_NOTIFY_THRESH && + rte_ring_free_count(s->rx.q) >= RECV_WIN_NOTIFY_THRESH) { + s->tcb.snd.update_rcv = true; + txs_enqueue(s->s.ctx, s); } return sz; @@ -2263,48 +2505,35 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags, if (i == num) { /* queue packets for further transmission. */ rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num); - if (rc != 0) + if (rc != 0) { + rc = -EAGAIN; free_mbufs(segs, num); + } } return rc; } -uint16_t -tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) +static inline uint16_t +stream_send(struct tle_tcp_stream *s, struct rte_mbuf *pkt[], + uint16_t num, uint16_t mss, uint64_t ol_flags) { - uint32_t i, j, k, mss, n, state; + uint16_t i, j, k; int32_t rc; - uint64_t ol_flags; - struct tle_tcp_stream *s; + uint32_t n, free_slots; struct rte_mbuf *segs[TCP_MAX_PKT_SEG]; - - s = TCP_STREAM(ts); - - /* mark stream as not closable. */ - if (tcp_stream_acquire(s) < 0) { - rte_errno = EAGAIN; - return 0; - } - - state = s->tcb.state; - if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) { - rte_errno = ENOTCONN; - tcp_stream_release(s); - return 0; - } - - mss = s->tcb.snd.mss; - ol_flags = s->tx.dst.ol_flags; + int32_t pkt_len; k = 0; rc = 0; + pkt_len = 0; while (k != num) { /* prepare and check for TX */ for (i = k; i != num; i++) { if (pkt[i]->pkt_len > mss || pkt[i]->nb_segs > TCP_MAX_PKT_SEG) break; + pkt_len += pkt[i]->pkt_len; rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags, s->s.port, 0, TCP_FLAG_ACK, 0, 0); if (rc != 0) @@ -2328,6 +2557,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) pkt[j]->l3_len + pkt[j]->l4_len); pkt[j]->ol_flags &= ol_flags; + pkt_len -= pkt[j]->pkt_len; } break; } @@ -2339,8 +2569,10 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) /* segment large packet and enqueue for sending */ } else if (i != num) { + free_slots = rte_ring_free_count(s->tx.q); + free_slots = RTE_MIN(free_slots, RTE_DIM(segs)); /* segment the packet. */ - rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs), + rc = tcp_segmentation(pkt[i], segs, free_slots, &s->tx.dst, mss); if (rc < 0) { rte_errno = -rc; @@ -2351,19 +2583,161 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) if (rc == 0) { /* free the large mbuf */ rte_pktmbuf_free(pkt[i]); + pkt_len += pkt[i]->pkt_len; /* set the mbuf as consumed */ k++; - } else + } else { /* no space left in tx queue */ + RTE_VERIFY(0); + break; + } + } + } + + s->tcb.snd.waitlen += pkt_len; + return k; +} + +static inline uint16_t +stream_send_tso(struct tle_tcp_stream *s, struct rte_mbuf *pkt[], + uint16_t num, uint16_t mss, uint64_t ol_flags) +{ + uint16_t i, k, nb_segs; + int32_t rc, pkt_len; + uint64_t ol_flags1; + struct rte_mbuf *pre_tail; + + k = 0; + rc = 0; + while (k != num) { + /* Make sure there is at least one slot available */ + if (rte_ring_free_count(s->tx.q) == 0) + break; + + /* prepare and check for TX */ + nb_segs = 0; + pkt_len = 0; + pre_tail = NULL; + for (i = k; i != num; i++) { + if (pkt[i]->nb_segs != 1) + rte_panic("chained mbuf: %p\n", pkt[i]); + /* We shall consider cwnd and snd wnd when limit len */ + if (nb_segs + pkt[i]->nb_segs <= TCP_MAX_PKT_SEG && + pkt_len + pkt[i]->pkt_len <= 65535 - RESERVE_HEADER_LEN) { + nb_segs += pkt[i]->nb_segs; + pkt_len += pkt[i]->pkt_len; + if (pre_tail) + pre_tail->next = pkt[i]; + pre_tail = rte_pktmbuf_lastseg(pkt[i]); + } else { + /* enqueue this one now */ + break; + } + } + + if (unlikely(i == k)) { + /* pkt[k] is a too big packet, now we fall back to + * non-tso send; we can optimize it later by + * splitting the mbuf. + */ + if (stream_send(s, &pkt[k], 1, mss, ol_flags) == 1) { + k++; + continue; + } else break; } + + pkt[k]->nb_segs = nb_segs; + pkt[k]->pkt_len = pkt_len; + + ol_flags1 = ol_flags; + if (pkt_len > mss) + ol_flags1 |= PKT_TX_TCP_SEG; + + rc = tcp_fill_mbuf(pkt[k], s, &s->tx.dst, ol_flags1, + s->s.port, 0, TCP_FLAG_ACK, 0, 0); + if (rc != 0) /* hard to recover */ + rte_panic("failed to fill mbuf: %p\n", pkt[k]); + + /* correct mss */ + pkt[k]->tso_segsz = mss; + + s->tcb.snd.waitlen += pkt_len; + /* We already make sure there is at least one slot */ + if (_rte_ring_enqueue_burst(s->tx.q, (void **)pkt + k, 1) < 1) + RTE_VERIFY(0); + + k = i; + } + + return k; +} + +uint16_t +tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) +{ + uint16_t k, mss, state; + uint64_t ol_flags; + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + + if (s->tcb.err != 0) { + rte_errno = s->tcb.err; + return 0; + } + + /* mark stream as not closable. */ + if (tcp_stream_acquire(s) < 0) { + rte_errno = EAGAIN; + return 0; } + state = s->tcb.state; + switch (state) { + case TCP_ST_ESTABLISHED: + case TCP_ST_CLOSE_WAIT: + break; + case TCP_ST_FIN_WAIT_1: + case TCP_ST_FIN_WAIT_2: + case TCP_ST_CLOSING: + case TCP_ST_LAST_ACK: + rte_errno = EPIPE; + tcp_stream_release(s); + return 0; + default: + rte_errno = ENOTCONN; + tcp_stream_release(s); + return 0; + } + + mss = s->tcb.snd.mss; + + ol_flags = s->tx.dst.ol_flags; + + /* Some reference number on the case: + * " - tap - - " + * ~2Gbps with tso disabled; + * ~16Gbps with tso enabled. + */ + if (rte_ring_free_count(s->tx.q) == 0) { + /* Block send may try without waiting for tx event (raised by acked + * data), so here we will still put this stream for further process + */ + txs_enqueue(s->s.ctx, s); + rte_errno = EAGAIN; + k = 0; + } else if (s->tx.dst.dev->prm.tx_offload & DEV_TX_OFFLOAD_TCP_TSO) + k = stream_send_tso(s, pkt, num, mss, ol_flags); + else + k = stream_send(s, pkt, num, mss, ol_flags); + /* notify BE about more data to send */ if (k != 0) txs_enqueue(s->s.ctx, s); + /* if possible, re-arm stream write event. */ - if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL) + if (rte_ring_free_count(s->tx.q) && s->tx.ev != NULL && k == num) tle_event_raise(s->tx.ev); tcp_stream_release(s); @@ -2382,9 +2756,15 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, struct tle_tcp_stream *s; struct iovec iv; struct rte_mbuf *mb[2 * MAX_PKT_BURST]; + uint16_t mss; s = TCP_STREAM(ts); + if (s->tcb.err != 0) { + rte_errno = s->tcb.err; + return -1; + } + /* mark stream as not closable. */ if (tcp_stream_acquire(s) < 0) { rte_errno = EAGAIN; @@ -2392,7 +2772,18 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, } state = s->tcb.state; - if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) { + switch (state) { + case TCP_ST_ESTABLISHED: + case TCP_ST_CLOSE_WAIT: + break; + case TCP_ST_FIN_WAIT_1: + case TCP_ST_FIN_WAIT_2: + case TCP_ST_CLOSING: + case TCP_ST_LAST_ACK: + rte_errno = EPIPE; + tcp_stream_release(s); + return -1; + default: rte_errno = ENOTCONN; tcp_stream_release(s); return -1; @@ -2403,11 +2794,24 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, for (i = 0; i != iovcnt; i++) tsz += iov[i].iov_len; + if (tsz == 0) { + tcp_stream_release(s); + return 0; + } + slen = rte_pktmbuf_data_room_size(mp); - slen = RTE_MIN(slen, s->tcb.snd.mss); + mss = s->tcb.snd.mss; + + slen = RTE_MIN(slen, mss); num = (tsz + slen - 1) / slen; n = rte_ring_free_count(s->tx.q); + + if (n == 0) { + tcp_stream_release(s); + return 0; + } + num = RTE_MIN(num, n); n = RTE_MIN(num, RTE_DIM(mb)); @@ -2451,7 +2855,6 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, k = 0; if (k != j) { - /* free pkts that were not enqueued */ free_mbufs(mb + k, j - k); @@ -2466,14 +2869,16 @@ tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, } } - if (k != 0) { - + if (k != 0) { /* notify BE about more data to send */ txs_enqueue(s->s.ctx, s); /* if possible, re-arm stream write event. */ if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL) tle_event_raise(s->tx.ev); + } else { + rte_errno = EAGAIN; + sz = -1; } tcp_stream_release(s); @@ -2485,7 +2890,7 @@ static inline void tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state) { /* try to send some data */ - tx_nxt_data(s, tms); + uint32_t tn = tx_nxt_data(s, tms); /* we also have to send a FIN */ if (state != TCP_ST_ESTABLISHED && @@ -2495,6 +2900,13 @@ tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state) s->tcb.snd.fss = ++s->tcb.snd.nxt; send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK); } + + if (s->tcb.snd.update_rcv) { + if (tn == 0) + send_ack(s, tms, TCP_FLAG_ACK); /* update recv window */ + + s->tcb.snd.update_rcv = false; + } } static inline void @@ -2507,7 +2919,7 @@ tx_stream(struct tle_tcp_stream *s, uint32_t tms) if (state == TCP_ST_SYN_SENT) { /* send the SYN, start the rto timer */ send_ack(s, tms, TCP_FLAG_SYN); - timer_start(s); + timer_start(s, TIMER_RTO, s->tcb.snd.rto); } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) { @@ -2515,7 +2927,7 @@ tx_stream(struct tle_tcp_stream *s, uint32_t tms) /* start RTO timer. */ if (s->tcb.snd.nxt != s->tcb.snd.una) - timer_start(s); + timer_start(s, TIMER_RTO, s->tcb.snd.rto); } } @@ -2544,7 +2956,6 @@ rto_stream(struct tle_tcp_stream *s, uint32_t tms) if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) { if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) { - /* update SND.CWD and SND.SSTHRESH */ rto_cwnd_update(&s->tcb); @@ -2570,50 +2981,131 @@ rto_stream(struct tle_tcp_stream *s, uint32_t tms) * than one SYN or SYN/ACK retransmissions or true loss * detection has been made. */ - if (s->tcb.snd.nb_retx != 0) + if (s->tcb.snd.nb_retx != 0) { s->tcb.snd.cwnd = s->tcb.snd.mss; + CWND_INFO("synsent", s->tcb.snd.cwnd); + } send_ack(s, tms, TCP_FLAG_SYN); - - } else if (state == TCP_ST_TIME_WAIT) { - stream_term(s); + TCP_INC_STATS(TCP_MIB_RETRANSSEGS); } /* RFC6298:5.5 back off the timer */ s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto); s->tcb.snd.nb_retx++; - timer_restart(s); + timer_restart(s, TIMER_RTO, s->tcb.snd.rto); } else { - send_rst(s, s->tcb.snd.nxt); + if (state == TCP_ST_SYN_SENT) { + if (stream_fill_dest(s) != 0 || + is_broadcast_ether_addr((struct ether_addr *)s->tx.dst.hdr)) + s->tcb.err = EHOSTUNREACH; + else + /* TODO: do we send rst on this */ + s->tcb.err = ENOTCONN; + } else + send_rst(s, s->tcb.snd.una); stream_term(s); } } +static inline void +set_keepalive_timer(struct tle_tcp_stream *s) +{ + if (s->s.option.keepalive) { + if (s->tcb.state == TCP_ST_ESTABLISHED) { + if (s->tcb.snd.nb_keepalive == 0) + timer_reset(s, TIMER_KEEPALIVE, + s->s.option.keepidle * MS_PER_S); + else + timer_reset(s, TIMER_KEEPALIVE, + s->s.option.keepintvl * MS_PER_S); + } + } else { + timer_stop(s, TIMER_KEEPALIVE); + s->tcb.snd.nb_keepalive = 0; + } +} + int tle_tcp_process(struct tle_ctx *ctx, uint32_t num) { - uint32_t i, k, tms; + uint8_t type; + uint32_t i, k; + uint64_t tms; struct sdr *dr; struct tle_timer_wheel *tw; struct tle_stream *p; struct tle_tcp_stream *s, *rs[num]; - /* process streams with RTO exipred */ + tms = tcp_get_tms(ctx->cycles_ms_shift); + /* process streams with RTO exipred */ tw = CTX_TCP_TMWHL(ctx); - tms = tcp_get_tms(ctx->cycles_ms_shift); tle_timer_expire(tw, tms); k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs)); for (i = 0; i != k; i++) { - - s = rs[i]; - s->timer.handle = NULL; - if (tcp_stream_try_acquire(s) > 0) - rto_stream(s, tms); - tcp_stream_release(s); + s = timer_stream(rs[i]); + type = timer_type(rs[i]); + s->timer.handle[type] = NULL; + + switch (type) { + case TIMER_RTO: + /* FE cannot change stream into below states, + * that's why we don't put it into lock + */ + if (s->tcb.state == TCP_ST_TIME_WAIT || + s->tcb.state == TCP_ST_FIN_WAIT_2) { + tcp_stream_down(s); + stream_term(s); + tcp_stream_up(s); + } else if (tcp_stream_acquire(s) > 0) { + /* + * stream may be closed in frontend concurrently. + * if stream has already been closed, it need not + * to retransmit anymore. + */ + if (s->tcb.state != TCP_ST_CLOSED) + rto_stream(s, tms); + tcp_stream_release(s); + } + /* Fail to aquire lock? FE is shutdown or close this + * stream, either FIN or RST needs to be sent, which + * means it's in tsq, will be processed later. + */ + break; + case TIMER_DACK: + if (rte_atomic32_read(&s->tx.arm) == 0 && + s->tcb.rcv.nxt != s->tcb.snd.ack && + tcp_stream_acquire(s) > 0) { + s->s.option.tcpquickack = 8; + send_ack(s, tms, TCP_FLAG_ACK); + tcp_stream_release(s); + } + break; + case TIMER_KEEPALIVE: + if (s->tcb.snd.nb_keepalive < s->s.option.keepcnt) { + if (tcp_stream_try_acquire(s) > 0 && + s->tcb.state == TCP_ST_ESTABLISHED) { + send_keepalive(s); + s->tcb.snd.nb_keepalive++; + timer_start(s, TIMER_KEEPALIVE, + s->s.option.keepintvl * MS_PER_S); + } + tcp_stream_release(s); + } else { + tcp_stream_down(s); + send_rst(s, s->tcb.snd.nxt); + s->tcb.err = ETIMEDOUT; + stream_term(s); + tcp_stream_up(s); + } + break; + default: + rte_panic("Invalid timer type: %d\n", type); + } } /* process streams from to-send queue */ @@ -2621,20 +3113,63 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num) k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs)); for (i = 0; i != k; i++) { - s = rs[i]; - rte_atomic32_set(&s->tx.arm, 0); - if (tcp_stream_try_acquire(s) > 0) + if (s->tcb.uop & TCP_OP_RESET) { + /* already put into death row in close() */ + send_rst(s, s->tcb.snd.nxt); + continue; + } + + if (tcp_stream_acquire(s) > 0) { + if (s->tcb.uop & TCP_OP_KEEPALIVE) { + s->tcb.uop &= ~TCP_OP_KEEPALIVE; + set_keepalive_timer(s); + } + + if (s->tcb.state == TCP_ST_FIN_WAIT_2 && + s->tcb.uop & TCP_OP_CLOSE) { + /* This could happen after: + * 1) shutdown; + * 2) FIN sent; + * 3) ack received; + * 4) close; + */ + timer_start(s, TIMER_RTO, s->tcb.snd.rto_fw); + tcp_stream_release(s); + continue; + } + + if (s->tcb.state == TCP_ST_ESTABLISHED && + s->s.option.tcpcork) { + if (s->tcb.snd.cork_ts == 0) + s->tcb.snd.cork_ts = (uint32_t)tms; + + if (s->tcb.snd.waitlen < s->tcb.snd.mss && + (uint32_t)tms - s->tcb.snd.cork_ts < 200) { + txs_enqueue(s->s.ctx, s); + tcp_stream_release(s); + continue; + } + + s->tcb.snd.cork_ts = 0; + } + tx_stream(s, tms); - else + tcp_stream_release(s); + continue; + } + + if (s->tcb.state != TCP_ST_CLOSED) txs_enqueue(s->s.ctx, s); - tcp_stream_release(s); + + /* TCP_ST_CLOSED? See close with TCP_ST_CLOSED state */ } /* collect streams to close from the death row */ dr = CTX_TCP_SDR(ctx); + rte_spinlock_lock(&dr->lock); for (k = 0, p = STAILQ_FIRST(&dr->be); k != num && p != NULL; k++, p = STAILQ_NEXT(p, link)) @@ -2645,9 +3180,21 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num) else STAILQ_FIRST(&dr->be) = p; + /* if stream still in tsq, wait one more round */ + for (i = 0; i != k; i++) { + if (rte_atomic32_read(&rs[i]->tx.arm) > 0) { + STAILQ_INSERT_TAIL(&dr->be, &rs[i]->s, link); + rs[i] = NULL; + } + } + + rte_spinlock_unlock(&dr->lock); + /* cleanup closed streams */ for (i = 0; i != k; i++) { s = rs[i]; + if (s == NULL) + continue; tcp_stream_down(s); tcp_stream_reset(ctx, s); } -- cgit 1.2.3-korg