aboutsummaryrefslogtreecommitdiffstats
path: root/lib/libtle_l4p/tcp_rxtx.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libtle_l4p/tcp_rxtx.c')
-rw-r--r--lib/libtle_l4p/tcp_rxtx.c378
1 files changed, 292 insertions, 86 deletions
diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c
index a1c7d09..30ed104 100644
--- a/lib/libtle_l4p/tcp_rxtx.c
+++ b/lib/libtle_l4p/tcp_rxtx.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Intel Corporation.
+ * Copyright (c) 2016-2017 Intel Corporation.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -59,12 +59,12 @@ rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
struct tle_tcp_stream *s;
s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
- if (s == NULL || rwl_acquire(&s->rx.use) < 0)
+ if (s == NULL || tcp_stream_acquire(s) < 0)
return NULL;
/* check that we have a proper stream. */
if (s->tcb.state != TCP_ST_LISTEN) {
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
s = NULL;
}
@@ -84,11 +84,11 @@ rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
return NULL;
}
- if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0)
+ if (tcp_stream_acquire(s) < 0)
return NULL;
/* check that we have a proper stream. */
else if (s->tcb.state == TCP_ST_CLOSED) {
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
s = NULL;
}
@@ -164,6 +164,24 @@ stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
}
+static inline uint32_t
+get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
+{
+ uint32_t pid;
+ rte_atomic32_t *pa;
+
+ pa = &dev->tx.packet_id[type];
+
+ if (st == 0) {
+ pid = rte_atomic32_add_return(pa, num);
+ return pid - num;
+ } else {
+ pid = rte_atomic32_read(pa);
+ rte_atomic32_set(pa, pid + num);
+ return pid;
+ }
+}
+
static inline void
fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
uint32_t seq, uint8_t hlen, uint8_t flags)
@@ -357,7 +375,7 @@ tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
type = s->s.type;
dev = s->tx.dst.dev;
- pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num;
+ pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
k = 0;
tn = 0;
@@ -506,22 +524,16 @@ calc_smss(uint16_t mss, const struct tle_dest *dst)
}
/*
- * RFC 5681 3.1
- * If SMSS > 2190 bytes:
- * IW = 2 * SMSS bytes and MUST NOT be more than 2 segments
- * If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes):
- * IW = 3 * SMSS bytes and MUST NOT be more than 3 segments
- * if SMSS <= 1095 bytes:
- * IW = 4 * SMSS bytes and MUST NOT be more than 4 segments
+ * RFC 6928 2
+ * min (10*MSS, max (2*MSS, 14600))
+ *
+ * or using user provided initial congestion window (icw)
+ * min (10*MSS, max (2*MSS, icw))
*/
static inline uint32_t
-initial_cwnd(uint16_t smss)
+initial_cwnd(uint32_t smss, uint32_t icw)
{
- if (smss > 2190)
- return 2 * smss;
- else if (smss > 1095)
- return 3 * smss;
- return 4 * smss;
+ return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
}
/*
@@ -561,7 +573,7 @@ send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
dst = &s->tx.dst;
type = s->s.type;
- pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1;
+ pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
if (rc == 0)
@@ -657,7 +669,7 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
return -EINVAL;
dev = dst.dev;
- pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1;
+ pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
@@ -763,8 +775,8 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
}
static inline int
-restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
- const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb,
+restore_syn_opt(union seg_info *si, union tsopt *to,
+ const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb,
uint32_t hash_alg, rte_xmm_t *secret_key)
{
int32_t rc;
@@ -778,12 +790,12 @@ restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
if (rc < 0)
return rc;
- so->mss = rc;
+ si->mss = rc;
th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
mb->l2_len + mb->l3_len);
len = mb->l4_len - sizeof(*th);
- sync_get_opts(so, (uintptr_t)(th + 1), len);
+ to[0] = get_tms_opts((uintptr_t)(th + 1), len);
return 0;
}
@@ -814,9 +826,11 @@ static inline int
stream_fill_dest(struct tle_tcp_stream *s)
{
int32_t rc;
+ uint32_t type;
const void *da;
- if (s->s.type == TLE_V4)
+ type = s->s.type;
+ if (type == TLE_V4)
da = &s->s.ipv4.addr.src;
else
da = &s->s.ipv6.addr.src;
@@ -830,7 +844,7 @@ stream_fill_dest(struct tle_tcp_stream *s)
*/
static inline int
accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
- struct tle_tcp_stream *cs, const struct syn_opts *so,
+ struct tle_tcp_stream *cs, const union tsopt *to,
uint32_t tms, const union pkt_info *pi, const union seg_info *si)
{
int32_t rc;
@@ -857,7 +871,7 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
}
/* setup TCB */
- sync_fill_tcb(&cs->tcb, si, so);
+ sync_fill_tcb(&cs->tcb, si, to);
cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
/*
@@ -871,8 +885,9 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
} else
cs->tcb.snd.rto = TCP_RTO_DEFAULT;
- /* copy streams type. */
+ /* copy streams type & flags. */
cs->s.type = ps->s.type;
+ cs->flags = ps->flags;
/* retrive and cache destination information. */
rc = stream_fill_dest(cs);
@@ -883,8 +898,9 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
/* setup congestion variables */
- cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
+ cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
+ cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
cs->tcb.state = TCP_ST_ESTABLISHED;
@@ -909,14 +925,14 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
*/
static inline int
rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
- const union pkt_info *pi, const union seg_info *si,
+ const union pkt_info *pi, union seg_info *si,
uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
{
int32_t rc;
struct tle_ctx *ctx;
struct tle_stream *ts;
struct tle_tcp_stream *cs;
- struct syn_opts so;
+ union tsopt to;
*csp = NULL;
@@ -924,7 +940,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
return -EINVAL;
ctx = s->s.ctx;
- rc = restore_syn_opt(&so, pi, si, tms, mb, ctx->prm.hash_alg,
+ rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg,
&ctx->prm.secret_key);
if (rc < 0)
return rc;
@@ -936,7 +952,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
return ENFILE;
/* prepare stream to handle new connection */
- if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
+ if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
/* put new stream in the accept queue */
if (_rte_ring_enqueue_burst(s->rx.q,
@@ -947,7 +963,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
/* cleanup on failure */
tcp_stream_down(cs);
- stbl_del_pkt(st, cs->ste, pi);
+ stbl_del_stream(st, cs->ste, cs, 0);
cs->ste = NULL;
}
@@ -1008,6 +1024,17 @@ rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
}
static void
+stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
+{
+ if (rto != 0) {
+ s->tcb.state = TCP_ST_TIME_WAIT;
+ s->tcb.snd.rto = rto;
+ timer_reset(s);
+ } else
+ stream_term(s);
+}
+
+static void
rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
{
uint32_t state;
@@ -1027,17 +1054,13 @@ rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
s->err.cb.func(s->err.cb.data, &s->s);
} else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
rsp->flags |= TCP_FLAG_ACK;
- if (ackfin != 0) {
- s->tcb.state = TCP_ST_TIME_WAIT;
- s->tcb.snd.rto = TCP_RTO_2MSL;
- timer_reset(s);
- } else
+ if (ackfin != 0)
+ stream_timewait(s, s->tcb.snd.rto_tw);
+ else
s->tcb.state = TCP_ST_CLOSING;
} else if (state == TCP_ST_FIN_WAIT_2) {
rsp->flags |= TCP_FLAG_ACK;
- s->tcb.state = TCP_ST_TIME_WAIT;
- s->tcb.snd.rto = TCP_RTO_2MSL;
- timer_reset(s);
+ stream_timewait(s, s->tcb.snd.rto_tw);
} else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
stream_term(s);
}
@@ -1144,7 +1167,9 @@ rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
static inline void
dack_info_init(struct dack_info *tack, const struct tcb *tcb)
{
- memset(tack, 0, sizeof(*tack));
+ static const struct dack_info zero_dack;
+
+ tack[0] = zero_dack;
tack->ack = tcb->snd.una;
tack->segs.dup = tcb->rcv.dupack;
tack->wu.raw = tcb->snd.wu.raw;
@@ -1488,9 +1513,7 @@ rx_ackfin(struct tle_tcp_stream *s)
timer_stop(s);
s->tcb.state = TCP_ST_FIN_WAIT_2;
} else if (state == TCP_ST_CLOSING) {
- s->tcb.state = TCP_ST_TIME_WAIT;
- s->tcb.snd.rto = TCP_RTO_2MSL;
- timer_reset(s);
+ stream_timewait(s, s->tcb.snd.rto_tw);
}
}
@@ -1554,7 +1577,7 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
s->tcb.snd.wscale = so.wscale;
/* setup congestion variables */
- s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss);
+ s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
s->tcb.snd.ssthresh = s->tcb.snd.wnd;
s->tcb.rcv.ts = so.ts.val;
@@ -1720,9 +1743,9 @@ rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
{
uint32_t i;
- if (rwl_acquire(&s->rx.use) > 0) {
+ if (tcp_stream_acquire(s) > 0) {
i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
return i;
}
@@ -1735,7 +1758,7 @@ rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
static inline uint32_t
rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
- const union pkt_info pi[], const union seg_info si[],
+ const union pkt_info pi[], union seg_info si[],
struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
uint32_t num)
{
@@ -1809,7 +1832,7 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
k = num - i;
}
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
return num - k;
}
@@ -1850,7 +1873,7 @@ rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
}
}
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
return num - k;
}
@@ -1859,7 +1882,8 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
{
struct stbl *st;
- uint32_t i, j, k, n, t, ts;
+ struct tle_ctx *ctx;
+ uint32_t i, j, k, mt, n, t, ts;
uint64_t csf;
union pkt_info pi[num];
union seg_info si[num];
@@ -1868,8 +1892,10 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
uint32_t raw;
} stu;
- ts = tcp_get_tms();
- st = CTX_TCP_STLB(dev->ctx);
+ ctx = dev->ctx;
+ ts = tcp_get_tms(ctx->cycles_ms_shift);
+ st = CTX_TCP_STLB(ctx);
+ mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
stu.raw = 0;
@@ -1887,7 +1913,7 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
pi[i].tf.type, IPPROTO_TCP) != 0)
pi[i].csf = csf;
- stu.t[t] = 1;
+ stu.t[t] = mt;
}
if (stu.t[TLE_V4] != 0)
@@ -1936,7 +1962,7 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
struct tle_tcp_stream *s;
s = TCP_STREAM(ts);
- n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
+ n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
if (n == 0)
return 0;
@@ -1945,9 +1971,9 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
* then rearm stream RX event.
*/
if (n == num && rte_ring_count(s->rx.q) != 0) {
- if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
+ if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
tle_event_raise(s->rx.ev);
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
}
return n;
@@ -2066,7 +2092,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
/* fill pkt info to generate seq.*/
stream_fill_pkt_info(s, &pi);
- tms = tcp_get_tms();
+ tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
s->tcb.so.ts.val = tms;
s->tcb.so.ts.ecr = 0;
s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
@@ -2116,7 +2142,7 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
if (type >= TLE_VNUM)
return -EINVAL;
- if (rwl_try_acquire(&s->tx.use) > 0) {
+ if (tcp_stream_try_acquire(s) > 0) {
rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
TCP_ST_SYN_SENT);
rc = (rc == 0) ? -EDEADLK : 0;
@@ -2124,14 +2150,14 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
rc = -EINVAL;
if (rc != 0) {
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
return rc;
}
/* fill stream, prepare and transmit syn pkt */
s->tcb.uop |= TCP_OP_CONNECT;
rc = tx_syn(s, addr);
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
/* error happened, do a cleanup */
if (rc != 0)
@@ -2147,7 +2173,7 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
struct tle_tcp_stream *s;
s = TCP_STREAM(ts);
- n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
+ n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
if (n == 0)
return 0;
@@ -2156,14 +2182,76 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
* then rearm stream RX event.
*/
if (n == num && rte_ring_count(s->rx.q) != 0) {
- if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
+ if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
tle_event_raise(s->rx.ev);
- rwl_release(&s->rx.use);
+ tcp_stream_release(s);
}
return n;
}
+ssize_t
+tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
+ int iovcnt)
+{
+ int32_t i;
+ uint32_t mn, n, tn;
+ size_t sz;
+ struct tle_tcp_stream *s;
+ struct iovec iv;
+ struct rxq_objs mo[2];
+
+ s = TCP_STREAM(ts);
+
+ /* get group of packets */
+ mn = tcp_rxq_get_objs(s, mo);
+ if (mn == 0)
+ return 0;
+
+ sz = 0;
+ n = 0;
+ for (i = 0; i != iovcnt; i++) {
+ iv = iov[i];
+ sz += iv.iov_len;
+ n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n);
+ if (iv.iov_len != 0) {
+ sz -= iv.iov_len;
+ break;
+ }
+ }
+
+ tn = n;
+
+ if (i != iovcnt && mn != 1) {
+ n = 0;
+ do {
+ sz += iv.iov_len;
+ n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n);
+ if (iv.iov_len != 0) {
+ sz -= iv.iov_len;
+ break;
+ }
+ if (i + 1 != iovcnt)
+ iv = iov[i + 1];
+ } while (++i != iovcnt);
+ tn += n;
+ }
+
+ tcp_rxq_consume(s, tn);
+
+ /*
+ * if we still have packets to read,
+ * then rearm stream RX event.
+ */
+ if (i == iovcnt && rte_ring_count(s->rx.q) != 0) {
+ if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
+ tle_event_raise(s->rx.ev);
+ tcp_stream_release(s);
+ }
+
+ return sz;
+}
+
static inline int32_t
tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
struct rte_mbuf *segs[], uint32_t num)
@@ -2176,16 +2264,16 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
0, TCP_FLAG_ACK, 0, 0);
if (rc != 0) {
- free_segments(segs, num);
+ free_mbufs(segs, num);
break;
}
}
if (i == num) {
/* queue packets for further transmission. */
- rc = _rte_ring_mp_enqueue_bulk(s->tx.q, (void **)segs, num);
+ rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
if (rc != 0)
- free_segments(segs, num);
+ free_mbufs(segs, num);
}
return rc;
@@ -2194,17 +2282,16 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
uint16_t
tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
{
- uint32_t i, j, k, mss, n, state, type;
+ uint32_t i, j, k, mss, n, state;
int32_t rc;
uint64_t ol_flags;
struct tle_tcp_stream *s;
- struct tle_dev *dev;
struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
s = TCP_STREAM(ts);
/* mark stream as not closable. */
- if (rwl_acquire(&s->tx.use) < 0) {
+ if (tcp_stream_acquire(s) < 0) {
rte_errno = EAGAIN;
return 0;
}
@@ -2212,14 +2299,12 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
state = s->tcb.state;
if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
rte_errno = ENOTCONN;
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
return 0;
}
mss = s->tcb.snd.mss;
- dev = s->tx.dst.dev;
- type = s->s.type;
- ol_flags = dev->tx.ol_flags[type];
+ ol_flags = s->tx.dst.ol_flags;
k = 0;
rc = 0;
@@ -2237,7 +2322,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
if (i != k) {
/* queue packets for further transmission. */
- n = _rte_ring_mp_enqueue_burst(s->tx.q,
+ n = _rte_ring_enqueue_burst(s->tx.q,
(void **)pkt + k, (i - k));
k += n;
@@ -2246,7 +2331,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
* remove pkt l2/l3 headers, restore ol_flags
*/
if (i != k) {
- ol_flags = ~dev->tx.ol_flags[type];
+ ol_flags = ~s->tx.dst.ol_flags;
for (j = k; j != i; j++) {
rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
pkt[j]->l3_len +
@@ -2271,7 +2356,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
break;
}
- rc = tx_segments(s, dev->tx.ol_flags[type], segs, rc);
+ rc = tx_segments(s, ol_flags, segs, rc);
if (rc == 0) {
/* free the large mbuf */
rte_pktmbuf_free(pkt[i]);
@@ -2290,11 +2375,120 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
tle_event_raise(s->tx.ev);
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
return k;
}
+ssize_t
+tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
+ const struct iovec *iov, int iovcnt)
+{
+ int32_t i, rc;
+ uint32_t j, k, n, num, slen, state;
+ uint64_t ol_flags;
+ size_t sz, tsz;
+ struct tle_tcp_stream *s;
+ struct iovec iv;
+ struct rte_mbuf *mb[2 * MAX_PKT_BURST];
+
+ s = TCP_STREAM(ts);
+
+ /* mark stream as not closable. */
+ if (tcp_stream_acquire(s) < 0) {
+ rte_errno = EAGAIN;
+ return -1;
+ }
+
+ state = s->tcb.state;
+ if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
+ rte_errno = ENOTCONN;
+ tcp_stream_release(s);
+ return -1;
+ }
+
+ /* figure out how many mbufs do we need */
+ tsz = 0;
+ for (i = 0; i != iovcnt; i++)
+ tsz += iov[i].iov_len;
+
+ slen = rte_pktmbuf_data_room_size(mp);
+ slen = RTE_MIN(slen, s->tcb.snd.mss);
+
+ num = (tsz + slen - 1) / slen;
+ n = rte_ring_free_count(s->tx.q);
+ num = RTE_MIN(num, n);
+ n = RTE_MIN(num, RTE_DIM(mb));
+
+ /* allocate mbufs */
+ if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) {
+ rte_errno = ENOMEM;
+ tcp_stream_release(s);
+ return -1;
+ }
+
+ /* copy data into the mbufs */
+ k = 0;
+ sz = 0;
+ for (i = 0; i != iovcnt; i++) {
+ iv = iov[i];
+ sz += iv.iov_len;
+ k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k);
+ if (iv.iov_len != 0) {
+ sz -= iv.iov_len;
+ break;
+ }
+ }
+
+ /* partially filled segment */
+ k += (k != n && mb[k]->data_len != 0);
+
+ /* fill pkt headers */
+ ol_flags = s->tx.dst.ol_flags;
+
+ for (j = 0; j != k; j++) {
+ rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags,
+ s->s.port, 0, TCP_FLAG_ACK, 0, 0);
+ if (rc != 0)
+ break;
+ }
+
+ /* if no error encountered, then enqueue pkts for transmission */
+ if (k == j)
+ k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j);
+ else
+ k = 0;
+
+ if (k != j) {
+
+ /* free pkts that were not enqueued */
+ free_mbufs(mb + k, j - k);
+
+ /* our last segment can be partially filled */
+ sz += slen - sz % slen;
+ sz -= (j - k) * slen;
+
+ /* report an error */
+ if (rc != 0) {
+ rte_errno = -rc;
+ sz = -1;
+ }
+ }
+
+ if (k != 0) {
+
+ /* notify BE about more data to send */
+ txs_enqueue(s->s.ctx, s);
+
+ /* if possible, re-arm stream write event. */
+ if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
+ tle_event_raise(s->tx.ev);
+ }
+
+ tcp_stream_release(s);
+ return sz;
+}
+
/* send data and FIN (if needed) */
static inline void
tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
@@ -2376,6 +2570,18 @@ rto_stream(struct tle_tcp_stream *s, uint32_t tms)
} else if (state == TCP_ST_SYN_SENT) {
/* resending SYN */
s->tcb.so.ts.val = tms;
+
+ /* According to RFC 6928 2:
+ * To reduce the chance for spurious SYN or SYN/ACK
+ * retransmission, it is RECOMMENDED that
+ * implementations refrain from resetting the initial
+ * window to 1 segment, unless there have been more
+ * than one SYN or SYN/ACK retransmissions or true loss
+ * detection has been made.
+ */
+ if (s->tcb.snd.nb_retx != 0)
+ s->tcb.snd.cwnd = s->tcb.snd.mss;
+
send_ack(s, tms, TCP_FLAG_SYN);
} else if (state == TCP_ST_TIME_WAIT) {
@@ -2405,7 +2611,7 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
/* process streams with RTO exipred */
tw = CTX_TCP_TMWHL(ctx);
- tms = tcp_get_tms();
+ tms = tcp_get_tms(ctx->cycles_ms_shift);
tle_timer_expire(tw, tms);
k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
@@ -2414,9 +2620,9 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
s = rs[i];
s->timer.handle = NULL;
- if (rwl_try_acquire(&s->tx.use) > 0)
+ if (tcp_stream_try_acquire(s) > 0)
rto_stream(s, tms);
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
}
/* process streams from to-send queue */
@@ -2428,11 +2634,11 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
s = rs[i];
rte_atomic32_set(&s->tx.arm, 0);
- if (rwl_try_acquire(&s->tx.use) > 0)
+ if (tcp_stream_try_acquire(s) > 0)
tx_stream(s, tms);
else
txs_enqueue(s->s.ctx, s);
- rwl_release(&s->tx.use);
+ tcp_stream_release(s);
}
/* collect streams to close from the death row */