From 7e18fa1bf263822c46d7431a911b41d6377d5f69 Mon Sep 17 00:00:00 2001 From: Konstantin Ananyev Date: Thu, 27 Jul 2017 12:00:57 +0100 Subject: - Introduce tle_tcp_stream_readv() and tle_tcp_stream_writev(). - Introduce flags for tle_ctx_param. - Introduce TLE_CTX_FLAG_ST - indicates that given ctx will be used by single thread only. - Introduce new parameters for tcp context: timewait - allows user to configure max timeout in TCP_TIMEWAIT state. icw - allows user to specify desired initial congestion window for new connections. -Few optimisations: cache tx.ol_flags inside tle destination. calcualte and cache inside ctx cycles_to_ms shift value. reorder restoring SYN opts and filling TCB a bit. Change-Id: Ie05087783b3b7f1e4ce99d3555bc5bd098f83fe0 Signed-off-by: Konstantin Ananyev Signed-off-by: Mohammad Abdul Awal --- lib/libtle_l4p/tcp_rxtx.c | 378 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 292 insertions(+), 86 deletions(-) (limited to 'lib/libtle_l4p/tcp_rxtx.c') diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c index a1c7d09..30ed104 100644 --- a/lib/libtle_l4p/tcp_rxtx.c +++ b/lib/libtle_l4p/tcp_rxtx.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -59,12 +59,12 @@ rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi, struct tle_tcp_stream *s; s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst]; - if (s == NULL || rwl_acquire(&s->rx.use) < 0) + if (s == NULL || tcp_stream_acquire(s) < 0) return NULL; /* check that we have a proper stream. */ if (s->tcb.state != TCP_ST_LISTEN) { - rwl_release(&s->rx.use); + tcp_stream_release(s); s = NULL; } @@ -84,11 +84,11 @@ rx_obtain_stream(const struct tle_dev *dev, struct stbl *st, return NULL; } - if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0) + if (tcp_stream_acquire(s) < 0) return NULL; /* check that we have a proper stream. */ else if (s->tcb.state == TCP_ST_CLOSED) { - rwl_release(&s->rx.use); + tcp_stream_release(s); s = NULL; } @@ -164,6 +164,24 @@ stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[], return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb); } +static inline uint32_t +get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st) +{ + uint32_t pid; + rte_atomic32_t *pa; + + pa = &dev->tx.packet_id[type]; + + if (st == 0) { + pid = rte_atomic32_add_return(pa, num); + return pid - num; + } else { + pid = rte_atomic32_read(pa); + rte_atomic32_set(pa, pid + num); + return pid; + } +} + static inline void fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port, uint32_t seq, uint8_t hlen, uint8_t flags) @@ -357,7 +375,7 @@ tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[], type = s->s.type; dev = s->tx.dst.dev; - pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num; + pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0); k = 0; tn = 0; @@ -506,22 +524,16 @@ calc_smss(uint16_t mss, const struct tle_dest *dst) } /* - * RFC 5681 3.1 - * If SMSS > 2190 bytes: - * IW = 2 * SMSS bytes and MUST NOT be more than 2 segments - * If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes): - * IW = 3 * SMSS bytes and MUST NOT be more than 3 segments - * if SMSS <= 1095 bytes: - * IW = 4 * SMSS bytes and MUST NOT be more than 4 segments + * RFC 6928 2 + * min (10*MSS, max (2*MSS, 14600)) + * + * or using user provided initial congestion window (icw) + * min (10*MSS, max (2*MSS, icw)) */ static inline uint32_t -initial_cwnd(uint16_t smss) +initial_cwnd(uint32_t smss, uint32_t icw) { - if (smss > 2190) - return 2 * smss; - else if (smss > 1095) - return 3 * smss; - return 4 * smss; + return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw)); } /* @@ -561,7 +573,7 @@ send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq, dst = &s->tx.dst; type = s->s.type; - pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1; + pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0); rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1); if (rc == 0) @@ -657,7 +669,7 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi, return -EINVAL; dev = dst.dev; - pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1; + pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0); rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq, TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1); @@ -763,8 +775,8 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len, } static inline int -restore_syn_opt(struct syn_opts *so, const union pkt_info *pi, - const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb, +restore_syn_opt(union seg_info *si, union tsopt *to, + const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb, uint32_t hash_alg, rte_xmm_t *secret_key) { int32_t rc; @@ -778,12 +790,12 @@ restore_syn_opt(struct syn_opts *so, const union pkt_info *pi, if (rc < 0) return rc; - so->mss = rc; + si->mss = rc; th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *, mb->l2_len + mb->l3_len); len = mb->l4_len - sizeof(*th); - sync_get_opts(so, (uintptr_t)(th + 1), len); + to[0] = get_tms_opts((uintptr_t)(th + 1), len); return 0; } @@ -814,9 +826,11 @@ static inline int stream_fill_dest(struct tle_tcp_stream *s) { int32_t rc; + uint32_t type; const void *da; - if (s->s.type == TLE_V4) + type = s->s.type; + if (type == TLE_V4) da = &s->s.ipv4.addr.src; else da = &s->s.ipv6.addr.src; @@ -830,7 +844,7 @@ stream_fill_dest(struct tle_tcp_stream *s) */ static inline int accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, - struct tle_tcp_stream *cs, const struct syn_opts *so, + struct tle_tcp_stream *cs, const union tsopt *to, uint32_t tms, const union pkt_info *pi, const union seg_info *si) { int32_t rc; @@ -857,7 +871,7 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, } /* setup TCB */ - sync_fill_tcb(&cs->tcb, si, so); + sync_fill_tcb(&cs->tcb, si, to); cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale); /* @@ -871,8 +885,9 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, } else cs->tcb.snd.rto = TCP_RTO_DEFAULT; - /* copy streams type. */ + /* copy streams type & flags. */ cs->s.type = ps->s.type; + cs->flags = ps->flags; /* retrive and cache destination information. */ rc = stream_fill_dest(cs); @@ -883,8 +898,9 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst); /* setup congestion variables */ - cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss); + cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd); cs->tcb.snd.ssthresh = cs->tcb.snd.wnd; + cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw; cs->tcb.state = TCP_ST_ESTABLISHED; @@ -909,14 +925,14 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, */ static inline int rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, - const union pkt_info *pi, const union seg_info *si, + const union pkt_info *pi, union seg_info *si, uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp) { int32_t rc; struct tle_ctx *ctx; struct tle_stream *ts; struct tle_tcp_stream *cs; - struct syn_opts so; + union tsopt to; *csp = NULL; @@ -924,7 +940,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, return -EINVAL; ctx = s->s.ctx; - rc = restore_syn_opt(&so, pi, si, tms, mb, ctx->prm.hash_alg, + rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg, &ctx->prm.secret_key); if (rc < 0) return rc; @@ -936,7 +952,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, return ENFILE; /* prepare stream to handle new connection */ - if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) { + if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) { /* put new stream in the accept queue */ if (_rte_ring_enqueue_burst(s->rx.q, @@ -947,7 +963,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, /* cleanup on failure */ tcp_stream_down(cs); - stbl_del_pkt(st, cs->ste, pi); + stbl_del_stream(st, cs->ste, cs, 0); cs->ste = NULL; } @@ -1007,6 +1023,17 @@ rx_ackdata(struct tle_tcp_stream *s, uint32_t ack) return n; } +static void +stream_timewait(struct tle_tcp_stream *s, uint32_t rto) +{ + if (rto != 0) { + s->tcb.state = TCP_ST_TIME_WAIT; + s->tcb.snd.rto = rto; + timer_reset(s); + } else + stream_term(s); +} + static void rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp) { @@ -1027,17 +1054,13 @@ rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp) s->err.cb.func(s->err.cb.data, &s->s); } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) { rsp->flags |= TCP_FLAG_ACK; - if (ackfin != 0) { - s->tcb.state = TCP_ST_TIME_WAIT; - s->tcb.snd.rto = TCP_RTO_2MSL; - timer_reset(s); - } else + if (ackfin != 0) + stream_timewait(s, s->tcb.snd.rto_tw); + else s->tcb.state = TCP_ST_CLOSING; } else if (state == TCP_ST_FIN_WAIT_2) { rsp->flags |= TCP_FLAG_ACK; - s->tcb.state = TCP_ST_TIME_WAIT; - s->tcb.snd.rto = TCP_RTO_2MSL; - timer_reset(s); + stream_timewait(s, s->tcb.snd.rto_tw); } else if (state == TCP_ST_LAST_ACK && ackfin != 0) { stream_term(s); } @@ -1144,7 +1167,9 @@ rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp) static inline void dack_info_init(struct dack_info *tack, const struct tcb *tcb) { - memset(tack, 0, sizeof(*tack)); + static const struct dack_info zero_dack; + + tack[0] = zero_dack; tack->ack = tcb->snd.una; tack->segs.dup = tcb->rcv.dupack; tack->wu.raw = tcb->snd.wu.raw; @@ -1488,9 +1513,7 @@ rx_ackfin(struct tle_tcp_stream *s) timer_stop(s); s->tcb.state = TCP_ST_FIN_WAIT_2; } else if (state == TCP_ST_CLOSING) { - s->tcb.state = TCP_ST_TIME_WAIT; - s->tcb.snd.rto = TCP_RTO_2MSL; - timer_reset(s); + stream_timewait(s, s->tcb.snd.rto_tw); } } @@ -1554,7 +1577,7 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state, s->tcb.snd.wscale = so.wscale; /* setup congestion variables */ - s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss); + s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd); s->tcb.snd.ssthresh = s->tcb.snd.wnd; s->tcb.rcv.ts = so.ts.val; @@ -1720,9 +1743,9 @@ rx_new_stream(struct tle_tcp_stream *s, uint32_t ts, { uint32_t i; - if (rwl_acquire(&s->rx.use) > 0) { + if (tcp_stream_acquire(s) > 0) { i = rx_stream(s, ts, pi, si, mb, rp, rc, num); - rwl_release(&s->rx.use); + tcp_stream_release(s); return i; } @@ -1735,7 +1758,7 @@ rx_new_stream(struct tle_tcp_stream *s, uint32_t ts, static inline uint32_t rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, - const union pkt_info pi[], const union seg_info si[], + const union pkt_info pi[], union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[], uint32_t num) { @@ -1809,7 +1832,7 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, k = num - i; } - rwl_release(&s->rx.use); + tcp_stream_release(s); return num - k; } @@ -1850,7 +1873,7 @@ rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts, } } - rwl_release(&s->rx.use); + tcp_stream_release(s); return num - k; } @@ -1859,7 +1882,8 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], struct rte_mbuf *rp[], int32_t rc[], uint16_t num) { struct stbl *st; - uint32_t i, j, k, n, t, ts; + struct tle_ctx *ctx; + uint32_t i, j, k, mt, n, t, ts; uint64_t csf; union pkt_info pi[num]; union seg_info si[num]; @@ -1868,8 +1892,10 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint32_t raw; } stu; - ts = tcp_get_tms(); - st = CTX_TCP_STLB(dev->ctx); + ctx = dev->ctx; + ts = tcp_get_tms(ctx->cycles_ms_shift); + st = CTX_TCP_STLB(ctx); + mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0); stu.raw = 0; @@ -1887,7 +1913,7 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], pi[i].tf.type, IPPROTO_TCP) != 0) pi[i].csf = csf; - stu.t[t] = 1; + stu.t[t] = mt; } if (stu.t[TLE_V4] != 0) @@ -1936,7 +1962,7 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[], struct tle_tcp_stream *s; s = TCP_STREAM(ts); - n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num); + n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num); if (n == 0) return 0; @@ -1945,9 +1971,9 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[], * then rearm stream RX event. */ if (n == num && rte_ring_count(s->rx.q) != 0) { - if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL) + if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) tle_event_raise(s->rx.ev); - rwl_release(&s->rx.use); + tcp_stream_release(s); } return n; @@ -2066,7 +2092,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) /* fill pkt info to generate seq.*/ stream_fill_pkt_info(s, &pi); - tms = tcp_get_tms(); + tms = tcp_get_tms(s->s.ctx->cycles_ms_shift); s->tcb.so.ts.val = tms; s->tcb.so.ts.ecr = 0; s->tcb.so.wscale = TCP_WSCALE_DEFAULT; @@ -2116,7 +2142,7 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr) if (type >= TLE_VNUM) return -EINVAL; - if (rwl_try_acquire(&s->tx.use) > 0) { + if (tcp_stream_try_acquire(s) > 0) { rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED, TCP_ST_SYN_SENT); rc = (rc == 0) ? -EDEADLK : 0; @@ -2124,14 +2150,14 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr) rc = -EINVAL; if (rc != 0) { - rwl_release(&s->tx.use); + tcp_stream_release(s); return rc; } /* fill stream, prepare and transmit syn pkt */ s->tcb.uop |= TCP_OP_CONNECT; rc = tx_syn(s, addr); - rwl_release(&s->tx.use); + tcp_stream_release(s); /* error happened, do a cleanup */ if (rc != 0) @@ -2147,7 +2173,7 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) struct tle_tcp_stream *s; s = TCP_STREAM(ts); - n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num); + n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num); if (n == 0) return 0; @@ -2156,14 +2182,76 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) * then rearm stream RX event. */ if (n == num && rte_ring_count(s->rx.q) != 0) { - if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL) + if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) tle_event_raise(s->rx.ev); - rwl_release(&s->rx.use); + tcp_stream_release(s); } return n; } +ssize_t +tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, + int iovcnt) +{ + int32_t i; + uint32_t mn, n, tn; + size_t sz; + struct tle_tcp_stream *s; + struct iovec iv; + struct rxq_objs mo[2]; + + s = TCP_STREAM(ts); + + /* get group of packets */ + mn = tcp_rxq_get_objs(s, mo); + if (mn == 0) + return 0; + + sz = 0; + n = 0; + for (i = 0; i != iovcnt; i++) { + iv = iov[i]; + sz += iv.iov_len; + n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n); + if (iv.iov_len != 0) { + sz -= iv.iov_len; + break; + } + } + + tn = n; + + if (i != iovcnt && mn != 1) { + n = 0; + do { + sz += iv.iov_len; + n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n); + if (iv.iov_len != 0) { + sz -= iv.iov_len; + break; + } + if (i + 1 != iovcnt) + iv = iov[i + 1]; + } while (++i != iovcnt); + tn += n; + } + + tcp_rxq_consume(s, tn); + + /* + * if we still have packets to read, + * then rearm stream RX event. + */ + if (i == iovcnt && rte_ring_count(s->rx.q) != 0) { + if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + tcp_stream_release(s); + } + + return sz; +} + static inline int32_t tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags, struct rte_mbuf *segs[], uint32_t num) @@ -2176,16 +2264,16 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags, rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port, 0, TCP_FLAG_ACK, 0, 0); if (rc != 0) { - free_segments(segs, num); + free_mbufs(segs, num); break; } } if (i == num) { /* queue packets for further transmission. */ - rc = _rte_ring_mp_enqueue_bulk(s->tx.q, (void **)segs, num); + rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num); if (rc != 0) - free_segments(segs, num); + free_mbufs(segs, num); } return rc; @@ -2194,17 +2282,16 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags, uint16_t tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) { - uint32_t i, j, k, mss, n, state, type; + uint32_t i, j, k, mss, n, state; int32_t rc; uint64_t ol_flags; struct tle_tcp_stream *s; - struct tle_dev *dev; struct rte_mbuf *segs[TCP_MAX_PKT_SEG]; s = TCP_STREAM(ts); /* mark stream as not closable. */ - if (rwl_acquire(&s->tx.use) < 0) { + if (tcp_stream_acquire(s) < 0) { rte_errno = EAGAIN; return 0; } @@ -2212,14 +2299,12 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) state = s->tcb.state; if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) { rte_errno = ENOTCONN; - rwl_release(&s->tx.use); + tcp_stream_release(s); return 0; } mss = s->tcb.snd.mss; - dev = s->tx.dst.dev; - type = s->s.type; - ol_flags = dev->tx.ol_flags[type]; + ol_flags = s->tx.dst.ol_flags; k = 0; rc = 0; @@ -2237,7 +2322,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) if (i != k) { /* queue packets for further transmission. */ - n = _rte_ring_mp_enqueue_burst(s->tx.q, + n = _rte_ring_enqueue_burst(s->tx.q, (void **)pkt + k, (i - k)); k += n; @@ -2246,7 +2331,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) * remove pkt l2/l3 headers, restore ol_flags */ if (i != k) { - ol_flags = ~dev->tx.ol_flags[type]; + ol_flags = ~s->tx.dst.ol_flags; for (j = k; j != i; j++) { rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len + pkt[j]->l3_len + @@ -2271,7 +2356,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) break; } - rc = tx_segments(s, dev->tx.ol_flags[type], segs, rc); + rc = tx_segments(s, ol_flags, segs, rc); if (rc == 0) { /* free the large mbuf */ rte_pktmbuf_free(pkt[i]); @@ -2290,11 +2375,120 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL) tle_event_raise(s->tx.ev); - rwl_release(&s->tx.use); + tcp_stream_release(s); return k; } +ssize_t +tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, + const struct iovec *iov, int iovcnt) +{ + int32_t i, rc; + uint32_t j, k, n, num, slen, state; + uint64_t ol_flags; + size_t sz, tsz; + struct tle_tcp_stream *s; + struct iovec iv; + struct rte_mbuf *mb[2 * MAX_PKT_BURST]; + + s = TCP_STREAM(ts); + + /* mark stream as not closable. */ + if (tcp_stream_acquire(s) < 0) { + rte_errno = EAGAIN; + return -1; + } + + state = s->tcb.state; + if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) { + rte_errno = ENOTCONN; + tcp_stream_release(s); + return -1; + } + + /* figure out how many mbufs do we need */ + tsz = 0; + for (i = 0; i != iovcnt; i++) + tsz += iov[i].iov_len; + + slen = rte_pktmbuf_data_room_size(mp); + slen = RTE_MIN(slen, s->tcb.snd.mss); + + num = (tsz + slen - 1) / slen; + n = rte_ring_free_count(s->tx.q); + num = RTE_MIN(num, n); + n = RTE_MIN(num, RTE_DIM(mb)); + + /* allocate mbufs */ + if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) { + rte_errno = ENOMEM; + tcp_stream_release(s); + return -1; + } + + /* copy data into the mbufs */ + k = 0; + sz = 0; + for (i = 0; i != iovcnt; i++) { + iv = iov[i]; + sz += iv.iov_len; + k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k); + if (iv.iov_len != 0) { + sz -= iv.iov_len; + break; + } + } + + /* partially filled segment */ + k += (k != n && mb[k]->data_len != 0); + + /* fill pkt headers */ + ol_flags = s->tx.dst.ol_flags; + + for (j = 0; j != k; j++) { + rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags, + s->s.port, 0, TCP_FLAG_ACK, 0, 0); + if (rc != 0) + break; + } + + /* if no error encountered, then enqueue pkts for transmission */ + if (k == j) + k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j); + else + k = 0; + + if (k != j) { + + /* free pkts that were not enqueued */ + free_mbufs(mb + k, j - k); + + /* our last segment can be partially filled */ + sz += slen - sz % slen; + sz -= (j - k) * slen; + + /* report an error */ + if (rc != 0) { + rte_errno = -rc; + sz = -1; + } + } + + if (k != 0) { + + /* notify BE about more data to send */ + txs_enqueue(s->s.ctx, s); + + /* if possible, re-arm stream write event. */ + if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + } + + tcp_stream_release(s); + return sz; +} + /* send data and FIN (if needed) */ static inline void tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state) @@ -2376,6 +2570,18 @@ rto_stream(struct tle_tcp_stream *s, uint32_t tms) } else if (state == TCP_ST_SYN_SENT) { /* resending SYN */ s->tcb.so.ts.val = tms; + + /* According to RFC 6928 2: + * To reduce the chance for spurious SYN or SYN/ACK + * retransmission, it is RECOMMENDED that + * implementations refrain from resetting the initial + * window to 1 segment, unless there have been more + * than one SYN or SYN/ACK retransmissions or true loss + * detection has been made. + */ + if (s->tcb.snd.nb_retx != 0) + s->tcb.snd.cwnd = s->tcb.snd.mss; + send_ack(s, tms, TCP_FLAG_SYN); } else if (state == TCP_ST_TIME_WAIT) { @@ -2405,7 +2611,7 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num) /* process streams with RTO exipred */ tw = CTX_TCP_TMWHL(ctx); - tms = tcp_get_tms(); + tms = tcp_get_tms(ctx->cycles_ms_shift); tle_timer_expire(tw, tms); k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs)); @@ -2414,9 +2620,9 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num) s = rs[i]; s->timer.handle = NULL; - if (rwl_try_acquire(&s->tx.use) > 0) + if (tcp_stream_try_acquire(s) > 0) rto_stream(s, tms); - rwl_release(&s->tx.use); + tcp_stream_release(s); } /* process streams from to-send queue */ @@ -2428,11 +2634,11 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num) s = rs[i]; rte_atomic32_set(&s->tx.arm, 0); - if (rwl_try_acquire(&s->tx.use) > 0) + if (tcp_stream_try_acquire(s) > 0) tx_stream(s, tms); else txs_enqueue(s->s.ctx, s); - rwl_release(&s->tx.use); + tcp_stream_release(s); } /* collect streams to close from the death row */ -- cgit 1.2.3-korg