From 3395610ea65d66fb96ab98d6915a7ffbd584c34e Mon Sep 17 00:00:00 2001 From: Konstantin Ananyev Date: Fri, 3 Jun 2016 16:43:13 +0100 Subject: Initial commit of tldk code. Change-Id: Ib96fdd2c57bae0a51ed420137c35eb8e2ee58473 Signed-off-by: Konstantin Ananyev Signed-off-by: Ed Warnicke --- lib/libtle_udp/Makefile | 50 +++ lib/libtle_udp/buf_cage.c | 81 +++++ lib/libtle_udp/buf_cage.h | 231 +++++++++++++ lib/libtle_udp/event.c | 104 ++++++ lib/libtle_udp/misc.h | 296 ++++++++++++++++ lib/libtle_udp/osdep.h | 56 +++ lib/libtle_udp/port_bitmap.h | 112 ++++++ lib/libtle_udp/tle_event.h | 257 ++++++++++++++ lib/libtle_udp/tle_udp_impl.h | 373 ++++++++++++++++++++ lib/libtle_udp/udp_ctl.c | 723 +++++++++++++++++++++++++++++++++++++++ lib/libtle_udp/udp_impl.h | 161 +++++++++ lib/libtle_udp/udp_rxtx.c | 767 ++++++++++++++++++++++++++++++++++++++++++ 12 files changed, 3211 insertions(+) create mode 100644 lib/libtle_udp/Makefile create mode 100644 lib/libtle_udp/buf_cage.c create mode 100644 lib/libtle_udp/buf_cage.h create mode 100644 lib/libtle_udp/event.c create mode 100644 lib/libtle_udp/misc.h create mode 100644 lib/libtle_udp/osdep.h create mode 100644 lib/libtle_udp/port_bitmap.h create mode 100644 lib/libtle_udp/tle_event.h create mode 100644 lib/libtle_udp/tle_udp_impl.h create mode 100644 lib/libtle_udp/udp_ctl.c create mode 100644 lib/libtle_udp/udp_impl.h create mode 100644 lib/libtle_udp/udp_rxtx.c (limited to 'lib') diff --git a/lib/libtle_udp/Makefile b/lib/libtle_udp/Makefile new file mode 100644 index 0000000..100755c --- /dev/null +++ b/lib/libtle_udp/Makefile @@ -0,0 +1,50 @@ +# Copyright (c) 2016 Intel Corporation. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +# Default target, can be overwritten by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +include $(RTE_SDK)/mk/rte.vars.mk + +# library name +LIB = libtle_udp.a + +CFLAGS += -O3 +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) + +EXPORT_MAP := tle_udp_version.map + +LIBABIVER := 1 + +#source files +SRCS-y += buf_cage.c +SRCS-y += event.c +SRCS-y += udp_ctl.c +SRCS-y += udp_rxtx.c + +# install this header file +SYMLINK-y-include += tle_udp_impl.h +SYMLINK-y-include += tle_event.h + +# this library depends on +DEPDIRS-y += $(RTE_SDK)/lib/librte_eal +DEPDIRS-y += $(RTE_SDK)/lib/librte_ether +DEPDIRS-y += $(RTE_SDK)/lib/librte_mbuf +DEPDIRS-y += $(RTE_SDK)lib/librte_net +DEPDIRS-y += $(RTE_SDK)lib/librte_ip_frag + +include $(RTE_SDK)/mk/rte.extlib.mk diff --git a/lib/libtle_udp/buf_cage.c b/lib/libtle_udp/buf_cage.c new file mode 100644 index 0000000..0ae21b0 --- /dev/null +++ b/lib/libtle_udp/buf_cage.c @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "buf_cage.h" +#include "osdep.h" + +struct bcg_store * +bcg_create(const struct bcg_store_prm *prm) +{ + struct buf_cage *bc; + struct bcg_store *st; + uintptr_t end, p; + size_t sz, tsz; + uint32_t n; + + if (prm == NULL || (prm->cage_align != 0 && + rte_is_power_of_2(prm->cage_align) == 0)) { + rte_errno = EINVAL; + return NULL; + } + + /* number of cages required. */ + n = (prm->max_bufs + prm->cage_bufs - 1) / prm->cage_bufs; + n = RTE_MAX(n, prm->min_cages); + + /* size of each cage. */ + sz = prm->cage_bufs * sizeof(bc->bufs[0]) + sizeof(*bc); + sz = RTE_ALIGN_CEIL(sz, prm->cage_align); + + /* total number of bytes required. */ + tsz = n * sz + RTE_ALIGN_CEIL(sizeof(*st), prm->cage_align); + + st = rte_zmalloc_socket(NULL, tsz, RTE_CACHE_LINE_SIZE, prm->socket_id); + if (st == NULL) { + UDP_LOG(ERR, "%s: allocation of %zu bytes on " + "socket %d failed\n", + __func__, tsz, prm->socket_id); + return NULL; + } + + st->prm = prm[0]; + bcg_queue_reset(&st->free); + + p = (uintptr_t)RTE_PTR_ALIGN_CEIL((st + 1), prm->cage_align); + end = p + n * sz; + + for (; p != end; p += sz) { + bc = (struct buf_cage *)p; + bc->st = st; + bc->num = prm->cage_bufs; + STAILQ_INSERT_TAIL(&st->free.queue, bc, ql); + } + + st->free.num = n; + st->nb_cages = n; + st->cage_sz = sz; + st->total_sz = tsz; + return st; +} + +void +bcg_destroy(struct bcg_store *st) +{ + rte_free(st); +} diff --git a/lib/libtle_udp/buf_cage.h b/lib/libtle_udp/buf_cage.h new file mode 100644 index 0000000..3b3c429 --- /dev/null +++ b/lib/libtle_udp/buf_cage.h @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _BUF_CAGE_H_ +#define _BUF_CAGE_H_ + +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct bcg_store; + +struct buf_cage { + struct bcg_store *st; + STAILQ_ENTRY(buf_cage) ql; + uint32_t num; + uint32_t rp; + uint32_t wp; + const void *bufs[0]; +}; + +struct bcg_queue { + rte_spinlock_t lock; + uint32_t num; + STAILQ_HEAD(, buf_cage) queue; +}; + +struct bcg_store_prm { + void *user_data; + int32_t socket_id; /* NUMA socket to allocate memory from. */ + uint32_t max_bufs; /* total number of bufs to cage. */ + uint32_t min_cages; /* min number of cages per store. */ + uint32_t cage_bufs; /* min number of bufs per cage. */ + uint32_t cage_align; /* each cage to be aligned (power of 2). */ +}; + +struct bcg_store { + struct bcg_queue free; + uint32_t nb_cages; + size_t cage_sz; + size_t total_sz; + struct bcg_store_prm prm; +} __rte_cache_aligned; + +struct bcg_store *bcg_create(const struct bcg_store_prm *prm); +void bcg_destroy(struct bcg_store *st); + +static inline int +bcg_store_full(const struct bcg_store *st) +{ + return st->nb_cages == st->free.num; +} + +static inline void +bcg_queue_reset(struct bcg_queue *bq) +{ + STAILQ_INIT(&bq->queue); + bq->num = 0; + rte_spinlock_init(&bq->lock); +} + +static inline void +bcg_reset(struct buf_cage *bc) +{ + bc->rp = 0; + bc->wp = 0; +} + +static inline void * +bcg_get_udata(struct buf_cage *bc) +{ + return bc->st->prm.user_data; +} + +static inline struct buf_cage * +__bcg_dequeue_head(struct bcg_queue *bq) +{ + struct buf_cage *bc; + + bc = STAILQ_FIRST(&bq->queue); + if (bc != NULL) { + STAILQ_REMOVE_HEAD(&bq->queue, ql); + bq->num--; + } + return bc; +} + +static inline struct buf_cage * +bcg_dequeue_head(struct bcg_queue *bq) +{ + struct buf_cage *bc; + + if (bq->num == 0) + return NULL; + + rte_compiler_barrier(); + + rte_spinlock_lock(&bq->lock); + bc = __bcg_dequeue_head(bq); + rte_spinlock_unlock(&bq->lock); + return bc; +} + +static inline uint32_t +__bcg_enqueue_head(struct bcg_queue *bq, struct buf_cage *bc) +{ + STAILQ_INSERT_HEAD(&bq->queue, bc, ql); + return ++bq->num; +} + +static inline uint32_t +bcg_enqueue_head(struct bcg_queue *bq, struct buf_cage *bc) +{ + uint32_t n; + + rte_spinlock_lock(&bq->lock); + n = __bcg_enqueue_head(bq, bc); + rte_spinlock_unlock(&bq->lock); + return n; +} + +static inline uint32_t +__bcg_enqueue_tail(struct bcg_queue *bq, struct buf_cage *bc) +{ + STAILQ_INSERT_TAIL(&bq->queue, bc, ql); + return ++bq->num; +} + +static inline uint32_t +bcg_enqueue_tail(struct bcg_queue *bq, struct buf_cage *bc) +{ + uint32_t n; + + rte_spinlock_lock(&bq->lock); + n = __bcg_enqueue_tail(bq, bc); + rte_spinlock_unlock(&bq->lock); + return n; +} + +static inline uint32_t +bcg_queue_append(struct bcg_queue *dst, struct bcg_queue *src) +{ + rte_spinlock_lock(&src->lock); + STAILQ_CONCAT(&dst->queue, &src->queue); + dst->num += src->num; + src->num = 0; + rte_spinlock_unlock(&src->lock); + return dst->num; +} + +static inline uint32_t +bcg_free_count(const struct buf_cage *bc) +{ + return bc->num - bc->wp; +} + + +static inline uint32_t +bcg_fill_count(const struct buf_cage *bc) +{ + return bc->wp - bc->rp; +} + +/* !!! if going to keep it - try to unroll copying stuff. !!! */ +static inline uint32_t +bcg_get(struct buf_cage *bc, const void *bufs[], uint32_t num) +{ + uint32_t i, n, r; + + r = bc->rp; + n = RTE_MIN(num, bc->wp - r); + for (i = 0; i != n; i++) + bufs[i] = bc->bufs[r + i]; + + bc->rp = r + n; + return n; +} + +static inline uint32_t +bcg_put(struct buf_cage *bc, const void *bufs[], uint32_t num) +{ + uint32_t i, n, w; + + w = bc->wp; + n = RTE_MIN(num, bc->num - w); + for (i = 0; i != n; i++) + bc->bufs[w + i] = bufs[i]; + + bc->wp = w + n; + return n; +} + + +static inline struct buf_cage * +bcg_alloc(struct bcg_store *st) +{ + return bcg_dequeue_head(&st->free); +} + +static inline uint32_t +bcg_free(struct buf_cage *bc) +{ + struct bcg_store *st; + + st = bc->st; + bcg_reset(bc); + return bcg_enqueue_head(&st->free, bc); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _BUF_CAGE_H_ */ diff --git a/lib/libtle_udp/event.c b/lib/libtle_udp/event.c new file mode 100644 index 0000000..7e340e8 --- /dev/null +++ b/lib/libtle_udp/event.c @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include "osdep.h" + +struct tle_evq * +tle_evq_create(const struct tle_evq_param *prm) +{ + struct tle_evq *evq; + size_t sz; + uint32_t i; + + if (prm == NULL) { + rte_errno = EINVAL; + return NULL; + } + + sz = sizeof(*evq) + sizeof(evq->events[0]) * prm->max_events; + evq = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + prm->socket_id); + if (evq == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes for " + "new tle_evq(%u) on socket %d failed\n", + sz, prm->max_events, prm->socket_id); + return NULL; + } + + TAILQ_INIT(&evq->armed); + TAILQ_INIT(&evq->free); + + for (i = 0; i != prm->max_events; i++) { + evq->events[i].head = evq; + TAILQ_INSERT_TAIL(&evq->free, evq->events + i, ql); + } + + evq->nb_events = i; + evq->nb_free = i; + + return evq; +} + +void +tle_evq_destroy(struct tle_evq *evq) +{ + rte_free(evq); +} + +struct tle_event * +tle_event_alloc(struct tle_evq *evq, const void *data) +{ + struct tle_event *h; + + if (evq == NULL) { + rte_errno = EINVAL; + return NULL; + } + + rte_spinlock_lock(&evq->lock); + h = TAILQ_FIRST(&evq->free); + if (h != NULL) { + TAILQ_REMOVE(&evq->free, h, ql); + evq->nb_free--; + h->data = data; + } else + rte_errno = -ENOMEM; + rte_spinlock_unlock(&evq->lock); + return h; +} + +void +tle_event_free(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev == NULL) { + rte_errno = EINVAL; + return; + } + + q = ev->head; + rte_spinlock_lock(&q->lock); + ev->data = NULL; + ev->state = TLE_SEV_IDLE; + TAILQ_INSERT_HEAD(&q->free, ev, ql); + q->nb_free++; + rte_spinlock_unlock(&q->lock); +} diff --git a/lib/libtle_udp/misc.h b/lib/libtle_udp/misc.h new file mode 100644 index 0000000..3874647 --- /dev/null +++ b/lib/libtle_udp/misc.h @@ -0,0 +1,296 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _MISC_H_ +#define _MISC_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +static inline int +ymm_mask_cmp(const _ymm_t *da, const _ymm_t *sa, const _ymm_t *sm) +{ + uint64_t ret; + + ret = ((sa->u64[0] & sm->u64[0]) ^ da->u64[0]) | + ((sa->u64[1] & sm->u64[1]) ^ da->u64[1]) | + ((sa->u64[2] & sm->u64[2]) ^ da->u64[2]) | + ((sa->u64[3] & sm->u64[3]) ^ da->u64[3]); + + return (ret == 0); +} + +/* + * Setup tx_offload field inside mbuf using raw 64-bit field. + * Consider to move it into DPDK librte_mbuf. + */ +static inline uint64_t +_mbuf_tx_offload(uint64_t il2, uint64_t il3, uint64_t il4, uint64_t tso, + uint64_t ol3, uint64_t ol2) +{ + return il2 | il3 << 7 | il4 << 16 | tso << 24 | ol3 << 40 | ol2 << 49; +} + +/* + * Routines to calculate L3/L4 checksums in SW. + * Pretty similar to ones from DPDK librte_net/rte_ip.h, + * but provide better performance (at least for tested configurations), + * and extended functionality. + * Consider to move them into DPDK librte_net/rte_ip.h. + */ + +/* make compiler to generate: add %r1, %r2; adc $0, %r1. */ +#define CKSUM_ADD_CARRY(s, v) do { \ + (s) += (v); \ + (s) = ((s) < (v)) ? (s) + 1 : (s); \ +} while (0) + +/** + * Process the non-complemented checksum of a buffer. + * Similar to rte_raw_cksum(), but provide better perfomance + * (at least on IA platforms). + * @param buf + * Pointer to the buffer. + * @param len + * Length of the buffer. + * @return + * The non-complemented checksum. + */ +static inline uint16_t +__raw_cksum(const uint8_t *buf, uint32_t size) +{ + uint64_t s, sum; + uint32_t i, n; + uint32_t dw1, dw2; + uint16_t w1, w2; + const uint64_t *b; + + b = (const uint64_t *)buf; + n = size / sizeof(*b); + sum = 0; + + /* main loop, consume 8 bytes per iteration. */ + for (i = 0; i != n; i++) { + s = b[i]; + CKSUM_ADD_CARRY(sum, s); + } + + /* consume the remainder. */ + n = size % sizeof(*b); + if (n != 0) { + /* position of the of last 8 bytes of data. */ + b = (const uint64_t *)((uintptr_t)(b + i) + n - sizeof(*b)); + /* calculate shift amount. */ + n = (sizeof(*b) - n) * CHAR_BIT; + s = b[0] >> n; + CKSUM_ADD_CARRY(sum, s); + } + + /* reduce to 16 bits */ + dw1 = sum; + dw2 = sum >> 32; + CKSUM_ADD_CARRY(dw1, dw2); + w1 = dw1; + w2 = dw1 >> 16; + CKSUM_ADD_CARRY(w1, w2); + return w1; +} + + +/** + * Process UDP or TCP checksum over possibly multi-segmented packet. + * @param mb + * The pointer to the mbuf with the packet. + * @param l4_ofs + * Offset to the beginning of the L4 header (should be in first segment). + * @param cksum + * Already pre-calculated pseudo-header checksum value. + * @return + * The complemented checksum. + */ +static inline uint32_t +__udptcp_mbuf_cksum(const struct rte_mbuf *mb, uint16_t l4_ofs, + uint32_t cksum) +{ + uint32_t dlen, i, plen; + const struct rte_mbuf *ms; + const void *data; + + plen = rte_pktmbuf_pkt_len(mb); + ms = mb; + + for (i = l4_ofs; i < plen && ms != NULL; i += dlen) { + data = rte_pktmbuf_mtod_offset(ms, const void *, l4_ofs); + dlen = rte_pktmbuf_data_len(ms) - l4_ofs; + cksum += __raw_cksum(data, dlen); + ms = ms->next; + l4_ofs = 0; + } + + cksum = ((cksum & 0xffff0000) >> 16) + (cksum & 0xffff); + cksum = (~cksum) & 0xffff; + if (cksum == 0) + cksum = 0xffff; + + return cksum; +} + +/** + * Process the pseudo-header checksum of an IPv4 header. + * + * Depending on the ol_flags, the pseudo-header checksum expected by the + * drivers is not the same. For instance, when TSO is enabled, the IP + * payload length must not be included in the packet. + * + * When ol_flags is 0, it computes the standard pseudo-header checksum. + * + * @param ipv4_hdr + * The pointer to the contiguous IPv4 header. + * @param ipv4_len + * Length of the IPv4 header. + * @param ol_flags + * The ol_flags of the associated mbuf. + * @return + * The non-complemented checksum to set in the L4 header. + */ +static inline uint16_t +_ipv4x_phdr_cksum(const struct ipv4_hdr *ipv4_hdr, size_t ipv4h_len, + uint64_t ol_flags) +{ + uint32_t s0, s1; + + s0 = ipv4_hdr->src_addr; + s1 = ipv4_hdr->dst_addr; + CKSUM_ADD_CARRY(s0, s1); + + if (ol_flags & PKT_TX_TCP_SEG) + s1 = 0; + else + s1 = rte_cpu_to_be_16( + (uint16_t)(rte_be_to_cpu_16(ipv4_hdr->total_length) - + ipv4h_len)); + + s1 += rte_cpu_to_be_16(ipv4_hdr->next_proto_id); + CKSUM_ADD_CARRY(s0, s1); + + return __rte_raw_cksum_reduce(s0); +} + +/** + * Process the IPv4 UDP or TCP checksum. + * + * @param mb + * The pointer to the IPv4 packet. + * @param l4_ofs + * Offset to the beginning of the L4 header (should be in first segment). + * @param ipv4_hdr + * The pointer to the contiguous IPv4 header. + * @return + * The complemented checksum to set in the IP packet. + */ +static inline int +_ipv4_udptcp_mbuf_cksum(const struct rte_mbuf *mb, uint16_t l4_ofs, + const struct ipv4_hdr *ipv4_hdr) +{ + uint32_t cksum; + + cksum = _ipv4x_phdr_cksum(ipv4_hdr, mb->l3_len, 0); + cksum = __udptcp_mbuf_cksum(mb, l4_ofs, cksum); + + return cksum; +} + +/** + * Process the IPv6 UDP or TCP checksum. + * + * @param mb + * The pointer to the IPv6 packet. + * @param l4_ofs + * Offset to the beginning of the L4 header (should be in first segment). + * @param ipv6_hdr + * The pointer to the contiguous IPv6 header. + * @return + * The complemented checksum to set in the IP packet. + */ +static inline int +_ipv6_udptcp_mbuf_cksum(const struct rte_mbuf *mb, uint16_t l4_ofs, + const struct ipv6_hdr *ipv6_hdr) +{ + uint32_t cksum; + + cksum = rte_ipv6_phdr_cksum(ipv6_hdr, 0); + cksum = __udptcp_mbuf_cksum(mb, l4_ofs, cksum); + + return cksum; +} + +static inline uint16_t +_ipv4x_cksum(const void *iph, size_t len) +{ + uint16_t cksum; + + cksum = __raw_cksum(iph, len); + return (cksum == 0xffff) ? cksum : ~cksum; +} + + +/* + * Analog of read-write locks, very much in favour of read side. + * Assumes, that there are no more then INT32_MAX concurrent readers. + * Consider to move into DPDK librte_eal. + */ + +static inline int +rwl_try_acquire(rte_atomic32_t *p) +{ + return rte_atomic32_add_return(p, 1); +} + +static inline void +rwl_release(rte_atomic32_t *p) +{ + rte_atomic32_sub(p, 1); +} + +static inline int +rwl_acquire(rte_atomic32_t *p) +{ + int32_t rc; + + rc = rwl_try_acquire(p); + if (rc < 0) + rwl_release(p); + return rc; +} + +static inline void +rwl_down(rte_atomic32_t *p) +{ + while (rte_atomic32_cmpset((volatile uint32_t *)p, 0, INT32_MIN) == 0) + rte_pause(); +} + +static inline void +rwl_up(rte_atomic32_t *p) +{ + rte_atomic32_sub(p, INT32_MIN); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _MISC_H_ */ diff --git a/lib/libtle_udp/osdep.h b/lib/libtle_udp/osdep.h new file mode 100644 index 0000000..6161242 --- /dev/null +++ b/lib/libtle_udp/osdep.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _OSDEP_H_ +#define _OSDEP_H_ + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#define UDP_LOG(lvl, fmt, args...) RTE_LOG(lvl, USER1, fmt, ##args) + +/* + * if no AVX support, define _ymm_t here. + */ + +#ifdef __AVX__ + +#define _ymm_t rte_ymm_t + +#else + +#define YMM_SIZE (2 * sizeof(rte_xmm_t)) +#define YMM_MASK (YMM_SIZE - 1) + +typedef union _ymm { + xmm_t x[YMM_SIZE / sizeof(xmm_t)]; + uint8_t u8[YMM_SIZE / sizeof(uint8_t)]; + uint16_t u16[YMM_SIZE / sizeof(uint16_t)]; + uint32_t u32[YMM_SIZE / sizeof(uint32_t)]; + uint64_t u64[YMM_SIZE / sizeof(uint64_t)]; + double pd[YMM_SIZE / sizeof(double)]; +} _ymm_t; + +#endif /* __AVX__ */ + +#ifdef __cplusplus +} +#endif + +#endif /* _OSDEP_H_ */ diff --git a/lib/libtle_udp/port_bitmap.h b/lib/libtle_udp/port_bitmap.h new file mode 100644 index 0000000..6aff4e6 --- /dev/null +++ b/lib/libtle_udp/port_bitmap.h @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _PORT_BITMAP_H_ +#define _PORT_BITMAP_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Simple implementation of bitmap for all possible UDP ports [0-UINT16_MAX]. + */ + +#define MAX_PORT_NUM (UINT16_MAX + 1) + +#define PORT_BLK(p) ((p) / (sizeof(uint32_t) * CHAR_BIT)) +#define PORT_IDX(p) ((p) % (sizeof(uint32_t) * CHAR_BIT)) + +#define MAX_PORT_BLK PORT_BLK(MAX_PORT_NUM) + +struct udp_pbm { + uint32_t nb_set; /* number of bits set. */ + uint32_t blk; /* last block with free entry. */ + uint32_t bm[MAX_PORT_BLK]; +}; + +static inline void +udp_pbm_init(struct udp_pbm *pbm, uint32_t blk) +{ + pbm->bm[0] = 1; + pbm->nb_set = 1; + pbm->blk = blk; +} + +static inline void +udp_pbm_set(struct udp_pbm *pbm, uint16_t port) +{ + uint32_t i, b, v; + + i = PORT_BLK(port); + b = 1 << PORT_IDX(port); + v = pbm->bm[i]; + pbm->bm[i] = v | b; + pbm->nb_set += (v & b) == 0; +} + +static inline void +udp_pbm_clear(struct udp_pbm *pbm, uint16_t port) +{ + uint32_t i, b, v; + + i = PORT_BLK(port); + b = 1 << PORT_IDX(port); + v = pbm->bm[i]; + pbm->bm[i] = v & ~b; + pbm->nb_set -= (v & b) != 0; +} + + +static inline uint32_t +udp_pbm_check(const struct udp_pbm *pbm, uint16_t port) +{ + uint32_t i, v; + + i = PORT_BLK(port); + v = pbm->bm[i] >> PORT_IDX(port); + return v & 1; +} + +static inline uint16_t +udp_pbm_find_range(struct udp_pbm *pbm, uint32_t start_blk, uint32_t end_blk) +{ + uint32_t i, v; + uint16_t p; + + if (pbm->nb_set == MAX_PORT_NUM) + return 0; + + p = 0; + for (i = start_blk; i != end_blk; i++) { + i %= RTE_DIM(pbm->bm); + v = pbm->bm[i]; + if (v != UINT32_MAX) { + for (p = i * (sizeof(pbm->bm[0]) * CHAR_BIT); + (v & 1) != 0; v >>= 1, p++) + ; + + pbm->blk = i; + break; + } + } + return p; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _PORT_BITMAP_H_ */ diff --git a/lib/libtle_udp/tle_event.h b/lib/libtle_udp/tle_event.h new file mode 100644 index 0000000..1a5c436 --- /dev/null +++ b/lib/libtle_udp/tle_event.h @@ -0,0 +1,257 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _SEV_IMPL_H_ +#define _SEV_IMPL_H_ + +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct tle_evq; + +/** + * Possible states of the event. + */ +enum tle_ev_state { + TLE_SEV_IDLE, + TLE_SEV_DOWN, + TLE_SEV_UP, + TLE_SEV_NUM +}; + +struct tle_event { + TAILQ_ENTRY(tle_event) ql; + struct tle_evq *head; + const void *data; + enum tle_ev_state state; +} __rte_cache_aligned; + +struct tle_evq { + rte_spinlock_t lock; + uint32_t nb_events; + uint32_t nb_armed; + uint32_t nb_free; + TAILQ_HEAD(, tle_event) armed; + TAILQ_HEAD(, tle_event) free; + struct tle_event events[0]; +}; + +/** + * event queue creation parameters. + */ +struct tle_evq_param { + int32_t socket_id; /**< socket ID to allocate memory from. */ + uint32_t max_events; /**< max number of events in queue. */ +}; + +/** + * create event queue. + * @param prm + * Parameters used to create and initialise the queue. + * @return + * Pointer to new event queue structure, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOMEM - out of memory + */ +struct tle_evq *tle_evq_create(const struct tle_evq_param *prm); + +/** + * Destroy given event queue. + * + * @param evq + * event queue to destroy + */ +void tle_evq_destroy(struct tle_evq *evq); + +/** + * allocate a new event within given event queue. + * @param evq + * event queue to allocate a new stream within. + * @param data + * User data to be associated with that event. + * @return + * Pointer to event structure that can be used in future tle_event API calls, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOMEM - max limit of allocated events reached for that context + */ +struct tle_event *tle_event_alloc(struct tle_evq *evq, const void *data); + +/** + * free an allocated event. + * @param ev + * Pointer to the event to free. + */ +void tle_event_free(struct tle_event *ev); + + +/** + * move event from DOWN to UP state. + * @param ev + * Pointer to the event. + */ +static inline void +tle_event_raise(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev->state != TLE_SEV_DOWN) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (ev->state == TLE_SEV_DOWN) { + ev->state = TLE_SEV_UP; + TAILQ_INSERT_TAIL(&q->armed, ev, ql); + q->nb_armed++; + } + rte_spinlock_unlock(&q->lock); +} + +/** + * move event from UP to DOWN state. + * @param ev + * Pointer to the event. + */ +static inline void +tle_event_down(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev->state != TLE_SEV_UP) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (ev->state == TLE_SEV_UP) { + ev->state = TLE_SEV_DOWN; + TAILQ_REMOVE(&q->armed, ev, ql); + q->nb_armed--; + } + rte_spinlock_unlock(&q->lock); +} + +/** + * move from IDLE to DOWN/UP state. + * @param ev + * Pointer to the event. + * @param st + * new state for the event. + */ +static inline void +tle_event_active(struct tle_event *ev, enum tle_ev_state st) +{ + struct tle_evq *q; + + if (st == ev->state) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (st > ev->state) { + if (st == TLE_SEV_UP) { + TAILQ_INSERT_TAIL(&q->armed, ev, ql); + q->nb_armed++; + } + ev->state = st; + } + rte_spinlock_unlock(&q->lock); +} + +/** + * move event IDLE state. + * @param ev + * Pointer to the event. + */ +static inline void +tle_event_idle(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev->state == TLE_SEV_IDLE) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (ev->state == TLE_SEV_UP) { + TAILQ_REMOVE(&q->armed, ev, ql); + q->nb_armed--; + } + ev->state = TLE_SEV_IDLE; + rte_spinlock_unlock(&q->lock); +} + + +/* + * return up to *num* user data pointers associated with + * the events that were in the UP state. + * Each retrieved event is automatically moved into the DOWN state. + * @param evq + * event queue to retrieve events from. + * @param evd + * An array of user data pointers associated with the events retrieved. + * It must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *evd* array. + * @return + * number of of entries filled inside *evd* array. + */ +static inline int32_t +tle_evq_get(struct tle_evq *evq, const void *evd[], uint32_t num) +{ + uint32_t i, n; + struct tle_event *ev; + + if (evq->nb_armed == 0) + return 0; + + rte_compiler_barrier(); + + rte_spinlock_lock(&evq->lock); + n = RTE_MIN(num, evq->nb_armed); + for (i = 0; i != n; i++) { + ev = TAILQ_FIRST(&evq->armed); + ev->state = TLE_SEV_DOWN; + TAILQ_REMOVE(&evq->armed, ev, ql); + evd[i] = ev->data; + } + evq->nb_armed -= n; + rte_spinlock_unlock(&evq->lock); + return n; +} + + +#ifdef __cplusplus +} +#endif + +#endif /* _SEV_IMPL_H_ */ diff --git a/lib/libtle_udp/tle_udp_impl.h b/lib/libtle_udp/tle_udp_impl.h new file mode 100644 index 0000000..a5d17e1 --- /dev/null +++ b/lib/libtle_udp/tle_udp_impl.h @@ -0,0 +1,373 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_UDP_IMPL_H_ +#define _TLE_UDP_IMPL_H_ + +#include +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * - each such ctx represents an 'independent copy of the stack'. + * It owns set of s and s entities and provides + * (de)multiplexing input/output packets from/into UDP devices into/from + * UDP streams. + * is an abstraction for the underlying device, that is able + * to RX/TX packets and may provide some HW offload capabilities. + * It is a user responsibility to add to the all s, + * that context has to manage, before starting to do stream operations + * (open/send/recv,close) over that context. + * Right now adding/deleting s to the context with open + * streams is not supported. + * represents an UDP endpoint and is an analogy to + * socket entity. + * As with a socket, there are ability to do recv/send over it. + * belongs to particular but is visible globally across + * the process, i.e. any thread within the process can do recv/send over it + * without any further synchronisation. + * While 'upper' layer API is thread safe, lower layer API (rx_bulk/tx_bulk) + * is not thread safe and is not supposed to be run on multiple threads + * in parallel. + * So single thread can drive multiple s and do IO for them, + * but multiple threads can't drive same without some + * explicit synchronization. + */ + +struct tle_udp_ctx; +struct tle_udp_dev; + +/** + * UDP device parameters. + */ +struct tle_udp_dev_param { + uint32_t rx_offload; /**< DEV_RX_OFFLOAD_* supported. */ + uint32_t tx_offload; /**< DEV_TX_OFFLOAD_* supported. */ + struct in_addr local_addr4; /**< local IPv4 address assigned. */ + struct in6_addr local_addr6; /**< local IPv6 address assigned. */ +}; + +#define TLE_UDP_MAX_HDR 0x60 + +struct tle_udp_dest { + struct rte_mempool *head_mp; /**< MP for fragment feaders. */ + struct tle_udp_dev *dev; /**< device to send packets through. */ + uint16_t mtu; /**< MTU for given destination. */ + uint8_t l2_len; /**< L2 header lenght. */ + uint8_t l3_len; /**< L3 header lenght. */ + uint8_t hdr[TLE_UDP_MAX_HDR]; /**< L2/L3 headers. */ +}; + +/** + * UDP context creation parameters. + */ +struct tle_udp_ctx_param { + int32_t socket_id; /**< socket ID to allocate memory for. */ + uint32_t max_streams; /**< max number of streams in context. */ + uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */ + uint32_t max_stream_sbufs; /**< max send mbufs per stream. */ + + int (*lookup4)(void *opaque, const struct in_addr *addr, + struct tle_udp_dest *res); + /**< will be called by send() to get IPv4 packet destination info. */ + void *lookup4_data; + /**< opaque data pointer for lookup4() callback. */ + + int (*lookup6)(void *opaque, const struct in6_addr *addr, + struct tle_udp_dest *res); + /**< will be called by send() to get IPv6 packet destination info. */ + void *lookup6_data; + /**< opaque data pointer for lookup6() callback. */ +}; + +/** + * create UDP context. + * @param ctx_prm + * Parameters used to create and initialise the UDP context. + * @return + * Pointer to UDP context structure that can be used in future UDP + * operations, or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOMEM - out of memory + */ +struct tle_udp_ctx * +tle_udp_create(const struct tle_udp_ctx_param *ctx_prm); + +/** + * Destroy given UDP context. + * + * @param ctx + * UDP context to destroy + */ +void tle_udp_destroy(struct tle_udp_ctx *ctx); + +/** + * Add new device into the given UDP context. + * This function is not multi-thread safe. + * + * @param ctx + * UDP context to add new device into. + * @param dev_prm + * Parameters used to create and initialise new device inside the + * UDP context. + * @return + * Pointer to UDP device structure that can be used in future UDP + * operations, or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENODEV - max possible value of open devices is reached + * - ENOMEM - out of memory + */ +struct tle_udp_dev * +tle_udp_add_dev(struct tle_udp_ctx *ctx, + const struct tle_udp_dev_param *dev_prm); + +/** + * Remove and destroy previously added device from the given UDP context. + * This function is not multi-thread safe. + * + * @param dev + * UDP device to remove and destroy. + * @return + * zero on successful completion. + * - -EINVAL - invalid parameter passed to function + */ +int tle_udp_del_dev(struct tle_udp_dev *dev); + +/** + * Flags to the UDP context that destinations info might be changed, + * so if it has any destinations data cached, then + * it has to be invalidated. + * @param ctx + * UDP context to invalidate. + */ +void tle_udp_ctx_invalidate(struct tle_udp_ctx *ctx); + +struct tle_udp_stream; + +/** + * Stream asynchronous notification mechanisms: + * a) recv/send callback. + * Stream recv/send notification callbacks behaviour is edge-triggered (ET). + * recv callback will be invoked if stream receive buffer was empty and + * new packet(s) have arrived. + * send callback will be invoked when stream send buffer was full, + * and some packets belonging to that stream were sent + * (part of send buffer became free again). + * Note that both recv and send callbacks are called with sort of read lock + * held on that stream. So it is not permitted to call stream_close() + * within the callback function. Doing that would cause a deadlock. + * While it is allowed to call stream send/recv functions within the + * callback, it is not recommended: callback function will be invoked + * within tle_udp_rx_bulk/tle_udp_tx_bulk context and some heavy processing + * within the callback functions might cause performance degradation + * or even loss of packets for further streams. + * b) recv/send event. + * Stream recv/send events behavour is level-triggered (LT). + * receive event will be raised by either + * tle_udp_rx_burst() or tle_udp_stream_recv() as long as there are any + * remaining packets inside stream receive buffer. + * send event will be raised by either + * tle_udp_tx_burst() or tle_udp_stream_send() as long as there are any + * free space inside stream send buffer. + * Note that callback and event are mutually exclusive on basis. + * It is not possible to open a stream with both recv event and callback + * specified. + * Though it is possible to open a stream with recv callback and send event, + * or visa-versa. + * If the user doesn't need any notification mechanism for that stream, + * both event and callback could be set to zero. + */ + +/** + * Stream recv/send callback function and data. + */ +struct tle_udp_stream_cb { + void (*func)(void *, struct tle_udp_stream *); + void *data; +}; + +struct tle_event; + +/** + * UDP stream creation parameters. + */ +struct tle_udp_stream_param { + struct sockaddr_storage local_addr; /**< stream local address. */ + struct sockaddr_storage remote_addr; /**< stream remote address. */ + + /* _cb and _ev are mutually exclusive */ + struct tle_event *recv_ev; /**< recv event to use. */ + struct tle_udp_stream_cb recv_cb; /**< recv callback to use. */ + + struct tle_event *send_ev; /**< send event to use. */ + struct tle_udp_stream_cb send_cb; /**< send callback to use. */ +}; + +/** + * create a new stream within given UDP context. + * @param ctx + * UDP context to create new stream within. + * @param prm + * Parameters used to create and initialise the new stream. + * @return + * Pointer to UDP stream structure that can be used in future UDP API calls, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOFILE - max limit of open streams reached for that context + */ +struct tle_udp_stream * +tle_udp_stream_open(struct tle_udp_ctx *ctx, + const struct tle_udp_stream_param *prm); + +/** + * close an open stream. + * All packets still remaining in stream receive buffer will be freed. + * All packets still remaining in stream transmit buffer will be kept + * for father transmission. + * @param s + * Pointer to the stream to close. + * @return + * zero on successful completion. + * - -EINVAL - invalid parameter passed to function + */ +int tle_udp_stream_close(struct tle_udp_stream *s); + +/** + * get open stream parameters. + * @param s + * Pointer to the stream. + * @return + * zero on successful completion. + * - EINVAL - invalid parameter passed to function + */ +int +tle_udp_stream_get_param(const struct tle_udp_stream *s, + struct tle_udp_stream_param *prm); + +/** + * Take input mbufs and distribute them to open UDP streams. + * expects that for each input packet: + * - l2_len, l3_len, l4_len are setup correctly + * - (packet_type & (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6)) != 0, + * - (packet_type & RTE_PTYPE_L4_UDP) != 0, + * During delivery L3/L4 checksums will be verified + * (either relies on HW offload or in SW). + * This function is not multi-thread safe. + * @param dev + * UDP device the packets were received from. + * @param pkt + * The burst of input packets that need to be processed. + * @param rp + * The array that will contain pointers of unprocessed packets at return. + * Should contain at least *num* elements. + * @param rc + * The array that will contain error code for corresponding rp[] entry: + * - ENOENT - no open stream matching this packet. + * - ENOBUFS - receive buffer of the destination stream is full. + * Should contain at least *num* elements. + * @param num + * Number of elements in the *pkt* input array. + * @return + * number of packets delivered to the UDP streams. + */ +uint16_t tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num); + +/** + * Fill *pkt* with pointers to the packets that have to be transmitted + * over given UDP device. + * Output packets have to be ready to be passed straight to rte_eth_tx_burst() + * without any extra processing. + * UDP/IPv4 checksum either already calculated or appropriate mbuf fields set + * properly for HW offload. + * This function is not multi-thread safe. + * @param dev + * UDP device the output packets will be transmitted over. + * @param pkt + * An array of pointers to *rte_mbuf* structures that + * must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *pkt* array. + * @return + * number of of entries filled inside *pkt* array. + */ +uint16_t tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], + uint16_t num); + +/* + * return up to *num* mbufs that was received for given UDP stream. + * For each returned mbuf: + * data_off set to the start of the packet's UDP data + * l2_len, l3_len, l4_len are setup properly + * (so user can still extract L2/L3 address info if needed) + * packet_type RTE_PTYPE_L2/L3/L4 bits are setup properly. + * L3/L4 checksum is verified. + * Packets with invalid L3/L4 checksum will be silently dropped. + * @param s + * UDP stream to receive packets from. + * @param pkt + * An array of pointers to *rte_mbuf* structures that + * must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *pkt* array. + * @return + * number of of entries filled inside *pkt* array. + */ +uint16_t tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + uint16_t num); + +/** + * Consume and queue up to *num* packets, that will be sent eventually + * by tle_udp_tx_bulk(). + * If *dst_addr* is NULL, then default remote address associated with that + * stream (if any) will be used. + * The main purpose of that function is to determine over which UDP dev + * given packets have to be sent out and do necessary preparations for that. + * Based on the *dst_addr* it does route lookup, fills L2/L3/L4 headers, + * and, if necessary, fragments packets. + * Depending on the underlying device information, it either does + * IP/UDP checksum calculations in SW or sets mbuf TX checksum + * offload fields properly. + * For each input mbuf the following conditions have to be met: + * - data_off point to the start of packet's UDP data. + * - there is enough header space to prepend L2/L3/L4 headers. + * @param s + * UDP stream to send packets over. + * @param pkt + * The burst of output packets that need to be send. + * @param num + * Number of elements in the *pkt* array. + * @param dst_addr + * Destination address to send packets to. + * @return + * number of packets successfully queued in the stream send buffer. + */ +uint16_t tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + uint16_t num, const struct sockaddr *dst_addr); + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_UDP_IMPL_H_ */ diff --git a/lib/libtle_udp/udp_ctl.c b/lib/libtle_udp/udp_ctl.c new file mode 100644 index 0000000..36ec8a6 --- /dev/null +++ b/lib/libtle_udp/udp_ctl.c @@ -0,0 +1,723 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "udp_impl.h" +#include "misc.h" + +#define LPORT_START 0x8000 +#define LPORT_END MAX_PORT_NUM + +#define LPORT_START_BLK PORT_BLK(LPORT_START) +#define LPORT_END_BLK PORT_BLK(LPORT_END) + +#define MAX_BURST 0x20 + +static const struct in6_addr tle_udp6_any = IN6ADDR_ANY_INIT; +static const struct in6_addr tle_udp6_none = { + { + .__u6_addr32 = { + UINT32_MAX, UINT32_MAX, UINT32_MAX, UINT32_MAX + }, + }, +}; + +static int +check_dev_prm(const struct tle_udp_dev_param *dev_prm) +{ + /* no valid IPv4/IPv6 addresses provided. */ + if (dev_prm->local_addr4.s_addr == INADDR_ANY && + memcmp(&dev_prm->local_addr6, &tle_udp6_any, + sizeof(tle_udp6_any)) == 0) + return -EINVAL; + + return 0; +} + +static void +unuse_stream(struct tle_udp_stream *s) +{ + s->type = TLE_UDP_VNUM; + rte_atomic32_set(&s->rx.use, INT32_MIN); + rte_atomic32_set(&s->tx.use, INT32_MIN); +} + +static int +init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s) +{ + size_t sz; + uint32_t n; + struct bcg_store_prm sp; + char name[RTE_RING_NAMESIZE]; + + /* init RX part. */ + + n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); + n = rte_align32pow2(n); + sz = sizeof(*s->rx.q) + n * sizeof(s->rx.q->ring[0]); + + s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s->rx.q == NULL) { + UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, s, sz, ctx->prm.socket_id, rte_errno); + return ENOMEM; + } + + snprintf(name, sizeof(name), "%p@%zu", s, sz); + rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ); + + /* init TX part. */ + + sp.socket_id = ctx->prm.socket_id; + sp.max_bufs = ctx->prm.max_stream_sbufs; + sp.min_cages = RTE_DIM(ctx->dev) + 1; + sp.cage_bufs = MAX_BURST; + sp.cage_align = RTE_CACHE_LINE_SIZE; + sp.user_data = s; + + s->tx.st = bcg_create(&sp); + if (s->tx.st == NULL) { + UDP_LOG(ERR, + "%s(%p): bcg_create() failed with error code: %d\n", + __func__, s, rte_errno); + return ENOMEM; + } + + s->ctx = ctx; + unuse_stream(s); + STAILQ_INSERT_TAIL(&ctx->streams.free, s, link); + + return 0; +} + +static void +fini_stream(struct tle_udp_stream *s) +{ + bcg_destroy(s->tx.st); + rte_free(s->rx.q); +} + +struct tle_udp_ctx * +tle_udp_create(const struct tle_udp_ctx_param *ctx_prm) +{ + struct tle_udp_ctx *ctx; + size_t sz; + uint32_t i; + + if (ctx_prm == NULL) { + rte_errno = EINVAL; + return NULL; + } + + sz = sizeof(*ctx); + ctx = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx_prm->socket_id); + if (ctx == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes for new udp_ctx " + "on socket %d failed\n", + sz, ctx_prm->socket_id); + return NULL; + } + + ctx->prm = *ctx_prm; + + sz = sizeof(*ctx->streams.buf) * ctx_prm->max_streams; + ctx->streams.buf = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx_prm->socket_id); + if (ctx->streams.buf == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes on socket %d " + "for %u udp_streams failed\n", + sz, ctx_prm->socket_id, ctx_prm->max_streams); + tle_udp_destroy(ctx); + return NULL; + } + + STAILQ_INIT(&ctx->streams.free); + for (i = 0; i != ctx_prm->max_streams && + init_stream(ctx, &ctx->streams.buf[i]) == 0; + i++) + ; + + if (i != ctx_prm->max_streams) { + UDP_LOG(ERR, "initalisation of %u-th stream failed", i); + tle_udp_destroy(ctx); + return NULL; + } + + for (i = 0; i != RTE_DIM(ctx->use); i++) + udp_pbm_init(ctx->use + i, LPORT_START_BLK); + + ctx->streams.nb_free = ctx->prm.max_streams; + return ctx; +} + +void +tle_udp_destroy(struct tle_udp_ctx *ctx) +{ + uint32_t i; + + if (ctx == NULL) { + rte_errno = EINVAL; + return; + } + + if (ctx->streams.buf != 0) { + for (i = 0; i != ctx->prm.max_streams; i++) + fini_stream(&ctx->streams.buf[i]); + rte_free(ctx->streams.buf); + } + + for (i = 0; i != RTE_DIM(ctx->dev); i++) + tle_udp_del_dev(ctx->dev + i); + + rte_free(ctx); +} + +void +tle_udp_ctx_invalidate(struct tle_udp_ctx *ctx) +{ + RTE_SET_USED(ctx); +} + +static int +init_dev_proto(struct tle_udp_dev *dev, uint32_t idx, int32_t socket_id) +{ + size_t sz; + + sz = sizeof(*dev->dp[idx]); + dev->dp[idx] = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + socket_id); + + if (dev->dp[idx] == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes on " + "socket %d for %u-th device failed\n", + sz, socket_id, idx); + return ENOMEM; + } + + udp_pbm_init(&dev->dp[idx]->use, LPORT_START_BLK); + return 0; +} + +static struct tle_udp_dev * +find_free_dev(struct tle_udp_ctx *ctx) +{ + uint32_t i; + + if (ctx->nb_dev < RTE_DIM(ctx->dev)) { + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].ctx != ctx) + return ctx->dev + i; + } + } + + rte_errno = ENODEV; + return NULL; +} + +struct tle_udp_dev * +tle_udp_add_dev(struct tle_udp_ctx *ctx, + const struct tle_udp_dev_param *dev_prm) +{ + int32_t rc; + struct tle_udp_dev *dev; + + if (ctx == NULL || dev_prm == NULL || check_dev_prm(dev_prm) != 0) { + rte_errno = EINVAL; + return NULL; + } + + dev = find_free_dev(ctx); + if (dev == NULL) + return NULL; + rc = 0; + + /* device can handle IPv4 traffic */ + if (dev_prm->local_addr4.s_addr != INADDR_ANY) + rc = init_dev_proto(dev, TLE_UDP_V4, ctx->prm.socket_id); + + /* device can handle IPv6 traffic */ + if (rc == 0 && memcmp(&dev_prm->local_addr6, &tle_udp6_any, + sizeof(tle_udp6_any)) != 0) + rc = init_dev_proto(dev, TLE_UDP_V6, ctx->prm.socket_id); + + if (rc != 0) { + /* cleanup and return an error. */ + rte_free(dev->dp[TLE_UDP_V4]); + rte_free(dev->dp[TLE_UDP_V6]); + rte_errno = rc; + return NULL; + } + + /* setup RX data. */ + if (dev_prm->local_addr4.s_addr != INADDR_ANY && + (dev_prm->rx_offload & DEV_RX_OFFLOAD_IPV4_CKSUM) == 0) + dev->rx.ol_flags[TLE_UDP_V4] |= PKT_RX_IP_CKSUM_BAD; + if ((dev_prm->rx_offload & DEV_RX_OFFLOAD_UDP_CKSUM) == 0) { + dev->rx.ol_flags[TLE_UDP_V4] |= PKT_RX_L4_CKSUM_BAD; + dev->rx.ol_flags[TLE_UDP_V6] |= PKT_RX_L4_CKSUM_BAD; + } + + /* setup TX data. */ + bcg_queue_reset(&dev->tx.beq); + bcg_queue_reset(&dev->tx.feq); + + if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0) { + dev->tx.ol_flags[TLE_UDP_V4] |= PKT_TX_IPV4 | PKT_TX_UDP_CKSUM; + dev->tx.ol_flags[TLE_UDP_V6] |= PKT_TX_IPV6 | PKT_TX_UDP_CKSUM; + } + if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_IPV4_CKSUM) != 0) + dev->tx.ol_flags[TLE_UDP_V4] |= PKT_TX_IPV4 | PKT_TX_IP_CKSUM; + + dev->prm = *dev_prm; + dev->ctx = ctx; + ctx->nb_dev++; + + return dev; +} + +static void +empty_cage(struct buf_cage *bc) +{ + uint32_t i, n; + struct rte_mbuf *pkt[MAX_BURST]; + + do { + n = bcg_get(bc, (const void **)(uintptr_t)pkt, RTE_DIM(pkt)); + for (i = 0; i != n; i++) + rte_pktmbuf_free(pkt[i]); + } while (n != 0); + + bcg_free(bc); +} + +int +tle_udp_del_dev(struct tle_udp_dev *dev) +{ + uint32_t p; + struct buf_cage *bc; + struct tle_udp_ctx *ctx; + + ctx = dev->ctx; + + if (dev == NULL || dev->ctx == NULL) + return -EINVAL; + + p = dev - ctx->dev; + + if (p >= RTE_DIM(ctx->dev) || + (dev->dp[TLE_UDP_V4] == NULL && + dev->dp[TLE_UDP_V6] == NULL)) + return -EINVAL; + + /* emtpy TX queues. */ + if (dev->tx.bc != NULL) + empty_cage(dev->tx.bc); + + bcg_queue_append(&dev->tx.beq, &dev->tx.feq); + + while ((bc = __bcg_dequeue_head(&dev->tx.beq)) != NULL) + empty_cage(bc); + + rte_free(dev->dp[TLE_UDP_V4]); + rte_free(dev->dp[TLE_UDP_V6]); + memset(dev, 0, sizeof(*dev)); + ctx->nb_dev--; + return 0; +} + +static inline void +stream_down(struct tle_udp_stream *s) +{ + rwl_down(&s->rx.use); + rwl_down(&s->tx.use); +} + +static inline void +stream_up(struct tle_udp_stream *s) +{ + rwl_up(&s->rx.use); + rwl_up(&s->tx.use); +} + +static struct tle_udp_dev * +find_ipv4_dev(struct tle_udp_ctx *ctx, const struct in_addr *addr) +{ + uint32_t i; + + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].prm.local_addr4.s_addr == addr->s_addr && + ctx->dev[i].dp[TLE_UDP_V4] != NULL) + return ctx->dev + i; + } + + return NULL; +} + +static struct tle_udp_dev * +find_ipv6_dev(struct tle_udp_ctx *ctx, const struct in6_addr *addr) +{ + uint32_t i; + + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (memcmp(&ctx->dev[i].prm.local_addr6, addr, + sizeof(*addr)) == 0 && + ctx->dev[i].dp[TLE_UDP_V6] != NULL) + return ctx->dev + i; + } + + return NULL; +} + +static int +stream_fill_dev(struct tle_udp_ctx *ctx, struct tle_udp_stream *s) +{ + struct tle_udp_dev *dev; + struct udp_pbm *pbm; + struct sockaddr_in *lin4; + struct sockaddr_in6 *lin6; + uint32_t i, p, sp, t; + + if (s->prm.local_addr.ss_family == AF_INET) { + lin4 = (struct sockaddr_in *)&s->prm.local_addr; + t = TLE_UDP_V4; + p = lin4->sin_port; + } else if (s->prm.local_addr.ss_family == AF_INET6) { + lin6 = (struct sockaddr_in6 *)&s->prm.local_addr; + t = TLE_UDP_V6; + p = lin6->sin6_port; + } else + return EINVAL; + + p = ntohs(p); + + /* if local address is not wildcard, find device it belongs to. */ + if (t == TLE_UDP_V4 && lin4->sin_addr.s_addr != INADDR_ANY) { + dev = find_ipv4_dev(ctx, &lin4->sin_addr); + if (dev == NULL) + return ENODEV; + } else if (t == TLE_UDP_V6 && memcmp(&tle_udp6_any, &lin6->sin6_addr, + sizeof(tle_udp6_any)) != 0) { + dev = find_ipv6_dev(ctx, &lin6->sin6_addr); + if (dev == NULL) + return ENODEV; + } else + dev = NULL; + + if (dev != NULL) + pbm = &dev->dp[t]->use; + else + pbm = &ctx->use[t]; + + /* try to acquire local port number. */ + if (p == 0) { + p = udp_pbm_find_range(pbm, pbm->blk, LPORT_END_BLK); + if (p == 0 && pbm->blk > LPORT_START_BLK) + p = udp_pbm_find_range(pbm, LPORT_START_BLK, pbm->blk); + } else if (udp_pbm_check(pbm, p) != 0) + return EEXIST; + + if (p == 0) + return ENFILE; + + /* fill socket's dst port and type */ + + sp = htons(p); + s->type = t; + s->port.dst = sp; + + /* mark port as in-use */ + + udp_pbm_set(&ctx->use[t], p); + if (dev != NULL) { + udp_pbm_set(pbm, p); + dev->dp[t]->streams[sp] = s; + } else { + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].dp[t] != NULL) { + udp_pbm_set(&ctx->dev[i].dp[t]->use, p); + ctx->dev[i].dp[t]->streams[sp] = s; + } + } + } + + return 0; +} + +static int +stream_clear_dev(struct tle_udp_ctx *ctx, struct tle_udp_stream *s) +{ + struct tle_udp_dev *dev; + uint32_t i, p, sp, t; + + t = s->type; + sp = s->port.dst; + p = ntohs(sp); + + /* if local address is not wildcard, find device it belongs to. */ + if (t == TLE_UDP_V4 && s->ipv4.addr.dst != INADDR_ANY) { + dev = find_ipv4_dev(ctx, (struct in_addr *)&s->ipv4.addr.dst); + if (dev == NULL) + return ENODEV; + } else if (t == TLE_UDP_V6 && memcmp(&tle_udp6_any, &s->ipv6.addr.dst, + sizeof(tle_udp6_any)) != 0) { + dev = find_ipv6_dev(ctx, (struct in6_addr *)&s->ipv6.addr.dst); + if (dev == NULL) + return ENODEV; + } else + dev = NULL; + + udp_pbm_clear(&ctx->use[t], p); + if (dev != NULL) { + udp_pbm_clear(&dev->dp[t]->use, p); + dev->dp[t]->streams[sp] = NULL; + } else { + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].dp[t] != NULL) { + udp_pbm_clear(&ctx->dev[i].dp[t]->use, p); + ctx->dev[i].dp[t]->streams[sp] = NULL; + } + } + } + + return 0; +} + +static struct tle_udp_stream * +get_stream(struct tle_udp_ctx *ctx) +{ + struct tle_udp_stream *s; + + s = NULL; + if (ctx->streams.nb_free == 0) + return s; + + rte_spinlock_lock(&ctx->streams.lock); + if (ctx->streams.nb_free != 0) { + s = STAILQ_FIRST(&ctx->streams.free); + STAILQ_REMOVE_HEAD(&ctx->streams.free, link); + ctx->streams.nb_free--; + } + rte_spinlock_unlock(&ctx->streams.lock); + return s; +} + +static void +put_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s, int32_t head) +{ + s->type = TLE_UDP_VNUM; + rte_spinlock_lock(&ctx->streams.lock); + if (head != 0) + STAILQ_INSERT_HEAD(&ctx->streams.free, s, link); + else + STAILQ_INSERT_TAIL(&ctx->streams.free, s, link); + ctx->streams.nb_free++; + rte_spinlock_unlock(&ctx->streams.lock); +} + +static void +fill_ipv4_am(const struct sockaddr_in *in, uint32_t *addr, uint32_t *mask) +{ + *addr = in->sin_addr.s_addr; + *mask = (*addr == INADDR_ANY) ? INADDR_ANY : INADDR_NONE; +} + +static void +fill_ipv6_am(const struct sockaddr_in6 *in, rte_xmm_t *addr, rte_xmm_t *mask) +{ + const struct in6_addr *pm; + + memcpy(addr, &in->sin6_addr, sizeof(*addr)); + if (memcmp(&tle_udp6_any, addr, sizeof(*addr)) == 0) + pm = &tle_udp6_any; + else + pm = &tle_udp6_none; + + memcpy(mask, pm, sizeof(*mask)); +} + +static int +check_stream_prm(const struct tle_udp_stream_param *prm) +{ + if ((prm->local_addr.ss_family != AF_INET && + prm->local_addr.ss_family != AF_INET6) || + prm->local_addr.ss_family != prm->remote_addr.ss_family) + return EINVAL; + + /* callback and event notifications mechanisms are mutually exclusive */ + if ((prm->recv_ev != NULL && prm->recv_cb.func != NULL) || + (prm->send_ev != NULL && prm->send_cb.func != NULL)) + return EINVAL; + + return 0; +} + +struct tle_udp_stream * +tle_udp_stream_open(struct tle_udp_ctx *ctx, + const struct tle_udp_stream_param *prm) +{ + struct tle_udp_stream *s; + const struct sockaddr_in *rin; + int32_t rc; + + if (ctx == NULL || prm == NULL || check_stream_prm(prm) != 0) { + rte_errno = EINVAL; + return NULL; + } + + s = get_stream(ctx); + if (s == NULL) { + rte_errno = ENFILE; + return NULL; + + /* some TX still pending for that stream. */ + } else if (bcg_store_full(s->tx.st) == 0) { + put_stream(ctx, s, 0); + rte_errno = EAGAIN; + return NULL; + } + + /* copy input parameters. */ + s->prm = *prm; + + /* setup ports and port mask fields (except dst port). */ + rin = (const struct sockaddr_in *)&prm->remote_addr; + s->port.src = rin->sin_port; + s->pmsk.src = (s->port.src == 0) ? 0 : UINT16_MAX; + s->pmsk.dst = UINT16_MAX; + + /* setup src and dst addresses. */ + if (prm->local_addr.ss_family == AF_INET) { + fill_ipv4_am((const struct sockaddr_in *)&prm->local_addr, + &s->ipv4.addr.dst, &s->ipv4.mask.dst); + fill_ipv4_am((const struct sockaddr_in *)&prm->remote_addr, + &s->ipv4.addr.src, &s->ipv4.mask.src); + } else if (prm->local_addr.ss_family == AF_INET6) { + fill_ipv6_am((const struct sockaddr_in6 *)&prm->local_addr, + &s->ipv6.addr.dst, &s->ipv6.mask.dst); + fill_ipv6_am((const struct sockaddr_in6 *)&prm->remote_addr, + &s->ipv6.addr.src, &s->ipv6.mask.src); + } + + rte_spinlock_lock(&ctx->dev_lock); + rc = stream_fill_dev(ctx, s); + rte_spinlock_unlock(&ctx->dev_lock); + + if (rc != 0) { + put_stream(ctx, s, 1); + s = NULL; + rte_errno = rc; + } else { + /* setup stream notification menchanism */ + s->rx.ev = prm->recv_ev; + s->rx.cb = prm->recv_cb; + s->tx.ev = prm->send_ev; + s->tx.cb = prm->send_cb; + + /* mark stream as avaialbe for RX/TX */ + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + stream_up(s); + } + + return s; +} + +int +tle_udp_stream_close(struct tle_udp_stream *s) +{ + uint32_t i, n; + int32_t rc; + struct tle_udp_ctx *ctx; + struct rte_mbuf *m[MAX_BURST]; + + static const struct tle_udp_stream_cb zcb; + + if (s == NULL || s->type >= TLE_UDP_VNUM) + return EINVAL; + + ctx = s->ctx; + + /* mark stream as unavaialbe for RX/TX. */ + stream_down(s); + + /* reset TX cages. */ + rte_spinlock_lock(&s->tx.lock); + memset(s->tx.cg, 0, sizeof(s->tx.cg)); + rte_spinlock_unlock(&s->tx.lock); + + /* reset stream events if any. */ + if (s->rx.ev != NULL) { + tle_event_idle(s->rx.ev); + s->rx.ev = NULL; + } + if (s->tx.ev != NULL) { + tle_event_idle(s->tx.ev); + s->tx.ev = NULL; + } + + s->rx.cb = zcb; + s->tx.cb = zcb; + + /* free stream's destination port */ + rte_spinlock_lock(&ctx->dev_lock); + rc = stream_clear_dev(ctx, s); + rte_spinlock_unlock(&ctx->dev_lock); + + /* empty stream's RX queue */ + do { + n = rte_ring_dequeue_burst(s->rx.q, (void **)m, RTE_DIM(m)); + for (i = 0; i != n; i++) + rte_pktmbuf_free(m[i]); + } while (n != 0); + + /* + * mark the stream as free again. + * if there still are pkts queued for TX, + * then put this stream to the tail of free list. + */ + put_stream(ctx, s, bcg_store_full(s->tx.st)); + return rc; +} + +int +tle_udp_stream_get_param(const struct tle_udp_stream *s, + struct tle_udp_stream_param *prm) +{ + struct sockaddr_in *lin4; + struct sockaddr_in6 *lin6; + + if (prm == NULL || s == NULL || s->type >= TLE_UDP_VNUM) + return EINVAL; + + prm[0] = s->prm; + if (prm->local_addr.ss_family == AF_INET) { + lin4 = (struct sockaddr_in *)&prm->local_addr; + lin4->sin_port = s->port.dst; + } else if (s->prm.local_addr.ss_family == AF_INET6) { + lin6 = (struct sockaddr_in6 *)&prm->local_addr; + lin6->sin6_port = s->port.dst; + } + + return 0; +} diff --git a/lib/libtle_udp/udp_impl.h b/lib/libtle_udp/udp_impl.h new file mode 100644 index 0000000..fbdb743 --- /dev/null +++ b/lib/libtle_udp/udp_impl.h @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _UDP_IMPL_H_ +#define _UDP_IMPL_H_ + +#include +#include +#include +#include + +#include "buf_cage.h" +#include "port_bitmap.h" +#include "osdep.h" + +#ifdef __cplusplus +extern "C" { +#endif + +enum { + TLE_UDP_V4, + TLE_UDP_V6, + TLE_UDP_VNUM +}; + +union udp_ports { + uint32_t raw; + struct { + uint16_t src; + uint16_t dst; + }; +}; + +union udph { + uint64_t raw; + struct { + union udp_ports ports; + uint16_t len; + uint16_t cksum; + }; +}; + +union ipv4_addrs { + uint64_t raw; + struct { + uint32_t src; + uint32_t dst; + }; +}; + +union ipv6_addrs { + _ymm_t raw; + struct { + rte_xmm_t src; + rte_xmm_t dst; + }; +}; + +union ip_addrs { + union ipv4_addrs v4; + union ipv6_addrs v6; +}; + + +struct tle_udp_stream { + + STAILQ_ENTRY(tle_udp_stream) link; + struct tle_udp_ctx *ctx; + + uint8_t type; /* TLE_UDP_V4 or TLE_UDP_V6 */ + + struct { + struct rte_ring *q; + struct tle_event *ev; + struct tle_udp_stream_cb cb; + rte_atomic32_t use; + } rx; + + union udp_ports port; + union udp_ports pmsk; + + union { + struct { + union ipv4_addrs addr; + union ipv4_addrs mask; + } ipv4; + struct { + union ipv6_addrs addr; + union ipv6_addrs mask; + } ipv6; + }; + + struct { + rte_atomic32_t use; + rte_spinlock_t lock; + struct tle_event *ev; + struct tle_udp_stream_cb cb; + struct bcg_store *st; + struct buf_cage *cg[RTE_MAX_ETHPORTS]; + } tx __rte_cache_aligned; + + struct tle_udp_stream_param prm; +} __rte_cache_aligned; + +struct tle_udp_dport { + struct udp_pbm use; /* ports in use. */ + struct tle_udp_stream *streams[MAX_PORT_NUM]; /* port to stream. */ +}; + +struct tle_udp_dev { + struct tle_udp_ctx *ctx; + struct { + uint64_t ol_flags[TLE_UDP_VNUM]; + } rx; + struct { + /* used by FE. */ + uint64_t ol_flags[TLE_UDP_VNUM]; + rte_atomic32_t packet_id[TLE_UDP_VNUM]; + struct bcg_queue feq; + + /* used by BE only. */ + struct bcg_queue beq __rte_cache_min_aligned; + struct buf_cage *bc; + } tx; + struct tle_udp_dev_param prm; /* copy of device paramaters. */ + struct tle_udp_dport *dp[TLE_UDP_VNUM]; /* device udp ports */ +}; + +struct tle_udp_ctx { + struct tle_udp_ctx_param prm; + + struct { + rte_spinlock_t lock; + uint32_t nb_free; /* number of free streams. */ + STAILQ_HEAD(, tle_udp_stream) free; + struct tle_udp_stream *buf; /* array of streams */ + } streams; + + rte_spinlock_t dev_lock; + uint32_t nb_dev; + struct udp_pbm use[TLE_UDP_VNUM]; /* all ports in use. */ + struct tle_udp_dev dev[RTE_MAX_ETHPORTS]; +}; + +#ifdef __cplusplus +} +#endif + +#endif /* _UDP_IMPL_H_ */ diff --git a/lib/libtle_udp/udp_rxtx.c b/lib/libtle_udp/udp_rxtx.c new file mode 100644 index 0000000..d5d248e --- /dev/null +++ b/lib/libtle_udp/udp_rxtx.c @@ -0,0 +1,767 @@ + +#include +#include +#include +#include +#include +#include + +#include "udp_impl.h" +#include "misc.h" + +static inline struct tle_udp_stream * +rx_stream_obtain(struct tle_udp_dev *dev, uint32_t type, uint32_t port) +{ + struct tle_udp_stream *s; + + if (type >= TLE_UDP_VNUM || dev->dp[type] == NULL) + return NULL; + + s = dev->dp[type]->streams[port]; + if (s == NULL) + return NULL; + + if (rwl_acquire(&s->rx.use) < 0) + return NULL; + + return s; +} + +static inline uint16_t +get_pkt_type(const struct rte_mbuf *m) +{ + uint32_t v; + + v = m->packet_type & + (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK); + if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_UDP)) + return TLE_UDP_V4; + else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP)) + return TLE_UDP_V6; + else + return TLE_UDP_VNUM; +} + +static inline union udp_ports +pkt_info(const struct tle_udp_dev *dev, struct rte_mbuf *m, + union udp_ports *ports, union ipv4_addrs *addr4, + union ipv6_addrs **addr6) +{ + uint32_t len; + union udp_ports ret, *up; + union ipv4_addrs *pa4; + + ret.src = get_pkt_type(m); + + len = m->l2_len; + if (ret.src == TLE_UDP_V4) { + pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *, + len + offsetof(struct ipv4_hdr, src_addr)); + addr4->raw = pa4->raw; + m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V4]; + } else if (ret.src == TLE_UDP_V6) { + *addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *, + len + offsetof(struct ipv6_hdr, src_addr)); + m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V6]; + } + + len += m->l3_len; + up = rte_pktmbuf_mtod_offset(m, union udp_ports *, + len + offsetof(struct udp_hdr, src_port)); + ports->raw = up->raw; + ret.dst = ports->dst; + return ret; +} + +/* + * Helper routine, enqueues packets to the stream and calls RX + * notification callback, if needed. + */ +static inline uint16_t +rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[], + int32_t rc[], uint32_t num) +{ + uint32_t i, k, r; + + r = rte_ring_enqueue_burst(s->rx.q, mb, num); + + /* if RX queue was empty invoke user RX notification callback. */ + if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r) + s->rx.cb.func(s->rx.cb.data, s); + + for (i = r, k = 0; i != num; i++, k++) { + rc[k] = ENOBUFS; + rp[k] = mb[i]; + } + + return r; +} + +static inline uint16_t +rx_stream6(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + union ipv6_addrs *addr[], union udp_ports port[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + uint32_t i, k, n; + void *mb[num]; + + k = 0; + n = 0; + + for (i = 0; i != num; i++) { + + if ((port[i].raw & s->pmsk.raw) != s->port.raw || + ymm_mask_cmp(&addr[i]->raw, &s->ipv6.addr.raw, + &s->ipv6.mask.raw) != 0) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } else { + mb[n] = pkt[i]; + n++; + } + } + + return rx_stream(s, mb, rp + k, rc + k, n); +} + +static inline uint16_t +rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + union ipv4_addrs addr[], union udp_ports port[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + uint32_t i, k, n; + void *mb[num]; + + k = 0; + n = 0; + + for (i = 0; i != num; i++) { + + if ((addr[i].raw & s->ipv4.mask.raw) != s->ipv4.addr.raw || + (port[i].raw & s->pmsk.raw) != + s->port.raw) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } else { + mb[n] = pkt[i]; + n++; + } + } + + return rx_stream(s, mb, rp + k, rc + k, n); +} + +uint16_t +tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + struct tle_udp_stream *s; + uint32_t i, j, k, n, p, t; + union udp_ports tp[num], port[num]; + union ipv4_addrs a4[num]; + union ipv6_addrs *pa6[num]; + + for (i = 0; i != num; i++) + tp[i] = pkt_info(dev, pkt[i], &port[i], &a4[i], &pa6[i]); + + k = 0; + for (i = 0; i != num; i = j) { + + for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++) + ; + + t = tp[i].src; + p = tp[i].dst; + s = rx_stream_obtain(dev, t, p); + if (s != NULL) { + + if (t == TLE_UDP_V4) + n = rx_stream4(s, pkt + i, a4 + i, + port + i, rp + k, rc + k, j - i); + else + n = rx_stream6(s, pkt + i, pa6 + i, port + i, + rp + k, rc + k, j - i); + + k += j - i - n; + + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + rwl_release(&s->rx.use); + + } else { + for (; i != j; i++) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } + } + } + + return num - k; +} + +static inline void +tx_cage_release(struct buf_cage *bc) +{ + struct tle_udp_stream *s; + uint32_t n; + + s = bcg_get_udata(bc); + n = bcg_free(bc); + + /* If stream is still open, then mark it as avaialble for writing. */ + if (rwl_try_acquire(&s->tx.use) > 0) { + + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + + /* if stream send buffer was full invoke TX callback */ + else if (s->tx.cb.func != NULL && n == 1) + s->tx.cb.func(s->tx.cb.data, s); + + } + + rwl_release(&s->tx.use); +} + +static inline void +tx_cage_update(struct tle_udp_dev *dev, struct buf_cage *bc) +{ + struct tle_udp_stream *s; + struct tle_udp_ctx *ctx; + uint32_t idx; + + ctx = dev->ctx; + s = bcg_get_udata(bc); + idx = dev - ctx->dev; + + /* mark cage as closed to the stream. */ + rte_spinlock_lock(&s->tx.lock); + if (bc == s->tx.cg[idx]) + s->tx.cg[idx] = NULL; + rte_spinlock_unlock(&s->tx.lock); +} + +uint16_t +tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num) +{ + struct buf_cage *bc; + uint32_t i, n; + + for (i = 0; i != num; i += n) { + + bc = dev->tx.bc; + if (bc == NULL) { + if (dev->tx.beq.num == 0) + bcg_queue_append(&dev->tx.beq, &dev->tx.feq); + bc = __bcg_dequeue_head(&dev->tx.beq); + if (bc == NULL) + break; + tx_cage_update(dev, bc); + dev->tx.bc = bc; + } + + n = bcg_get(bc, (const void **)(uintptr_t)&pkt[i], num - i); + + /* cage is empty, need to free it and notify related stream. */ + if (bcg_fill_count(bc) == 0) { + tx_cage_release(bc); + dev->tx.bc = NULL; + } + } + + return i; +} + +static int +check_pkt_csum(const struct rte_mbuf *m, uint32_t type) +{ + const struct ipv4_hdr *l3h4; + const struct ipv6_hdr *l3h6; + const struct udp_hdr *l4h; + int32_t ret; + uint16_t csum; + + ret = 0; + l3h4 = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, m->l2_len); + l3h6 = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, m->l2_len); + + if ((m->ol_flags & PKT_RX_IP_CKSUM_BAD) != 0) { + csum = _ipv4x_cksum(l3h4, m->l3_len); + ret = (csum != UINT16_MAX); + } + + if (ret == 0 && (m->ol_flags & PKT_RX_L4_CKSUM_BAD) != 0) { + + /* + * for IPv4 it is allowed to have zero UDP cksum, + * for IPv6 valid UDP cksum is mandatory. + */ + if (type == TLE_UDP_V4) { + l4h = (const struct udp_hdr *)((uintptr_t)l3h4 + + m->l3_len); + csum = (l4h->dgram_cksum == 0) ? UINT16_MAX : + _ipv4_udptcp_mbuf_cksum(m, + m->l2_len + m->l3_len, l3h4); + } else + csum = _ipv6_udptcp_mbuf_cksum(m, + m->l2_len + m->l3_len, l3h6); + + ret = (csum != UINT16_MAX); + } + + return ret; +} + +/* exclude NULLs from the final list of packets. */ +static inline uint32_t +compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero) +{ + uint32_t i, j, k, l; + + for (j = nb_pkt; nb_zero != 0 && j-- != 0; ) { + + /* found a hole. */ + if (pkt[j] == NULL) { + + /* find how big is it. */ + for (i = j; i-- != 0 && pkt[i] == NULL; ) + ; + /* fill the hole. */ + for (k = j + 1, l = i + 1; k != nb_pkt; k++, l++) + pkt[l] = pkt[k]; + + nb_pkt -= j - i; + nb_zero -= j - i; + } + } + + return nb_pkt; +} + +/* + * helper function, do the necessary pre-processing for the received packets + * before handiing them to the strem_recv caller. + */ +static inline struct rte_mbuf * +recv_pkt_process(struct rte_mbuf *m, uint32_t type) +{ + uint64_t f; + + f = m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD); + if (f != 0) { + if (check_pkt_csum(m, type) == 0) + m->ol_flags ^= f; + else { + rte_pktmbuf_free(m); + return NULL; + } + } + + rte_pktmbuf_adj(m, m->l2_len + m->l3_len + m->l4_len); + return m; +} + +uint16_t +tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + uint16_t num) +{ + uint32_t i, k, n; + + n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num); + if (n == 0) + return 0; + + /* + * if we still have packets to read, + * then rearm stream RX event. + */ + if (n == num && rte_ring_count(s->rx.q) != 0) { + if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + rwl_release(&s->rx.use); + } + + k = 0; + for (i = 0; i != RTE_ALIGN_FLOOR(n, 4); i += 4) { + pkt[i] = recv_pkt_process(pkt[i], s->type); + pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type); + pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type); + pkt[i + 3] = recv_pkt_process(pkt[i + 3], s->type); + k += (pkt[i] == NULL) + (pkt[i + 1] == NULL) + + (pkt[i + 2] == NULL) + (pkt[i + 3] == NULL); + } + + switch (n % 4) { + case 3: + pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type); + k += (pkt[i + 2] == NULL); + case 2: + pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type); + k += (pkt[i + 1] == NULL); + case 1: + pkt[i] = recv_pkt_process(pkt[i], s->type); + k += (pkt[i] == NULL); + } + + return compress_pkt_list(pkt, n, k); +} + +static int32_t +udp_get_dest(struct tle_udp_stream *s, const void *dst_addr, + struct tle_udp_dest *dst) +{ + int32_t rc; + const struct in_addr *d4; + const struct in6_addr *d6; + struct tle_udp_ctx *ctx; + struct tle_udp_dev *dev; + + ctx = s->ctx; + + /* it is here just to keep gcc happy. */ + d4 = NULL; + + if (s->type == TLE_UDP_V4) { + d4 = dst_addr; + rc = ctx->prm.lookup4(ctx->prm.lookup4_data, d4, dst); + } else if (s->type == TLE_UDP_V6) { + d6 = dst_addr; + rc = ctx->prm.lookup6(ctx->prm.lookup6_data, d6, dst); + } else + rc = -ENOENT; + + if (rc < 0 || dst->dev == NULL || dst->dev->ctx != ctx) + return -ENOENT; + + dev = dst->dev; + if (s->type == TLE_UDP_V4) { + struct ipv4_hdr *l3h; + l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len); + l3h->src_addr = dev->prm.local_addr4.s_addr; + l3h->dst_addr = d4->s_addr; + } else { + struct ipv6_hdr *l3h; + l3h = (struct ipv6_hdr *)(dst->hdr + dst->l2_len); + rte_memcpy(l3h->src_addr, &dev->prm.local_addr6, + sizeof(l3h->src_addr)); + rte_memcpy(l3h->dst_addr, d6, sizeof(l3h->dst_addr)); + } + + return dev - ctx->dev; +} + +static inline int +udp_fill_mbuf(struct rte_mbuf *m, + uint32_t type, uint64_t ol_flags, uint32_t pid, + union udph udph, const struct tle_udp_dest *dst) +{ + uint32_t len, plen; + char *l2h; + union udph *l4h; + + len = dst->l2_len + dst->l3_len; + plen = m->pkt_len; + + /* copy to mbuf L2/L3 header template. */ + + l2h = rte_pktmbuf_prepend(m, len + sizeof(*l4h)); + if (l2h == NULL) + return -ENOBUFS; + + /* copy L2/L3 header */ + rte_memcpy(l2h, dst->hdr, len); + + /* copy UDP header */ + l4h = (union udph *)(l2h + len); + l4h->raw = udph.raw; + + /* setup mbuf TX offload related fields. */ + m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, + sizeof(*l4h), 0, 0, 0); + m->ol_flags |= ol_flags; + + l4h->len = rte_cpu_to_be_16(plen + sizeof(*l4h)); + + /* update proto specific fields. */ + + if (type == TLE_UDP_V4) { + struct ipv4_hdr *l3h; + l3h = (struct ipv4_hdr *)(l2h + dst->l2_len); + l3h->packet_id = rte_cpu_to_be_16(pid); + l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + + sizeof(*l4h)); + + if ((ol_flags & PKT_TX_UDP_CKSUM) != 0) + l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len, + ol_flags); + else + l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h); + + if ((ol_flags & PKT_TX_IP_CKSUM) == 0) + l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len); + } else { + struct ipv6_hdr *l3h; + l3h = (struct ipv6_hdr *)(l2h + dst->l2_len); + l3h->payload_len = rte_cpu_to_be_16(plen + sizeof(*l4h)); + if ((ol_flags & PKT_TX_UDP_CKSUM) != 0) + l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags); + else + l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h); + } + + return 0; +} + +/* ??? + * probably this function should be there - + * rte_ipv[4,6]_fragment_packet should do that. + */ +static inline void +frag_fixup(const struct rte_mbuf *ms, struct rte_mbuf *mf, uint32_t type) +{ + struct ipv4_hdr *l3h; + + mf->ol_flags = ms->ol_flags; + mf->tx_offload = ms->tx_offload; + + if (type == TLE_UDP_V4 && (ms->ol_flags & PKT_TX_IP_CKSUM) == 0) { + l3h = rte_pktmbuf_mtod(mf, struct ipv4_hdr *); + l3h->hdr_checksum = _ipv4x_cksum(l3h, mf->l3_len); + } +} + +/* + * Returns negative for failure to fragment or actual number of fragments. + */ +static inline int +fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num, + uint32_t type, const struct tle_udp_dest *dst) +{ + int32_t frag_num, i; + uint16_t mtu; + void *eth_hdr; + + /* Remove the Ethernet header from the input packet */ + rte_pktmbuf_adj(pkt, dst->l2_len); + mtu = dst->mtu - dst->l2_len; + + /* fragment packet */ + if (type == TLE_UDP_V4) + frag_num = rte_ipv4_fragment_packet(pkt, frag, num, mtu, + dst->head_mp, dst->head_mp); + else + frag_num = rte_ipv6_fragment_packet(pkt, frag, num, mtu, + dst->head_mp, dst->head_mp); + + if (frag_num > 0) { + for (i = 0; i != frag_num; i++) { + + frag_fixup(pkt, frag[i], type); + + /* Move data_off to include l2 header first */ + eth_hdr = rte_pktmbuf_prepend(frag[i], dst->l2_len); + + /* copy l2 header into fragment */ + rte_memcpy(eth_hdr, dst->hdr, dst->l2_len); + } + } + + return frag_num; +} + +/* enqueue up to num packets to the destination device queue. */ +static inline uint16_t +queue_pkt_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di, + const void *pkt[], uint16_t num) +{ + struct buf_cage *bc; + uint32_t i, n; + + rte_spinlock_lock(&s->tx.lock); + bc = s->tx.cg[di]; + + for (i = 0; i != num; i += n) { + if (bc == NULL) { + bc = bcg_alloc(s->tx.st); + if (bc == NULL) + break; + n = bcg_put(bc, pkt + i, num - i); + bcg_enqueue_tail(bq, bc); + } else + n = bcg_put(bc, pkt + i, num - i); + + if (n != num - i) + bc = NULL; + } + + s->tx.cg[di] = bc; + rte_spinlock_unlock(&s->tx.lock); + return i; +} + +/* + * etiher enqueue all num packets or none. + * assumes that all number of input packets not exceed size of buf_cage. + */ +static inline uint16_t +queue_frg_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di, + const void *pkt[], uint16_t num) +{ + struct buf_cage *bc, *bcp; + uint32_t n; + + rte_spinlock_lock(&s->tx.lock); + bc = s->tx.cg[di]; + + n = 0; + if (bc == NULL || bcg_free_count(bc) < num) { + bcp = bc; + bc = bcg_alloc(s->tx.st); + if (bc != NULL) { + if (bcp != NULL) + n = bcg_put(bcp, pkt, num); + n += bcg_put(bc, pkt, num - n); + bcg_enqueue_tail(bq, bc); + } + } else + n = bcg_put(bc, pkt, num); + + s->tx.cg[di] = bc; + rte_spinlock_unlock(&s->tx.lock); + return n; +} + +uint16_t +tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + uint16_t num, const struct sockaddr *dst_addr) +{ + int32_t di, frg, rc; + uint64_t ol_flags; + uint32_t i, k, n; + uint32_t mtu, pid, type; + const struct sockaddr_in *d4; + const struct sockaddr_in6 *d6; + const void *da; + union udph udph; + struct tle_udp_dest dst; + + type = s->type; + + /* start filling UDP header. */ + udph.raw = 0; + udph.ports.src = s->port.dst; + + /* figure out what destination addr/port to use. */ + if (dst_addr != NULL) { + if (dst_addr->sa_family != s->prm.remote_addr.ss_family) { + rte_errno = EINVAL; + return 0; + } + if (type == TLE_UDP_V4) { + d4 = (const struct sockaddr_in *)dst_addr; + da = &d4->sin_addr; + udph.ports.dst = d4->sin_port; + } else { + d6 = (const struct sockaddr_in6 *)dst_addr; + da = &d6->sin6_addr; + udph.ports.dst = d6->sin6_port; + } + } else { + udph.ports.dst = s->port.src; + if (type == TLE_UDP_V4) + da = &s->ipv4.addr.src; + else + da = &s->ipv6.addr.src; + } + + di = udp_get_dest(s, da, &dst); + if (di < 0) { + rte_errno = -di; + return 0; + } + + pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num; + mtu = dst.mtu - dst.l2_len - dst.l3_len; + + /* mark stream as not closable. */ + if (rwl_acquire(&s->tx.use) < 0) + return 0; + + for (i = 0, k = 0; k != num; k = i) { + + /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */ + + frg = 0; + ol_flags = dst.dev->tx.ol_flags[type]; + + while (i != num && frg == 0) { + frg = pkt[i]->pkt_len > mtu; + if (frg != 0) + ol_flags &= ~PKT_TX_UDP_CKSUM; + rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i, + udph, &dst); + if (rc != 0) { + rte_errno = -rc; + goto out; + } + i += (frg == 0); + } + + /* enqueue non-fragment packets to the destination device. */ + if (k != i) { + k += queue_pkt_out(s, &dst.dev->tx.feq, di, + (const void **)(uintptr_t)&pkt[k], i - k); + + /* stream TX queue is full. */ + if (k != i) + break; + } + + /* enqueue packet that need to be fragmented */ + if (i != num) { + + struct rte_mbuf *frag[RTE_LIBRTE_IP_FRAG_MAX_FRAG]; + + /* fragment the packet. */ + rc = fragment(pkt[i], frag, RTE_DIM(frag), type, &dst); + if (rc < 0) { + rte_errno = -rc; + break; + } + + n = queue_frg_out(s, &dst.dev->tx.feq, di, + (const void **)(uintptr_t)frag, rc); + if (n == 0) { + while (rc-- != 0) + rte_pktmbuf_free(frag[rc]); + break; + } + + /* all fragments enqueued, free the original packet. */ + rte_pktmbuf_free(pkt[i]); + i++; + } + } + + /* if possible, rearm socket write event. */ + if (k == num && s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + +out: + rwl_release(&s->tx.use); + + /* + * remove pkt l2/l3 headers, restore ol_flags for unsent, but + * already modified packets. + */ + ol_flags = ~dst.dev->tx.ol_flags[type]; + for (n = k; n != i; n++) { + rte_pktmbuf_adj(pkt[n], dst.l2_len + dst.l3_len + sizeof(udph)); + pkt[n]->ol_flags &= ol_flags; + } + + return k; +} -- cgit 1.2.3-korg