diff options
author | Konstantin Ananyev <konstantin.ananyev@intel.com> | 2017-07-27 12:00:57 +0100 |
---|---|---|
committer | Konstantin Ananyev <konstantin.ananyev@intel.com> | 2017-07-27 20:24:53 +0100 |
commit | 7e18fa1bf263822c46d7431a911b41d6377d5f69 (patch) | |
tree | ddf5ce05545419d6d77bb9d8b3c48fc90d221a7a /lib | |
parent | e151ee29d02d7802fab9e32b50ced54fd8d64160 (diff) |
- Introduce tle_tcp_stream_readv() and tle_tcp_stream_writev().
- Introduce flags for tle_ctx_param.
- Introduce TLE_CTX_FLAG_ST - indicates that given ctx will be used
by single thread only.
- Introduce new parameters for tcp context:
timewait - allows user to configure max timeout in TCP_TIMEWAIT state.
icw - allows user to specify desired initial congestion window
for new connections.
-Few optimisations:
cache tx.ol_flags inside tle destination.
calcualte and cache inside ctx cycles_to_ms shift value.
reorder restoring SYN opts and filling TCB a bit.
Change-Id: Ie05087783b3b7f1e4ce99d3555bc5bd098f83fe0
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Signed-off-by: Mohammad Abdul Awal <mohammad.abdul.awal@intel.com>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/libtle_dring/tle_dring.h | 73 | ||||
-rw-r--r-- | lib/libtle_l4p/ctx.c | 13 | ||||
-rw-r--r-- | lib/libtle_l4p/ctx.h | 3 | ||||
-rw-r--r-- | lib/libtle_l4p/misc.h | 91 | ||||
-rw-r--r-- | lib/libtle_l4p/stream.h | 4 | ||||
-rw-r--r-- | lib/libtle_l4p/stream_table.h | 56 | ||||
-rw-r--r-- | lib/libtle_l4p/syncookie.h | 51 | ||||
-rw-r--r-- | lib/libtle_l4p/tcp_ctl.h | 91 | ||||
-rw-r--r-- | lib/libtle_l4p/tcp_misc.h | 22 | ||||
-rw-r--r-- | lib/libtle_l4p/tcp_rxq.h | 42 | ||||
-rw-r--r-- | lib/libtle_l4p/tcp_rxtx.c | 378 | ||||
-rw-r--r-- | lib/libtle_l4p/tcp_stream.c | 71 | ||||
-rw-r--r-- | lib/libtle_l4p/tcp_stream.h | 9 | ||||
-rw-r--r-- | lib/libtle_l4p/tcp_tx_seg.h | 15 | ||||
-rw-r--r-- | lib/libtle_l4p/tle_ctx.h | 22 | ||||
-rw-r--r-- | lib/libtle_l4p/tle_tcp.h | 72 | ||||
-rw-r--r-- | lib/libtle_misc/tle_dpdk_wrapper.h | 113 |
17 files changed, 857 insertions, 269 deletions
diff --git a/lib/libtle_dring/tle_dring.h b/lib/libtle_dring/tle_dring.h index f589ece..9d3788a 100644 --- a/lib/libtle_dring/tle_dring.h +++ b/lib/libtle_dring/tle_dring.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -21,6 +21,7 @@ #include <rte_common.h> #include <rte_atomic.h> #include <rte_memory.h> +#include <rte_ring.h> #include <rte_debug.h> #ifdef __cplusplus @@ -68,11 +69,13 @@ struct tle_drb { struct tle_dring { uint32_t flags; struct { + uint32_t single; /**< true if single producer */ volatile uint32_t head; /**< producer head */ volatile uint32_t tail; /**< producer tail */ struct tle_drb * volatile crb; /**< block to enqueue to */ } prod __rte_cache_aligned; struct { + uint32_t single; /**< true if single consumer */ volatile uint32_t head; /**< consumer head */ volatile uint32_t tail; /**< consumer tail */ struct tle_drb * volatile crb; /**< block to dequeue from */ @@ -259,6 +262,36 @@ tle_dring_sp_enqueue(struct tle_dring *dr, const void * const objs[], return nb_obj; } +/** + * Enqueue several objects on the dring. + * Note that it is a caller responsibility to provide enough drbs + * to enqueue all requested objects. + * + * @param dr + * A pointer to the ring structure. + * @param objs + * An array of pointers to objects to enqueue. + * @param nb_obj + * The number of objects to add to the dring from the objs[]. + * @param drbs + * An array of pointers to the drbs that can be used by the dring + * to perform enqueue operation. + * @param nb_drb + * at input: number of elements in the drbs[] array. + * at output: number of unused by the dring elements in the drbs[] array. + * @return + * - number of enqueued objects. + */ +static inline uint32_t +tle_dring_enqueue(struct tle_dring *dr, const void * const objs[], + uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb) +{ + if (dr->prod.single == 0) + return tle_dring_mp_enqueue(dr, objs, nb_obj, drbs, nb_drb); + else + return tle_dring_sp_enqueue(dr, objs, nb_obj, drbs, nb_drb); +} + /* * helper routine, to dequeue objects from the ring. */ @@ -429,6 +462,39 @@ tle_dring_sc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj, } /** + * Dequeue several objects from the dring. + * Note, that it is a caller responsibility to provide drbs[] large + * enough to store pointers to all drbs that might become unused + * after that dequeue operation. It is a caller responsibility to manage + * unused drbs after the dequeue operation is completed + * (i.e mark them as free/reusable again, etc.). + * + * @param dr + * A pointer to the ring structure. + * @param objs + * An array of pointers to objects that will be dequeued. + * @param nb_obj + * The number of objects to dequeue from the dring. + * @param drbs + * An array of pointers to the drbs that will become unused after that + * dequeue operation. + * @param nb_drb + * at input: number of elements in the drbs[] array. + * at output: number of filled entries in the drbs[] array. + * @return + * - number of dequeued objects. + */ +static inline uint32_t +tle_dring_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj, + struct tle_drb *drbs[], uint32_t *nb_drb) +{ + if (dr->cons.single == 0) + return tle_dring_mc_dequeue(dr, objs, nb_obj, drbs, nb_drb); + else + return tle_dring_sc_dequeue(dr, objs, nb_obj, drbs, nb_drb); +} + +/** * Reset given dring to the initial state. * Note, that information about all queued objects will be lost. * @@ -436,11 +502,14 @@ tle_dring_sc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj, * A pointer to the dring structure. */ static inline void -tle_dring_reset(struct tle_dring *dr) +tle_dring_reset(struct tle_dring *dr, uint32_t flags) { memset(dr, 0, sizeof(*dr)); dr->prod.crb = &dr->dummy; dr->cons.crb = &dr->dummy; + dr->prod.single = ((flags & RING_F_SP_ENQ) != 0); + dr->cons.single = ((flags & RING_F_SC_DEQ) != 0); + dr->flags = flags; } /** diff --git a/lib/libtle_l4p/ctx.c b/lib/libtle_l4p/ctx.c index 6eb33eb..910fc88 100644 --- a/lib/libtle_l4p/ctx.c +++ b/lib/libtle_l4p/ctx.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -16,6 +16,7 @@ #include <string.h> #include <rte_malloc.h> #include <rte_errno.h> +#include <rte_cycles.h> #include <rte_ethdev.h> #include <rte_ip.h> @@ -77,6 +78,7 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm) { struct tle_ctx *ctx; size_t sz; + uint64_t ms; uint32_t i; int32_t rc; @@ -95,6 +97,10 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm) return NULL; } + /* caclulate closest shift to convert from cycles to ms (approximate) */ + ms = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S; + ctx->cycles_ms_shift = sizeof(ms) * CHAR_BIT - __builtin_clzll(ms) - 1; + ctx->prm = *ctx_prm; rc = tle_stream_ops[ctx_prm->proto].init_streams(ctx); @@ -195,6 +201,7 @@ struct tle_dev * tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm) { int32_t rc; + uint32_t df; struct tle_dev *dev; if (ctx == NULL || dev_prm == NULL || check_dev_prm(dev_prm) != 0) { @@ -247,7 +254,9 @@ tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm) } /* setup TX data. */ - tle_dring_reset(&dev->tx.dr); + df = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 : + RING_F_SP_ENQ | RING_F_SC_DEQ; + tle_dring_reset(&dev->tx.dr, df); if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0 && ctx->prm.proto == TLE_PROTO_UDP) { diff --git a/lib/libtle_l4p/ctx.h b/lib/libtle_l4p/ctx.h index cc32081..389d646 100644 --- a/lib/libtle_l4p/ctx.h +++ b/lib/libtle_l4p/ctx.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -53,6 +53,7 @@ struct tle_dev { struct tle_ctx { struct tle_ctx_param prm; + uint32_t cycles_ms_shift; /* to convert from cycles to ms */ struct { rte_spinlock_t lock; uint32_t nb_free; /* number of free streams. */ diff --git a/lib/libtle_l4p/misc.h b/lib/libtle_l4p/misc.h index 6450b67..9bff459 100644 --- a/lib/libtle_l4p/misc.h +++ b/lib/libtle_l4p/misc.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -396,20 +396,103 @@ compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero) return nb_pkt; } +static inline void +free_mbufs(struct rte_mbuf *mb[], uint32_t num) +{ + uint32_t i; + + for (i = 0; i != num; i++) + rte_pktmbuf_free(mb[i]); +} + /* empty ring and free queued mbufs */ static inline void empty_mbuf_ring(struct rte_ring *r) { - uint32_t i, n; + uint32_t n; struct rte_mbuf *mb[MAX_PKT_BURST]; do { n = _rte_ring_dequeue_burst(r, (void **)mb, RTE_DIM(mb)); - for (i = 0; i != n; i++) - rte_pktmbuf_free(mb[i]); + free_mbufs(mb, n); } while (n != 0); } +static inline uint32_t +_mbus_to_iovec(struct iovec *iv, struct rte_mbuf *mb[], uint32_t num) +{ + uint32_t i, ns; + uint32_t len, slen, tlen; + struct rte_mbuf *m, *next; + const void *src; + + for (i = 0; i != num; i++) { + + m = mb[i]; + tlen = 0; + ns = 0; + + do { + slen = m->data_len; + src = rte_pktmbuf_mtod(m, const void *); + len = RTE_MIN(iv->iov_len - tlen, slen); + rte_memcpy((uint8_t *)iv->iov_base + tlen, src, len); + slen -= len; + tlen += len; + if (slen != 0) + break; + ns++; + next = m->next; + rte_pktmbuf_free_seg(m); + m = next; + } while (m != NULL); + + iv->iov_base = (uint8_t *)iv->iov_base + tlen; + iv->iov_len -= tlen; + + /* partly consumed mbuf */ + if (m != NULL) { + m->pkt_len = mb[i]->pkt_len - tlen; + m->data_len = slen; + m->data_off += len; + m->nb_segs = mb[i]->nb_segs - ns; + mb[i] = m; + break; + } + } + + return i; +} + +static inline uint32_t +_iovec_to_mbsegs(struct iovec *iv, uint32_t seglen, struct rte_mbuf *mb[], + uint32_t num) +{ + uint32_t i; + uint32_t len, slen, tlen; + struct rte_mbuf *m; + void *dst; + + tlen = 0; + for (i = 0; i != num; i++) { + + m = mb[i]; + slen = rte_pktmbuf_tailroom(m); + slen = RTE_MIN(slen, seglen - m->data_len); + len = RTE_MIN(iv->iov_len - tlen, slen); + dst = rte_pktmbuf_append(m, len); + rte_memcpy(dst, (uint8_t *)iv->iov_base + tlen, len); + tlen += len; + if (len != slen) + break; + } + + iv->iov_base = (uint8_t *)iv->iov_base + tlen; + iv->iov_len -= tlen; + + return i; +} + #ifdef __cplusplus } #endif diff --git a/lib/libtle_l4p/stream.h b/lib/libtle_l4p/stream.h index f3b5828..e76f126 100644 --- a/lib/libtle_l4p/stream.h +++ b/lib/libtle_l4p/stream.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -147,6 +147,8 @@ stream_get_dest(struct tle_stream *s, const void *dst_addr, return -ENOENT; dev = dst->dev; + dst->ol_flags = dev->tx.ol_flags[s->type]; + if (s->type == TLE_V4) { struct ipv4_hdr *l3h; l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len); diff --git a/lib/libtle_l4p/stream_table.h b/lib/libtle_l4p/stream_table.h index 29f1f63..033c306 100644 --- a/lib/libtle_l4p/stream_table.h +++ b/lib/libtle_l4p/stream_table.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -23,9 +23,6 @@ extern "C" { #endif -/* current stbl entry contains packet. */ -#define STE_PKT 1 - struct stbl_entry { void *data; }; @@ -138,18 +135,6 @@ stbl_find_entry(struct stbl *st, const union pkt_info *pi) return ht->ent + rc; } -static inline int -stbl_data_pkt(const void *p) -{ - return ((uintptr_t)p & STE_PKT); -} - -static inline void * -stbl_get_pkt(const struct stbl_entry *se) -{ - return (void *)((uintptr_t)se->data ^ STE_PKT); -} - static inline void * stbl_find_data(struct stbl *st, const union pkt_info *pi) { @@ -159,35 +144,6 @@ stbl_find_data(struct stbl *st, const union pkt_info *pi) return (ent == NULL) ? NULL : ent->data; } -static inline void -stbl_del_pkt(struct stbl *st, struct stbl_entry *se, const union pkt_info *pi) -{ - uint32_t type; - struct stbl_key k; - - se->data = NULL; - - type = pi->tf.type; - stbl_pkt_fill_key(&k, pi, type); - rte_hash_del_key(st->ht[type].t, &k); -} - -static inline void -stbl_del_pkt_lock(struct stbl *st, struct stbl_entry *se, - const union pkt_info *pi) -{ - uint32_t type; - struct stbl_key k; - - se->data = NULL; - - type = pi->tf.type; - stbl_pkt_fill_key(&k, pi, type); - stbl_lock(st, type); - rte_hash_del_key(st->ht[type].t, &k); - stbl_unlock(st, type); -} - #include "tcp_stream.h" static inline void @@ -235,8 +191,8 @@ stbl_add_stream_lock(struct stbl *st, const struct tle_tcp_stream *s) } static inline void -stbl_del_stream_lock(struct stbl *st, struct stbl_entry *se, - const struct tle_tcp_stream *s) +stbl_del_stream(struct stbl *st, struct stbl_entry *se, + const struct tle_tcp_stream *s, uint32_t lock) { uint32_t type; struct stbl_key k; @@ -248,9 +204,11 @@ stbl_del_stream_lock(struct stbl *st, struct stbl_entry *se, type = s->s.type; stbl_stream_fill_key(&k, &s->s, type); - stbl_lock(st, type); + if (lock != 0) + stbl_lock(st, type); rte_hash_del_key(st->ht[type].t, &k); - stbl_unlock(st, type); + if (lock != 0) + stbl_unlock(st, type); } #ifdef __cplusplus diff --git a/lib/libtle_l4p/syncookie.h b/lib/libtle_l4p/syncookie.h index da2e166..61bfce4 100644 --- a/lib/libtle_l4p/syncookie.h +++ b/lib/libtle_l4p/syncookie.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -178,37 +178,40 @@ sync_check_ack(const union pkt_info *pi, uint32_t seq, uint32_t ack, } static inline void -sync_get_opts(struct syn_opts *so, uintptr_t p, uint32_t len) +sync_fill_tcb(struct tcb *tcb, const union seg_info *si, const union tsopt *to) { - so->ts = get_tms_opts(p, len); - so->wscale = so->ts.ecr & SYNC_TMS_WSCALE_MASK; -} + uint32_t ack, mss, seq, wscale; -static inline void -sync_fill_tcb(struct tcb *tcb, const union seg_info *si, - const struct syn_opts *so) -{ - tcb->rcv.nxt = si->seq; - tcb->rcv.irs = si->seq - 1; + seq = si->seq; + + tcb->rcv.nxt = seq; + tcb->rcv.irs = seq - 1; + tcb->snd.wu.wl1 = seq; + + ack = si->ack; + + tcb->snd.nxt = ack; + tcb->snd.una = ack; + tcb->snd.iss = ack - 1; + tcb->snd.rcvr = ack - 1; + tcb->snd.wu.wl2 = ack; - tcb->snd.nxt = si->ack; - tcb->snd.una = si->ack; - tcb->snd.iss = si->ack - 1; - tcb->snd.rcvr = tcb->snd.iss; + mss = si->mss; - tcb->snd.wu.wl1 = si->seq; - tcb->snd.wu.wl2 = si->ack; + tcb->snd.mss = mss; + tcb->so.mss = mss; - tcb->so = *so; + tcb->snd.ts = to->ecr; + tcb->rcv.ts = to->val; + tcb->so.ts.raw = to->raw; - tcb->snd.wscale = tcb->so.wscale; - tcb->snd.mss = tcb->so.mss; - tcb->snd.wnd = si->wnd << tcb->snd.wscale; + wscale = to->ecr & SYNC_TMS_WSCALE_MASK; - tcb->snd.ts = tcb->so.ts.ecr; - tcb->rcv.ts = tcb->so.ts.val; + tcb->snd.wscale = wscale; + tcb->snd.wnd = si->wnd << wscale; + tcb->so.wscale = wscale; - tcb->rcv.wscale = (tcb->so.wscale == TCP_WSCALE_NONE) ? + tcb->rcv.wscale = (wscale == TCP_WSCALE_NONE) ? TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT; } diff --git a/lib/libtle_l4p/tcp_ctl.h b/lib/libtle_l4p/tcp_ctl.h index 32faaa2..bec1e76 100644 --- a/lib/libtle_l4p/tcp_ctl.h +++ b/lib/libtle_l4p/tcp_ctl.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -30,15 +30,63 @@ extern "C" { static inline void tcp_stream_down(struct tle_tcp_stream *s) { - rwl_down(&s->rx.use); - rwl_down(&s->tx.use); + if ((s->flags & TLE_CTX_FLAG_ST) == 0) + rwl_down(&s->use); + else + rte_atomic32_set(&s->use, INT32_MIN); } static inline void tcp_stream_up(struct tle_tcp_stream *s) { - rwl_up(&s->rx.use); - rwl_up(&s->tx.use); + int32_t v; + + if ((s->flags & TLE_CTX_FLAG_ST) == 0) + rwl_up(&s->use); + else { + v = rte_atomic32_read(&s->use) - INT32_MIN; + rte_atomic32_set(&s->use, v); + } +} + +static inline int +tcp_stream_try_acquire(struct tle_tcp_stream *s) +{ + int32_t v; + + if ((s->flags & TLE_CTX_FLAG_ST) == 0) + return rwl_try_acquire(&s->use); + + v = rte_atomic32_read(&s->use) + 1; + rte_atomic32_set(&s->use, v); + return v; +} + +static inline void +tcp_stream_release(struct tle_tcp_stream *s) +{ + int32_t v; + + if ((s->flags & TLE_CTX_FLAG_ST) == 0) + rwl_release(&s->use); + else { + v = rte_atomic32_read(&s->use) - 1; + rte_atomic32_set(&s->use, v); + } +} + +static inline int +tcp_stream_acquire(struct tle_tcp_stream *s) +{ + int32_t v; + + if ((s->flags & TLE_CTX_FLAG_ST) == 0) + return rwl_acquire(&s->use); + + v = rte_atomic32_read(&s->use) + 1; + if (v > 0) + rte_atomic32_set(&s->use, v); + return v; } /* calculate RCV.WND value based on size of stream receive buffer */ @@ -67,28 +115,28 @@ empty_tq(struct tle_tcp_stream *s) static inline void empty_rq(struct tle_tcp_stream *s) { - empty_mbuf_ring(s->rx.q); + uint32_t n; + struct rte_mbuf *mb[MAX_PKT_BURST]; + + do { + n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)mb, + RTE_DIM(mb)); + free_mbufs(mb, n); + } while (n != 0); + tcp_ofo_reset(s->rx.ofo); } /* empty stream's listen queue */ static inline void -empty_lq(struct tle_tcp_stream *s, struct stbl *st) +empty_lq(struct tle_tcp_stream *s) { - uint32_t i, n; - struct rte_mbuf *mb; - union pkt_info pi; - union seg_info si; - struct stbl_entry *se[MAX_PKT_BURST]; + uint32_t n; + struct tle_stream *ts[MAX_PKT_BURST]; do { - n = _rte_ring_dequeue_burst(s->rx.q, (void **)se, RTE_DIM(se)); - for (i = 0; i != n; i++) { - mb = stbl_get_pkt(se[i]); - get_pkt_info(mb, &pi, &si); - stbl_del_pkt_lock(st, se[i], &pi); - rte_pktmbuf_free(mb); - } + n = _rte_ring_dequeue_burst(s->rx.q, (void **)ts, RTE_DIM(ts)); + tle_tcp_stream_close_bulk(ts, n); } while (n != 0); } @@ -114,12 +162,13 @@ tcp_stream_reset(struct tle_ctx *ctx, struct tle_tcp_stream *s) /* free stream's destination port */ stream_clear_ctx(ctx, &s->s); if (uop == TCP_OP_LISTEN) - empty_lq(s, st); + empty_lq(s); } if (s->ste != NULL) { /* remove entry from RX streams table */ - stbl_del_stream_lock(st, s->ste, s); + stbl_del_stream(st, s->ste, s, + (s->flags & TLE_CTX_FLAG_ST) == 0); s->ste = NULL; empty_rq(s); } diff --git a/lib/libtle_l4p/tcp_misc.h b/lib/libtle_l4p/tcp_misc.h index 9f19f69..0ca5429 100644 --- a/lib/libtle_l4p/tcp_misc.h +++ b/lib/libtle_l4p/tcp_misc.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -54,6 +54,10 @@ extern "C" { #define TCP6_OP_MSS (TCP6_NOP_MSS - TCP_TX_OPT_LEN_MAX) +/* Initial Window Configuration parameter, probably will be configured during + * the startup in future */ +#define TCP_INITIAL_CWND_MAX 14600 + /* * TCP flags */ @@ -93,8 +97,8 @@ union seg_info { struct { uint32_t seq; uint32_t ack; - uint16_t hole1; uint16_t wnd; + uint16_t mss; /* valid only at SYN time */ }; }; @@ -223,11 +227,10 @@ struct dack_info { /* get current timestamp in ms */ static inline uint32_t -tcp_get_tms(void) +tcp_get_tms(uint32_t mshift) { - uint64_t ts, ms; - ms = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S; - ts = rte_get_tsc_cycles() / ms; + uint64_t ts; + ts = rte_get_tsc_cycles() >> mshift; return ts; } @@ -248,8 +251,11 @@ static inline void get_seg_info(const struct tcp_hdr *th, union seg_info *si) { __m128i v; - const __m128i bswap_mask = _mm_set_epi8(15, 14, 13, 12, 10, 11, 9, 8, - 4, 5, 6, 7, 0, 1, 2, 3); + const __m128i bswap_mask = + _mm_set_epi8(UINT8_MAX, UINT8_MAX, UINT8_MAX, UINT8_MAX, + UINT8_MAX, UINT8_MAX, 10, 11, + 4, 5, 6, 7, + 0, 1, 2, 3); v = _mm_loadu_si128((const __m128i *)&th->sent_seq); si->raw.x = _mm_shuffle_epi8(v, bswap_mask); diff --git a/lib/libtle_l4p/tcp_rxq.h b/lib/libtle_l4p/tcp_rxq.h index bddc28e..01f34fa 100644 --- a/lib/libtle_l4p/tcp_rxq.h +++ b/lib/libtle_l4p/tcp_rxq.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -22,6 +22,11 @@ extern "C" { #endif +struct rxq_objs { + struct rte_mbuf **mb; + uint32_t num; +}; + static inline uint32_t rx_ofo_enqueue(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mb[], uint32_t num) @@ -142,6 +147,41 @@ rx_data_enqueue(struct tle_tcp_stream *s, uint32_t seq, uint32_t len, return t; } +static inline uint32_t +tcp_rxq_get_objs(struct tle_tcp_stream *s, struct rxq_objs obj[2]) +{ + struct rte_ring *r; + uint32_t n, head, sz; + + r = s->rx.q; + + n = _rte_ring_mcs_dequeue_start(r, UINT32_MAX); + if (n == 0) + return 0; + + sz = _rte_ring_get_size(r); + head = (r->cons.head - n) & _rte_ring_get_mask(r); + + obj[0].mb = (struct rte_mbuf **)(_rte_ring_get_data(r) + head); + obj[1].mb = (struct rte_mbuf **)_rte_ring_get_data(r); + + if (head + n <= sz) { + obj[0].num = n; + obj[1].num = 0; + return 1; + } else { + obj[0].num = sz - head; + obj[1].num = n + head - sz; + return 2; + } +} + +static inline void +tcp_rxq_consume(struct tle_tcp_stream *s, uint32_t num) +{ + _rte_ring_mcs_dequeue_finish(s->rx.q, num); +} + #ifdef __cplusplus } #endif diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c index a1c7d09..30ed104 100644 --- a/lib/libtle_l4p/tcp_rxtx.c +++ b/lib/libtle_l4p/tcp_rxtx.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -59,12 +59,12 @@ rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi, struct tle_tcp_stream *s; s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst]; - if (s == NULL || rwl_acquire(&s->rx.use) < 0) + if (s == NULL || tcp_stream_acquire(s) < 0) return NULL; /* check that we have a proper stream. */ if (s->tcb.state != TCP_ST_LISTEN) { - rwl_release(&s->rx.use); + tcp_stream_release(s); s = NULL; } @@ -84,11 +84,11 @@ rx_obtain_stream(const struct tle_dev *dev, struct stbl *st, return NULL; } - if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0) + if (tcp_stream_acquire(s) < 0) return NULL; /* check that we have a proper stream. */ else if (s->tcb.state == TCP_ST_CLOSED) { - rwl_release(&s->rx.use); + tcp_stream_release(s); s = NULL; } @@ -164,6 +164,24 @@ stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[], return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb); } +static inline uint32_t +get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st) +{ + uint32_t pid; + rte_atomic32_t *pa; + + pa = &dev->tx.packet_id[type]; + + if (st == 0) { + pid = rte_atomic32_add_return(pa, num); + return pid - num; + } else { + pid = rte_atomic32_read(pa); + rte_atomic32_set(pa, pid + num); + return pid; + } +} + static inline void fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port, uint32_t seq, uint8_t hlen, uint8_t flags) @@ -357,7 +375,7 @@ tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[], type = s->s.type; dev = s->tx.dst.dev; - pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num; + pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0); k = 0; tn = 0; @@ -506,22 +524,16 @@ calc_smss(uint16_t mss, const struct tle_dest *dst) } /* - * RFC 5681 3.1 - * If SMSS > 2190 bytes: - * IW = 2 * SMSS bytes and MUST NOT be more than 2 segments - * If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes): - * IW = 3 * SMSS bytes and MUST NOT be more than 3 segments - * if SMSS <= 1095 bytes: - * IW = 4 * SMSS bytes and MUST NOT be more than 4 segments + * RFC 6928 2 + * min (10*MSS, max (2*MSS, 14600)) + * + * or using user provided initial congestion window (icw) + * min (10*MSS, max (2*MSS, icw)) */ static inline uint32_t -initial_cwnd(uint16_t smss) +initial_cwnd(uint32_t smss, uint32_t icw) { - if (smss > 2190) - return 2 * smss; - else if (smss > 1095) - return 3 * smss; - return 4 * smss; + return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw)); } /* @@ -561,7 +573,7 @@ send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq, dst = &s->tx.dst; type = s->s.type; - pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1; + pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0); rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1); if (rc == 0) @@ -657,7 +669,7 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi, return -EINVAL; dev = dst.dev; - pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1; + pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0); rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq, TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1); @@ -763,8 +775,8 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len, } static inline int -restore_syn_opt(struct syn_opts *so, const union pkt_info *pi, - const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb, +restore_syn_opt(union seg_info *si, union tsopt *to, + const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb, uint32_t hash_alg, rte_xmm_t *secret_key) { int32_t rc; @@ -778,12 +790,12 @@ restore_syn_opt(struct syn_opts *so, const union pkt_info *pi, if (rc < 0) return rc; - so->mss = rc; + si->mss = rc; th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *, mb->l2_len + mb->l3_len); len = mb->l4_len - sizeof(*th); - sync_get_opts(so, (uintptr_t)(th + 1), len); + to[0] = get_tms_opts((uintptr_t)(th + 1), len); return 0; } @@ -814,9 +826,11 @@ static inline int stream_fill_dest(struct tle_tcp_stream *s) { int32_t rc; + uint32_t type; const void *da; - if (s->s.type == TLE_V4) + type = s->s.type; + if (type == TLE_V4) da = &s->s.ipv4.addr.src; else da = &s->s.ipv6.addr.src; @@ -830,7 +844,7 @@ stream_fill_dest(struct tle_tcp_stream *s) */ static inline int accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, - struct tle_tcp_stream *cs, const struct syn_opts *so, + struct tle_tcp_stream *cs, const union tsopt *to, uint32_t tms, const union pkt_info *pi, const union seg_info *si) { int32_t rc; @@ -857,7 +871,7 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, } /* setup TCB */ - sync_fill_tcb(&cs->tcb, si, so); + sync_fill_tcb(&cs->tcb, si, to); cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale); /* @@ -871,8 +885,9 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, } else cs->tcb.snd.rto = TCP_RTO_DEFAULT; - /* copy streams type. */ + /* copy streams type & flags. */ cs->s.type = ps->s.type; + cs->flags = ps->flags; /* retrive and cache destination information. */ rc = stream_fill_dest(cs); @@ -883,8 +898,9 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst); /* setup congestion variables */ - cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss); + cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd); cs->tcb.snd.ssthresh = cs->tcb.snd.wnd; + cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw; cs->tcb.state = TCP_ST_ESTABLISHED; @@ -909,14 +925,14 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, */ static inline int rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, - const union pkt_info *pi, const union seg_info *si, + const union pkt_info *pi, union seg_info *si, uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp) { int32_t rc; struct tle_ctx *ctx; struct tle_stream *ts; struct tle_tcp_stream *cs; - struct syn_opts so; + union tsopt to; *csp = NULL; @@ -924,7 +940,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, return -EINVAL; ctx = s->s.ctx; - rc = restore_syn_opt(&so, pi, si, tms, mb, ctx->prm.hash_alg, + rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg, &ctx->prm.secret_key); if (rc < 0) return rc; @@ -936,7 +952,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, return ENFILE; /* prepare stream to handle new connection */ - if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) { + if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) { /* put new stream in the accept queue */ if (_rte_ring_enqueue_burst(s->rx.q, @@ -947,7 +963,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, /* cleanup on failure */ tcp_stream_down(cs); - stbl_del_pkt(st, cs->ste, pi); + stbl_del_stream(st, cs->ste, cs, 0); cs->ste = NULL; } @@ -1008,6 +1024,17 @@ rx_ackdata(struct tle_tcp_stream *s, uint32_t ack) } static void +stream_timewait(struct tle_tcp_stream *s, uint32_t rto) +{ + if (rto != 0) { + s->tcb.state = TCP_ST_TIME_WAIT; + s->tcb.snd.rto = rto; + timer_reset(s); + } else + stream_term(s); +} + +static void rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp) { uint32_t state; @@ -1027,17 +1054,13 @@ rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp) s->err.cb.func(s->err.cb.data, &s->s); } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) { rsp->flags |= TCP_FLAG_ACK; - if (ackfin != 0) { - s->tcb.state = TCP_ST_TIME_WAIT; - s->tcb.snd.rto = TCP_RTO_2MSL; - timer_reset(s); - } else + if (ackfin != 0) + stream_timewait(s, s->tcb.snd.rto_tw); + else s->tcb.state = TCP_ST_CLOSING; } else if (state == TCP_ST_FIN_WAIT_2) { rsp->flags |= TCP_FLAG_ACK; - s->tcb.state = TCP_ST_TIME_WAIT; - s->tcb.snd.rto = TCP_RTO_2MSL; - timer_reset(s); + stream_timewait(s, s->tcb.snd.rto_tw); } else if (state == TCP_ST_LAST_ACK && ackfin != 0) { stream_term(s); } @@ -1144,7 +1167,9 @@ rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp) static inline void dack_info_init(struct dack_info *tack, const struct tcb *tcb) { - memset(tack, 0, sizeof(*tack)); + static const struct dack_info zero_dack; + + tack[0] = zero_dack; tack->ack = tcb->snd.una; tack->segs.dup = tcb->rcv.dupack; tack->wu.raw = tcb->snd.wu.raw; @@ -1488,9 +1513,7 @@ rx_ackfin(struct tle_tcp_stream *s) timer_stop(s); s->tcb.state = TCP_ST_FIN_WAIT_2; } else if (state == TCP_ST_CLOSING) { - s->tcb.state = TCP_ST_TIME_WAIT; - s->tcb.snd.rto = TCP_RTO_2MSL; - timer_reset(s); + stream_timewait(s, s->tcb.snd.rto_tw); } } @@ -1554,7 +1577,7 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state, s->tcb.snd.wscale = so.wscale; /* setup congestion variables */ - s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss); + s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd); s->tcb.snd.ssthresh = s->tcb.snd.wnd; s->tcb.rcv.ts = so.ts.val; @@ -1720,9 +1743,9 @@ rx_new_stream(struct tle_tcp_stream *s, uint32_t ts, { uint32_t i; - if (rwl_acquire(&s->rx.use) > 0) { + if (tcp_stream_acquire(s) > 0) { i = rx_stream(s, ts, pi, si, mb, rp, rc, num); - rwl_release(&s->rx.use); + tcp_stream_release(s); return i; } @@ -1735,7 +1758,7 @@ rx_new_stream(struct tle_tcp_stream *s, uint32_t ts, static inline uint32_t rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, - const union pkt_info pi[], const union seg_info si[], + const union pkt_info pi[], union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[], uint32_t num) { @@ -1809,7 +1832,7 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, k = num - i; } - rwl_release(&s->rx.use); + tcp_stream_release(s); return num - k; } @@ -1850,7 +1873,7 @@ rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts, } } - rwl_release(&s->rx.use); + tcp_stream_release(s); return num - k; } @@ -1859,7 +1882,8 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], struct rte_mbuf *rp[], int32_t rc[], uint16_t num) { struct stbl *st; - uint32_t i, j, k, n, t, ts; + struct tle_ctx *ctx; + uint32_t i, j, k, mt, n, t, ts; uint64_t csf; union pkt_info pi[num]; union seg_info si[num]; @@ -1868,8 +1892,10 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint32_t raw; } stu; - ts = tcp_get_tms(); - st = CTX_TCP_STLB(dev->ctx); + ctx = dev->ctx; + ts = tcp_get_tms(ctx->cycles_ms_shift); + st = CTX_TCP_STLB(ctx); + mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0); stu.raw = 0; @@ -1887,7 +1913,7 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], pi[i].tf.type, IPPROTO_TCP) != 0) pi[i].csf = csf; - stu.t[t] = 1; + stu.t[t] = mt; } if (stu.t[TLE_V4] != 0) @@ -1936,7 +1962,7 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[], struct tle_tcp_stream *s; s = TCP_STREAM(ts); - n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num); + n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num); if (n == 0) return 0; @@ -1945,9 +1971,9 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[], * then rearm stream RX event. */ if (n == num && rte_ring_count(s->rx.q) != 0) { - if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL) + if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) tle_event_raise(s->rx.ev); - rwl_release(&s->rx.use); + tcp_stream_release(s); } return n; @@ -2066,7 +2092,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) /* fill pkt info to generate seq.*/ stream_fill_pkt_info(s, &pi); - tms = tcp_get_tms(); + tms = tcp_get_tms(s->s.ctx->cycles_ms_shift); s->tcb.so.ts.val = tms; s->tcb.so.ts.ecr = 0; s->tcb.so.wscale = TCP_WSCALE_DEFAULT; @@ -2116,7 +2142,7 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr) if (type >= TLE_VNUM) return -EINVAL; - if (rwl_try_acquire(&s->tx.use) > 0) { + if (tcp_stream_try_acquire(s) > 0) { rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED, TCP_ST_SYN_SENT); rc = (rc == 0) ? -EDEADLK : 0; @@ -2124,14 +2150,14 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr) rc = -EINVAL; if (rc != 0) { - rwl_release(&s->tx.use); + tcp_stream_release(s); return rc; } /* fill stream, prepare and transmit syn pkt */ s->tcb.uop |= TCP_OP_CONNECT; rc = tx_syn(s, addr); - rwl_release(&s->tx.use); + tcp_stream_release(s); /* error happened, do a cleanup */ if (rc != 0) @@ -2147,7 +2173,7 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) struct tle_tcp_stream *s; s = TCP_STREAM(ts); - n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num); + n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num); if (n == 0) return 0; @@ -2156,14 +2182,76 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) * then rearm stream RX event. */ if (n == num && rte_ring_count(s->rx.q) != 0) { - if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL) + if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) tle_event_raise(s->rx.ev); - rwl_release(&s->rx.use); + tcp_stream_release(s); } return n; } +ssize_t +tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, + int iovcnt) +{ + int32_t i; + uint32_t mn, n, tn; + size_t sz; + struct tle_tcp_stream *s; + struct iovec iv; + struct rxq_objs mo[2]; + + s = TCP_STREAM(ts); + + /* get group of packets */ + mn = tcp_rxq_get_objs(s, mo); + if (mn == 0) + return 0; + + sz = 0; + n = 0; + for (i = 0; i != iovcnt; i++) { + iv = iov[i]; + sz += iv.iov_len; + n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n); + if (iv.iov_len != 0) { + sz -= iv.iov_len; + break; + } + } + + tn = n; + + if (i != iovcnt && mn != 1) { + n = 0; + do { + sz += iv.iov_len; + n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n); + if (iv.iov_len != 0) { + sz -= iv.iov_len; + break; + } + if (i + 1 != iovcnt) + iv = iov[i + 1]; + } while (++i != iovcnt); + tn += n; + } + + tcp_rxq_consume(s, tn); + + /* + * if we still have packets to read, + * then rearm stream RX event. + */ + if (i == iovcnt && rte_ring_count(s->rx.q) != 0) { + if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + tcp_stream_release(s); + } + + return sz; +} + static inline int32_t tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags, struct rte_mbuf *segs[], uint32_t num) @@ -2176,16 +2264,16 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags, rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port, 0, TCP_FLAG_ACK, 0, 0); if (rc != 0) { - free_segments(segs, num); + free_mbufs(segs, num); break; } } if (i == num) { /* queue packets for further transmission. */ - rc = _rte_ring_mp_enqueue_bulk(s->tx.q, (void **)segs, num); + rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num); if (rc != 0) - free_segments(segs, num); + free_mbufs(segs, num); } return rc; @@ -2194,17 +2282,16 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags, uint16_t tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) { - uint32_t i, j, k, mss, n, state, type; + uint32_t i, j, k, mss, n, state; int32_t rc; uint64_t ol_flags; struct tle_tcp_stream *s; - struct tle_dev *dev; struct rte_mbuf *segs[TCP_MAX_PKT_SEG]; s = TCP_STREAM(ts); /* mark stream as not closable. */ - if (rwl_acquire(&s->tx.use) < 0) { + if (tcp_stream_acquire(s) < 0) { rte_errno = EAGAIN; return 0; } @@ -2212,14 +2299,12 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) state = s->tcb.state; if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) { rte_errno = ENOTCONN; - rwl_release(&s->tx.use); + tcp_stream_release(s); return 0; } mss = s->tcb.snd.mss; - dev = s->tx.dst.dev; - type = s->s.type; - ol_flags = dev->tx.ol_flags[type]; + ol_flags = s->tx.dst.ol_flags; k = 0; rc = 0; @@ -2237,7 +2322,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) if (i != k) { /* queue packets for further transmission. */ - n = _rte_ring_mp_enqueue_burst(s->tx.q, + n = _rte_ring_enqueue_burst(s->tx.q, (void **)pkt + k, (i - k)); k += n; @@ -2246,7 +2331,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) * remove pkt l2/l3 headers, restore ol_flags */ if (i != k) { - ol_flags = ~dev->tx.ol_flags[type]; + ol_flags = ~s->tx.dst.ol_flags; for (j = k; j != i; j++) { rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len + pkt[j]->l3_len + @@ -2271,7 +2356,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) break; } - rc = tx_segments(s, dev->tx.ol_flags[type], segs, rc); + rc = tx_segments(s, ol_flags, segs, rc); if (rc == 0) { /* free the large mbuf */ rte_pktmbuf_free(pkt[i]); @@ -2290,11 +2375,120 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL) tle_event_raise(s->tx.ev); - rwl_release(&s->tx.use); + tcp_stream_release(s); return k; } +ssize_t +tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, + const struct iovec *iov, int iovcnt) +{ + int32_t i, rc; + uint32_t j, k, n, num, slen, state; + uint64_t ol_flags; + size_t sz, tsz; + struct tle_tcp_stream *s; + struct iovec iv; + struct rte_mbuf *mb[2 * MAX_PKT_BURST]; + + s = TCP_STREAM(ts); + + /* mark stream as not closable. */ + if (tcp_stream_acquire(s) < 0) { + rte_errno = EAGAIN; + return -1; + } + + state = s->tcb.state; + if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) { + rte_errno = ENOTCONN; + tcp_stream_release(s); + return -1; + } + + /* figure out how many mbufs do we need */ + tsz = 0; + for (i = 0; i != iovcnt; i++) + tsz += iov[i].iov_len; + + slen = rte_pktmbuf_data_room_size(mp); + slen = RTE_MIN(slen, s->tcb.snd.mss); + + num = (tsz + slen - 1) / slen; + n = rte_ring_free_count(s->tx.q); + num = RTE_MIN(num, n); + n = RTE_MIN(num, RTE_DIM(mb)); + + /* allocate mbufs */ + if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) { + rte_errno = ENOMEM; + tcp_stream_release(s); + return -1; + } + + /* copy data into the mbufs */ + k = 0; + sz = 0; + for (i = 0; i != iovcnt; i++) { + iv = iov[i]; + sz += iv.iov_len; + k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k); + if (iv.iov_len != 0) { + sz -= iv.iov_len; + break; + } + } + + /* partially filled segment */ + k += (k != n && mb[k]->data_len != 0); + + /* fill pkt headers */ + ol_flags = s->tx.dst.ol_flags; + + for (j = 0; j != k; j++) { + rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags, + s->s.port, 0, TCP_FLAG_ACK, 0, 0); + if (rc != 0) + break; + } + + /* if no error encountered, then enqueue pkts for transmission */ + if (k == j) + k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j); + else + k = 0; + + if (k != j) { + + /* free pkts that were not enqueued */ + free_mbufs(mb + k, j - k); + + /* our last segment can be partially filled */ + sz += slen - sz % slen; + sz -= (j - k) * slen; + + /* report an error */ + if (rc != 0) { + rte_errno = -rc; + sz = -1; + } + } + + if (k != 0) { + + /* notify BE about more data to send */ + txs_enqueue(s->s.ctx, s); + + /* if possible, re-arm stream write event. */ + if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + } + + tcp_stream_release(s); + return sz; +} + /* send data and FIN (if needed) */ static inline void tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state) @@ -2376,6 +2570,18 @@ rto_stream(struct tle_tcp_stream *s, uint32_t tms) } else if (state == TCP_ST_SYN_SENT) { /* resending SYN */ s->tcb.so.ts.val = tms; + + /* According to RFC 6928 2: + * To reduce the chance for spurious SYN or SYN/ACK + * retransmission, it is RECOMMENDED that + * implementations refrain from resetting the initial + * window to 1 segment, unless there have been more + * than one SYN or SYN/ACK retransmissions or true loss + * detection has been made. + */ + if (s->tcb.snd.nb_retx != 0) + s->tcb.snd.cwnd = s->tcb.snd.mss; + send_ack(s, tms, TCP_FLAG_SYN); } else if (state == TCP_ST_TIME_WAIT) { @@ -2405,7 +2611,7 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num) /* process streams with RTO exipred */ tw = CTX_TCP_TMWHL(ctx); - tms = tcp_get_tms(); + tms = tcp_get_tms(ctx->cycles_ms_shift); tle_timer_expire(tw, tms); k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs)); @@ -2414,9 +2620,9 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num) s = rs[i]; s->timer.handle = NULL; - if (rwl_try_acquire(&s->tx.use) > 0) + if (tcp_stream_try_acquire(s) > 0) rto_stream(s, tms); - rwl_release(&s->tx.use); + tcp_stream_release(s); } /* process streams from to-send queue */ @@ -2428,11 +2634,11 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num) s = rs[i]; rte_atomic32_set(&s->tx.arm, 0); - if (rwl_try_acquire(&s->tx.use) > 0) + if (tcp_stream_try_acquire(s) > 0) tx_stream(s, tms); else txs_enqueue(s->s.ctx, s); - rwl_release(&s->tx.use); + tcp_stream_release(s); } /* collect streams to close from the death row */ diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c index 99791d0..4e9ddb7 100644 --- a/lib/libtle_l4p/tcp_stream.c +++ b/lib/libtle_l4p/tcp_stream.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -32,8 +32,7 @@ static void unuse_stream(struct tle_tcp_stream *s) { s->s.type = TLE_VNUM; - rte_atomic32_set(&s->rx.use, INT32_MIN); - rte_atomic32_set(&s->tx.use, INT32_MIN); + rte_atomic32_set(&s->use, INT32_MIN); } static void @@ -99,14 +98,17 @@ static int init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s) { size_t bsz, rsz, sz; - uint32_t i, k, n, nb; + uint32_t f, i, k, n, nb; struct tle_drb *drb; char name[RTE_RING_NAMESIZE]; + f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 : + (RING_F_SP_ENQ | RING_F_SC_DEQ); + /* init RX part. */ n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); - s->rx.q = alloc_ring(n, RING_F_SP_ENQ, ctx->prm.socket_id); + s->rx.q = alloc_ring(n, f | RING_F_SP_ENQ, ctx->prm.socket_id); if (s->rx.q == NULL) return -ENOMEM; @@ -117,7 +119,7 @@ init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s) /* init TX part. */ n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U); - s->tx.q = alloc_ring(n, RING_F_SC_DEQ, ctx->prm.socket_id); + s->tx.q = alloc_ring(n, f | RING_F_SC_DEQ, ctx->prm.socket_id); if (s->tx.q == NULL) return -ENOMEM; @@ -145,7 +147,7 @@ init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s) } snprintf(name, sizeof(name), "%p@%zu", s, sz); - rte_ring_init(s->tx.drb.r, name, n, 0); + rte_ring_init(s->tx.drb.r, name, n, f); for (i = 0; i != k; i++) { drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r + @@ -177,24 +179,27 @@ tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb) } static struct tle_timer_wheel * -alloc_timers(uint32_t num, int32_t socket) +alloc_timers(uint32_t num, uint32_t mshift, int32_t socket) { struct tle_timer_wheel_args twprm; twprm.tick_size = TCP_RTO_GRANULARITY; twprm.max_timer = num; twprm.socket_id = socket; - return tle_timer_create(&twprm, tcp_get_tms()); + return tle_timer_create(&twprm, tcp_get_tms(mshift)); } static int tcp_init_streams(struct tle_ctx *ctx) { size_t sz; - uint32_t i; + uint32_t f, i; int32_t rc; struct tcp_streams *ts; + f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 : + (RING_F_SP_ENQ | RING_F_SC_DEQ); + sz = sizeof(*ts) + sizeof(ts->s[0]) * ctx->prm.max_streams; ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, ctx->prm.socket_id); @@ -211,14 +216,15 @@ tcp_init_streams(struct tle_ctx *ctx) ctx->streams.buf = ts; STAILQ_INIT(&ctx->streams.free); - ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->prm.socket_id); + ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->cycles_ms_shift, + ctx->prm.socket_id); if (ts->tmr == NULL) { TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n", ctx, rte_errno); rc = -ENOMEM; } else { ts->tsq = alloc_ring(ctx->prm.max_streams, - RING_F_SC_DEQ, ctx->prm.socket_id); + f | RING_F_SC_DEQ, ctx->prm.socket_id); if (ts->tsq == NULL) rc = -ENOMEM; else @@ -329,8 +335,13 @@ tle_tcp_stream_open(struct tle_ctx *ctx, s->err.cb = prm->cfg.err_cb; /* store other params */ + s->flags = ctx->prm.flags; s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries : TLE_TCP_DEFAULT_RETRIES; + s->tcb.snd.cwnd = (ctx->prm.icw == 0) ? TCP_INITIAL_CWND_MAX : + ctx->prm.icw; + s->tcb.snd.rto_tw = (ctx->prm.timewait == TLE_TCP_TIMEWAIT_DEFAULT) ? + TCP_RTO_2MSL : ctx->prm.timewait; tcp_stream_up(s); return &s->s; @@ -438,15 +449,6 @@ tle_tcp_stream_close(struct tle_stream *ts) return -EINVAL; ctx = s->s.ctx; - - /* reset stream events if any. */ - if (s->rx.ev != NULL) - tle_event_idle(s->rx.ev); - if (s->tx.ev != NULL) - tle_event_idle(s->tx.ev); - if (s->err.ev != NULL) - tle_event_idle(s->err.ev); - return stream_close(ctx, s); } @@ -505,7 +507,7 @@ tle_tcp_stream_listen(struct tle_stream *ts) return -EINVAL; /* mark stream as not closable. */ - if (rwl_try_acquire(&s->rx.use) > 0) { + if (tcp_stream_try_acquire(s) > 0) { rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED, TCP_ST_LISTEN); if (rc != 0) { @@ -517,7 +519,7 @@ tle_tcp_stream_listen(struct tle_stream *ts) } else rc = -EINVAL; - rwl_release(&s->rx.use); + tcp_stream_release(s); return rc; } @@ -527,17 +529,12 @@ tle_tcp_stream_listen(struct tle_stream *ts) static inline int stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm) { - int32_t rc1, rc2; struct tle_tcp_stream *s; s = TCP_STREAM(ts); - rc1 = rwl_try_acquire(&s->rx.use); - rc2 = rwl_try_acquire(&s->tx.use); - - if (rc1 < 0 || rc2 < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) { - rwl_release(&s->tx.use); - rwl_release(&s->rx.use); + if (tcp_stream_try_acquire(s) < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) { + tcp_stream_release(s); return -EINVAL; } @@ -581,9 +578,7 @@ stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm) s->err.cb.func(s->err.cb.data, &s->s); } - rwl_release(&s->tx.use); - rwl_release(&s->rx.use); - + tcp_stream_release(s); return 0; } @@ -606,13 +601,13 @@ tle_tcp_stream_update_cfg(struct tle_stream *ts[], } int -tle_tcp_stream_get_mss(const struct tle_stream * stream) +tle_tcp_stream_get_mss(const struct tle_stream * ts) { - struct tle_tcp_stream *tcp; + struct tle_tcp_stream *s; - if (stream == NULL) + if (ts == NULL) return -EINVAL; - tcp = TCP_STREAM(stream); - return tcp->tcb.snd.mss; + s = TCP_STREAM(ts); + return s->tcb.snd.mss; } diff --git a/lib/libtle_l4p/tcp_stream.h b/lib/libtle_l4p/tcp_stream.h index 04c2f88..4629fe6 100644 --- a/lib/libtle_l4p/tcp_stream.h +++ b/lib/libtle_l4p/tcp_stream.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -82,6 +82,7 @@ struct tcb { uint32_t cwnd; /* congestion window */ uint32_t ssthresh; /* slow start threshold */ uint32_t rto; /* retransmission timeout */ + uint32_t rto_tw; /* TIME_WAIT retransmission timeout */ uint32_t iss; /* initial send sequence */ uint16_t mss; uint8_t wscale; @@ -91,11 +92,13 @@ struct tcb { struct syn_opts so; /* initial syn options. */ }; - struct tle_tcp_stream { struct tle_stream s; + uint32_t flags; + rte_atomic32_t use; + struct stbl_entry *ste; /* entry in streams table. */ struct tcb tcb; @@ -109,7 +112,6 @@ struct tle_tcp_stream { } err; struct { - rte_atomic32_t use; struct rte_ring *q; /* listen (syn) queue */ struct ofo *ofo; struct tle_event *ev; /* user provided recv event. */ @@ -117,7 +119,6 @@ struct tle_tcp_stream { } rx __rte_cache_aligned; struct { - rte_atomic32_t use; rte_atomic32_t arm; /* when > 0 stream is in to-send queue */ struct { uint32_t nb_elem; /* number of objects per drb. */ diff --git a/lib/libtle_l4p/tcp_tx_seg.h b/lib/libtle_l4p/tcp_tx_seg.h index 3a80fdd..a8d2425 100644 --- a/lib/libtle_l4p/tcp_tx_seg.h +++ b/lib/libtle_l4p/tcp_tx_seg.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -20,15 +20,6 @@ extern "C" { #endif -static inline void -free_segments(struct rte_mbuf *mb[], uint32_t num) -{ - uint32_t i; - - for (i = 0; i != num; i++) - rte_pktmbuf_free(mb[i]); -} - static inline int32_t tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num, const struct tle_dest *dst, uint16_t mss) @@ -53,7 +44,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num, /* Allocate direct buffer */ out_pkt = rte_pktmbuf_alloc(dst->head_mp); if (out_pkt == NULL) { - free_segments(mbout, nbseg); + free_mbufs(mbout, nbseg); return -ENOMEM; } @@ -67,7 +58,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num, out_seg = rte_pktmbuf_alloc(dst->head_mp); if (out_seg == NULL) { rte_pktmbuf_free(out_pkt); - free_segments(mbout, nbseg); + free_mbufs(mbout, nbseg); return -ENOMEM; } out_seg_prev->next = out_seg; diff --git a/lib/libtle_l4p/tle_ctx.h b/lib/libtle_l4p/tle_ctx.h index 144dbe7..0ea4668 100644 --- a/lib/libtle_l4p/tle_ctx.h +++ b/lib/libtle_l4p/tle_ctx.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -80,8 +80,9 @@ struct tle_dev_param { struct tle_dest { struct rte_mempool *head_mp; /**< MP for fragment headers and control packets. */ - struct tle_dev *dev; /**< device to send packets through. */ - uint16_t mtu; /**< MTU for given destination. */ + struct tle_dev *dev; /**< device to send packets through. */ + uint64_t ol_flags; /**< tx ofload flags. */ + uint16_t mtu; /**< MTU for given destination. */ uint8_t l2_len; /**< L2 header length. */ uint8_t l3_len; /**< L3 header length. */ uint8_t hdr[TLE_DST_MAX_HDR]; /**< L2/L3 headers. */ @@ -103,6 +104,10 @@ enum { TLE_HASH_NUM }; +enum { + TLE_CTX_FLAG_ST = 1, /**< ctx will be used by single thread */ +}; + struct tle_ctx_param { int32_t socket_id; /**< socket ID to allocate memory for. */ uint32_t proto; /**< L4 proto to handle. */ @@ -110,6 +115,7 @@ struct tle_ctx_param { uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */ uint32_t max_stream_sbufs; /**< max send mbufs per stream. */ uint32_t send_bulk_size; /**< expected # of packets per send call. */ + uint32_t flags; /**< specific flags */ int (*lookup4)(void *opaque, const struct in_addr *addr, struct tle_dest *res); @@ -127,9 +133,19 @@ struct tle_ctx_param { /**< hash algorithm to be used to generate sequence number. */ rte_xmm_t secret_key; /**< secret key to be used to calculate the hash. */ + + uint32_t icw; /**< initial congestion window, default is 2*MSS if 0. */ + uint32_t timewait; + /**< TCP TIME_WAIT state timeout duration in milliseconds, + * default 2MSL, if UINT32_MAX */ }; /** + * use default TIMEWAIT timeout value. + */ +#define TLE_TCP_TIMEWAIT_DEFAULT UINT32_MAX + +/** * create L4 processing context. * @param ctx_prm * Parameters used to create and initialise the L4 context. diff --git a/lib/libtle_l4p/tle_tcp.h b/lib/libtle_l4p/tle_tcp.h index 9086658..b0cbda6 100644 --- a/lib/libtle_l4p/tle_tcp.h +++ b/lib/libtle_l4p/tle_tcp.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Intel Corporation. + * Copyright (c) 2016-2017 Intel Corporation. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -120,6 +120,16 @@ tle_tcp_stream_get_addr(const struct tle_stream *s, struct tle_tcp_stream_addr *addr); /** + * Get current TCP maximum segment size + * @param ts + * Stream to retrieve MSS information from. + * @return + * Maximum segment size in bytes, if successful. + * Negative on failure. + */ +int tle_tcp_stream_get_mss(const struct tle_stream *ts); + +/** * Client mode connect API. */ @@ -258,6 +268,28 @@ uint16_t tle_tcp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[], uint16_t num); /** + * Reads iovcnt buffers from the for given TCP stream. + * Note that the stream has to be in connected state. + * Data ordering is preserved. + * Buffers are processed in array order. + * This means that the function will comppletely fill iov[0] + * before proceeding to iov[1], and so on. + * If there is insufficient data, then not all buffers pointed to by iov + * may be filled. + * @param ts + * TCP stream to receive data from. + * @param iov + * Points to an array of iovec structures. + * @param iovcnt + * Number of elements in the *iov* array. + * @return + * On success, number of bytes read in the stream receive buffer. + * In case of error, returns -1 and error code will be set in rte_errno. + */ +ssize_t tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov, + int iovcnt); + +/** * Consume and queue up to *num* packets, that will be sent eventually * by tle_tcp_tx_bulk(). * Note that the stream has to be in connected state. @@ -281,7 +313,7 @@ uint16_t tle_tcp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[], * number of packets successfully queued in the stream send buffer. * In case of error, error code can be set in rte_errno. * Possible rte_errno errors include: - * - EAGAIN - operation can be perfomed right now + * - EAGAIN - operation can't be perfomed right now * (most likely close() was perfomed on that stream allready). * - ENOTCONN - the stream is not connected. */ @@ -289,6 +321,32 @@ uint16_t tle_tcp_stream_send(struct tle_stream *s, struct rte_mbuf *pkt[], uint16_t num); /** + * Writes iovcnt buffers of data described by iov to the for given TCP stream. + * Note that the stream has to be in connected state. + * Data ordering is preserved. + * Buffers are processed in array order. + * This means that the function will write out the entire contents of iov[0] + * before proceeding to iov[1], and so on. + * If there is insufficient space in stream send buffer, + * then not all buffers pointed to by iov may be written out. + * @param ts + * TCP stream to send data to. + * @param iov + * Points to an array of iovec structures. + * @param iovcnt + * Number of elements in the *iov* array. + * @return + * On success, number of bytes written to the stream send buffer. + * In case of error, returns -1 and error code will be set in rte_errno. + * - EAGAIN - operation can't be perfomed right now + * (most likely close() was perfomed on that stream allready). + * - ENOTCONN - the stream is not connected. + * - ENOMEM - not enough internal buffer (mbuf) to store user provided data. + */ +ssize_t tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp, + const struct iovec *iov, int iovcnt); + +/** * Back End (BE) API. * BE API functions are not multi-thread safe. * Supposed to be called by the L2/L3 processing layer. @@ -362,16 +420,6 @@ uint16_t tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], */ int tle_tcp_process(struct tle_ctx *ctx, uint32_t num); -/** - * Get current TCP maximum segment size - * @param stream - * Stream to get MSS from. - * @return - * Maximum segment size in bytes, if successful. - * Negative on failure. - */ -int tle_tcp_stream_get_mss(const struct tle_stream * const stream); - #ifdef __cplusplus } #endif diff --git a/lib/libtle_misc/tle_dpdk_wrapper.h b/lib/libtle_misc/tle_dpdk_wrapper.h index 6757663..3736964 100644 --- a/lib/libtle_misc/tle_dpdk_wrapper.h +++ b/lib/libtle_misc/tle_dpdk_wrapper.h @@ -57,6 +57,18 @@ _rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, uint32_t n) } static inline uint32_t +_rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, uint32_t n) +{ + uint32_t rc; + + rc = rte_ring_enqueue_bulk(r, (void * const *)obj_table, n, NULL); + if (rc == n) + return 0; + else + return -ENOSPC; +} + +static inline uint32_t _rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, uint32_t n) { return rte_ring_dequeue_burst(r, (void **)obj_table, n, NULL); @@ -80,6 +92,17 @@ _rte_ring_get_data(struct rte_ring *r) return (void **)(&r[1]); } +static inline void +_rte_ring_dequeue_ptrs(struct rte_ring *r, void **obj_table, uint32_t num) +{ + uint32_t tail; + void **data; + + tail = r->cons.tail; + data = _rte_ring_get_data(r); + DEQUEUE_PTRS(r, data, tail, obj_table, num, void *); +} + #else static inline uint32_t @@ -109,6 +132,13 @@ _rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, uint32_t n) } static inline uint32_t +_rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + uint32_t n) +{ + return rte_ring_enqueue_bulk(r, (void * const *)obj_table, n); +} + +static inline uint32_t _rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, uint32_t n) { return rte_ring_dequeue_burst(r, (void **)obj_table, n); @@ -132,10 +162,91 @@ _rte_ring_get_data(struct rte_ring *r) return (void **)r->ring; } -#endif +static inline void +_rte_ring_dequeue_ptrs(struct rte_ring *r, void **obj_table, uint32_t num) +{ + uint32_t i, n; + uint32_t mask, cons_head; + + n = num; + cons_head = r->cons.tail; + mask = _rte_ring_get_mask(r); + + DEQUEUE_PTRS(); +} + +#endif /* RTE_VERSION >= RTE_VERSION_NUM(17, 5, 0, 0) */ + +/* + * Serialized variation of DPDK rte_ring dequeue mechanism. + * At any given moment, only one consumer is allowed to dequeue + * objects from the ring. + */ + +static inline __attribute__((always_inline)) uint32_t +_rte_ring_mcs_dequeue_start(struct rte_ring *r, uint32_t num) +{ + uint32_t n, end, head, tail; + int32_t rc; + + rc = 0; + do { + head = r->cons.head; + tail = r->cons.tail; + end = r->prod.tail; + + if (head != tail) { + rte_pause(); + continue; + } + + n = end - head; + n = RTE_MIN(num, n); + if (n == 0) + return 0; + + rc = rte_atomic32_cmpset(&r->cons.head, head, head + n); + } while (rc == 0); + + return n; +} + +static inline __attribute__((always_inline)) void +_rte_ring_mcs_dequeue_finish(struct rte_ring *r, uint32_t num) +{ + uint32_t n, head, tail; + + head = r->cons.head; + rte_smp_rmb(); + tail = r->cons.tail; + n = head - tail; + RTE_ASSERT(n >= num); + RTE_SET_USED(n); + head = tail + num; + r->cons.head = head; + r->cons.tail = head; +} + +static inline __attribute__((always_inline)) void +_rte_ring_mcs_dequeue_abort(struct rte_ring *r) +{ + r->cons.head = r->cons.tail; +} + +static inline uint32_t +_rte_ring_mcs_dequeue_burst(struct rte_ring *r, void **obj_table, uint32_t num) +{ + uint32_t n; + + n = _rte_ring_mcs_dequeue_start(r, num); + _rte_ring_dequeue_ptrs(r, obj_table, n); + _rte_ring_mcs_dequeue_finish(r, n); + return n; +} #ifdef __cplusplus } #endif + #endif /* TLE_DPDK_WRAPPER_H_ */ |