From aa97dd1ce910b839fed46ad55d1e70e403f5a930 Mon Sep 17 00:00:00 2001 From: Konstantin Ananyev Date: Tue, 21 Feb 2017 18:12:20 +0000 Subject: Introduce first version of TCP code. Supported functionality: - open/close - listen/accept/connect - send/recv In order to achieve that libtle_udp library was reworked into libtle_l4p library that supports both TCP and UDP protocols. New libtle_timer library was introduced (thanks to Cisco guys and Dave Barach for sharing their timer code with us). Sample application was also reworked significantly to support both TCP and UDP traffic handling. New UT were introduced. Change-Id: I806b05011f521e89b58db403cfdd484a37beb775 Signed-off-by: Mohammad Abdul Awal Signed-off-by: Karol Latecki Signed-off-by: Daniel Mrzyglod Signed-off-by: Konstantin Ananyev --- lib/libtle_l4p/Makefile | 53 + lib/libtle_l4p/ctx.c | 527 +++++++++ lib/libtle_l4p/ctx.h | 86 ++ lib/libtle_l4p/debug.h | 81 ++ lib/libtle_l4p/event.c | 104 ++ lib/libtle_l4p/misc.h | 415 +++++++ lib/libtle_l4p/net_misc.h | 78 ++ lib/libtle_l4p/osdep.h | 73 ++ lib/libtle_l4p/port_bitmap.h | 112 ++ lib/libtle_l4p/stream.h | 170 +++ lib/libtle_l4p/stream_table.c | 80 ++ lib/libtle_l4p/stream_table.h | 260 +++++ lib/libtle_l4p/syncookie.h | 194 ++++ lib/libtle_l4p/tcp_ctl.h | 120 ++ lib/libtle_l4p/tcp_misc.h | 462 ++++++++ lib/libtle_l4p/tcp_ofo.c | 85 ++ lib/libtle_l4p/tcp_ofo.h | 249 +++++ lib/libtle_l4p/tcp_rxq.h | 149 +++ lib/libtle_l4p/tcp_rxtx.c | 2431 +++++++++++++++++++++++++++++++++++++++++ lib/libtle_l4p/tcp_stream.c | 522 +++++++++ lib/libtle_l4p/tcp_stream.h | 170 +++ lib/libtle_l4p/tcp_timer.h | 126 +++ lib/libtle_l4p/tcp_txq.h | 122 +++ lib/libtle_l4p/tle_ctx.h | 233 ++++ lib/libtle_l4p/tle_event.h | 278 +++++ lib/libtle_l4p/tle_tcp.h | 395 +++++++ lib/libtle_l4p/tle_udp.h | 187 ++++ lib/libtle_l4p/udp_rxtx.c | 646 +++++++++++ lib/libtle_l4p/udp_stream.c | 346 ++++++ lib/libtle_l4p/udp_stream.h | 79 ++ 30 files changed, 8833 insertions(+) create mode 100644 lib/libtle_l4p/Makefile create mode 100644 lib/libtle_l4p/ctx.c create mode 100644 lib/libtle_l4p/ctx.h create mode 100644 lib/libtle_l4p/debug.h create mode 100644 lib/libtle_l4p/event.c create mode 100644 lib/libtle_l4p/misc.h create mode 100644 lib/libtle_l4p/net_misc.h create mode 100644 lib/libtle_l4p/osdep.h create mode 100644 lib/libtle_l4p/port_bitmap.h create mode 100644 lib/libtle_l4p/stream.h create mode 100644 lib/libtle_l4p/stream_table.c create mode 100644 lib/libtle_l4p/stream_table.h create mode 100644 lib/libtle_l4p/syncookie.h create mode 100644 lib/libtle_l4p/tcp_ctl.h create mode 100644 lib/libtle_l4p/tcp_misc.h create mode 100644 lib/libtle_l4p/tcp_ofo.c create mode 100644 lib/libtle_l4p/tcp_ofo.h create mode 100644 lib/libtle_l4p/tcp_rxq.h create mode 100644 lib/libtle_l4p/tcp_rxtx.c create mode 100644 lib/libtle_l4p/tcp_stream.c create mode 100644 lib/libtle_l4p/tcp_stream.h create mode 100644 lib/libtle_l4p/tcp_timer.h create mode 100644 lib/libtle_l4p/tcp_txq.h create mode 100644 lib/libtle_l4p/tle_ctx.h create mode 100644 lib/libtle_l4p/tle_event.h create mode 100644 lib/libtle_l4p/tle_tcp.h create mode 100644 lib/libtle_l4p/tle_udp.h create mode 100644 lib/libtle_l4p/udp_rxtx.c create mode 100644 lib/libtle_l4p/udp_stream.c create mode 100644 lib/libtle_l4p/udp_stream.h (limited to 'lib/libtle_l4p') diff --git a/lib/libtle_l4p/Makefile b/lib/libtle_l4p/Makefile new file mode 100644 index 0000000..c0d3e80 --- /dev/null +++ b/lib/libtle_l4p/Makefile @@ -0,0 +1,53 @@ +# Copyright (c) 2016 Intel Corporation. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +# Default target, can be overwritten by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +include $(RTE_SDK)/mk/rte.vars.mk + +# library name +LIB = libtle_l4p.a + +CFLAGS += -O3 +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) + +EXPORT_MAP := tle_l4p_version.map + +LIBABIVER := 1 + +#source files +SRCS-y += ctx.c +SRCS-y += event.c +SRCS-y += stream_table.c +SRCS-y += tcp_ofo.c +SRCS-y += tcp_stream.c +SRCS-y += tcp_rxtx.c +SRCS-y += udp_stream.c +SRCS-y += udp_rxtx.c + +# install this header file +SYMLINK-y-include += tle_ctx.h +SYMLINK-y-include += tle_event.h +SYMLINK-y-include += tle_tcp.h +SYMLINK-y-include += tle_udp.h + +# this lib dependencies +DEPDIRS-y += lib/libtle_dring +DEPDIRS-y += lib/libtle_timer + +include $(TLDK_ROOT)/mk/tle.lib.mk diff --git a/lib/libtle_l4p/ctx.c b/lib/libtle_l4p/ctx.c new file mode 100644 index 0000000..7ebef9d --- /dev/null +++ b/lib/libtle_l4p/ctx.c @@ -0,0 +1,527 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "stream.h" +#include "misc.h" + +#define LPORT_START 0x8000 +#define LPORT_END MAX_PORT_NUM + +#define LPORT_START_BLK PORT_BLK(LPORT_START) +#define LPORT_END_BLK PORT_BLK(LPORT_END) + +const struct in6_addr tle_ipv6_any = IN6ADDR_ANY_INIT; +const struct in6_addr tle_ipv6_none = { + { + .__u6_addr32 = { + UINT32_MAX, UINT32_MAX, UINT32_MAX, UINT32_MAX + }, + }, +}; + +struct stream_ops tle_stream_ops[TLE_PROTO_NUM] = {}; + +static int +check_dev_prm(const struct tle_dev_param *dev_prm) +{ + /* no valid IPv4/IPv6 addresses provided. */ + if (dev_prm->local_addr4.s_addr == INADDR_ANY && + memcmp(&dev_prm->local_addr6, &tle_ipv6_any, + sizeof(tle_ipv6_any)) == 0) + return -EINVAL; + + if (dev_prm->bl4.nb_port > UINT16_MAX || + (dev_prm->bl4.nb_port != 0 && + dev_prm->bl4.port == NULL)) + return -EINVAL; + + if (dev_prm->bl6.nb_port > UINT16_MAX || + (dev_prm->bl6.nb_port != 0 && + dev_prm->bl6.port == NULL)) + return -EINVAL; + + return 0; +} + +static int +check_ctx_prm(const struct tle_ctx_param *prm) +{ + if (prm->proto >= TLE_PROTO_NUM) + return -EINVAL; + return 0; +} + +struct tle_ctx * +tle_ctx_create(const struct tle_ctx_param *ctx_prm) +{ + struct tle_ctx *ctx; + size_t sz; + uint32_t i; + int32_t rc; + + if (ctx_prm == NULL || check_ctx_prm(ctx_prm) != 0) { + rte_errno = EINVAL; + return NULL; + } + + sz = sizeof(*ctx); + ctx = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx_prm->socket_id); + if (ctx == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes for new ctx " + "on socket %d failed\n", + sz, ctx_prm->socket_id); + return NULL; + } + + ctx->prm = *ctx_prm; + + rc = tle_stream_ops[ctx_prm->proto].init_streams(ctx); + if (rc != 0) { + UDP_LOG(ERR, "init_streams(ctx=%p, proto=%u) failed " + "with error code: %d;\n", + ctx, ctx_prm->proto, rc); + tle_ctx_destroy(ctx); + rte_errno = -rc; + return NULL; + } + + for (i = 0; i != RTE_DIM(ctx->use); i++) + tle_pbm_init(ctx->use + i, LPORT_START_BLK); + + ctx->streams.nb_free = ctx->prm.max_streams; + return ctx; +} + +void +tle_ctx_destroy(struct tle_ctx *ctx) +{ + uint32_t i; + + if (ctx == NULL) { + rte_errno = EINVAL; + return; + } + + for (i = 0; i != RTE_DIM(ctx->dev); i++) + tle_del_dev(ctx->dev + i); + + tle_stream_ops[ctx->prm.proto].fini_streams(ctx); + rte_free(ctx); +} + +void +tle_ctx_invalidate(struct tle_ctx *ctx) +{ + RTE_SET_USED(ctx); +} + +static void +fill_pbm(struct tle_pbm *pbm, const struct tle_bl_port *blp) +{ + uint32_t i; + + for (i = 0; i != blp->nb_port; i++) + tle_pbm_set(pbm, blp->port[i]); +} + +static int +init_dev_proto(struct tle_dev *dev, uint32_t idx, int32_t socket_id, + const struct tle_bl_port *blp) +{ + size_t sz; + + sz = sizeof(*dev->dp[idx]); + dev->dp[idx] = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + socket_id); + + if (dev->dp[idx] == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes on " + "socket %d for %u-th device failed\n", + sz, socket_id, idx); + return ENOMEM; + } + + tle_pbm_init(&dev->dp[idx]->use, LPORT_START_BLK); + fill_pbm(&dev->dp[idx]->use, blp); + return 0; +} + +static struct tle_dev * +find_free_dev(struct tle_ctx *ctx) +{ + uint32_t i; + + if (ctx->nb_dev < RTE_DIM(ctx->dev)) { + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].ctx != ctx) + return ctx->dev + i; + } + } + + rte_errno = ENODEV; + return NULL; +} + +struct tle_dev * +tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm) +{ + int32_t rc; + struct tle_dev *dev; + + if (ctx == NULL || dev_prm == NULL || check_dev_prm(dev_prm) != 0) { + rte_errno = EINVAL; + return NULL; + } + + dev = find_free_dev(ctx); + if (dev == NULL) + return NULL; + rc = 0; + + /* device can handle IPv4 traffic */ + if (dev_prm->local_addr4.s_addr != INADDR_ANY) { + rc = init_dev_proto(dev, TLE_V4, ctx->prm.socket_id, + &dev_prm->bl4); + if (rc == 0) + fill_pbm(&ctx->use[TLE_V4], &dev_prm->bl4); + } + + /* device can handle IPv6 traffic */ + if (rc == 0 && memcmp(&dev_prm->local_addr6, &tle_ipv6_any, + sizeof(tle_ipv6_any)) != 0) { + rc = init_dev_proto(dev, TLE_V6, ctx->prm.socket_id, + &dev_prm->bl6); + if (rc == 0) + fill_pbm(&ctx->use[TLE_V6], &dev_prm->bl6); + } + + if (rc != 0) { + /* cleanup and return an error. */ + rte_free(dev->dp[TLE_V4]); + rte_free(dev->dp[TLE_V6]); + rte_errno = rc; + return NULL; + } + + /* setup RX data. */ + if (dev_prm->local_addr4.s_addr != INADDR_ANY && + (dev_prm->rx_offload & DEV_RX_OFFLOAD_IPV4_CKSUM) == 0) + dev->rx.ol_flags[TLE_V4] |= PKT_RX_IP_CKSUM_BAD; + + if (((dev_prm->rx_offload & DEV_RX_OFFLOAD_UDP_CKSUM) == 0 && + ctx->prm.proto == TLE_PROTO_UDP) || + ((dev_prm->rx_offload & + DEV_RX_OFFLOAD_TCP_CKSUM) == 0 && + ctx->prm.proto == TLE_PROTO_TCP)) { + dev->rx.ol_flags[TLE_V4] |= PKT_RX_L4_CKSUM_BAD; + dev->rx.ol_flags[TLE_V6] |= PKT_RX_L4_CKSUM_BAD; + } + + /* setup TX data. */ + tle_dring_reset(&dev->tx.dr); + + if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0 && + ctx->prm.proto == TLE_PROTO_UDP) { + dev->tx.ol_flags[TLE_V4] |= PKT_TX_IPV4 | PKT_TX_UDP_CKSUM; + dev->tx.ol_flags[TLE_V6] |= PKT_TX_IPV6 | PKT_TX_UDP_CKSUM; + } else if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_TCP_CKSUM) != 0 && + ctx->prm.proto == TLE_PROTO_TCP) { + dev->tx.ol_flags[TLE_V4] |= PKT_TX_IPV4 | PKT_TX_TCP_CKSUM; + dev->tx.ol_flags[TLE_V6] |= PKT_TX_IPV6 | PKT_TX_TCP_CKSUM; + } + + if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_IPV4_CKSUM) != 0) + dev->tx.ol_flags[TLE_V4] |= PKT_TX_IPV4 | PKT_TX_IP_CKSUM; + + dev->prm = *dev_prm; + dev->ctx = ctx; + ctx->nb_dev++; + + return dev; +} + +static void +empty_dring(struct tle_dring *dr, uint32_t proto) +{ + uint32_t i, k, n; + struct tle_stream *s; + struct rte_mbuf *pkt[MAX_PKT_BURST]; + struct tle_drb *drb[MAX_PKT_BURST]; + + do { + k = RTE_DIM(drb); + n = tle_dring_sc_dequeue(dr, (const void **)(uintptr_t)pkt, + RTE_DIM(pkt), drb, &k); + + /* free mbufs */ + for (i = 0; i != n; i++) + rte_pktmbuf_free(pkt[i]); + /* free drbs */ + for (i = 0; i != k; i++) { + s = drb[i]->udata; + tle_stream_ops[proto].free_drbs(s, drb + i, 1); + } + } while (n != 0); +} + +int +tle_del_dev(struct tle_dev *dev) +{ + uint32_t p; + struct tle_ctx *ctx; + + if (dev == NULL || dev->ctx == NULL) + return -EINVAL; + + ctx = dev->ctx; + p = dev - ctx->dev; + + if (p >= RTE_DIM(ctx->dev) || + (dev->dp[TLE_V4] == NULL && + dev->dp[TLE_V6] == NULL)) + return -EINVAL; + + /* emtpy TX queues. */ + empty_dring(&dev->tx.dr, ctx->prm.proto); + + rte_free(dev->dp[TLE_V4]); + rte_free(dev->dp[TLE_V6]); + memset(dev, 0, sizeof(*dev)); + ctx->nb_dev--; + return 0; +} + +static struct tle_dev * +find_ipv4_dev(struct tle_ctx *ctx, const struct in_addr *addr) +{ + uint32_t i; + + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].prm.local_addr4.s_addr == addr->s_addr && + ctx->dev[i].dp[TLE_V4] != NULL) + return ctx->dev + i; + } + + return NULL; +} + +static struct tle_dev * +find_ipv6_dev(struct tle_ctx *ctx, const struct in6_addr *addr) +{ + uint32_t i; + + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (memcmp(&ctx->dev[i].prm.local_addr6, addr, + sizeof(*addr)) == 0 && + ctx->dev[i].dp[TLE_V6] != NULL) + return ctx->dev + i; + } + + return NULL; +} + +static int +stream_fill_dev(struct tle_ctx *ctx, struct tle_stream *s, + const struct sockaddr *addr) +{ + struct tle_dev *dev; + struct tle_pbm *pbm; + const struct sockaddr_in *lin4; + const struct sockaddr_in6 *lin6; + uint32_t i, p, sp, t; + + if (addr->sa_family == AF_INET) { + lin4 = (const struct sockaddr_in *)addr; + t = TLE_V4; + p = lin4->sin_port; + } else if (addr->sa_family == AF_INET6) { + lin6 = (const struct sockaddr_in6 *)addr; + t = TLE_V6; + p = lin6->sin6_port; + } else + return EINVAL; + + p = ntohs(p); + + /* if local address is not wildcard, find device it belongs to. */ + if (t == TLE_V4 && lin4->sin_addr.s_addr != INADDR_ANY) { + dev = find_ipv4_dev(ctx, &lin4->sin_addr); + if (dev == NULL) + return ENODEV; + } else if (t == TLE_V6 && memcmp(&tle_ipv6_any, &lin6->sin6_addr, + sizeof(tle_ipv6_any)) != 0) { + dev = find_ipv6_dev(ctx, &lin6->sin6_addr); + if (dev == NULL) + return ENODEV; + } else + dev = NULL; + + if (dev != NULL) + pbm = &dev->dp[t]->use; + else + pbm = &ctx->use[t]; + + /* try to acquire local port number. */ + if (p == 0) { + p = tle_pbm_find_range(pbm, pbm->blk, LPORT_END_BLK); + if (p == 0 && pbm->blk > LPORT_START_BLK) + p = tle_pbm_find_range(pbm, LPORT_START_BLK, pbm->blk); + } else if (tle_pbm_check(pbm, p) != 0) + return EEXIST; + + if (p == 0) + return ENFILE; + + /* fill socket's dst port and type */ + + sp = htons(p); + s->type = t; + s->port.dst = sp; + + /* mark port as in-use */ + + tle_pbm_set(&ctx->use[t], p); + if (dev != NULL) { + tle_pbm_set(pbm, p); + dev->dp[t]->streams[sp] = s; + } else { + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].dp[t] != NULL) { + tle_pbm_set(&ctx->dev[i].dp[t]->use, p); + ctx->dev[i].dp[t]->streams[sp] = s; + } + } + } + + return 0; +} + +static int +stream_clear_dev(struct tle_ctx *ctx, const struct tle_stream *s) +{ + struct tle_dev *dev; + uint32_t i, p, sp, t; + + t = s->type; + sp = s->port.dst; + p = ntohs(sp); + + /* if local address is not wildcard, find device it belongs to. */ + if (t == TLE_V4 && s->ipv4.addr.dst != INADDR_ANY) { + dev = find_ipv4_dev(ctx, + (const struct in_addr *)&s->ipv4.addr.dst); + if (dev == NULL) + return ENODEV; + } else if (t == TLE_V6 && memcmp(&tle_ipv6_any, &s->ipv6.addr.dst, + sizeof(tle_ipv6_any)) != 0) { + dev = find_ipv6_dev(ctx, + (const struct in6_addr *)&s->ipv6.addr.dst); + if (dev == NULL) + return ENODEV; + } else + dev = NULL; + + tle_pbm_clear(&ctx->use[t], p); + if (dev != NULL) { + if (dev->dp[t]->streams[sp] == s) { + tle_pbm_clear(&dev->dp[t]->use, p); + dev->dp[t]->streams[sp] = NULL; + } + } else { + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].dp[t] != NULL && + ctx->dev[i].dp[t]->streams[sp] == s) { + tle_pbm_clear(&ctx->dev[i].dp[t]->use, p); + ctx->dev[i].dp[t]->streams[sp] = NULL; + } + } + } + + return 0; +} + +static void +fill_ipv4_am(const struct sockaddr_in *in, uint32_t *addr, uint32_t *mask) +{ + *addr = in->sin_addr.s_addr; + *mask = (*addr == INADDR_ANY) ? INADDR_ANY : INADDR_NONE; +} + +static void +fill_ipv6_am(const struct sockaddr_in6 *in, rte_xmm_t *addr, rte_xmm_t *mask) +{ + const struct in6_addr *pm; + + memcpy(addr, &in->sin6_addr, sizeof(*addr)); + if (memcmp(&tle_ipv6_any, addr, sizeof(*addr)) == 0) + pm = &tle_ipv6_any; + else + pm = &tle_ipv6_none; + + memcpy(mask, pm, sizeof(*mask)); +} + +int +stream_fill_ctx(struct tle_ctx *ctx, struct tle_stream *s, + const struct sockaddr *laddr, const struct sockaddr *raddr) +{ + const struct sockaddr_in *rin; + int32_t rc; + + /* setup ports and port mask fields (except dst port). */ + rin = (const struct sockaddr_in *)raddr; + s->port.src = rin->sin_port; + s->pmsk.src = (s->port.src == 0) ? 0 : UINT16_MAX; + s->pmsk.dst = UINT16_MAX; + + /* setup src and dst addresses. */ + if (laddr->sa_family == AF_INET) { + fill_ipv4_am((const struct sockaddr_in *)laddr, + &s->ipv4.addr.dst, &s->ipv4.mask.dst); + fill_ipv4_am((const struct sockaddr_in *)raddr, + &s->ipv4.addr.src, &s->ipv4.mask.src); + } else if (laddr->sa_family == AF_INET6) { + fill_ipv6_am((const struct sockaddr_in6 *)laddr, + &s->ipv6.addr.dst, &s->ipv6.mask.dst); + fill_ipv6_am((const struct sockaddr_in6 *)raddr, + &s->ipv6.addr.src, &s->ipv6.mask.src); + } + + rte_spinlock_lock(&ctx->dev_lock); + rc = stream_fill_dev(ctx, s, laddr); + rte_spinlock_unlock(&ctx->dev_lock); + + return rc; +} + +/* free stream's destination port */ +int +stream_clear_ctx(struct tle_ctx *ctx, struct tle_stream *s) +{ + int32_t rc; + + rte_spinlock_lock(&ctx->dev_lock); + rc = stream_clear_dev(ctx, s); + rte_spinlock_unlock(&ctx->dev_lock); + + return rc; +} diff --git a/lib/libtle_l4p/ctx.h b/lib/libtle_l4p/ctx.h new file mode 100644 index 0000000..cc32081 --- /dev/null +++ b/lib/libtle_l4p/ctx.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _CTX_H_ +#define _CTX_H_ + +#include +#include +#include +#include + +#include "port_bitmap.h" +#include "osdep.h" +#include "net_misc.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct tle_dport { + struct tle_pbm use; /* ports in use. */ + struct tle_stream *streams[MAX_PORT_NUM]; /* port to stream. */ +}; + +struct tle_dev { + struct tle_ctx *ctx; + struct { + uint64_t ol_flags[TLE_VNUM]; + } rx; + struct { + /* used by FE. */ + uint64_t ol_flags[TLE_VNUM]; + rte_atomic32_t packet_id[TLE_VNUM]; + + /* used by FE & BE. */ + struct tle_dring dr; + } tx; + struct tle_dev_param prm; /* copy of device parameters. */ + struct tle_dport *dp[TLE_VNUM]; /* device L4 ports */ +}; + +struct tle_ctx { + struct tle_ctx_param prm; + struct { + rte_spinlock_t lock; + uint32_t nb_free; /* number of free streams. */ + STAILQ_HEAD(, tle_stream) free; + void *buf; /* space allocated for streams */ + } streams; + + rte_spinlock_t dev_lock; + uint32_t nb_dev; + struct tle_pbm use[TLE_VNUM]; /* all ports in use. */ + struct tle_dev dev[RTE_MAX_ETHPORTS]; +}; + +struct stream_ops { + int (*init_streams)(struct tle_ctx *); + void (*fini_streams)(struct tle_ctx *); + void (*free_drbs)(struct tle_stream *, struct tle_drb *[], uint32_t); +}; + +extern struct stream_ops tle_stream_ops[TLE_PROTO_NUM]; + +int stream_fill_ctx(struct tle_ctx *ctx, struct tle_stream *s, + const struct sockaddr *laddr, const struct sockaddr *raddr); + +int stream_clear_ctx(struct tle_ctx *ctx, struct tle_stream *s); + +#ifdef __cplusplus +} +#endif + +#endif /* _UDP_IMPL_H_ */ diff --git a/lib/libtle_l4p/debug.h b/lib/libtle_l4p/debug.h new file mode 100644 index 0000000..b2a8b52 --- /dev/null +++ b/lib/libtle_l4p/debug.h @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _DEBUG_H_ +#define _DEBUG_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#define FUNC_SEQ_VERIFY(v) do { \ + static uint64_t nb_call; \ + static typeof(v) x; \ + if (nb_call++ != 0) \ + RTE_VERIFY(tcp_seq_leq(x, v)); \ + x = (v); \ +} while (0) + +#define FUNC_VERIFY(e, c) do { \ + static uint64_t nb_call; \ + if ((e) == 0) \ + nb_call++; \ + else \ + nb_call = 0; \ + RTE_VERIFY(nb_call != (c)); \ +} while (0) + +#define FUNC_STAT(v, c) do { \ + static uint64_t nb_call, nb_data; \ + nb_call++; \ + nb_data += (v); \ + if ((nb_call & ((c) - 1)) == 0) { \ + printf("%s#%d@%u: nb_call=%lu, avg(" #v ")=%#Lf\n", \ + __func__, __LINE__, rte_lcore_id(), nb_call, \ + (long double)nb_data / nb_call); \ + nb_call = 0; \ + nb_data = 0; \ + } \ +} while (0) + +#define FUNC_TM_STAT(v, c) do { \ + static uint64_t nb_call, nb_data; \ + static uint64_t cts, pts, sts; \ + cts = rte_rdtsc(); \ + if (pts != 0) \ + sts += cts - pts; \ + pts = cts; \ + nb_call++; \ + nb_data += (v); \ + if ((nb_call & ((c) - 1)) == 0) { \ + printf("%s#%d@%u: nb_call=%lu, " \ + "avg(" #v ")=%#Lf, " \ + "avg(cycles)=%#Lf, " \ + "avg(cycles/" #v ")=%#Lf\n", \ + __func__, __LINE__, rte_lcore_id(), nb_call, \ + (long double)nb_data / nb_call, \ + (long double)sts / nb_call, \ + (long double)sts / nb_data); \ + nb_call = 0; \ + nb_data = 0; \ + sts = 0; \ + } \ +} while (0) + +#ifdef __cplusplus +} +#endif + +#endif /* _DEBUG_H_ */ diff --git a/lib/libtle_l4p/event.c b/lib/libtle_l4p/event.c new file mode 100644 index 0000000..66c5a3b --- /dev/null +++ b/lib/libtle_l4p/event.c @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include "osdep.h" + +struct tle_evq * +tle_evq_create(const struct tle_evq_param *prm) +{ + struct tle_evq *evq; + size_t sz; + uint32_t i; + + if (prm == NULL) { + rte_errno = EINVAL; + return NULL; + } + + sz = sizeof(*evq) + sizeof(evq->events[0]) * prm->max_events; + evq = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + prm->socket_id); + if (evq == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes for " + "new tle_evq(%u) on socket %d failed\n", + sz, prm->max_events, prm->socket_id); + return NULL; + } + + TAILQ_INIT(&evq->armed); + TAILQ_INIT(&evq->free); + + for (i = 0; i != prm->max_events; i++) { + evq->events[i].head = evq; + TAILQ_INSERT_TAIL(&evq->free, evq->events + i, ql); + } + + evq->nb_events = i; + evq->nb_free = i; + + return evq; +} + +void +tle_evq_destroy(struct tle_evq *evq) +{ + rte_free(evq); +} + +struct tle_event * +tle_event_alloc(struct tle_evq *evq, const void *data) +{ + struct tle_event *h; + + if (evq == NULL) { + rte_errno = EINVAL; + return NULL; + } + + rte_spinlock_lock(&evq->lock); + h = TAILQ_FIRST(&evq->free); + if (h != NULL) { + TAILQ_REMOVE(&evq->free, h, ql); + evq->nb_free--; + h->data = data; + } else + rte_errno = ENOMEM; + rte_spinlock_unlock(&evq->lock); + return h; +} + +void +tle_event_free(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev == NULL) { + rte_errno = EINVAL; + return; + } + + q = ev->head; + rte_spinlock_lock(&q->lock); + ev->data = NULL; + ev->state = TLE_SEV_IDLE; + TAILQ_INSERT_HEAD(&q->free, ev, ql); + q->nb_free++; + rte_spinlock_unlock(&q->lock); +} diff --git a/lib/libtle_l4p/misc.h b/lib/libtle_l4p/misc.h new file mode 100644 index 0000000..55dca10 --- /dev/null +++ b/lib/libtle_l4p/misc.h @@ -0,0 +1,415 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _MISC_H_ +#define _MISC_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +static inline int +xmm_cmp(const rte_xmm_t *da, const rte_xmm_t *sa) +{ + uint64_t ret; + + ret = (sa->u64[0] ^ da->u64[0]) | + (sa->u64[1] ^ da->u64[1]); + + return (ret != 0); +} + +static inline int +ymm_cmp(const _ymm_t *da, const _ymm_t *sa) +{ + uint64_t ret; + + ret = (sa->u64[0] ^ da->u64[0]) | + (sa->u64[1] ^ da->u64[1]) | + (sa->u64[2] ^ da->u64[2]) | + (sa->u64[3] ^ da->u64[3]); + + return (ret != 0); +} + +static inline int +ymm_mask_cmp(const _ymm_t *da, const _ymm_t *sa, const _ymm_t *sm) +{ + uint64_t ret; + + ret = ((da->u64[0] & sm->u64[0]) ^ sa->u64[0]) | + ((da->u64[1] & sm->u64[1]) ^ sa->u64[1]) | + ((da->u64[2] & sm->u64[2]) ^ sa->u64[2]) | + ((da->u64[3] & sm->u64[3]) ^ sa->u64[3]); + + return (ret != 0); +} + +/* + * Setup tx_offload field inside mbuf using raw 64-bit field. + * Consider to move it into DPDK librte_mbuf. + */ +static inline uint64_t +_mbuf_tx_offload(uint64_t il2, uint64_t il3, uint64_t il4, uint64_t tso, + uint64_t ol3, uint64_t ol2) +{ + return il2 | il3 << 7 | il4 << 16 | tso << 24 | ol3 << 40 | ol2 << 49; +} + +/* + * Given the value of mbuf's tx_offload, calculate L4 payload offset. + */ +static inline uint32_t +_tx_offload_l4_offset(uint64_t ofl) +{ + uint32_t l2, l3, l4; + + l2 = ofl & 0x7f; + l3 = ofl >> 7 & 0x1ff; + l4 = ofl >> 16 & UINT8_MAX; + + return l2 + l3 + l4; +} + +/* + * Routines to calculate L3/L4 checksums in SW. + * Pretty similar to ones from DPDK librte_net/rte_ip.h, + * but provide better performance (at least for tested configurations), + * and extended functionality. + * Consider to move them into DPDK librte_net/rte_ip.h. + */ + +/* make compiler to generate: add %r1, %r2; adc $0, %r1. */ +#define CKSUM_ADD_CARRY(s, v) do { \ + (s) += (v); \ + (s) = ((s) < (v)) ? (s) + 1 : (s); \ +} while (0) + +/** + * Process the non-complemented checksum of a buffer. + * Similar to rte_raw_cksum(), but provide better performance + * (at least on IA platforms). + * @param buf + * Pointer to the buffer. + * @param size + * Length of the buffer. + * @return + * The non-complemented checksum. + */ +static inline uint16_t +__raw_cksum(const uint8_t *buf, uint32_t size) +{ + uint64_t s, sum; + uint32_t i, n; + uint32_t dw1, dw2; + uint16_t w1, w2; + const uint64_t *b; + + b = (const uint64_t *)buf; + n = size / sizeof(*b); + sum = 0; + + /* main loop, consume 8 bytes per iteration. */ + for (i = 0; i != n; i++) { + s = b[i]; + CKSUM_ADD_CARRY(sum, s); + } + + /* consume the remainder. */ + n = size % sizeof(*b); + if (n != 0) { + /* position of the of last 8 bytes of data. */ + b = (const uint64_t *)((uintptr_t)(b + i) + n - sizeof(*b)); + /* calculate shift amount. */ + n = (sizeof(*b) - n) * CHAR_BIT; + s = b[0] >> n; + CKSUM_ADD_CARRY(sum, s); + } + + /* reduce to 16 bits */ + dw1 = sum; + dw2 = sum >> 32; + CKSUM_ADD_CARRY(dw1, dw2); + w1 = dw1; + w2 = dw1 >> 16; + CKSUM_ADD_CARRY(w1, w2); + return w1; +} + +/** + * Process UDP or TCP checksum over possibly multi-segmented packet. + * @param mb + * The pointer to the mbuf with the packet. + * @param l4_ofs + * Offset to the beginning of the L4 header (should be in first segment). + * @param cksum + * Already pre-calculated pseudo-header checksum value. + * @return + * The complemented checksum. + */ +static inline uint32_t +__udptcp_mbuf_cksum(const struct rte_mbuf *mb, uint16_t l4_ofs, + uint32_t cksum) +{ + uint32_t dlen, i, plen; + const struct rte_mbuf *ms; + const void *data; + + plen = rte_pktmbuf_pkt_len(mb); + ms = mb; + + for (i = l4_ofs; i < plen && ms != NULL; i += dlen) { + data = rte_pktmbuf_mtod_offset(ms, const void *, l4_ofs); + dlen = rte_pktmbuf_data_len(ms) - l4_ofs; + cksum += __raw_cksum(data, dlen); + ms = ms->next; + l4_ofs = 0; + } + + cksum = ((cksum & 0xffff0000) >> 16) + (cksum & 0xffff); + cksum = (~cksum) & 0xffff; + if (cksum == 0) + cksum = 0xffff; + + return cksum; +} + +/** + * Process the pseudo-header checksum of an IPv4 header. + * + * Depending on the ol_flags, the pseudo-header checksum expected by the + * drivers is not the same. For instance, when TSO is enabled, the IP + * payload length must not be included in the packet. + * + * When ol_flags is 0, it computes the standard pseudo-header checksum. + * + * @param ipv4_hdr + * The pointer to the contiguous IPv4 header. + * @param ipv4_len + * Length of the IPv4 header. + * @param ol_flags + * The ol_flags of the associated mbuf. + * @return + * The non-complemented checksum to set in the L4 header. + */ +static inline uint16_t +_ipv4x_phdr_cksum(const struct ipv4_hdr *ipv4_hdr, size_t ipv4h_len, + uint64_t ol_flags) +{ + uint32_t s0, s1; + + s0 = ipv4_hdr->src_addr; + s1 = ipv4_hdr->dst_addr; + CKSUM_ADD_CARRY(s0, s1); + + if (ol_flags & PKT_TX_TCP_SEG) + s1 = 0; + else + s1 = rte_cpu_to_be_16( + (uint16_t)(rte_be_to_cpu_16(ipv4_hdr->total_length) - + ipv4h_len)); + + s1 += rte_cpu_to_be_16(ipv4_hdr->next_proto_id); + CKSUM_ADD_CARRY(s0, s1); + + return __rte_raw_cksum_reduce(s0); +} + +/** + * Process the IPv4 UDP or TCP checksum. + * + * @param mb + * The pointer to the IPv4 packet. + * @param l4_ofs + * Offset to the beginning of the L4 header (should be in first segment). + * @param ipv4_hdr + * The pointer to the contiguous IPv4 header. + * @return + * The complemented checksum to set in the IP packet. + */ +static inline int +_ipv4_udptcp_mbuf_cksum(const struct rte_mbuf *mb, uint16_t l4_ofs, + const struct ipv4_hdr *ipv4_hdr) +{ + uint32_t cksum; + + cksum = _ipv4x_phdr_cksum(ipv4_hdr, mb->l3_len, 0); + cksum = __udptcp_mbuf_cksum(mb, l4_ofs, cksum); + + return cksum; +} + +/** + * Process the IPv6 UDP or TCP checksum. + * + * @param mb + * The pointer to the IPv6 packet. + * @param l4_ofs + * Offset to the beginning of the L4 header (should be in first segment). + * @param ipv6_hdr + * The pointer to the contiguous IPv6 header. + * @return + * The complemented checksum to set in the IP packet. + */ +static inline int +_ipv6_udptcp_mbuf_cksum(const struct rte_mbuf *mb, uint16_t l4_ofs, + const struct ipv6_hdr *ipv6_hdr) +{ + uint32_t cksum; + + cksum = rte_ipv6_phdr_cksum(ipv6_hdr, 0); + cksum = __udptcp_mbuf_cksum(mb, l4_ofs, cksum); + + return cksum; +} + +static inline uint16_t +_ipv4x_cksum(const void *iph, size_t len) +{ + uint16_t cksum; + + cksum = __raw_cksum(iph, len); + return (cksum == 0xffff) ? cksum : ~cksum; +} + +static inline int +check_pkt_csum(const struct rte_mbuf *m, uint64_t ol_flags, uint32_t type, + uint32_t proto) +{ + const struct ipv4_hdr *l3h4; + const struct ipv6_hdr *l3h6; + const struct udp_hdr *l4h; + int32_t ret; + uint16_t csum; + + ret = 0; + l3h4 = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, m->l2_len); + l3h6 = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, m->l2_len); + + if ((ol_flags & PKT_RX_IP_CKSUM_BAD) != 0) { + csum = _ipv4x_cksum(l3h4, m->l3_len); + ret = (csum != UINT16_MAX); + } + + if (ret == 0 && (ol_flags & PKT_RX_L4_CKSUM_BAD) != 0) { + + /* + * for IPv4 it is allowed to have zero UDP cksum, + * for IPv6 valid UDP cksum is mandatory. + */ + if (type == TLE_V4) { + l4h = (const struct udp_hdr *)((uintptr_t)l3h4 + + m->l3_len); + csum = (proto == IPPROTO_UDP && l4h->dgram_cksum == 0) ? + UINT16_MAX : _ipv4_udptcp_mbuf_cksum(m, + m->l2_len + m->l3_len, l3h4); + } else + csum = _ipv6_udptcp_mbuf_cksum(m, + m->l2_len + m->l3_len, l3h6); + + ret = (csum != UINT16_MAX); + } + + return ret; +} + +/* + * Analog of read-write locks, very much in favour of read side. + * Assumes, that there are no more then INT32_MAX concurrent readers. + * Consider to move into DPDK librte_eal. + */ + +static inline int +rwl_try_acquire(rte_atomic32_t *p) +{ + return rte_atomic32_add_return(p, 1); +} + +static inline void +rwl_release(rte_atomic32_t *p) +{ + rte_atomic32_sub(p, 1); +} + +static inline int +rwl_acquire(rte_atomic32_t *p) +{ + int32_t rc; + + rc = rwl_try_acquire(p); + if (rc < 0) + rwl_release(p); + return rc; +} + +static inline void +rwl_down(rte_atomic32_t *p) +{ + while (rte_atomic32_cmpset((volatile uint32_t *)p, 0, INT32_MIN) == 0) + rte_pause(); +} + +static inline void +rwl_up(rte_atomic32_t *p) +{ + rte_atomic32_sub(p, INT32_MIN); +} + +/* exclude NULLs from the final list of packets. */ +static inline uint32_t +compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero) +{ + uint32_t i, j, k, l; + + for (j = nb_pkt; nb_zero != 0 && j-- != 0; ) { + + /* found a hole. */ + if (pkt[j] == NULL) { + + /* find how big is it. */ + for (i = j; i-- != 0 && pkt[i] == NULL; ) + ; + /* fill the hole. */ + for (k = j + 1, l = i + 1; k != nb_pkt; k++, l++) + pkt[l] = pkt[k]; + + nb_pkt -= j - i; + nb_zero -= j - i; + j = i + 1; + } + } + + return nb_pkt; +} + +/* empty ring and free queued mbufs */ +static inline void +empty_mbuf_ring(struct rte_ring *r) +{ + uint32_t i, n; + struct rte_mbuf *mb[MAX_PKT_BURST]; + + do { + n = rte_ring_dequeue_burst(r, (void **)mb, RTE_DIM(mb)); + for (i = 0; i != n; i++) + rte_pktmbuf_free(mb[i]); + } while (n != 0); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _MISC_H_ */ diff --git a/lib/libtle_l4p/net_misc.h b/lib/libtle_l4p/net_misc.h new file mode 100644 index 0000000..2d8dac2 --- /dev/null +++ b/lib/libtle_l4p/net_misc.h @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _NET_MISC_H_ +#define _NET_MISC_H_ + +#include +#include +#include "osdep.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define PKT_L234_HLEN(m) (_tx_offload_l4_offset(m->tx_offload)) +#define PKT_L4_PLEN(m) ((m)->pkt_len - PKT_L234_HLEN(m)) + +/* + * Some network protocols related structures definitions. + * Main purpose to simplify (and optimise) processing and representation + * of protocol related data. + */ + +enum { + TLE_V4, + TLE_V6, + TLE_VNUM +}; + +extern const struct in6_addr tle_ipv6_any; +extern const struct in6_addr tle_ipv6_none; + +union l4_ports { + uint32_t raw; + struct { + uint16_t src; + uint16_t dst; + }; +}; + +union ipv4_addrs { + uint64_t raw; + struct { + uint32_t src; + uint32_t dst; + }; +}; + +union ipv6_addrs { + _ymm_t raw; + struct { + rte_xmm_t src; + rte_xmm_t dst; + }; +}; + +union ip_addrs { + union ipv4_addrs v4; + union ipv6_addrs v6; +}; + +#ifdef __cplusplus +} +#endif + +#endif /* _NET_MISC_H_ */ diff --git a/lib/libtle_l4p/osdep.h b/lib/libtle_l4p/osdep.h new file mode 100644 index 0000000..ed7e883 --- /dev/null +++ b/lib/libtle_l4p/osdep.h @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _OSDEP_H_ +#define _OSDEP_H_ + +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * internal defines. + */ +#define MAX_PKT_BURST 0x20 + +#define MAX_DRB_BURST 4 + +/* + * logging related macros. + */ + +#define UDP_LOG(lvl, fmt, args...) RTE_LOG(lvl, USER1, fmt, ##args) + +#define TCP_LOG(lvl, fmt, args...) RTE_LOG(lvl, USER1, fmt, ##args) + +/* + * if no AVX support, define _ymm_t here. + */ + +#ifdef __AVX__ + +#define _ymm_t rte_ymm_t + +#else + +#define YMM_SIZE (2 * sizeof(rte_xmm_t)) +#define YMM_MASK (YMM_SIZE - 1) + +typedef union _ymm { + xmm_t x[YMM_SIZE / sizeof(xmm_t)]; + uint8_t u8[YMM_SIZE / sizeof(uint8_t)]; + uint16_t u16[YMM_SIZE / sizeof(uint16_t)]; + uint32_t u32[YMM_SIZE / sizeof(uint32_t)]; + uint64_t u64[YMM_SIZE / sizeof(uint64_t)]; + double pd[YMM_SIZE / sizeof(double)]; +} _ymm_t; + +#endif /* __AVX__ */ + +#include "debug.h" + +#ifdef __cplusplus +} +#endif + +#endif /* _OSDEP_H_ */ diff --git a/lib/libtle_l4p/port_bitmap.h b/lib/libtle_l4p/port_bitmap.h new file mode 100644 index 0000000..c0420d5 --- /dev/null +++ b/lib/libtle_l4p/port_bitmap.h @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _PORT_BITMAP_H_ +#define _PORT_BITMAP_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Simple implementation of bitmap for all possible L4 ports [0-UINT16_MAX]. + */ + +#define MAX_PORT_NUM (UINT16_MAX + 1) + +#define PORT_BLK(p) ((p) / (sizeof(uint32_t) * CHAR_BIT)) +#define PORT_IDX(p) ((p) % (sizeof(uint32_t) * CHAR_BIT)) + +#define MAX_PORT_BLK PORT_BLK(MAX_PORT_NUM) + +struct tle_pbm { + uint32_t nb_set; /* number of bits set. */ + uint32_t blk; /* last block with free entry. */ + uint32_t bm[MAX_PORT_BLK]; +}; + +static inline void +tle_pbm_init(struct tle_pbm *pbm, uint32_t blk) +{ + pbm->bm[0] = 1; + pbm->nb_set = 1; + pbm->blk = blk; +} + +static inline void +tle_pbm_set(struct tle_pbm *pbm, uint16_t port) +{ + uint32_t i, b, v; + + i = PORT_BLK(port); + b = 1 << PORT_IDX(port); + v = pbm->bm[i]; + pbm->bm[i] = v | b; + pbm->nb_set += (v & b) == 0; +} + +static inline void +tle_pbm_clear(struct tle_pbm *pbm, uint16_t port) +{ + uint32_t i, b, v; + + i = PORT_BLK(port); + b = 1 << PORT_IDX(port); + v = pbm->bm[i]; + pbm->bm[i] = v & ~b; + pbm->nb_set -= (v & b) != 0; +} + + +static inline uint32_t +tle_pbm_check(const struct tle_pbm *pbm, uint16_t port) +{ + uint32_t i, v; + + i = PORT_BLK(port); + v = pbm->bm[i] >> PORT_IDX(port); + return v & 1; +} + +static inline uint16_t +tle_pbm_find_range(struct tle_pbm *pbm, uint32_t start_blk, uint32_t end_blk) +{ + uint32_t i, v; + uint16_t p; + + if (pbm->nb_set == MAX_PORT_NUM) + return 0; + + p = 0; + for (i = start_blk; i != end_blk; i++) { + i %= RTE_DIM(pbm->bm); + v = pbm->bm[i]; + if (v != UINT32_MAX) { + for (p = i * (sizeof(pbm->bm[0]) * CHAR_BIT); + (v & 1) != 0; v >>= 1, p++) + ; + + pbm->blk = i; + break; + } + } + return p; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _PORT_BITMAP_H_ */ diff --git a/lib/libtle_l4p/stream.h b/lib/libtle_l4p/stream.h new file mode 100644 index 0000000..f3b5828 --- /dev/null +++ b/lib/libtle_l4p/stream.h @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _STREAM_H_ +#define _STREAM_H_ + +#include "ctx.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Common structure that must be present as first field in all partcular + * L4 (UDP/TCP, etc.) stream implementations. + */ +struct tle_stream { + + STAILQ_ENTRY(tle_stream) link; + struct tle_ctx *ctx; + + uint8_t type; /* TLE_V4 or TLE_V6 */ + + /* Stream address information. */ + union l4_ports port; + union l4_ports pmsk; + + union { + struct { + union ipv4_addrs addr; + union ipv4_addrs mask; + } ipv4; + struct { + union ipv6_addrs addr; + union ipv6_addrs mask; + } ipv6; + }; +}; + +static inline uint32_t +get_streams(struct tle_ctx *ctx, struct tle_stream *s[], uint32_t num) +{ + struct tle_stream *p; + uint32_t i, n; + + rte_spinlock_lock(&ctx->streams.lock); + + n = RTE_MIN(ctx->streams.nb_free, num); + for (i = 0, p = STAILQ_FIRST(&ctx->streams.free); + i != n; + i++, p = STAILQ_NEXT(p, link)) + s[i] = p; + + if (p == NULL) + /* we retrieved all free entries */ + STAILQ_INIT(&ctx->streams.free); + else + STAILQ_FIRST(&ctx->streams.free) = p; + + ctx->streams.nb_free -= n; + rte_spinlock_unlock(&ctx->streams.lock); + return n; +} + +static inline struct tle_stream * +get_stream(struct tle_ctx *ctx) +{ + struct tle_stream *s; + + s = NULL; + if (ctx->streams.nb_free == 0) + return s; + + get_streams(ctx, &s, 1); + return s; +} + +static inline void +put_stream(struct tle_ctx *ctx, struct tle_stream *s, int32_t head) +{ + s->type = TLE_VNUM; + rte_spinlock_lock(&ctx->streams.lock); + if (head != 0) + STAILQ_INSERT_HEAD(&ctx->streams.free, s, link); + else + STAILQ_INSERT_TAIL(&ctx->streams.free, s, link); + ctx->streams.nb_free++; + rte_spinlock_unlock(&ctx->streams.lock); +} + +/* calculate number of drbs per stream. */ +static inline uint32_t +calc_stream_drb_num(const struct tle_ctx *ctx, uint32_t obj_num) +{ + uint32_t num; + + num = (ctx->prm.max_stream_sbufs + obj_num - 1) / obj_num; + num = num + num / 2; + num = RTE_MAX(num, RTE_DIM(ctx->dev) + 1); + return num; +} + +static inline uint32_t +drb_nb_elem(const struct tle_ctx *ctx) +{ + return (ctx->prm.send_bulk_size != 0) ? + ctx->prm.send_bulk_size : MAX_PKT_BURST; +} + +static inline int32_t +stream_get_dest(struct tle_stream *s, const void *dst_addr, + struct tle_dest *dst) +{ + int32_t rc; + const struct in_addr *d4; + const struct in6_addr *d6; + struct tle_ctx *ctx; + struct tle_dev *dev; + + ctx = s->ctx; + + /* it is here just to keep gcc happy. */ + d4 = NULL; + + if (s->type == TLE_V4) { + d4 = dst_addr; + rc = ctx->prm.lookup4(ctx->prm.lookup4_data, d4, dst); + } else if (s->type == TLE_V6) { + d6 = dst_addr; + rc = ctx->prm.lookup6(ctx->prm.lookup6_data, d6, dst); + } else + rc = -ENOENT; + + if (rc < 0 || dst->dev == NULL || dst->dev->ctx != ctx) + return -ENOENT; + + dev = dst->dev; + if (s->type == TLE_V4) { + struct ipv4_hdr *l3h; + l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len); + l3h->src_addr = dev->prm.local_addr4.s_addr; + l3h->dst_addr = d4->s_addr; + } else { + struct ipv6_hdr *l3h; + l3h = (struct ipv6_hdr *)(dst->hdr + dst->l2_len); + rte_memcpy(l3h->src_addr, &dev->prm.local_addr6, + sizeof(l3h->src_addr)); + rte_memcpy(l3h->dst_addr, d6, sizeof(l3h->dst_addr)); + } + + return dev - ctx->dev; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _STREAM_H_ */ diff --git a/lib/libtle_l4p/stream_table.c b/lib/libtle_l4p/stream_table.c new file mode 100644 index 0000000..5a89553 --- /dev/null +++ b/lib/libtle_l4p/stream_table.c @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +#include "stream_table.h" + +void +stbl_fini(struct stbl *st) +{ + uint32_t i; + + for (i = 0; i != RTE_DIM(st->ht); i++) { + rte_hash_free(st->ht[i].t); + rte_free(st->ht[i].ent); + } + + memset(st, 0, sizeof(*st)); +} + +int +stbl_init(struct stbl *st, uint32_t num, int32_t socket) +{ + int32_t rc; + size_t i, sz; + struct rte_hash_parameters hprm; + char buf[RTE_HASH_NAMESIZE]; + + num = RTE_MAX(5 * num / 4, 0x10U); + + memset(&hprm, 0, sizeof(hprm)); + hprm.name = buf; + hprm.entries = num; + hprm.socket_id = socket; + + rc = 0; + + snprintf(buf, sizeof(buf), "stbl4@%p", st); + hprm.key_len = sizeof(struct stbl4_key); + st->ht[TLE_V4].t = rte_hash_create(&hprm); + if (st->ht[TLE_V4].t == NULL) + rc = (rte_errno != 0) ? -rte_errno : -ENOMEM; + + if (rc == 0) { + snprintf(buf, sizeof(buf), "stbl6@%p", st); + hprm.key_len = sizeof(struct stbl6_key); + st->ht[TLE_V6].t = rte_hash_create(&hprm); + if (st->ht[TLE_V6].t == NULL) + rc = (rte_errno != 0) ? -rte_errno : -ENOMEM; + } + + for (i = 0; i != RTE_DIM(st->ht) && rc == 0; i++) { + + sz = sizeof(*st->ht[i].ent) * num; + st->ht[i].ent = rte_zmalloc_socket(NULL, sz, + RTE_CACHE_LINE_SIZE, socket); + if (st->ht[i].ent == NULL) + rc = -ENOMEM; + else + st->ht[i].nb_ent = num; + } + + if (rc != 0) + stbl_fini(st); + + return rc; +} diff --git a/lib/libtle_l4p/stream_table.h b/lib/libtle_l4p/stream_table.h new file mode 100644 index 0000000..8ad1103 --- /dev/null +++ b/lib/libtle_l4p/stream_table.h @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _STREAM_TABLE_H_ +#define _STREAM_TABLE_H_ + +#include +#include "tcp_misc.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* current stbl entry contains packet. */ +#define STE_PKT 1 + +struct stbl_entry { + void *data; +}; + +struct shtbl { + uint32_t nb_ent; /* max number of entries in the table. */ + rte_spinlock_t l; /* lock to protect the hash table */ + struct rte_hash *t; + struct stbl_entry *ent; +} __rte_cache_aligned; + +struct stbl { + struct shtbl ht[TLE_VNUM]; +}; + +struct stbl4_key { + union l4_ports port; + union ipv4_addrs addr; +} __attribute__((__packed__)); + +struct stbl6_key { + union l4_ports port; + union ipv6_addrs addr; +} __attribute__((__packed__)); + +struct stbl_key { + union l4_ports port; + union { + union ipv4_addrs addr4; + union ipv6_addrs addr6; + }; +} __attribute__((__packed__)); + +extern void stbl_fini(struct stbl *st); + +extern int stbl_init(struct stbl *st, uint32_t num, int32_t socket); + +static inline void +stbl_pkt_fill_key(struct stbl_key *k, const union pkt_info *pi, uint32_t type) +{ + static const struct stbl_key zero = { + .port.raw = 0, + }; + + k->port = pi->port; + if (type == TLE_V4) + k->addr4 = pi->addr4; + else if (type == TLE_V6) + k->addr6 = *pi->addr6; + else + *k = zero; +} + +static inline void +stbl_lock(struct stbl *st, uint32_t type) +{ + rte_spinlock_lock(&st->ht[type].l); +} + +static inline void +stbl_unlock(struct stbl *st, uint32_t type) +{ + rte_spinlock_unlock(&st->ht[type].l); +} + +static inline struct stbl_entry * +stbl_add_entry(struct stbl *st, const union pkt_info *pi) +{ + int32_t rc; + uint32_t type; + struct shtbl *ht; + struct stbl_key k; + + type = pi->tf.type; + stbl_pkt_fill_key(&k, pi, type); + ht = st->ht + type; + + rc = rte_hash_add_key(ht->t, &k); + if ((uint32_t)rc >= ht->nb_ent) + return NULL; + return ht->ent + rc; +} + +static inline struct stbl_entry * +stbl_add_pkt(struct stbl *st, const union pkt_info *pi, const void *pkt) +{ + struct stbl_entry *se; + + se = stbl_add_entry(st, pi); + if (se != NULL) + se->data = (void *)((uintptr_t)pkt | STE_PKT); + return se; +} + +static inline struct stbl_entry * +stbl_find_entry(struct stbl *st, const union pkt_info *pi) +{ + int32_t rc; + uint32_t type; + struct shtbl *ht; + struct stbl_key k; + + type = pi->tf.type; + stbl_pkt_fill_key(&k, pi, type); + ht = st->ht + type; + + rc = rte_hash_lookup(ht->t, &k); + if ((uint32_t)rc >= ht->nb_ent) + return NULL; + return ht->ent + rc; +} + +static inline int +stbl_data_pkt(const void *p) +{ + return ((uintptr_t)p & STE_PKT); +} + +static inline void * +stbl_get_pkt(const struct stbl_entry *se) +{ + return (void *)((uintptr_t)se->data ^ STE_PKT); +} + +static inline void * +stbl_find_data(struct stbl *st, const union pkt_info *pi) +{ + struct stbl_entry *ent; + + ent = stbl_find_entry(st, pi); + return (ent == NULL) ? NULL : ent->data; +} + +static inline void +stbl_del_pkt(struct stbl *st, struct stbl_entry *se, const union pkt_info *pi) +{ + uint32_t type; + struct stbl_key k; + + se->data = NULL; + + type = pi->tf.type; + stbl_pkt_fill_key(&k, pi, type); + rte_hash_del_key(st->ht[type].t, &k); +} + +static inline void +stbl_del_pkt_lock(struct stbl *st, struct stbl_entry *se, + const union pkt_info *pi) +{ + uint32_t type; + struct stbl_key k; + + se->data = NULL; + + type = pi->tf.type; + stbl_pkt_fill_key(&k, pi, type); + stbl_lock(st, type); + rte_hash_del_key(st->ht[type].t, &k); + stbl_unlock(st, type); +} + +#include "tcp_stream.h" + +static inline void +stbl_stream_fill_key(struct stbl_key *k, const struct tle_stream *s, + uint32_t type) +{ + static const struct stbl_key zero = { + .port.raw = 0, + }; + + k->port = s->port; + if (type == TLE_V4) + k->addr4 = s->ipv4.addr; + else if (type == TLE_V6) + k->addr6 = s->ipv6.addr; + else + *k = zero; +} + +static inline struct stbl_entry * +stbl_add_stream_lock(struct stbl *st, const struct tle_tcp_stream *s) +{ + uint32_t type; + struct stbl_key k; + struct stbl_entry *se; + struct shtbl *ht; + int32_t rc; + + type = s->s.type; + stbl_stream_fill_key(&k, &s->s, type); + ht = st->ht + type; + + stbl_lock(st, type); + rc = rte_hash_add_key(ht->t, &k); + stbl_unlock(st, type); + + if ((uint32_t)rc >= ht->nb_ent) + return NULL; + + se = ht->ent + rc; + if (se != NULL) + se->data = (void *)(uintptr_t)s; + + return se; +} + +static inline void +stbl_del_stream_lock(struct stbl *st, struct stbl_entry *se, + const struct tle_tcp_stream *s) +{ + uint32_t type; + struct stbl_key k; + + if (se == NULL) + return; + + se->data = NULL; + + type = s->s.type; + stbl_stream_fill_key(&k, &s->s, type); + stbl_lock(st, type); + rte_hash_del_key(st->ht[type].t, &k); + stbl_unlock(st, type); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _STREAM_TABLE_H_ */ diff --git a/lib/libtle_l4p/syncookie.h b/lib/libtle_l4p/syncookie.h new file mode 100644 index 0000000..276d45a --- /dev/null +++ b/lib/libtle_l4p/syncookie.h @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _SYNCOOKIE_H_ +#define _SYNCOOKIE_H_ + +#include "tcp_misc.h" +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#define SYNC_SEED0 0x736f6d65 +#define SYNC_SEED1 0x646f7261 + +struct sync_in4 { + uint32_t seq; + union l4_ports port; + union ipv4_addrs addr; +}; + +static const rte_xmm_t mss4len = { + .u32 = { + TCP4_MIN_MSS, /* 536 */ + 1300, + TCP4_OP_MSS, /* 1440 */ + TCP4_NOP_MSS, /* 1460 */ + }, +}; + +static const rte_xmm_t mss6len = { + .u32 = { + TCP6_MIN_MSS, /* 1220 */ + TCP6_OP_MSS, /* 1420 */ + TCP6_NOP_MSS, /* 1440 */ + 8940, + }, +}; + +#define SYNC_MSS_BITS 2 +#define SYNC_MSS_MASK ((1 << SYNC_MSS_BITS) - 1) + +#define SYNC_TMS_WSCALE_BITS 4 +#define SYNC_TMS_WSCALE_MASK ((1 << SYNC_TMS_WSCALE_BITS) - 1) + +#define SYNC_TMS_RESERVE_BITS 2 + +#define SYNC_TMS_OPT_BITS (SYNC_TMS_WSCALE_BITS + SYNC_TMS_RESERVE_BITS) +#define SYNC_TMS_OPT_MASK ((1 << SYNC_TMS_OPT_BITS) - 1) + +/* allow around 2 minutes for 3-way handshake. */ +#define SYNC_MAX_TMO 0x20000 + + +/* ??? use SipHash as FreeBSD does. ??? */ +static inline uint32_t +sync_hash4(const union pkt_info *pi, uint32_t seq) +{ + uint32_t v0, v1; + struct sync_in4 in4; + + in4.seq = seq; + in4.port = pi->port; + in4.addr = pi->addr4; + + v0 = SYNC_SEED0; + v1 = SYNC_SEED1; + rte_jhash_32b_2hashes(&in4.seq, sizeof(in4) / sizeof(uint32_t), + &v0, &v1); + return v0 + v1; +} + +static inline uint32_t +sync_hash6(const union pkt_info *pi, uint32_t seq) +{ + uint32_t v0, v1; + + v0 = SYNC_SEED0; + v1 = SYNC_SEED1; + rte_jhash_32b_2hashes(pi->addr6->raw.u32, + sizeof(*pi->addr6) / sizeof(uint32_t), &v0, &v1); + return rte_jhash_3words(v0, seq, pi->port.raw, v1); +} + +static inline uint32_t +sync_mss2idx(uint16_t mss, const rte_xmm_t *msl) +{ + if (mss >= msl->u32[2]) + return (mss >= msl->u32[3]) ? 3 : 2; + else + return (mss >= msl->u32[1]) ? 1 : 0; +} + +static inline uint32_t +sync_gen_seq(const union pkt_info *pi, uint32_t seq, uint32_t ts, uint16_t mss) +{ + uint32_t h, mi; + + if (pi->tf.type == TLE_V4) { + h = sync_hash4(pi, seq); + mi = sync_mss2idx(mss, &mss4len); + } else { + h = sync_hash6(pi, seq); + mi = sync_mss2idx(mss, &mss6len); + } + + h += (ts & ~SYNC_MSS_MASK) | mi; + return h; +} + +static inline uint32_t +sync_gen_ts(uint32_t ts, uint32_t wscale) +{ + ts = (ts - (SYNC_TMS_OPT_MASK + 1)) & ~SYNC_TMS_OPT_MASK; + ts |= wscale; + return ts; +} + +static inline int +sync_check_ack(const union pkt_info *pi, uint32_t seq, uint32_t ack, + uint32_t ts) +{ + uint32_t h, mi, pts; + + h = (pi->tf.type == TLE_V4) ? sync_hash4(pi, seq) : sync_hash6(pi, seq); + + h = ack - h; + pts = h & ~SYNC_MSS_MASK; + mi = h & SYNC_MSS_MASK; + + if (ts - pts > SYNC_MAX_TMO) + return -ERANGE; + + return (pi->tf.type == TLE_V4) ? mss4len.u32[mi] : mss6len.u32[mi]; +} + +static inline void +sync_get_opts(struct syn_opts *so, uintptr_t p, uint32_t len) +{ + so->ts = get_tms_opts(p, len); + so->wscale = so->ts.ecr & SYNC_TMS_WSCALE_MASK; +} + +static inline void +sync_fill_tcb(struct tcb *tcb, const union seg_info *si, + const struct rte_mbuf *mb) +{ + const struct tcp_hdr *th; + + th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *, + mb->l2_len + mb->l3_len); + + tcb->rcv.nxt = si->seq; + tcb->rcv.irs = si->seq - 1; + + tcb->snd.nxt = si->ack; + tcb->snd.una = si->ack; + tcb->snd.iss = si->ack - 1; + tcb->snd.rcvr = tcb->snd.iss; + + tcb->snd.wu.wl1 = si->seq; + tcb->snd.wu.wl2 = si->ack; + + get_syn_opts(&tcb->so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th)); + + tcb->snd.wscale = tcb->so.wscale; + tcb->snd.mss = tcb->so.mss; + tcb->snd.wnd = si->wnd << tcb->snd.wscale; + + tcb->snd.ts = tcb->so.ts.ecr; + tcb->rcv.ts = tcb->so.ts.val; + + tcb->rcv.wscale = (tcb->so.wscale == TCP_WSCALE_NONE) ? + TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _STREAM_TABLE_H_ */ diff --git a/lib/libtle_l4p/tcp_ctl.h b/lib/libtle_l4p/tcp_ctl.h new file mode 100644 index 0000000..dcb9c3e --- /dev/null +++ b/lib/libtle_l4p/tcp_ctl.h @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Some helper stream control functions definitions. + */ + +#ifndef _TCP_CTL_H_ +#define _TCP_CTL_H_ + +#include "tcp_stream.h" +#include "tcp_ofo.h" + +#ifdef __cplusplus +extern "C" { +#endif + +static inline void +tcp_stream_down(struct tle_tcp_stream *s) +{ + rwl_down(&s->rx.use); + rwl_down(&s->tx.use); +} + +static inline void +tcp_stream_up(struct tle_tcp_stream *s) +{ + rwl_up(&s->rx.use); + rwl_up(&s->tx.use); +} + +/* empty stream's receive queue */ +static void +empty_rq(struct tle_tcp_stream *s) +{ + empty_mbuf_ring(s->rx.q); + tcp_ofo_reset(s->rx.ofo); +} + +/* empty stream's listen queue */ +static void +empty_lq(struct tle_tcp_stream *s, struct stbl *st) +{ + uint32_t i, n; + struct rte_mbuf *mb; + union pkt_info pi; + union seg_info si; + struct stbl_entry *se[MAX_PKT_BURST]; + + do { + n = rte_ring_dequeue_burst(s->rx.q, (void **)se, RTE_DIM(se)); + for (i = 0; i != n; i++) { + mb = stbl_get_pkt(se[i]); + get_pkt_info(mb, &pi, &si); + stbl_del_pkt_lock(st, se[i], &pi); + rte_pktmbuf_free(mb); + } + } while (n != 0); +} + +static inline void +tcp_stream_reset(struct tle_ctx *ctx, struct tle_tcp_stream *s) +{ + struct stbl *st; + uint16_t uop; + + st = CTX_TCP_STLB(ctx); + + /* reset TX armed */ + rte_atomic32_set(&s->tx.arm, 0); + + /* reset TCB */ + uop = s->tcb.uop & (TCP_OP_LISTEN | TCP_OP_CONNECT); + memset(&s->tcb, 0, sizeof(s->tcb)); + + /* reset cached destination */ + memset(&s->tx.dst, 0, sizeof(s->tx.dst)); + + if (uop != 0) { + /* free stream's destination port */ + stream_clear_ctx(ctx, &s->s); + if (uop == TCP_OP_LISTEN) + empty_lq(s, st); + } + + if (s->ste != NULL) { + /* remove entry from RX streams table */ + stbl_del_stream_lock(st, s->ste, s); + s->ste = NULL; + empty_rq(s); + } + + /* empty TX queue */ + empty_mbuf_ring(s->tx.q); + + /* + * mark the stream as free again. + * if there still are pkts queued for TX, + * then put this stream to the tail of free list. + */ + put_stream(ctx, &s->s, TCP_STREAM_TX_FINISHED(s)); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _TCP_CTL_H_ */ diff --git a/lib/libtle_l4p/tcp_misc.h b/lib/libtle_l4p/tcp_misc.h new file mode 100644 index 0000000..beb6699 --- /dev/null +++ b/lib/libtle_l4p/tcp_misc.h @@ -0,0 +1,462 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TCP_MISC_H_ +#define _TCP_MISC_H_ + +#include "net_misc.h" +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * TCP protocols related structures/functions definitions. + * Main purpose to simplify (and optimise) processing and representation + * of protocol related data. + */ + +#define TCP_WSCALE_DEFAULT 7 +#define TCP_WSCALE_NONE 0 + +#define TCP_TX_HDR_MAX (sizeof(struct tcp_hdr) + TCP_TX_OPT_LEN_MAX) + +/* max header size for normal data+ack packet */ +#define TCP_TX_HDR_DACK (sizeof(struct tcp_hdr) + TCP_TX_OPT_LEN_TMS) + +#define TCP4_MIN_MSS 536 + +#define TCP6_MIN_MSS 1220 + +/* default MTU, no TCP options. */ +#define TCP4_NOP_MSS \ + (ETHER_MTU - sizeof(struct ipv4_hdr) - sizeof(struct tcp_hdr)) + +#define TCP6_NOP_MSS \ + (ETHER_MTU - sizeof(struct ipv6_hdr) - sizeof(struct tcp_hdr)) + +/* default MTU, TCP options present */ +#define TCP4_OP_MSS (TCP4_NOP_MSS - TCP_TX_OPT_LEN_MAX) + +#define TCP6_OP_MSS (TCP6_NOP_MSS - TCP_TX_OPT_LEN_MAX) + +/* + * TCP flags + */ +#define TCP_FLAG_FIN 0x01 +#define TCP_FLAG_SYN 0x02 +#define TCP_FLAG_RST 0x04 +#define TCP_FLAG_PSH 0x08 +#define TCP_FLAG_ACK 0x10 +#define TCP_FLAG_URG 0x20 + +/* TCP flags mask. */ +#define TCP_FLAG_MASK UINT8_MAX + +union typflg { + uint16_t raw; + struct { + uint8_t type; /* TLE_V4/TLE_V6 */ + uint8_t flags; /* TCP header flags */ + }; +}; + +union pkt_info { + rte_xmm_t raw; + struct { + union typflg tf; + uint16_t csf; /* checksum flags */ + union l4_ports port; + union { + union ipv4_addrs addr4; + const union ipv6_addrs *addr6; + }; + }; +}; + +union seg_info { + rte_xmm_t raw; + struct { + uint32_t seq; + uint32_t ack; + uint16_t hole1; + uint16_t wnd; + }; +}; + +union seqlen { + uint64_t raw; + struct { + uint32_t seq; + uint32_t len; + }; +}; + +#define TCP_DATA_ALIGN 4 + +#define TCP_DATA_OFFSET 4 + +/* + * recognizable options. + */ +#define TCP_OPT_KIND_EOL 0x00 +#define TCP_OPT_KIND_NOP 0x01 +#define TCP_OPT_KIND_MSS 0x02 +#define TCP_OPT_KIND_WSC 0x03 +#define TCP_OPT_KIND_TMS 0x08 + +#define TCP_OPT_LEN_EOL 0x01 +#define TCP_OPT_LEN_NOP 0x01 +#define TCP_OPT_LEN_MSS 0x04 +#define TCP_OPT_LEN_WSC 0x03 +#define TCP_OPT_LEN_TMS 0x0a + +#define TCP_TX_OPT_LEN_MAX \ + RTE_ALIGN_CEIL(TCP_OPT_LEN_MSS + TCP_OPT_LEN_WSC + TCP_OPT_LEN_TMS + \ + TCP_OPT_LEN_EOL, TCP_DATA_ALIGN) + +/* + * recomended format for TSOPT from RFC 1323, appendix A: + * +--------+--------+--------+--------+ + * | NOP | NOP | TSopt | 10 | + * +--------+--------+--------+--------+ + * | TSval timestamp | + * +--------+--------+--------+--------+ + * | TSecr timestamp | + * +--------+--------+--------+--------+ + */ +#define TCP_TX_OPT_LEN_TMS (TCP_OPT_LEN_TMS + 2 * TCP_OPT_LEN_NOP) + +#define TCP_OPT_TMS_HDR (rte_be_to_cpu_32( \ + TCP_OPT_KIND_NOP << 3 * CHAR_BIT | \ + TCP_OPT_KIND_NOP << 2 * CHAR_BIT | \ + TCP_OPT_KIND_TMS << CHAR_BIT | \ + TCP_OPT_LEN_TMS)) + +#define TCP_OPT_KL(k, l) (rte_be_to_cpu_16((k) << CHAR_BIT | (l))) + +#define TCP_OPT_KL_MSS TCP_OPT_KL(TCP_OPT_KIND_MSS, TCP_OPT_LEN_MSS) +#define TCP_OPT_KL_WSC TCP_OPT_KL(TCP_OPT_KIND_WSC, TCP_OPT_LEN_WSC) +#define TCP_OPT_KL_TMS TCP_OPT_KL(TCP_OPT_KIND_TMS, TCP_OPT_LEN_TMS) + +/* + * Timestamp option. + */ +union tsopt { + uint64_t raw; + struct { + uint32_t val; + uint32_t ecr; + }; +}; + +struct tcpopt { + union { + uint16_t raw; + struct { + uint8_t kind; + uint8_t len; + }; + } kl; + union { + uint16_t mss; + uint8_t wscale; + union tsopt ts; + }; +} __attribute__((__packed__)); + +struct syn_opts { + uint16_t mss; + uint8_t wscale; + union tsopt ts; +}; + +struct resp_info { + uint32_t flags; +}; + + +/* window update information (RFC 793 WL1, WL2) */ +union wui { + uint64_t raw; + union { + uint32_t wl1; + uint32_t wl2; + }; +}; + +/* + * helper structure: holds aggregated information about group + * of processed data+ack packets. + */ +struct dack_info { + struct { /* # of received segments with: */ + uint32_t data; /* incoming data */ + uint32_t ack; /* newly acked data */ + uint32_t dup; /* duplicate acks */ + uint32_t badseq; /* bad seq/ack */ + uint32_t ofo; /* OFO incoming data */ + } segs; + uint32_t ack; /* highest received ACK */ + union tsopt ts; /* TS of highest ACK */ + union wui wu; /* window update information */ + uint32_t wnd; + struct { /* 3 duplicate ACKs were observed after */ + uint32_t seg; /* # of meaningful ACK segments */ + uint32_t ack; /* ACK sequence */ + } dup3; +}; + +/* get current timestamp in ms */ +static inline uint32_t +tcp_get_tms(void) +{ + uint64_t ts, ms; + ms = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S; + ts = rte_get_tsc_cycles() / ms; + return ts; +} + +static inline int +tcp_seq_lt(uint32_t l, uint32_t r) +{ + return (int32_t)(l - r) < 0; +} + +static inline int +tcp_seq_leq(uint32_t l, uint32_t r) +{ + return (int32_t)(l - r) <= 0; +} + + +static inline void +get_seg_info(const struct tcp_hdr *th, union seg_info *si) +{ + __m128i v; + const __m128i bswap_mask = _mm_set_epi8(15, 14, 13, 12, 10, 11, 9, 8, + 4, 5, 6, 7, 0, 1, 2, 3); + + v = _mm_loadu_si128((const __m128i *)&th->sent_seq); + si->raw.x = _mm_shuffle_epi8(v, bswap_mask); +} + +static inline void +get_syn_opts(struct syn_opts *so, uintptr_t p, uint32_t len) +{ + uint32_t i, kind; + const struct tcpopt *opt; + + memset(so, 0, sizeof(*so)); + + i = 0; + while (i < len) { + opt = (const struct tcpopt *)(p + i); + kind = opt->kl.kind; + if (kind == TCP_OPT_KIND_EOL) + return; + else if (kind == TCP_OPT_KIND_NOP) + i += sizeof(opt->kl.kind); + else { + i += opt->kl.len; + if (i <= len) { + if (opt->kl.raw == TCP_OPT_KL_MSS) + so->mss = rte_be_to_cpu_16(opt->mss); + else if (opt->kl.raw == TCP_OPT_KL_WSC) + so->wscale = opt->wscale; + else if (opt->kl.raw == TCP_OPT_KL_TMS) { + so->ts.val = + rte_be_to_cpu_32(opt->ts.val); + so->ts.ecr = + rte_be_to_cpu_32(opt->ts.ecr); + } + } + } + } +} + +/* + * generates SYN options, assumes that there are + * at least TCP_TX_OPT_LEN_MAX bytes available. + */ +static inline void +fill_syn_opts(void *p, const struct syn_opts *so) +{ + uint8_t *to; + struct tcpopt *opt; + + to = (uint8_t *)p; + + /* setup MSS*/ + opt = (struct tcpopt *)to; + opt->kl.raw = TCP_OPT_KL_MSS; + opt->mss = rte_cpu_to_be_16(so->mss); + + to += TCP_OPT_LEN_MSS; + opt = (struct tcpopt *)to; + + /* setup TMS*/ + if (so->ts.val != 0) { + + opt->kl.raw = TCP_OPT_KL_TMS; + opt->ts.val = rte_cpu_to_be_32(so->ts.val); + opt->ts.ecr = rte_cpu_to_be_32(so->ts.ecr); + + to += TCP_OPT_LEN_TMS; + opt = (struct tcpopt *)to; + } + + /* setup TMS*/ + if (so->wscale != 0) { + + opt->kl.raw = TCP_OPT_KL_WSC; + opt->wscale = so->wscale; + + to += TCP_OPT_LEN_WSC; + opt = (struct tcpopt *)to; + } + + to[0] = TCP_OPT_KIND_EOL; +} + +/* + * generate TMS option, for non SYN packet, make sure + * there at least TCP_TX_OPT_LEN_TMS available. + */ +static inline void +fill_tms_opts(void *p, uint32_t val, uint32_t ecr) +{ + uint32_t *opt; + + opt = (uint32_t *)p; + opt[0] = TCP_OPT_TMS_HDR; + opt[1] = rte_cpu_to_be_32(val); + opt[2] = rte_cpu_to_be_32(ecr); +} + +static inline union tsopt +get_tms_opts(uintptr_t p, uint32_t len) +{ + union tsopt ts; + uint32_t i, kind; + const uint32_t *opt; + const struct tcpopt *to; + + opt = (const uint32_t *)p; + + /* TS option is presented in recommended way */ + if (len >= TCP_TX_OPT_LEN_TMS && opt[0] == TCP_OPT_TMS_HDR) { + ts.val = rte_be_to_cpu_32(opt[1]); + ts.ecr = rte_be_to_cpu_32(opt[2]); + return ts; + } + + /* parse through whole list of options. */ + ts.raw = 0; + i = 0; + while (i < len) { + to = (const struct tcpopt *)(p + i); + kind = to->kl.kind; + if (kind == TCP_OPT_KIND_EOL) + break; + else if (kind == TCP_OPT_KIND_NOP) + i += sizeof(to->kl.kind); + else { + i += to->kl.len; + if (i <= len && to->kl.raw == TCP_OPT_KL_TMS) { + ts.val = rte_be_to_cpu_32(to->ts.val); + ts.ecr = rte_be_to_cpu_32(to->ts.ecr); + break; + } + } + } + + return ts; +} + +static inline uint8_t +get_pkt_type(const struct rte_mbuf *m) +{ + uint32_t v; + + v = m->packet_type & + (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK); + if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_TCP)) + return TLE_V4; + else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_TCP)) + return TLE_V6; + else + return TLE_VNUM; +} + +static inline void +get_pkt_info(const struct rte_mbuf *m, union pkt_info *pi, union seg_info *si) +{ + uint32_t len, type; + const struct tcp_hdr *tcph; + const union l4_ports *prt; + const union ipv4_addrs *pa4; + + type = get_pkt_type(m); + len = m->l2_len; + + /* + * this line is here just to avoid gcc warning: + * error: ...addr4.raw may be used uninitialized. + */ + pi->addr4.raw = 0; + + if (type == TLE_V4) { + pa4 = rte_pktmbuf_mtod_offset(m, const union ipv4_addrs *, + len + offsetof(struct ipv4_hdr, src_addr)); + pi->addr4.raw = pa4->raw; + } else if (type == TLE_V6) { + pi->addr6 = rte_pktmbuf_mtod_offset(m, const union ipv6_addrs *, + len + offsetof(struct ipv6_hdr, src_addr)); + } + + len += m->l3_len; + tcph = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *, len); + prt = (const union l4_ports *) + ((uintptr_t)tcph + offsetof(struct tcp_hdr, src_port)); + pi->tf.flags = tcph->tcp_flags; + pi->tf.type = type; + pi->csf = m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD); + pi->port.raw = prt->raw; + + get_seg_info(tcph, si); +} + +static inline uint32_t +tcp_mbuf_seq_free(struct rte_mbuf *mb[], uint32_t num) +{ + uint32_t i, len; + + len = 0; + for (i = 0; i != num; i++) { + len += mb[i]->pkt_len; + rte_pktmbuf_free(mb[i]); + } + + return len; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _TCP_MISC_H_ */ diff --git a/lib/libtle_l4p/tcp_ofo.c b/lib/libtle_l4p/tcp_ofo.c new file mode 100644 index 0000000..1565445 --- /dev/null +++ b/lib/libtle_l4p/tcp_ofo.c @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "tcp_stream.h" +#include "tcp_rxq.h" + +#define OFO_FRACTION 4 + +#define OFO_DB_MAX 0x20U + +#define OFODB_OBJ_MIN 8U +#define OFODB_OBJ_MAX 0x20U + +#define OFO_OBJ_MAX (OFODB_OBJ_MAX * OFO_DB_MAX) + +void +tcp_ofo_free(struct ofo *ofo) +{ + rte_free(ofo); +} + +static void +calc_ofo_elems(uint32_t nbufs, uint32_t *nobj, uint32_t *ndb) +{ + uint32_t n, nd, no; + + n = nbufs / OFO_FRACTION; + n = RTE_MAX(n, OFODB_OBJ_MIN); + n = RTE_MIN(n, OFO_OBJ_MAX); + + no = OFODB_OBJ_MIN / 2; + do { + no *= 2; + nd = n / no; + } while (nd > OFO_DB_MAX); + + *nobj = no; + *ndb = nd; +} + +struct ofo * +tcp_ofo_alloc(uint32_t nbufs, int32_t socket) +{ + uint32_t i, ndb, nobj; + size_t dsz, osz, sz; + struct ofo *ofo; + struct rte_mbuf **obj; + + calc_ofo_elems(nbufs, &nobj, &ndb); + osz = sizeof(*ofo) + sizeof(ofo->db[0]) * ndb; + dsz = sizeof(ofo->db[0].obj[0]) * nobj * ndb; + sz = osz + dsz; + + ofo = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, socket); + if (ofo == NULL) { + TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, sz, socket, rte_errno); + return NULL; + } + + obj = (struct rte_mbuf **)&ofo->db[ndb]; + for (i = 0; i != ndb; i++) { + ofo->db[i].nb_max = nobj; + ofo->db[i].obj = obj + i * nobj; + } + + ofo->nb_max = ndb; + return ofo; +} + diff --git a/lib/libtle_l4p/tcp_ofo.h b/lib/libtle_l4p/tcp_ofo.h new file mode 100644 index 0000000..4f3bdab --- /dev/null +++ b/lib/libtle_l4p/tcp_ofo.h @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TCP_OFO_H_ +#define _TCP_OFO_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +struct ofodb { + uint32_t nb_elem; + uint32_t nb_max; + union seqlen sl; + struct rte_mbuf **obj; +}; + +struct ofo { + uint32_t nb_elem; + uint32_t nb_max; + struct ofodb db[]; +}; + +static inline void +_ofodb_free(struct ofodb *db) +{ + uint32_t i; + + for (i = 0; i != db->nb_elem; i++) + rte_pktmbuf_free(db->obj[i]); +} + +static inline void +_ofo_remove(struct ofo *ofo, uint32_t pos, uint32_t num) +{ + uint32_t i, n; + + n = ofo->nb_elem - num - pos; + for (i = 0; i != n; i++) + ofo->db[pos + i] = ofo->db[pos + num + i]; + ofo->nb_elem -= num; +} + +static inline void +tcp_ofo_reset(struct ofo *ofo) +{ + uint32_t i; + + for (i = 0; i != ofo->nb_elem; i++) + _ofodb_free(&ofo->db[i]); + + _ofo_remove(ofo, 0, ofo->nb_elem); +} + +static inline uint32_t +_ofo_insert_new(struct ofo *ofo, uint32_t pos, union seqlen *sl, + struct rte_mbuf *mb[], uint32_t num) +{ + uint32_t i, n, plen; + struct ofodb *db; + + n = ofo->nb_elem; + + /* out of space */ + if (n == ofo->nb_max) + return 0; + + /* allocate new one */ + db = ofo->db + n; + ofo->nb_elem = n + 1; + + /* insert into a proper position. */ + for (i = n; i != pos; i--) + ofo->db[i] = ofo->db[i - 1]; + + /* fill new block */ + n = RTE_MIN(db->nb_max, num); + for (i = 0; i != n; i++) + db->obj[i] = mb[i]; + + /* can't queue some packets. */ + plen = 0; + for (i = n; i != num; i++) + plen += mb[i]->pkt_len; + + db->nb_elem = n; + db->sl.seq = sl->seq; + db->sl.len = sl->len - plen; + + sl->seq += db->sl.len; + sl->len -= db->sl.len; + return n; +} + +static inline uint32_t +_ofo_insert_right(struct ofo *ofo, uint32_t pos, union seqlen *sl, + struct rte_mbuf *mb[], uint32_t num) +{ + uint32_t i, j, k, n; + uint32_t end, plen, skip; + struct ofodb *db; + + db = ofo->db + pos; + end = db->sl.seq + db->sl.len; + + skip = end - sl->seq; + + /* skip overlapping packets */ + for (i = 0, n = skip; i != num && n != 0; i++, n -= plen) { + + plen = mb[i]->pkt_len; + if (n < plen) { + /* adjust partially overlapped packet. */ + rte_pktmbuf_adj(mb[i], plen - n); + break; + } + } + + /* free totally overlapped packets. */ + for (j = 0; j != i; j++) + rte_pktmbuf_free(mb[j]); + + /* copy non-overlapping mbufs */ + k = db->nb_elem; + n = RTE_MIN(db->nb_max - k, num - i); + + plen = 0; + for (j = 0; j != n; j++) { + db->obj[k + j] = mb[i + j]; + plen += mb[i + j]->pkt_len; + } + + db->nb_elem += n; + db->sl.len += plen; + + plen += skip; + sl->len -= plen; + sl->seq += plen; + return n + i; +} + +static inline uint32_t +_ofo_step(struct ofo *ofo, union seqlen *sl, struct rte_mbuf *mb[], + uint32_t num) +{ + uint32_t i, n, end, lo, ro; + struct ofodb *db; + + db = NULL; + end = sl->seq + sl->len; + n = ofo->nb_elem; + + /* + * start from the right side, assume that after some gap, + * we keep receiving packets in order. + */ + for (i = n; i-- != 0; ) { + db = ofo->db + i; + if (tcp_seq_leq(db->sl.seq, sl->seq)) + break; + } + + /* new db required */ + if ((int32_t)i < 0 || tcp_seq_lt(db->sl.seq + db->sl.len, sl->seq)) + return _ofo_insert_new(ofo, i + 1, sl, mb, num); + + /* new one is right adjacent, or overlap */ + + ro = sl->seq - db->sl.seq; + lo = end - db->sl.seq; + + /* new one is completely overlapped by old one */ + if (lo <= db->sl.len) + return 0; + + /* either overlap OR (adjacent AND some free space remains) */ + if (ro < db->sl.len || db->nb_elem != db->nb_max) + return _ofo_insert_right(ofo, i, sl, mb, num); + + /* adjacent, no free space in current block */ + return _ofo_insert_new(ofo, i + 1, sl, mb, num); +} + +static inline void +_ofo_compact(struct ofo *ofo) +{ + uint32_t i, j, n, ro; + struct ofodb *db; + + for (i = 0; i < ofo->nb_elem; i = j) { + + for (j = i + 1; j != ofo->nb_elem; j++) { + + /* no intersection */ + ro = ofo->db[j].sl.seq - ofo->db[i].sl.seq; + if (ro > ofo->db[i].sl.len) + break; + + db = ofo->db + j; + n = _ofo_insert_right(ofo, i, &db->sl, db->obj, + db->nb_elem); + if (n < db->nb_elem) { + db->nb_elem -= n; + break; + } + } + + n = j - i - 1; + if (n != 0) + _ofo_remove(ofo, i + 1, n); + } +} + +static inline uint32_t +_ofodb_enqueue(struct rte_ring *r, const struct ofodb *db, union seqlen *sl) +{ + uint32_t n, num; + + num = db->nb_elem; + sl->raw = db->sl.raw; + n = rte_ring_enqueue_burst(r, (void * const *)db->obj, num); + + sl->len -= tcp_mbuf_seq_free(db->obj + n, num - n); + return num - n; +} + +struct ofo * +tcp_ofo_alloc(uint32_t nbufs, int32_t socket); + +void +tcp_ofo_free(struct ofo *ofo); + +#ifdef __cplusplus +} +#endif + +#endif /* _TCP_OFO_H_ */ diff --git a/lib/libtle_l4p/tcp_rxq.h b/lib/libtle_l4p/tcp_rxq.h new file mode 100644 index 0000000..90e657f --- /dev/null +++ b/lib/libtle_l4p/tcp_rxq.h @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TCP_RXQ_H_ +#define _TCP_RXQ_H_ + +#include "tcp_ofo.h" + +#ifdef __cplusplus +extern "C" { +#endif + +static inline uint32_t +rx_ofo_enqueue(struct tle_tcp_stream *s, union seqlen *sl, + struct rte_mbuf *mb[], uint32_t num) +{ + uint32_t i, n; + + n = 0; + do { + i = _ofo_step(s->rx.ofo, sl, mb + n, num - n); + n += i; + } while (i != 0 && n != num); + + _ofo_compact(s->rx.ofo); + return n; +} + +static inline uint32_t +rx_ofo_reduce(struct tle_tcp_stream *s) +{ + uint32_t i, n, end, seq; + struct ofo *ofo; + struct ofodb *db; + union seqlen sl; + + seq = s->tcb.rcv.nxt; + ofo = s->rx.ofo; + + n = 0; + for (i = 0; i != ofo->nb_elem; i++) { + + db = ofo->db + i; + + /* gap still present */ + if (tcp_seq_lt(seq, db->sl.seq)) + break; + + end = db->sl.seq + db->sl.len; + + /* this db is fully overlapped */ + if (tcp_seq_leq(end, seq)) + _ofodb_free(db); + else + n += _ofodb_enqueue(s->rx.q, db, &sl); + + seq = sl.seq + sl.len; + } + + s->tcb.rcv.nxt = seq; + _ofo_remove(ofo, 0, i); + return n; +} + +static inline uint32_t +rx_ino_enqueue(struct tle_tcp_stream *s, union seqlen *sl, + struct rte_mbuf *mb[], uint32_t num) +{ + uint32_t i, n; + + n = rte_ring_enqueue_burst(s->rx.q, (void * const *)mb, num); + + /* error: can'queue some packets into receive buffer. */ + for (i = n; i != num; i++) + sl->len -= mb[i]->pkt_len; + + s->tcb.rcv.nxt = sl->seq + sl->len; + return n; +} + +static inline uint32_t +rx_data_enqueue(struct tle_tcp_stream *s, uint32_t seq, uint32_t len, + struct rte_mbuf *mb[], uint32_t num) +{ + uint32_t n, r, t; + union seqlen sl; + + sl.seq = seq; + sl.len = len; + + r = rte_ring_count(s->rx.q); + + /* in order packets, ready to be delivered */ + if (seq == s->tcb.rcv.nxt) { + + t = rx_ino_enqueue(s, &sl, mb, num); + + /* failed to queue all input in-order packets */ + if (t != num) + TCP_LOG(DEBUG, + "%s(s=%p, seq=%u, len=%u, num=%u) failed to queue " + "%u packets;\n", + __func__, s, seq, len, num, num - t); + + /* try to consume some out-of-order packets*/ + else { + n = rx_ofo_reduce(s); + if (n != 0) + TCP_LOG(DEBUG, + "%s(s=%p, rcv.nxt=%u) failed to queue %u " + "OFO packets;\n", + __func__, s, s->tcb.rcv.nxt, n); + } + + /* queue out of order packets */ + } else { + t = rx_ofo_enqueue(s, &sl, mb, num); + } + + n = rte_ring_count(s->rx.q); + if (r != n) { + /* raise RX event */ + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + /* if RX queue was empty invoke RX notification callback. */ + else if (s->rx.cb.func != NULL && r == 0) + s->rx.cb.func(s->rx.cb.data, &s->s); + } + + return t; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _TCP_RXQ_H_ */ diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c new file mode 100644 index 0000000..4e43730 --- /dev/null +++ b/lib/libtle_l4p/tcp_rxtx.c @@ -0,0 +1,2431 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "tcp_stream.h" +#include "tcp_timer.h" +#include "stream_table.h" +#include "syncookie.h" +#include "misc.h" +#include "tcp_ctl.h" +#include "tcp_rxq.h" +#include "tcp_txq.h" + +#define TCP_MAX_PKT_SEG 0x20 + +/* + * checks if input TCP ports and IP addresses match given stream. + * returns zero on success. + */ +static inline int +rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi) +{ + int32_t rc; + + if (pi->tf.type == TLE_V4) + rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw || + (pi->addr4.raw & s->s.ipv4.mask.raw) != + s->s.ipv4.addr.raw; + else + rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw || + ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw, + &s->s.ipv6.mask.raw) != 0; + + return rc; +} + +static inline struct tle_tcp_stream * +rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi, + uint32_t type) +{ + struct tle_tcp_stream *s; + + s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst]; + if (s == NULL || rwl_acquire(&s->rx.use) < 0) + return NULL; + + /* check that we have a proper stream. */ + if (s->tcb.state != TCP_ST_LISTEN) { + rwl_release(&s->rx.use); + s = NULL; + } + + return s; +} + +static inline struct tle_tcp_stream * +rx_obtain_stream(const struct tle_dev *dev, struct stbl *st, + const union pkt_info *pi, uint32_t type) +{ + struct tle_tcp_stream *s; + + s = stbl_find_data(st, pi); + if (s == NULL) { + if (pi->tf.flags == TCP_FLAG_ACK) + return rx_obtain_listen_stream(dev, pi, type); + return NULL; + } + + if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0) + return NULL; + /* check that we have a proper stream. */ + else if (s->tcb.state == TCP_ST_CLOSED) { + rwl_release(&s->rx.use); + s = NULL; + } + + return s; +} + +/* + * Consider 2 pkt_info *equal* if their: + * - types (IPv4/IPv6) + * - TCP flags + * - checksum flags + * - TCP src and dst ports + * - IP src and dst addresses + * are equal. + */ +static inline int +pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num) +{ + uint32_t i; + + i = 1; + + if (pi[0].tf.type == TLE_V4) { + while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0) + i++; + + } else if (pi[0].tf.type == TLE_V6) { + while (i != num && + pi[0].raw.u64[0] == pi[i].raw.u64[0] && + ymm_cmp(&pi[0].addr6->raw, + &pi[i].addr6->raw) == 0) + i++; + } + + return i; +} + +static inline int +pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num) +{ + uint32_t i; + + i = 1; + + if (pi[0].tf.type == TLE_V4) { + while (i != num && pi[0].tf.raw == pi[i].tf.raw && + pi[0].port.dst == pi[i].port.dst && + pi[0].addr4.dst == pi[i].addr4.dst) + i++; + + } else if (pi[0].tf.type == TLE_V6) { + while (i != num && pi[0].tf.raw == pi[i].tf.raw && + pi[0].port.dst == pi[i].port.dst && + xmm_cmp(&pi[0].addr6->dst, + &pi[i].addr6->dst) == 0) + i++; + } + + return i; +} + +static inline void +stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[], + uint32_t nb_drb) +{ + rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb); +} + +static inline uint32_t +stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[], + uint32_t nb_drb) +{ + return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb); +} + +static inline void +fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port, + uint32_t seq, uint8_t hlen, uint8_t flags) +{ + uint16_t wnd; + + l4h->src_port = port.dst; + l4h->dst_port = port.src; + + wnd = (flags & TCP_FLAG_SYN) ? + RTE_MAX(TCP4_MIN_MSS, tcb->so.mss) : + tcb->rcv.wnd >> tcb->rcv.wscale; + + /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */ + l4h->sent_seq = rte_cpu_to_be_32(seq); + l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt); + l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET; + l4h->tcp_flags = flags; + l4h->rx_win = rte_cpu_to_be_16(wnd); + l4h->cksum = 0; + l4h->tcp_urp = 0; + + if (flags & TCP_FLAG_SYN) + fill_syn_opts(l4h + 1, &tcb->so); + else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0) + fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts); +} + +static inline int +tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s, + const struct tle_dest *dst, uint64_t ol_flags, + union l4_ports port, uint32_t seq, uint32_t flags, + uint32_t pid, uint32_t swcsm) +{ + uint32_t l4, len, plen; + struct tcp_hdr *l4h; + char *l2h; + + len = dst->l2_len + dst->l3_len; + plen = m->pkt_len; + + if (flags & TCP_FLAG_SYN) + l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX; + else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0) + l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS; + else + l4 = sizeof(*l4h); + + /* adjust mbuf to put L2/L3/L4 headers into it. */ + l2h = rte_pktmbuf_prepend(m, len + l4); + if (l2h == NULL) + return -EINVAL; + + /* copy L2/L3 header */ + rte_memcpy(l2h, dst->hdr, len); + + /* setup TCP header & options */ + l4h = (struct tcp_hdr *)(l2h + len); + fill_tcph(l4h, &s->tcb, port, seq, l4, flags); + + /* setup mbuf TX offload related fields. */ + m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0); + m->ol_flags |= ol_flags; + + /* update proto specific fields. */ + + if (s->s.type == TLE_V4) { + struct ipv4_hdr *l3h; + l3h = (struct ipv4_hdr *)(l2h + dst->l2_len); + l3h->packet_id = rte_cpu_to_be_16(pid); + l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4); + + if ((ol_flags & PKT_TX_TCP_CKSUM) != 0) + l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len, + ol_flags); + else if (swcsm != 0) + l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h); + + if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0) + l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len); + } else { + struct ipv6_hdr *l3h; + l3h = (struct ipv6_hdr *)(l2h + dst->l2_len); + l3h->payload_len = rte_cpu_to_be_16(plen + l4); + if ((ol_flags & PKT_TX_TCP_CKSUM) != 0) + l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags); + else if (swcsm != 0) + l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h); + } + + return 0; +} + +/* + * That function supposed to be used only for data packets. + * Assumes that L2/L3/L4 headers and mbuf fields already setup properly. + * - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR. + * - if no HW cksum offloads are enabled, calculates TCP checksum. + */ +static inline void +tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb, + uint32_t seq, uint32_t pid) +{ + struct tcp_hdr *l4h; + uint32_t len; + + len = m->l2_len + m->l3_len; + l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len); + + l4h->sent_seq = rte_cpu_to_be_32(seq); + l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt); + + if (tcb->so.ts.raw != 0) + fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts); + + if (type == TLE_V4) { + struct ipv4_hdr *l3h; + l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len); + l3h->hdr_checksum = 0; + l3h->packet_id = rte_cpu_to_be_16(pid); + if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0) + l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len); + } + + /* have to calculate TCP checksum in SW */ + if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) { + + l4h->cksum = 0; + + if (type == TLE_V4) { + struct ipv4_hdr *l3h; + l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, + m->l2_len); + l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h); + + } else { + struct ipv6_hdr *l3h; + l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *, + m->l2_len); + l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h); + } + } +} + +/* Send data packets that need to be ACK-ed by peer */ +static inline uint32_t +tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num) +{ + uint32_t bsz, i, nb, nbm; + struct tle_dev *dev; + struct tle_drb *drb[num]; + + /* calculate how many drbs are needed.*/ + bsz = s->tx.drb.nb_elem; + nbm = (num + bsz - 1) / bsz; + + /* allocate drbs, adjust number of packets. */ + nb = stream_drb_alloc(s, drb, nbm); + + /* drb ring is empty. */ + if (nb == 0) + return 0; + + else if (nb != nbm) + num = nb * bsz; + + dev = s->tx.dst.dev; + + /* enqueue pkts for TX. */ + nbm = nb; + i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m, + num, drb, &nb); + + /* free unused drbs. */ + if (nb != 0) + stream_drb_free(s, drb + nbm - nb, nb); + + return i; +} + +static inline uint32_t +tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[], + uint32_t num) +{ + uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type; + struct tle_dev *dev; + struct rte_mbuf *mb; + struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG]; + + mss = s->tcb.snd.mss; + type = s->s.type; + + dev = s->tx.dst.dev; + pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num; + + k = 0; + tn = 0; + fail = 0; + for (i = 0; i != num && sl->len != 0 && fail == 0; i++) { + + mb = mi[i]; + sz = RTE_MIN(sl->len, mss); + plen = PKT_L4_PLEN(mb); + + /*fast path, no need to use indirect mbufs. */ + if (plen <= sz) { + + /* update pkt TCP header */ + tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i); + + /* keep mbuf till ACK is received. */ + rte_pktmbuf_refcnt_update(mb, 1); + sl->len -= plen; + sl->seq += plen; + mo[k++] = mb; + /* remaining snd.wnd is less them MSS, send nothing */ + } else if (sz < mss) + break; + /* packet indirection needed */ + else + RTE_VERIFY(0); + + if (k >= MAX_PKT_BURST) { + n = tx_data_pkts(s, mo, k); + fail = k - n; + tn += n; + k = 0; + } + } + + if (k != 0) { + n = tx_data_pkts(s, mo, k); + fail = k - n; + tn += n; + } + + if (fail != 0) { + sz = tcp_mbuf_seq_free(mo + n, fail); + sl->seq -= sz; + sl->len += sz; + } + + return tn; +} + +/* + * gets data from stream send buffer, updates it and + * queues it into TX device queue. + * Note that this function and is not MT safe. + */ +static inline uint32_t +tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms) +{ + uint32_t n, num, tn, wnd; + struct rte_mbuf **mi; + union seqlen sl; + + tn = 0; + wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una); + sl.seq = s->tcb.snd.nxt; + sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd); + + if (sl.len == 0) + return tn; + + /* update send timestamp */ + s->tcb.snd.ts = tms; + + do { + /* get group of packets */ + mi = tcp_txq_get_nxt_objs(s, &num); + + /* stream send buffer is empty */ + if (num == 0) + break; + + /* queue data packets for TX */ + n = tx_data_bulk(s, &sl, mi, num); + tn += n; + + /* update consumer head */ + tcp_txq_set_nxt_head(s, n); + } while (n == num); + + s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt; + return tn; +} + +static inline void +free_una_data(struct tle_tcp_stream *s, uint32_t len) +{ + uint32_t i, n, num, plen; + struct rte_mbuf **mi; + + n = 0; + plen = 0; + + do { + /* get group of packets */ + mi = tcp_txq_get_una_objs(s, &num); + + if (num == 0) + break; + + /* free acked data */ + for (i = 0; i != num && n != len; i++, n = plen) { + plen += PKT_L4_PLEN(mi[i]); + if (plen > len) { + /* keep SND.UNA at the start of the packet */ + len -= RTE_MIN(len, plen - len); + break; + } + rte_pktmbuf_free(mi[i]); + } + + /* update consumer tail */ + tcp_txq_set_una_tail(s, i); + } while (plen < len); + + s->tcb.snd.una += len; + + /* + * that could happen in case of retransmit, + * adjust SND.NXT with SND.UNA. + */ + if (s->tcb.snd.una > s->tcb.snd.nxt) { + tcp_txq_rst_nxt_head(s); + s->tcb.snd.nxt = s->tcb.snd.una; + } +} + +static inline uint16_t +calc_smss(uint16_t mss, const struct tle_dest *dst) +{ + uint16_t n; + + n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK; + mss = RTE_MIN(n, mss); + return mss; +} + +/* + * RFC 5681 3.1 + * If SMSS > 2190 bytes: + * IW = 2 * SMSS bytes and MUST NOT be more than 2 segments + * If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes): + * IW = 3 * SMSS bytes and MUST NOT be more than 3 segments + * if SMSS <= 1095 bytes: + * IW = 4 * SMSS bytes and MUST NOT be more than 4 segments + */ +static inline uint32_t +initial_cwnd(uint16_t smss) +{ + if (smss > 2190) + return 2 * smss; + else if (smss > 1095) + return 3 * smss; + return 4 * smss; +} + +/* + * queue standalone packet to he particular output device + * It assumes that: + * - L2/L3/L4 headers should be already set. + * - packet fits into one segment. + */ +static inline int +send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m) +{ + uint32_t n, nb; + struct tle_drb *drb; + + if (stream_drb_alloc(s, &drb, 1) == 0) + return -ENOBUFS; + + /* enqueue pkt for TX. */ + nb = 1; + n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1, + &drb, &nb); + + /* free unused drbs. */ + if (nb != 0) + stream_drb_free(s, &drb, 1); + + return (n == 1) ? 0 : -ENOBUFS; +} + +static inline int +send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq, + uint32_t flags) +{ + const struct tle_dest *dst; + uint32_t pid, type; + int32_t rc; + + dst = &s->tx.dst; + type = s->s.type; + pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1; + + rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1); + if (rc == 0) + rc = send_pkt(s, dst->dev, m); + + return rc; +} + +static inline int +send_rst(struct tle_tcp_stream *s, uint32_t seq) +{ + struct rte_mbuf *m; + int32_t rc; + + m = rte_pktmbuf_alloc(s->tx.dst.head_mp); + if (m == NULL) + return -ENOMEM; + + rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST); + if (rc != 0) + rte_pktmbuf_free(m); + + return rc; +} + +static inline int +send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags) +{ + struct rte_mbuf *m; + uint32_t seq; + int32_t rc; + + m = rte_pktmbuf_alloc(s->tx.dst.head_mp); + if (m == NULL) + return -ENOMEM; + + seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0); + s->tcb.snd.ts = tms; + + rc = send_ctrl_pkt(s, m, seq, flags); + if (rc != 0) { + rte_pktmbuf_free(m); + return rc; + } + + s->tcb.snd.ack = s->tcb.rcv.nxt; + return 0; +} + + +static int +sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi, + const union seg_info *si, uint32_t ts, struct rte_mbuf *m) +{ + uint16_t len; + int32_t rc; + uint32_t pid, seq, type; + struct tle_dev *dev; + const void *da; + struct tle_dest dst; + const struct tcp_hdr *th; + + type = s->s.type; + + /* get destination information. */ + if (type == TLE_V4) + da = &pi->addr4.src; + else + da = &pi->addr6->src; + + rc = stream_get_dest(&s->s, da, &dst); + if (rc < 0) + return rc; + + th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *, + m->l2_len + m->l3_len); + get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th)); + + s->tcb.rcv.nxt = si->seq + 1; + seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss); + s->tcb.so.ts.ecr = s->tcb.so.ts.val; + s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale); + s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ? + TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT; + s->tcb.so.mss = calc_smss(dst.mtu, &dst); + + /* reset mbuf's data contents. */ + len = m->l2_len + m->l3_len + m->l4_len; + m->tx_offload = 0; + if (rte_pktmbuf_adj(m, len) == NULL) + return -EINVAL; + + dev = dst.dev; + pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1; + + rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq, + TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1); + if (rc == 0) + rc = send_pkt(s, dev, m); + + return rc; +} + +/* + * RFC 793: + * There are four cases for the acceptability test for an incoming segment: + * Segment Receive Test + * Length Window + * ------- ------- ------------------------------------------- + * 0 0 SEG.SEQ = RCV.NXT + * 0 >0 RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND + * >0 0 not acceptable + * >0 >0 RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND + * or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND + */ +static inline int +check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len) +{ + uint32_t n; + + n = seqn + len; + if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd && + n - tcb->rcv.nxt > tcb->rcv.wnd) + return -ERANGE; + + return 0; +} + +static inline union tsopt +rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb) +{ + union tsopt ts; + uintptr_t opt; + const struct tcp_hdr *th; + + if (tcb->so.ts.val != 0) { + opt = rte_pktmbuf_mtod_offset(mb, uintptr_t, + mb->l2_len + mb->l3_len + sizeof(*th)); + ts = get_tms_opts(opt, mb->l4_len - sizeof(*th)); + } else + ts.raw = 0; + + return ts; +} + +/* + * PAWS and sequence check. + * RFC 1323 4.2.1 + */ +static inline int +rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts) +{ + int32_t rc; + + /* RFC 1323 4.2.1 R2 */ + rc = check_seqn(tcb, seq, len); + if (rc < 0) + return rc; + + if (ts.raw != 0) { + + /* RFC 1323 4.2.1 R1 */ + if (tcp_seq_lt(ts.val, tcb->rcv.ts)) + return -ERANGE; + + /* RFC 1323 4.2.1 R3 */ + if (tcp_seq_leq(seq, tcb->snd.ack) && + tcp_seq_lt(tcb->snd.ack, seq + len)) + tcb->rcv.ts = ts.val; + } + + return rc; +} + +static inline int +rx_check_ack(const struct tcb *tcb, uint32_t ack) +{ + uint32_t max; + + max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr); + + if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max)) + return 0; + + return -ERANGE; +} + +static inline int +rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len, + const union tsopt ts) +{ + int32_t rc; + + rc = rx_check_seq(tcb, seq, len, ts); + rc |= rx_check_ack(tcb, ack); + return rc; +} + +static inline int +restore_syn_pkt(const union pkt_info *pi, const union seg_info *si, + uint32_t ts, struct rte_mbuf *mb) +{ + int32_t rc; + uint32_t len; + struct tcp_hdr *th; + struct syn_opts so; + + /* check that ACK, etc fields are what we expected. */ + rc = sync_check_ack(pi, si->seq, si->ack - 1, ts); + if (rc < 0) + return rc; + + so.mss = rc; + + th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *, + mb->l2_len + mb->l3_len); + len = mb->l4_len - sizeof(*th); + sync_get_opts(&so, (uintptr_t)(th + 1), len); + + /* reconstruct SYN options, extend header size if necessary */ + if (len < TCP_TX_OPT_LEN_MAX) { + len = TCP_TX_OPT_LEN_MAX - len; + th->data_off = TCP_TX_OPT_LEN_MAX / TCP_DATA_ALIGN << + TCP_DATA_OFFSET; + mb->pkt_len += len; + mb->data_len += len; + mb->l4_len += len; + } + + fill_syn_opts(th + 1, &so); + return 0; +} + +static inline int +rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, + const union pkt_info *pi, const union seg_info *si, + uint32_t ts, struct rte_mbuf *mb) +{ + int32_t rc; + struct stbl_entry *se; + + if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0) + return -EINVAL; + + /* ACK for new connection request. */ + + rc = restore_syn_pkt(pi, si, ts, mb); + if (rc < 0) + return rc; + + se = stbl_add_pkt(st, pi, mb); + if (se == NULL) + return -ENOBUFS; + + /* put new connection requests into stream listen queue */ + if (rte_ring_enqueue_burst(s->rx.q, + (void * const *)&se, 1) != 1) { + stbl_del_pkt(st, se, pi); + return -ENOBUFS; + } + + return 0; +} + +static inline void +stream_term(struct tle_tcp_stream *s) +{ + struct sdr *dr; + + s->tcb.state = TCP_ST_CLOSED; + rte_smp_wmb(); + + timer_stop(s); + + /* close() was already invoked, schedule final cleanup */ + if ((s->tcb.uop & TCP_OP_CLOSE) != 0) { + + dr = CTX_TCP_SDR(s->s.ctx); + STAILQ_INSERT_TAIL(&dr->be, &s->s, link); + + /* notify user that stream need to be closed */ + } else if (s->err.ev != NULL) + tle_event_raise(s->err.ev); + else if (s->err.cb.func != NULL) + s->err.cb.func(s->err.cb.data, &s->s); +} + +static inline int +data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen, + uint32_t *seqn, uint32_t *plen) +{ + uint32_t len, n, seq; + + seq = *seqn; + len = *plen; + + rte_pktmbuf_adj(mb, hlen); + if (len == 0) + return -ENODATA; + /* cut off the start of the packet */ + else if (tcp_seq_lt(seq, tcb->rcv.nxt)) { + n = tcb->rcv.nxt - seq; + if (n >= len) + return -ENODATA; + + rte_pktmbuf_adj(mb, n); + *seqn = seq + n; + *plen = len - n; + } + + return 0; +} + +static inline uint32_t +rx_ackdata(struct tle_tcp_stream *s, uint32_t ack) +{ + uint32_t k, n; + + n = ack - (uint32_t)s->tcb.snd.una; + + /* some more data was acked. */ + if (n != 0) { + + /* advance SND.UNA and free related packets. */ + k = rte_ring_free_count(s->tx.q); + free_una_data(s, n); + + /* mark the stream as available for writing */ + if (rte_ring_free_count(s->tx.q) != 0) { + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + else if (k == 0 && s->tx.cb.func != NULL) + s->tx.cb.func(s->tx.cb.data, &s->s); + } + } + + return n; +} + +static void +rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp) +{ + uint32_t state; + int32_t ackfin; + + s->tcb.rcv.nxt += 1; + + ackfin = (s->tcb.snd.una == s->tcb.snd.fss); + state = s->tcb.state; + + if (state == TCP_ST_ESTABLISHED) { + s->tcb.state = TCP_ST_CLOSE_WAIT; + /* raise err.ev & err.cb */ + if (s->err.ev != NULL) + tle_event_raise(s->err.ev); + else if (s->err.cb.func != NULL) + s->err.cb.func(s->err.cb.data, &s->s); + } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) { + rsp->flags |= TCP_FLAG_ACK; + if (ackfin != 0) { + s->tcb.state = TCP_ST_TIME_WAIT; + s->tcb.snd.rto = TCP_RTO_2MSL; + timer_reset(s); + } else + s->tcb.state = TCP_ST_CLOSING; + } else if (state == TCP_ST_FIN_WAIT_2) { + rsp->flags |= TCP_FLAG_ACK; + s->tcb.state = TCP_ST_TIME_WAIT; + s->tcb.snd.rto = TCP_RTO_2MSL; + timer_reset(s); + } else if (state == TCP_ST_LAST_ACK && ackfin != 0) { + stream_term(s); + } +} + +/* + * FIN process for ESTABLISHED state + * returns: + * 0 < - error occurred + * 0 - FIN was processed OK, and mbuf can be free/reused. + * 0 > - FIN was processed OK and mbuf can't be free/reused. + */ +static inline int +rx_fin(struct tle_tcp_stream *s, uint32_t state, + const union seg_info *si, struct rte_mbuf *mb, + struct resp_info *rsp) +{ + uint32_t hlen, plen, seq; + int32_t ret; + union tsopt ts; + + hlen = PKT_L234_HLEN(mb); + plen = mb->pkt_len - hlen; + seq = si->seq; + + ts = rx_tms_opt(&s->tcb, mb); + ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts); + if (ret != 0) + return ret; + + if (state < TCP_ST_ESTABLISHED) + return -EINVAL; + + if (plen != 0) { + + ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen); + if (ret != 0) + return ret; + if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1) + return -ENOBUFS; + } + + /* process ack here */ + rx_ackdata(s, si->ack); + + /* some fragments still missing */ + if (seq + plen != s->tcb.rcv.nxt) { + s->tcb.rcv.frs.seq = seq + plen; + s->tcb.rcv.frs.on = 1; + } else + rx_fin_state(s, rsp); + + return plen; +} + +static inline int +rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags, + const union seg_info *si) +{ + int32_t rc; + + /* + * RFC 793: In all states except SYN-SENT, all reset (RST) segments + * are validated by checking their SEQ-fields. + * A reset is valid if its sequence number is in the window. + * In the SYN-SENT state (a RST received in response to an initial SYN), + * the RST is acceptable if the ACK field acknowledges the SYN. + */ + if (state == TCP_ST_SYN_SENT) { + rc = ((flags & TCP_FLAG_ACK) == 0 || + si->ack != s->tcb.snd.nxt) ? + -ERANGE : 0; + } + + else + rc = check_seqn(&s->tcb, si->seq, 0); + + if (rc == 0) + stream_term(s); + + return rc; +} + +/* + * check do we have FIN that was received out-of-order. + * if yes, try to process it now. + */ +static inline void +rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp) +{ + if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq) + rx_fin_state(s, rsp); +} + +static inline void +dack_info_init(struct dack_info *tack, const struct tcb *tcb) +{ + memset(tack, 0, sizeof(*tack)); + tack->ack = tcb->snd.una; + tack->segs.dup = tcb->rcv.dupack; + tack->wu.raw = tcb->snd.wu.raw; + tack->wnd = tcb->snd.wnd >> tcb->snd.wscale; +} + +static inline void +ack_window_update(struct tcb *tcb, const struct dack_info *tack) +{ + tcb->snd.wu.raw = tack->wu.raw; + tcb->snd.wnd = tack->wnd << tcb->snd.wscale; +} + +static inline void +ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack) +{ + uint32_t n; + + n = tack->segs.ack * tcb->snd.mss; + + /* slow start phase, RFC 5681 3.1 (2) */ + if (tcb->snd.cwnd < tcb->snd.ssthresh) + tcb->snd.cwnd += RTE_MIN(acked, n); + /* congestion avoidance phase, RFC 5681 3.1 (3) */ + else + tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd); +} + +static inline void +rto_ssthresh_update(struct tcb *tcb) +{ + uint32_t k, n; + + /* RFC 5681 3.1 (4) */ + n = (tcb->snd.nxt - tcb->snd.una) / 2; + k = 2 * tcb->snd.mss; + tcb->snd.ssthresh = RTE_MAX(n, k); +} + +static inline void +rto_cwnd_update(struct tcb *tcb) +{ + + if (tcb->snd.nb_retx == 0) + rto_ssthresh_update(tcb); + + /* + * RFC 5681 3.1: upon a timeout cwnd MUST be set to + * no more than 1 full-sized segment. + */ + tcb->snd.cwnd = tcb->snd.mss; +} + +static inline void +ack_info_update(struct dack_info *tack, const union seg_info *si, + int32_t badseq, uint32_t dlen, const union tsopt ts) +{ + if (badseq != 0) { + tack->segs.badseq++; + return; + } + + /* segnt with incoming data */ + tack->segs.data += (dlen != 0); + + /* segment with newly acked data */ + if (tcp_seq_lt(tack->ack, si->ack)) { + tack->segs.dup = 0; + tack->segs.ack++; + tack->ack = si->ack; + tack->ts = ts; + + /* + * RFC 5681: An acknowledgment is considered a "duplicate" when: + * (a) the receiver of the ACK has outstanding data + * (b) the incoming acknowledgment carries no data + * (c) the SYN and FIN bits are both off + * (d) the acknowledgment number is equal to the TCP.UNA + * (e) the advertised window in the incoming acknowledgment equals the + * advertised window in the last incoming acknowledgment. + * + * Here will have only to check only for (b),(d),(e). + * (a) will be checked later for the whole bulk of packets, + * (c) should never happen here. + */ + } else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) { + tack->dup3.seg = tack->segs.ack + 1; + tack->dup3.ack = tack->ack; + } + + /* + * RFC 793: + * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be + * updated. If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and + * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set + * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK. + */ + if (tcp_seq_lt(tack->wu.wl1, si->seq) || + (si->seq == tack->wu.wl1 && + tcp_seq_leq(tack->wu.wl2, si->ack))) { + + tack->wu.wl1 = si->seq; + tack->wu.wl2 = si->ack; + tack->wnd = si->wnd; + } +} + +static inline uint32_t +rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack, + const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[], + int32_t rc[], uint32_t num) +{ + uint32_t i, j, k, n, t; + uint32_t hlen, plen, seq, tlen; + int32_t ret; + union tsopt ts; + + k = 0; + for (i = 0; i != num; i = j) { + + hlen = PKT_L234_HLEN(mb[i]); + plen = mb[i]->pkt_len - hlen; + seq = si[i].seq; + + ts = rx_tms_opt(&s->tcb, mb[i]); + ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts); + + /* account segment received */ + ack_info_update(tack, &si[i], ret != 0, plen, ts); + + if (ret == 0) { + /* skip duplicate data, if any */ + ret = data_pkt_adjust(&s->tcb, mb[i], hlen, + &seq, &plen); + } + + j = i + 1; + if (ret != 0) { + rp[k] = mb[i]; + rc[k] = -ret; + k++; + continue; + } + + /* group sequential packets together. */ + for (tlen = plen; j != num; tlen += plen, j++) { + + hlen = PKT_L234_HLEN(mb[j]); + plen = mb[j]->pkt_len - hlen; + + /* not consecutive packet */ + if (plen == 0 || seq + tlen != si[j].seq) + break; + + /* check SEQ/ACK */ + ts = rx_tms_opt(&s->tcb, mb[j]); + ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack, + plen, ts); + + /* account for segment received */ + ack_info_update(tack, &si[j], ret != 0, plen, ts); + + if (ret != 0) { + rp[k] = mb[j]; + rc[k] = -ret; + k++; + break; + } + rte_pktmbuf_adj(mb[j], hlen); + } + + n = j - i; + j += (ret != 0); + + /* account for OFO data */ + if (seq != s->tcb.rcv.nxt) + tack->segs.ofo += n; + + /* enqueue packets */ + t = rx_data_enqueue(s, seq, tlen, mb + i, n); + + /* if we are out of space in stream recv buffer. */ + for (; t != n; t++) { + rp[k] = mb[i + t]; + rc[k] = -ENOBUFS; + k++; + } + } + + return num - k; +} + +static inline void +start_fast_retransmit(struct tle_tcp_stream *s) +{ + struct tcb *tcb; + + tcb = &s->tcb; + + /* RFC 6582 3.2.2 */ + tcb->snd.rcvr = tcb->snd.nxt; + tcb->snd.fastack = 1; + + /* RFC 5681 3.2.2 */ + rto_ssthresh_update(tcb); + + /* RFC 5681 3.2.3 */ + tcp_txq_rst_nxt_head(s); + tcb->snd.nxt = tcb->snd.una; + tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss; +} + +static inline void +stop_fast_retransmit(struct tle_tcp_stream *s) +{ + struct tcb *tcb; + uint32_t n; + + tcb = &s->tcb; + n = tcb->snd.nxt - tcb->snd.una; + tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh, + RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss); + tcb->snd.fastack = 0; +} + +static inline int +in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num, + uint32_t dup_num) +{ + uint32_t n; + struct tcb *tcb; + + tcb = &s->tcb; + + /* RFC 5682 3.2.3 partial ACK */ + if (ack_len != 0) { + + n = ack_num * tcb->snd.mss; + if (ack_len >= n) + tcb->snd.cwnd -= ack_len - n; + else + tcb->snd.cwnd -= ack_len % tcb->snd.mss; + + /* + * For the first partial ACK that arrives + * during fast recovery, also reset the + * retransmit timer. + */ + if (tcb->snd.fastack == 1) + timer_reset(s); + + tcb->snd.fastack += ack_num; + return 1; + + /* RFC 5681 3.2.4 */ + } else if (dup_num > 3) { + s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss; + return 1; + } + + return 0; +} + +static inline int +process_ack(struct tle_tcp_stream *s, uint32_t acked, + const struct dack_info *tack) +{ + int32_t send; + + send = 0; + + /* normal mode */ + if (s->tcb.snd.fastack == 0) { + + send = 1; + + /* RFC 6582 3.2.2 switch to fast retransmit mode */ + if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt && + s->tcb.snd.una >= s->tcb.snd.rcvr) { + + start_fast_retransmit(s); + in_fast_retransmit(s, + tack->ack - tack->dup3.ack, + tack->segs.ack - tack->dup3.seg - 1, + tack->segs.dup); + + /* remain in normal mode */ + } else if (acked != 0) { + ack_cwnd_update(&s->tcb, acked, tack); + timer_stop(s); + } + + /* fast retransmit mode */ + } else { + + /* remain in fast retransmit mode */ + if (s->tcb.snd.una < s->tcb.snd.rcvr) { + + send = in_fast_retransmit(s, acked, tack->segs.ack, + tack->segs.dup); + } else { + /* RFC 5682 3.2.3 full ACK */ + stop_fast_retransmit(s); + timer_stop(s); + + /* if we have another series of dup ACKs */ + if (tack->dup3.seg != 0 && + s->tcb.snd.una != s->tcb.snd.nxt && + tcp_seq_leq((uint32_t)s->tcb.snd.rcvr, + tack->dup3.ack)) { + + /* restart fast retransmit again. */ + start_fast_retransmit(s); + send = in_fast_retransmit(s, + tack->ack - tack->dup3.ack, + tack->segs.ack - tack->dup3.seg - 1, + tack->segs.dup); + } + } + } + + return send; +} + +/* + * our FIN was acked, stop rto timer, change stream state, + * and possibly close the stream. + */ +static inline void +rx_ackfin(struct tle_tcp_stream *s) +{ + uint32_t state; + + s->tcb.snd.una = s->tcb.snd.fss; + empty_mbuf_ring(s->tx.q); + + state = s->tcb.state; + if (state == TCP_ST_LAST_ACK) + stream_term(s); + else if (state == TCP_ST_FIN_WAIT_1) { + timer_stop(s); + s->tcb.state = TCP_ST_FIN_WAIT_2; + } else if (state == TCP_ST_CLOSING) { + s->tcb.state = TCP_ST_TIME_WAIT; + s->tcb.snd.rto = TCP_RTO_2MSL; + timer_reset(s); + } +} + +static inline void +rx_process_ack(struct tle_tcp_stream *s, uint32_t ts, + const struct dack_info *tack) +{ + int32_t send; + uint32_t n; + + s->tcb.rcv.dupack = tack->segs.dup; + + n = rx_ackdata(s, tack->ack); + send = process_ack(s, n, tack); + + /* try to send more data. */ + if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0) + txs_enqueue(s->s.ctx, s); + + /* restart RTO timer. */ + if (s->tcb.snd.nxt != s->tcb.snd.una) + timer_start(s); + + /* update rto, if fresh packet is here then calculate rtt */ + if (tack->ts.ecr != 0) + rto_estimate(&s->tcb, ts - tack->ts.ecr); +} + +/* + * process + * returns negative value on failure, or zero on success. + */ +static inline int +rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state, + const union seg_info *si, struct rte_mbuf *mb, + struct resp_info *rsp) +{ + struct syn_opts so; + struct tcp_hdr *th; + + if (state != TCP_ST_SYN_SENT) + return -EINVAL; + + /* invalid SEG.SEQ */ + if (si->ack != (uint32_t)s->tcb.snd.nxt) { + rsp->flags = TCP_FLAG_RST; + return 0; + } + + th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *, + mb->l2_len + mb->l3_len); + get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th)); + + s->tcb.so = so; + + s->tcb.snd.una = s->tcb.snd.nxt; + s->tcb.snd.mss = so.mss; + s->tcb.snd.wnd = si->wnd << so.wscale; + s->tcb.snd.wu.wl1 = si->seq; + s->tcb.snd.wu.wl2 = si->ack; + s->tcb.snd.wscale = so.wscale; + + /* setup congestion variables */ + s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss); + s->tcb.snd.ssthresh = s->tcb.snd.wnd; + + s->tcb.rcv.ts = so.ts.val; + s->tcb.rcv.irs = si->seq; + s->tcb.rcv.nxt = si->seq + 1; + + /* calculate initial rto */ + rto_estimate(&s->tcb, ts - s->tcb.snd.ts); + + rsp->flags |= TCP_FLAG_ACK; + + timer_stop(s); + s->tcb.state = TCP_ST_ESTABLISHED; + rte_smp_wmb(); + + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + else if (s->tx.cb.func != NULL) + s->tx.cb.func(s->tx.cb.data, &s->s); + + return 0; +} + +static inline uint32_t +rx_stream(struct tle_tcp_stream *s, uint32_t ts, + const union pkt_info *pi, const union seg_info si[], + struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[], + uint32_t num) +{ + uint32_t i, k, n, state; + int32_t ret; + struct resp_info rsp; + struct dack_info tack; + + k = 0; + rsp.flags = 0; + + state = s->tcb.state; + + /* + * first check for the states/flags where we don't + * expect groups of packets. + */ + + /* process RST */ + if ((pi->tf.flags & TCP_FLAG_RST) != 0) { + for (i = 0; + i != num && + rx_rst(s, state, pi->tf.flags, &si[i]); + i++) + ; + i = 0; + + /* RFC 793: if the ACK bit is off drop the segment and return */ + } else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) { + i = 0; + /* + * first check for the states/flags where we don't + * expect groups of packets. + */ + + /* process */ + } else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) { + ret = 0; + for (i = 0; i != num; i++) { + ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp); + if (ret == 0) + break; + + rc[k] = -ret; + rp[k] = mb[i]; + k++; + } + + /* process FIN */ + } else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) { + ret = 0; + for (i = 0; i != num; i++) { + ret = rx_fin(s, state, &si[i], mb[i], &rsp); + if (ret >= 0) + break; + + rc[k] = -ret; + rp[k] = mb[i]; + k++; + } + i += (ret > 0); + + /* normal data/ack packets */ + } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) { + + /* process incoming data packets. */ + dack_info_init(&tack, &s->tcb); + n = rx_data_ack(s, &tack, si, mb, rp, rc, num); + + /* follow up actions based on aggregated information */ + + /* update SND.WND */ + ack_window_update(&s->tcb, &tack); + + /* + * fast-path: all data & FIN was already sent out + * and now is acknowledged. + */ + if (s->tcb.snd.fss == s->tcb.snd.nxt && + tack.ack == (uint32_t) s->tcb.snd.nxt) + rx_ackfin(s); + else + rx_process_ack(s, ts, &tack); + + /* + * send an immediate ACK if either: + * - received segment with invalid seq/ack number + * - received segment with OFO data + * - received segment with INO data and no TX is scheduled + * for that stream. + */ + if (tack.segs.badseq != 0 || tack.segs.ofo != 0 || + (tack.segs.data != 0 && + rte_atomic32_read(&s->tx.arm) == 0)) + rsp.flags |= TCP_FLAG_ACK; + + rx_ofo_fin(s, &rsp); + + k += num - n; + i = num; + + /* unhandled state, drop all packets. */ + } else + i = 0; + + /* we have a response packet to send. */ + if (rsp.flags == TCP_FLAG_RST) { + send_rst(s, si[i].ack); + stream_term(s); + } else if (rsp.flags != 0) { + send_ack(s, ts, rsp.flags); + + /* start the timer for FIN packet */ + if ((rsp.flags & TCP_FLAG_FIN) != 0) + timer_reset(s); + } + + /* unprocessed packets */ + for (; i != num; i++, k++) { + rc[k] = EINVAL; + rp[k] = mb[i]; + } + + return num - k; +} + +static inline uint32_t +rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, + const union pkt_info pi[], const union seg_info si[], + struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[], + uint32_t num) +{ + struct tle_tcp_stream *s; + uint32_t i, k, state; + int32_t ret; + + s = rx_obtain_stream(dev, st, &pi[0], type); + if (s == NULL) { + for (i = 0; i != num; i++) { + rc[i] = ENOENT; + rp[i] = mb[i]; + } + return 0; + } + + k = 0; + state = s->tcb.state; + + if (state == TCP_ST_LISTEN) { + + /* one connection per flow */ + ret = EINVAL; + for (i = 0; i != num && ret != 0; i++) { + ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i]); + if (ret != 0) { + rc[k] = -ret; + rp[k] = mb[i]; + k++; + } + } + /* duplicate SYN requests */ + for (; i != num; i++, k++) { + rc[k] = EINVAL; + rp[k] = mb[i]; + } + + if (k != num && s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + else if (s->rx.cb.func != NULL && rte_ring_count(s->rx.q) == 1) + s->rx.cb.func(s->rx.cb.data, &s->s); + + } else { + i = rx_stream(s, ts, pi, si, mb, rp, rc, num); + k = num - i; + } + + rwl_release(&s->rx.use); + return num - k; +} + + +static inline uint32_t +rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts, + const union pkt_info pi[], const union seg_info si[], + struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[], + uint32_t num) +{ + struct tle_tcp_stream *s; + uint32_t i, k; + int32_t ret; + + s = rx_obtain_listen_stream(dev, &pi[0], type); + if (s == NULL) { + for (i = 0; i != num; i++) { + rc[i] = ENOENT; + rp[i] = mb[i]; + } + return 0; + } + + k = 0; + for (i = 0; i != num; i++) { + + /* check that this remote is allowed to connect */ + if (rx_check_stream(s, &pi[i]) != 0) + ret = -ENOENT; + else + /* syncokie: reply with */ + ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]); + + if (ret != 0) { + rc[k] = -ret; + rp[k] = mb[i]; + k++; + } + } + + rwl_release(&s->rx.use); + return num - k; +} + +uint16_t +tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + struct stbl *st; + uint32_t i, j, k, n, t, ts; + uint64_t csf; + union pkt_info pi[num]; + union seg_info si[num]; + union { + uint8_t t[TLE_VNUM]; + uint32_t raw; + } stu; + + ts = tcp_get_tms(); + st = CTX_TCP_STLB(dev->ctx); + + stu.raw = 0; + + /* extract packet info and check the L3/L4 csums */ + for (i = 0; i != num; i++) { + + get_pkt_info(pkt[i], &pi[i], &si[i]); + + t = pi[i].tf.type; + csf = dev->rx.ol_flags[t] & + (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD); + + /* check csums in SW */ + if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf, + pi[i].tf.type, IPPROTO_TCP) != 0) + pi[i].csf = csf; + + stu.t[t] = 1; + } + + if (stu.t[TLE_V4] != 0) + stbl_lock(st, TLE_V4); + if (stu.t[TLE_V6] != 0) + stbl_lock(st, TLE_V6); + + k = 0; + for (i = 0; i != num; i += j) { + + t = pi[i].tf.type; + + /*basic checks for incoming packet */ + if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) { + rc[k] = EINVAL; + rp[k] = pkt[i]; + j = 1; + k++; + /* process input SYN packets */ + } else if (pi[i].tf.flags == TCP_FLAG_SYN) { + j = pkt_info_bulk_syneq(pi + i, num - i); + n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i, + rp + k, rc + k, j); + k += j - n; + } else { + j = pkt_info_bulk_eq(pi + i, num - i); + n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i, + rp + k, rc + k, j); + k += j - n; + } + } + + if (stu.t[TLE_V4] != 0) + stbl_unlock(st, TLE_V4); + if (stu.t[TLE_V6] != 0) + stbl_unlock(st, TLE_V6); + + return num - k; +} + +uint16_t +tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[], + uint32_t num) +{ + uint32_t i, n; + struct tle_tcp_stream *s; + struct stbl_entry *se[num]; + + s = TCP_STREAM(ts); + n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)se, num); + if (n == 0) + return 0; + + for (i = 0; i != n; i++) { + rq[i].pkt = stbl_get_pkt(se[i]); + rq[i].opaque = se[i]; + } + + /* + * if we still have packets to read, + * then rearm stream RX event. + */ + if (n == num && rte_ring_count(s->rx.q) != 0) { + if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + rwl_release(&s->rx.use); + } + + return n; +} + +static inline int +stream_fill_dest(struct tle_tcp_stream *s) +{ + int32_t rc; + const void *da; + + if (s->s.type == TLE_V4) + da = &s->s.ipv4.addr.src; + else + da = &s->s.ipv6.addr.src; + + rc = stream_get_dest(&s->s, da, &s->tx.dst); + return (rc < 0) ? rc : 0; +} + +/* + * helper function, prepares an accepted stream. + */ +static int +accept_fill_stream(struct tle_tcp_stream *ps, struct tle_tcp_stream *cs, + const struct tle_tcp_accept_param *prm, uint32_t tms, + const union pkt_info *pi, const union seg_info *si) +{ + int32_t rc; + uint32_t rtt; + + /* some TX still pending for that stream. */ + if (TCP_STREAM_TX_PENDING(cs)) + return -EAGAIN; + + /* setup L4 ports and L3 addresses fields. */ + cs->s.port.raw = pi->port.raw; + cs->s.pmsk.raw = UINT32_MAX; + + if (pi->tf.type == TLE_V4) { + cs->s.ipv4.addr = pi->addr4; + cs->s.ipv4.mask.src = INADDR_NONE; + cs->s.ipv4.mask.dst = INADDR_NONE; + } else if (pi->tf.type == TLE_V6) { + cs->s.ipv6.addr = *pi->addr6; + rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none, + sizeof(cs->s.ipv6.mask.src)); + rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none, + sizeof(cs->s.ipv6.mask.dst)); + } + + /* setup TCB */ + sync_fill_tcb(&cs->tcb, si, prm->syn.pkt); + cs->tcb.rcv.wnd = cs->rx.q->prod.mask << cs->tcb.rcv.wscale; + + /* setup stream notification menchanism */ + cs->rx.ev = prm->cfg.recv_ev; + cs->rx.cb = prm->cfg.recv_cb; + cs->tx.ev = prm->cfg.send_ev; + cs->tx.cb = prm->cfg.send_cb; + cs->err.ev = prm->cfg.err_ev; + cs->err.cb = prm->cfg.err_cb; + + /* store other params */ + cs->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries : + TLE_TCP_DEFAULT_RETRIES; + + /* + * estimate the rto + * for now rtt is calculated based on the tcp TMS option, + * later add real-time one + */ + if (cs->tcb.so.ts.ecr) { + rtt = tms - cs->tcb.so.ts.ecr; + rto_estimate(&cs->tcb, rtt); + } else + cs->tcb.snd.rto = TCP_RTO_DEFAULT; + + tcp_stream_up(cs); + + /* copy streams type. */ + cs->s.type = ps->s.type; + + /* retrive and cache destination information. */ + rc = stream_fill_dest(cs); + if (rc != 0) + return rc; + + /* update snd.mss with SMSS value */ + cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst); + + /* setup congestion variables */ + cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss); + cs->tcb.snd.ssthresh = cs->tcb.snd.wnd; + + cs->tcb.state = TCP_ST_ESTABLISHED; + cs->tcb.uop |= TCP_OP_ACCEPT; + + /* add stream to the table */ + cs->ste = prm->syn.opaque; + rte_smp_wmb(); + cs->ste->data = cs; + return 0; +} + +/* + * !!! + * Right now new stream rcv.wnd is set to zero. + * That simplifies handling of new connection establishment + * (as no data segments could be received), + * but has to be addressed. + * possible ways: + * - send ack after accept creates new stream with new rcv.wnd value. + * the problem with that approach that single ack is not delivered + * reliably (could be lost), plus might slowdown connection establishment + * (extra packet per connection, that client has to wait for). + * - allocate new stream at ACK recieve stage. + * As a drawback - whole new stream allocation/connection establishment + * will be done in BE. + * !!! + */ +int +tle_tcp_stream_accept(struct tle_stream *ts, + const struct tle_tcp_accept_param prm[], struct tle_stream *rs[], + uint32_t num) +{ + struct tle_tcp_stream *cs, *s; + struct tle_ctx *ctx; + uint32_t i, j, n, tms; + int32_t rc; + union pkt_info pi[num]; + union seg_info si[num]; + + tms = tcp_get_tms(); + s = TCP_STREAM(ts); + + for (i = 0; i != num; i++) + get_pkt_info(prm[i].syn.pkt, &pi[i], &si[i]); + + /* mark stream as not closable */ + if (rwl_acquire(&s->rx.use) < 0) + return -EINVAL; + + ctx = s->s.ctx; + n = get_streams(ctx, rs, num); + + rc = 0; + for (i = 0; i != n; i++) { + + /* prepare new stream */ + cs = TCP_STREAM(rs[i]); + rc = accept_fill_stream(s, cs, prm + i, tms, pi + i, si + i); + if (rc != 0) + break; + } + + rwl_release(&s->rx.use); + + /* free 'SYN' mbufs. */ + for (j = 0; j != i; j++) + rte_pktmbuf_free(prm[j].syn.pkt); + + /* close failed stream, put unused streams back to the free list. */ + if (rc != 0) { + tle_tcp_stream_close(rs[i]); + for (j = i + 1; j != n; j++) { + cs = TCP_STREAM(rs[j]); + put_stream(ctx, rs[j], TCP_STREAM_TX_PENDING(cs)); + } + rte_errno = -rc; + + /* not enough streams are available */ + } else if (n != num) + rte_errno = ENFILE; + + return i; +} + +/* + * !!! implement a proper one, or delete !!! + * need to make sure no race conditions with add/lookup stream table. + */ +void +tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[], + uint32_t num) +{ + uint32_t i; + struct rte_mbuf *mb; + struct stbl *st; + union pkt_info pi; + union seg_info si; + + st = CTX_TCP_STLB(s->ctx); + + for (i = 0; i != num; i++) { + mb = rq[i].pkt; + get_pkt_info(mb, &pi, &si); + if (pi.tf.type < TLE_VNUM) + stbl_del_pkt_lock(st, rq[i].opaque, &pi); + + /* !!! send RST pkt to the peer !!! */ + rte_pktmbuf_free(mb); + } +} + +uint16_t +tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num) +{ + uint32_t i, j, k, n; + struct tle_drb *drb[num]; + struct tle_tcp_stream *s; + + /* extract packets from device TX queue. */ + + k = num; + n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt, + num, drb, &k); + + if (n == 0) + return 0; + + /* free empty drbs and notify related streams. */ + + for (i = 0; i != k; i = j) { + s = drb[i]->udata; + for (j = i + 1; j != k && s == drb[j]->udata; j++) + ; + stream_drb_free(s, drb + i, j - i); + } + + return n; +} + +static inline void +stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi) +{ + if (s->s.type == TLE_V4) + pi->addr4 = s->s.ipv4.addr; + else + pi->addr6 = &s->s.ipv6.addr; + + pi->port = s->s.port; + pi->tf.type = s->s.type; +} + +static int +stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr) +{ + const struct sockaddr_in *in4; + const struct sockaddr_in6 *in6; + const struct tle_dev_param *prm; + int32_t rc; + + rc = 0; + s->s.pmsk.raw = UINT32_MAX; + + /* setup L4 src ports and src address fields. */ + if (s->s.type == TLE_V4) { + in4 = (const struct sockaddr_in *)addr; + if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0) + return -EINVAL; + + s->s.port.src = in4->sin_port; + s->s.ipv4.addr.src = in4->sin_addr.s_addr; + s->s.ipv4.mask.src = INADDR_NONE; + s->s.ipv4.mask.dst = INADDR_NONE; + + } else if (s->s.type == TLE_V6) { + in6 = (const struct sockaddr_in6 *)addr; + if (memcmp(&in6->sin6_addr, &tle_ipv6_any, + sizeof(tle_ipv6_any)) == 0 || + in6->sin6_port == 0) + return -EINVAL; + + s->s.port.src = in6->sin6_port; + rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr, + sizeof(s->s.ipv6.addr.src)); + rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none, + sizeof(s->s.ipv6.mask.src)); + rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none, + sizeof(s->s.ipv6.mask.dst)); + } + + /* setup the destination device. */ + rc = stream_fill_dest(s); + if (rc != 0) + return rc; + + /* setup L4 dst address from device param */ + prm = &s->tx.dst.dev->prm; + if (s->s.type == TLE_V4) { + if (s->s.ipv4.addr.dst == INADDR_ANY) + s->s.ipv4.addr.dst = prm->local_addr4.s_addr; + } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any, + sizeof(tle_ipv6_any)) == 0) + memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6, + sizeof(s->s.ipv6.addr.dst)); + + return rc; +} + +static inline int +tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) +{ + int32_t rc; + uint32_t tms, seq; + union pkt_info pi; + struct stbl *st; + struct stbl_entry *se; + + /* fill stream address */ + rc = stream_fill_addr(s, addr); + if (rc != 0) + return rc; + + /* fill pkt info to generate seq.*/ + stream_fill_pkt_info(s, &pi); + + tms = tcp_get_tms(); + s->tcb.so.ts.val = tms; + s->tcb.so.ts.ecr = 0; + s->tcb.so.wscale = TCP_WSCALE_DEFAULT; + s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst); + + /* note that rcv.nxt is 0 here for sync_gen_seq.*/ + seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss); + s->tcb.snd.iss = seq; + s->tcb.snd.rcvr = seq; + s->tcb.snd.una = seq; + s->tcb.snd.nxt = seq + 1; + s->tcb.snd.rto = TCP_RTO_DEFAULT; + s->tcb.snd.ts = tms; + + s->tcb.rcv.mss = s->tcb.so.mss; + s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT; + s->tcb.rcv.wnd = s->rx.q->prod.mask << s->tcb.rcv.wscale; + s->tcb.rcv.ts = 0; + + /* add the stream in stream table */ + st = CTX_TCP_STLB(s->s.ctx); + se = stbl_add_stream_lock(st, s); + if (se == NULL) + return -ENOBUFS; + s->ste = se; + + /* put stream into the to-send queue */ + txs_enqueue(s->s.ctx, s); + + return 0; +} + +int +tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr) +{ + struct tle_tcp_stream *s; + uint32_t type; + int32_t rc; + + if (ts == NULL || addr == NULL) + return -EINVAL; + + s = TCP_STREAM(ts); + type = s->s.type; + if (type >= TLE_VNUM) + return -EINVAL; + + if (rwl_try_acquire(&s->tx.use) > 0) { + rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED, + TCP_ST_SYN_SENT); + rc = (rc == 0) ? -EDEADLK : 0; + } else + rc = -EINVAL; + + if (rc != 0) { + rwl_release(&s->tx.use); + return rc; + } + + /* fill stream, prepare and transmit syn pkt */ + s->tcb.uop |= TCP_OP_CONNECT; + rc = tx_syn(s, addr); + rwl_release(&s->tx.use); + + /* error happened, do a cleanup */ + if (rc != 0) + tle_tcp_stream_close(ts); + + return rc; +} + +uint16_t +tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) +{ + uint32_t n; + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num); + if (n == 0) + return 0; + + /* + * if we still have packets to read, + * then rearm stream RX event. + */ + if (n == num && rte_ring_count(s->rx.q) != 0) { + if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + rwl_release(&s->rx.use); + } + + return n; +} + +uint16_t +tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num) +{ + uint32_t i, j, mss, n, state, type; + uint64_t ol_flags; + struct tle_tcp_stream *s; + struct tle_dev *dev; + + s = TCP_STREAM(ts); + + /* mark stream as not closable. */ + if (rwl_acquire(&s->tx.use) < 0) { + rte_errno = EAGAIN; + return 0; + } + + state = s->tcb.state; + if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) { + rte_errno = ENOTCONN; + n = 0; + } else { + mss = s->tcb.snd.mss; + dev = s->tx.dst.dev; + type = s->s.type; + ol_flags = dev->tx.ol_flags[type]; + + /* prepare and check for TX */ + for (i = 0; i != num; i++) { + + /* !!! need to be modified !!! */ + if (pkt[i]->pkt_len > mss || + pkt[i]->nb_segs > TCP_MAX_PKT_SEG) { + rte_errno = EBADMSG; + break; + } else if (tcp_fill_mbuf(pkt[i], s, &s->tx.dst, + ol_flags, s->s.port, 0, TCP_FLAG_ACK, + 0, 0) != 0) + break; + } + + /* queue packets for further transmision. */ + n = rte_ring_mp_enqueue_burst(s->tx.q, (void **)pkt, i); + + /* notify BE about more data to send */ + if (n != 0) + txs_enqueue(s->s.ctx, s); + + /* + * for unsent, but already modified packets: + * remove pkt l2/l3 headers, restore ol_flags + */ + if (n != i) { + ol_flags = ~dev->tx.ol_flags[type]; + for (j = n; j != i; j++) { + rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len + + pkt[j]->l3_len + pkt[j]->l4_len); + pkt[j]->ol_flags &= ol_flags; + } + /* if possible, rearm stream write event. */ + } else if (rte_ring_free_count(s->tx.q) != 0 && + s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + } + + rwl_release(&s->tx.use); + return n; +} + +/* send data and FIN (if needed) */ +static inline void +tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state) +{ + /* try to send some data */ + tx_nxt_data(s, tms); + + /* we also have to send a FIN */ + if (state != TCP_ST_ESTABLISHED && + state != TCP_ST_CLOSE_WAIT && + tcp_txq_nxt_cnt(s) == 0 && + s->tcb.snd.fss != s->tcb.snd.nxt) { + s->tcb.snd.fss = ++s->tcb.snd.nxt; + send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK); + } +} + +static inline void +tx_stream(struct tle_tcp_stream *s, uint32_t tms) +{ + uint32_t state; + + state = s->tcb.state; + + if (state == TCP_ST_SYN_SENT) { + /* send the SYN, start the rto timer */ + send_ack(s, tms, TCP_FLAG_SYN); + timer_start(s); + + } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) { + + tx_data_fin(s, tms, state); + + /* start RTO timer. */ + if (s->tcb.snd.nxt != s->tcb.snd.una) + timer_start(s); + } +} + +static inline void +rto_stream(struct tle_tcp_stream *s, uint32_t tms) +{ + uint32_t state; + + state = s->tcb.state; + + TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, " + "retx=%u, retm=%u, " + "rto=%u, snd.ts=%u, tmo=%u, " + "snd.nxt=%lu, snd.una=%lu, flight_size=%lu, " + "snd.rcvr=%lu, snd.fastack=%u, " + "wnd=%u, cwnd=%u, ssthresh=%u, " + "bytes sent=%lu, pkt remain=%u;\n", + __func__, s, tms, s->tcb.state, + s->tcb.snd.nb_retx, s->tcb.snd.nb_retm, + s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts, + s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una, + s->tcb.snd.rcvr, s->tcb.snd.fastack, + s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh, + s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s)); + + if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) { + + if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) { + + /* update SND.CWD and SND.SSTHRESH */ + rto_cwnd_update(&s->tcb); + + /* RFC 6582 3.2.4 */ + s->tcb.snd.rcvr = s->tcb.snd.nxt; + s->tcb.snd.fastack = 0; + + /* restart from last acked data */ + tcp_txq_rst_nxt_head(s); + s->tcb.snd.nxt = s->tcb.snd.una; + + tx_data_fin(s, tms, state); + + } else if (state == TCP_ST_SYN_SENT) { + /* resending SYN */ + s->tcb.so.ts.val = tms; + send_ack(s, tms, TCP_FLAG_SYN); + + } else if (state == TCP_ST_TIME_WAIT) { + stream_term(s); + } + + /* RFC6298:5.5 back off the timer */ + s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto); + s->tcb.snd.nb_retx++; + timer_restart(s); + + } else { + send_rst(s, s->tcb.snd.una); + stream_term(s); + } +} + +int +tle_tcp_process(struct tle_ctx *ctx, uint32_t num) +{ + uint32_t i, k, tms; + struct sdr *dr; + struct tle_timer_wheel *tw; + struct tle_stream *p; + struct tle_tcp_stream *s, *rs[num]; + + /* process streams with RTO exipred */ + + tw = CTX_TCP_TMWHL(ctx); + tms = tcp_get_tms(); + tle_timer_expire(tw, tms); + + k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs)); + + for (i = 0; i != k; i++) { + + s = rs[i]; + s->timer.handle = NULL; + if (rwl_try_acquire(&s->tx.use) > 0) + rto_stream(s, tms); + rwl_release(&s->tx.use); + } + + /* process streams from to-send queue */ + + k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs)); + + for (i = 0; i != k; i++) { + + s = rs[i]; + if (rwl_try_acquire(&s->tx.use) > 0 && + rte_atomic32_read(&s->tx.arm) > 0) { + rte_atomic32_set(&s->tx.arm, 0); + tx_stream(s, tms); + } + rwl_release(&s->tx.use); + } + + /* collect streams to close from the death row */ + + dr = CTX_TCP_SDR(ctx); + for (k = 0, p = STAILQ_FIRST(&dr->be); + k != num && p != NULL; + k++, p = STAILQ_NEXT(p, link)) + rs[k] = TCP_STREAM(p); + + if (p == NULL) + STAILQ_INIT(&dr->be); + else + STAILQ_FIRST(&dr->be) = p; + + /* cleanup closed streams */ + for (i = 0; i != k; i++) { + s = rs[i]; + tcp_stream_down(s); + tcp_stream_reset(ctx, s); + } + + return 0; +} diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c new file mode 100644 index 0000000..67ed66b --- /dev/null +++ b/lib/libtle_l4p/tcp_stream.c @@ -0,0 +1,522 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "tcp_stream.h" +#include "tcp_timer.h" +#include "stream_table.h" +#include "misc.h" +#include "tcp_ctl.h" +#include "tcp_ofo.h" +#include "tcp_txq.h" + + +static void +unuse_stream(struct tle_tcp_stream *s) +{ + s->s.type = TLE_VNUM; + rte_atomic32_set(&s->rx.use, INT32_MIN); + rte_atomic32_set(&s->tx.use, INT32_MIN); +} + +static void +fini_stream(struct tle_tcp_stream *s) +{ + if (s != NULL) { + rte_free(s->rx.q); + tcp_ofo_free(s->rx.ofo); + rte_free(s->tx.q); + rte_free(s->tx.drb.r); + } +} + +static void +tcp_fini_streams(struct tle_ctx *ctx) +{ + uint32_t i; + struct tcp_streams *ts; + + ts = CTX_TCP_STREAMS(ctx); + if (ts != NULL) { + stbl_fini(&ts->st); + for (i = 0; i != ctx->prm.max_streams; i++) + fini_stream(&ts->s[i]); + + /* free the timer wheel */ + tle_timer_free(ts->tmr); + rte_free(ts->tsq); + + STAILQ_INIT(&ts->dr.fe); + STAILQ_INIT(&ts->dr.be); + } + + rte_free(ts); + ctx->streams.buf = NULL; + STAILQ_INIT(&ctx->streams.free); +} + +static struct rte_ring * +alloc_ring(uint32_t n, uint32_t flags, int32_t socket) +{ + struct rte_ring *r; + size_t sz; + char name[RTE_RING_NAMESIZE]; + + n = rte_align32pow2(n); + sz = sizeof(*r) + n * sizeof(r->ring[0]); + + r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, socket); + if (r == NULL) { + TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, sz, socket, rte_errno); + return NULL; + } + + snprintf(name, sizeof(name), "%p@%zu", r, sz); + rte_ring_init(r, name, n, flags); + return r; +} + +static int +init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s) +{ + size_t bsz, rsz, sz; + uint32_t i, k, n, nb; + struct tle_drb *drb; + char name[RTE_RING_NAMESIZE]; + + /* init RX part. */ + + n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); + s->rx.q = alloc_ring(n, RING_F_SP_ENQ, ctx->prm.socket_id); + if (s->rx.q == NULL) + return -ENOMEM; + + s->rx.ofo = tcp_ofo_alloc(n, ctx->prm.socket_id); + if (s->rx.ofo == NULL) + return -ENOMEM; + + /* init TX part. */ + + n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U); + s->tx.q = alloc_ring(n, RING_F_SC_DEQ, ctx->prm.socket_id); + if (s->tx.q == NULL) + return -ENOMEM; + + nb = drb_nb_elem(ctx); + k = calc_stream_drb_num(ctx, nb); + n = rte_align32pow2(k); + + /* size of the drbs ring */ + rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]); + rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE); + + /* size of the drb. */ + bsz = tle_drb_calc_size(nb); + + /* total stream drbs size. */ + sz = rsz + bsz * k; + + s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s->tx.drb.r == NULL) { + TCP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, s, sz, ctx->prm.socket_id, rte_errno); + return -ENOMEM; + } + + snprintf(name, sizeof(name), "%p@%zu", s, sz); + rte_ring_init(s->tx.drb.r, name, n, 0); + + for (i = 0; i != k; i++) { + drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r + + rsz + bsz * i); + drb->udata = s; + drb->size = nb; + rte_ring_enqueue(s->tx.drb.r, drb); + } + + s->tx.drb.nb_elem = nb; + s->tx.drb.nb_max = k; + + /* mark stream as avaialble to use. */ + + s->s.ctx = ctx; + unuse_stream(s); + STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link); + + return 0; +} + +static void +tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb) +{ + struct tle_tcp_stream *us; + + us = (struct tle_tcp_stream *)s; + rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb); +} + +static struct tle_timer_wheel * +alloc_timers(uint32_t num, int32_t socket) +{ + struct tle_timer_wheel_args twprm; + + twprm.tick_size = TCP_RTO_GRANULARITY; + twprm.max_timer = num; + twprm.socket_id = socket; + return tle_timer_create(&twprm, tcp_get_tms()); +} + +static int +tcp_init_streams(struct tle_ctx *ctx) +{ + size_t sz; + uint32_t i; + int32_t rc; + struct tcp_streams *ts; + + sz = sizeof(*ts) + sizeof(ts->s[0]) * ctx->prm.max_streams; + ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (ts == NULL) { + TCP_LOG(ERR, "allocation of %zu bytes on socket %d " + "for %u tcp_streams failed\n", + sz, ctx->prm.socket_id, ctx->prm.max_streams); + return -ENOMEM; + } + + STAILQ_INIT(&ts->dr.fe); + STAILQ_INIT(&ts->dr.be); + + ctx->streams.buf = ts; + STAILQ_INIT(&ctx->streams.free); + + ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->prm.socket_id); + if (ts->tmr == NULL) { + TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n", + ctx, rte_errno); + rc = -ENOMEM; + } else { + ts->tsq = alloc_ring(ctx->prm.max_streams, + RING_F_SC_DEQ, ctx->prm.socket_id); + if (ts->tsq == NULL) + rc = -ENOMEM; + else + rc = stbl_init(&ts->st, ctx->prm.max_streams, + ctx->prm.socket_id); + } + + for (i = 0; rc == 0 && i != ctx->prm.max_streams; i++) + rc = init_stream(ctx, &ts->s[i]); + + if (rc != 0) { + TCP_LOG(ERR, "initalisation of %u-th stream failed", i); + tcp_fini_streams(ctx); + } + + return rc; +} + +static void __attribute__((constructor)) +tcp_stream_setup(void) +{ + static const struct stream_ops tcp_ops = { + .init_streams = tcp_init_streams, + .fini_streams = tcp_fini_streams, + .free_drbs = tcp_free_drbs, + }; + + tle_stream_ops[TLE_PROTO_TCP] = tcp_ops; +} + +/* + * Helper routine, check that input event and callback are mutually exclusive. + */ +static int +check_cbev(const struct tle_event *ev, const struct tle_stream_cb *cb) +{ + if (ev != NULL && cb->func != NULL) + return -EINVAL; + return 0; +} + +static int +check_stream_prm(const struct tle_ctx *ctx, + const struct tle_tcp_stream_param *prm) +{ + if ((prm->addr.local.ss_family != AF_INET && + prm->addr.local.ss_family != AF_INET6) || + prm->addr.local.ss_family != prm->addr.remote.ss_family) + return -EINVAL; + + /* callback and event notifications mechanisms are mutually exclusive */ + if (check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 || + check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 || + check_cbev(prm->cfg.err_ev, &prm->cfg.err_cb) != 0) + return -EINVAL; + + /* check does context support desired address family. */ + if ((prm->addr.local.ss_family == AF_INET && + ctx->prm.lookup4 == NULL) || + (prm->addr.local.ss_family == AF_INET6 && + ctx->prm.lookup6 == NULL)) + return -EINVAL; + + return 0; +} + +struct tle_stream * +tle_tcp_stream_open(struct tle_ctx *ctx, + const struct tle_tcp_stream_param *prm) +{ + struct tle_tcp_stream *s; + int32_t rc; + + if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) { + rte_errno = EINVAL; + return NULL; + } + + s = (struct tle_tcp_stream *)get_stream(ctx); + if (s == NULL) { + rte_errno = ENFILE; + return NULL; + + /* some TX still pending for that stream. */ + } else if (TCP_STREAM_TX_PENDING(s)) { + put_stream(ctx, &s->s, 0); + rte_errno = EAGAIN; + return NULL; + } + + /* setup L4 ports and L3 addresses fields. */ + rc = stream_fill_ctx(ctx, &s->s, + (const struct sockaddr *)&prm->addr.local, + (const struct sockaddr *)&prm->addr.remote); + + if (rc != 0) { + put_stream(ctx, &s->s, 1); + rte_errno = rc; + return NULL; + } + + /* setup stream notification menchanism */ + s->rx.ev = prm->cfg.recv_ev; + s->rx.cb = prm->cfg.recv_cb; + s->tx.ev = prm->cfg.send_ev; + s->tx.cb = prm->cfg.send_cb; + s->err.ev = prm->cfg.err_ev; + s->err.cb = prm->cfg.err_cb; + + /* store other params */ + s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries : + TLE_TCP_DEFAULT_RETRIES; + + tcp_stream_up(s); + return &s->s; +} + +/* + * Helper functions, used by close API. + */ +static inline int +stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s) +{ + uint16_t uop; + uint32_t state; + static const struct tle_stream_cb zcb; + + /* check was close() already invoked */ + uop = s->tcb.uop; + if ((uop & TCP_OP_CLOSE) != 0) + return -EDEADLK; + + /* record that close() was already invoked */ + if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0) + return -EDEADLK; + + /* mark stream as unavaialbe for RX/TX. */ + tcp_stream_down(s); + + /* reset events/callbacks */ + s->rx.ev = NULL; + s->tx.ev = NULL; + s->err.ev = NULL; + + s->rx.cb = zcb; + s->tx.cb = zcb; + s->err.cb = zcb; + + state = s->tcb.state; + + /* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */ + if (state <= TCP_ST_SYN_SENT) { + tcp_stream_reset(ctx, s); + return 0; + } + + /* generate FIN and proceed with normal connection termination */ + if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) { + + /* change state */ + s->tcb.state = (state == TCP_ST_ESTABLISHED) ? + TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK; + + /* mark stream as writable/readable again */ + tcp_stream_up(s); + + /* queue stream into to-send queue */ + txs_enqueue(ctx, s); + return 0; + } + + /* + * accroding to the state, close() was already invoked, + * should never that point. + */ + RTE_ASSERT(0); + return -EINVAL; +} + +uint32_t +tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num) +{ + int32_t rc; + uint32_t i; + struct tle_ctx *ctx; + struct tle_tcp_stream *s; + + rc = 0; + + for (i = 0; i != num; i++) { + + s = TCP_STREAM(ts[i]); + if (ts[i] == NULL || s->s.type >= TLE_VNUM) { + rc = EINVAL; + break; + } + + ctx = s->s.ctx; + rc = stream_close(ctx, s); + if (rc != 0) + break; + } + + if (rc != 0) + rte_errno = -rc; + return i; +} + +int +tle_tcp_stream_close(struct tle_stream *ts) +{ + struct tle_ctx *ctx; + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + if (ts == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + ctx = s->s.ctx; + + /* reset stream events if any. */ + if (s->rx.ev != NULL) + tle_event_idle(s->rx.ev); + if (s->tx.ev != NULL) + tle_event_idle(s->tx.ev); + if (s->err.ev != NULL) + tle_event_idle(s->err.ev); + + return stream_close(ctx, s); +} + +int +tle_tcp_stream_get_addr(const struct tle_stream *ts, + struct tle_tcp_stream_addr *addr) +{ + struct sockaddr_in *lin4, *rin4; + struct sockaddr_in6 *lin6, *rin6; + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + if (addr == NULL || ts == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + if (s->s.type == TLE_V4) { + + lin4 = (struct sockaddr_in *)&addr->local; + rin4 = (struct sockaddr_in *)&addr->remote; + + addr->local.ss_family = AF_INET; + addr->remote.ss_family = AF_INET; + + lin4->sin_port = s->s.port.dst; + rin4->sin_port = s->s.port.src; + lin4->sin_addr.s_addr = s->s.ipv4.addr.dst; + rin4->sin_addr.s_addr = s->s.ipv4.addr.src; + + } else if (s->s.type == TLE_V6) { + + lin6 = (struct sockaddr_in6 *)&addr->local; + rin6 = (struct sockaddr_in6 *)&addr->remote; + + addr->local.ss_family = AF_INET6; + addr->remote.ss_family = AF_INET6; + + lin6->sin6_port = s->s.port.dst; + rin6->sin6_port = s->s.port.src; + memcpy(&lin6->sin6_addr, &s->s.ipv6.addr.dst, + sizeof(lin6->sin6_addr)); + memcpy(&rin6->sin6_addr, &s->s.ipv6.addr.src, + sizeof(rin6->sin6_addr)); + } + + return 0; +} + +int +tle_tcp_stream_listen(struct tle_stream *ts) +{ + struct tle_tcp_stream *s; + int32_t rc; + + s = TCP_STREAM(ts); + if (ts == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + /* mark stream as not closable. */ + if (rwl_try_acquire(&s->rx.use) > 0) { + rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED, + TCP_ST_LISTEN); + if (rc != 0) { + s->tcb.uop |= TCP_OP_LISTEN; + rc = 0; + } else + rc = -EDEADLK; + } else + rc = -EINVAL; + + rwl_release(&s->rx.use); + return rc; +} diff --git a/lib/libtle_l4p/tcp_stream.h b/lib/libtle_l4p/tcp_stream.h new file mode 100644 index 0000000..04c2f88 --- /dev/null +++ b/lib/libtle_l4p/tcp_stream.h @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TCP_STREAM_H_ +#define _TCP_STREAM_H_ + +#include +#include +#include +#include + +#include "stream.h" +#include "misc.h" +#include "tcp_misc.h" + +#ifdef __cplusplus +extern "C" { +#endif + +enum { + TCP_ST_CLOSED, + TCP_ST_LISTEN, + TCP_ST_SYN_SENT, + TCP_ST_SYN_RCVD, + TCP_ST_ESTABLISHED, + TCP_ST_FIN_WAIT_1, + TCP_ST_FIN_WAIT_2, + TCP_ST_CLOSE_WAIT, + TCP_ST_CLOSING, + TCP_ST_LAST_ACK, + TCP_ST_TIME_WAIT, + TCP_ST_NUM +}; + +enum { + TCP_OP_LISTEN = 0x1, + TCP_OP_ACCEPT = 0x2, + TCP_OP_CONNECT = 0x4, + TCP_OP_CLOSE = 0x8, +}; + +struct tcb { + volatile uint16_t state; + volatile uint16_t uop; /* operations by user performed */ + struct { + uint32_t nxt; + uint32_t irs; /* initial received sequence */ + uint32_t wnd; + uint32_t ts; + struct { + uint32_t seq; + uint32_t on; + } frs; + uint32_t srtt; /* smoothed round trip time (scaled by >> 3) */ + uint32_t rttvar; /* rtt variance */ + uint16_t mss; + uint8_t wscale; + uint8_t dupack; + } rcv; + struct { + uint64_t nxt; + uint64_t una; + uint64_t rcvr; /* recover RFC 6582 */ + uint64_t fss; /* FIN sequence # */ + uint32_t fastack; /* # of partial acks in fast retransmit */ + uint32_t wnd; + union wui wu; /* window update */ + uint32_t ack; /* last sent ack */ + uint32_t ts; + uint32_t cwnd; /* congestion window */ + uint32_t ssthresh; /* slow start threshold */ + uint32_t rto; /* retransmission timeout */ + uint32_t iss; /* initial send sequence */ + uint16_t mss; + uint8_t wscale; + uint8_t nb_retx; /* number of retransmission */ + uint8_t nb_retm; /**< max number of retx attempts. */ + } snd; + struct syn_opts so; /* initial syn options. */ +}; + + +struct tle_tcp_stream { + + struct tle_stream s; + + struct stbl_entry *ste; /* entry in streams table. */ + struct tcb tcb; + + struct { + void *handle; + } timer; + + struct { + struct tle_event *ev; + struct tle_stream_cb cb; + } err; + + struct { + rte_atomic32_t use; + struct rte_ring *q; /* listen (syn) queue */ + struct ofo *ofo; + struct tle_event *ev; /* user provided recv event. */ + struct tle_stream_cb cb; /* user provided recv callback. */ + } rx __rte_cache_aligned; + + struct { + rte_atomic32_t use; + rte_atomic32_t arm; /* when > 0 stream is in to-send queue */ + struct { + uint32_t nb_elem; /* number of objects per drb. */ + uint32_t nb_max; /* number of drbs per stream. */ + struct rte_ring *r; + } drb; + struct rte_ring *q; /* (re)tx queue */ + struct tle_event *ev; + struct tle_stream_cb cb; + struct tle_dest dst; + } tx __rte_cache_aligned; + +} __rte_cache_aligned; + +#define TCP_STREAM(p) \ +((struct tle_tcp_stream *)((uintptr_t)(p) - offsetof(struct tle_tcp_stream, s))) + +#define TCP_STREAM_TX_PENDING(s) \ + ((s)->tx.drb.nb_max != rte_ring_count((s)->tx.drb.r)) + +#define TCP_STREAM_TX_FINISHED(s) \ + ((s)->tx.drb.nb_max == rte_ring_count((s)->tx.drb.r)) + +#include "stream_table.h" + +struct sdr { + rte_spinlock_t lock; + STAILQ_HEAD(, tle_stream) fe; + STAILQ_HEAD(, tle_stream) be; +}; + +struct tcp_streams { + struct stbl st; + struct tle_timer_wheel *tmr; /* timer wheel */ + struct rte_ring *tsq; /* to-send streams queue */ + struct sdr dr; /* death row for zombie streams */ + struct tle_tcp_stream s[]; /* array of allocated streams. */ +}; + +#define CTX_TCP_STREAMS(ctx) ((struct tcp_streams *)(ctx)->streams.buf) +#define CTX_TCP_STLB(ctx) (&CTX_TCP_STREAMS(ctx)->st) +#define CTX_TCP_TMWHL(ctx) (CTX_TCP_STREAMS(ctx)->tmr) +#define CTX_TCP_TSQ(ctx) (CTX_TCP_STREAMS(ctx)->tsq) +#define CTX_TCP_SDR(ctx) (&CTX_TCP_STREAMS(ctx)->dr) + +#ifdef __cplusplus +} +#endif + +#endif /* _TCP_STREAM_H_ */ diff --git a/lib/libtle_l4p/tcp_timer.h b/lib/libtle_l4p/tcp_timer.h new file mode 100644 index 0000000..8faefb3 --- /dev/null +++ b/lib/libtle_l4p/tcp_timer.h @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TCP_TIMER_H_ +#define _TCP_TIMER_H_ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * internal defines. + * all RTO values are in ms. + */ +#define TCP_RTO_MAX 60000U /* RFC 6298 (2.5) */ +#define TCP_RTO_MIN 1000U /* RFC 6298 (2.4) */ +#define TCP_RTO_2MSL (2 * TCP_RTO_MAX) +#define TCP_RTO_DEFAULT TCP_RTO_MIN /* RFC 6298 (2.1)*/ +#define TCP_RTO_GRANULARITY 100U + + +static inline void +timer_stop(struct tle_tcp_stream *s) +{ + struct tle_timer_wheel *tw; + + if (s->timer.handle != NULL) { + tw = CTX_TCP_TMWHL(s->s.ctx); + tle_timer_stop(tw, s->timer.handle); + s->timer.handle = NULL; + } +} + +static inline void +timer_start(struct tle_tcp_stream *s) +{ + struct tle_timer_wheel *tw; + + if (s->timer.handle == NULL) { + tw = CTX_TCP_TMWHL(s->s.ctx); + s->timer.handle = tle_timer_start(tw, s, s->tcb.snd.rto); + s->tcb.snd.nb_retx = 0; + } +} + +static inline void +timer_restart(struct tle_tcp_stream *s) +{ + struct tle_timer_wheel *tw; + + tw = CTX_TCP_TMWHL(s->s.ctx); + s->timer.handle = tle_timer_start(tw, s, s->tcb.snd.rto); +} + + +/* + * reset number of retransmissions and restart RTO timer. + */ +static inline void +timer_reset(struct tle_tcp_stream *s) +{ + timer_stop(s); + timer_start(s); +} + +static inline uint32_t +rto_roundup(uint32_t rto) +{ + rto = RTE_MAX(rto, TCP_RTO_MIN); + rto = RTE_MIN(rto, TCP_RTO_MAX); + return rto; +} + +/* + * RFC6298: Computing TCP's Retransmission Timer + * RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'| + * SRTT <- (1 - alpha) * SRTT + alpha * R' + * RTO <- SRTT + max (G, K*RTTVAR) + * the following computation is based on Jacobson'88 paper referenced + * in the RFC6298 +*/ +static inline void +rto_estimate(struct tcb *tcb, int32_t rtt) +{ + uint32_t rto; + + if (!rtt) + rtt = 1; + if (tcb->rcv.srtt) { + rtt -= (tcb->rcv.srtt >> 3); /* alpha = 1/8 */ + tcb->rcv.srtt += rtt; + + if (rtt < 0) + rtt = -rtt; + rtt -= (tcb->rcv.rttvar >> 2); /* beta = 1/4 */ + tcb->rcv.rttvar += rtt; + + } else { + tcb->rcv.srtt = rtt << 3; + tcb->rcv.rttvar = rtt << 1; + } + + rto = (tcb->rcv.srtt >> 3) + + RTE_MAX(TCP_RTO_GRANULARITY, tcb->rcv.rttvar); + tcb->snd.rto = rto_roundup(rto); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _TCP_TIMER_H_ */ diff --git a/lib/libtle_l4p/tcp_txq.h b/lib/libtle_l4p/tcp_txq.h new file mode 100644 index 0000000..0b199ba --- /dev/null +++ b/lib/libtle_l4p/tcp_txq.h @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TCP_TXQ_H_ +#define _TCP_TXQ_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +static inline struct rte_mbuf ** +tcp_txq_get_nxt_objs(const struct tle_tcp_stream *s, uint32_t *num) +{ + uint32_t cnt, head, mask, sz, tail; + struct rte_ring *r; + + r = s->tx.q; + sz = r->prod.size; + mask = r->prod.mask; + head = r->cons.head & mask; + tail = r->prod.tail & mask; + + cnt = (tail >= head) ? tail - head : sz - head; + + *num = cnt; + return (struct rte_mbuf **)(r->ring + head); +} + +static inline struct rte_mbuf ** +tcp_txq_get_una_objs(const struct tle_tcp_stream *s, uint32_t *num) +{ + uint32_t cnt, head, mask, sz, tail; + struct rte_ring *r; + + r = s->tx.q; + sz = r->prod.size; + mask = r->prod.mask; + head = r->prod.tail & mask; + tail = r->cons.tail & mask; + + cnt = (head >= tail) ? head - tail : sz - tail; + + *num = cnt; + return (struct rte_mbuf **)(r->ring + tail); +} + +static inline void +tcp_txq_set_nxt_head(struct tle_tcp_stream *s, uint32_t num) +{ + struct rte_ring *r; + + r = s->tx.q; + r->cons.head += num; +} + +static inline void +tcp_txq_rst_nxt_head(struct tle_tcp_stream *s) +{ + struct rte_ring *r; + + r = s->tx.q; + r->cons.head = r->cons.tail; +} + +static inline void +tcp_txq_set_una_tail(struct tle_tcp_stream *s, uint32_t num) +{ + struct rte_ring *r; + + r = s->tx.q; + rte_smp_rmb(); + r->cons.tail += num; +} + +static inline uint32_t +tcp_txq_nxt_cnt(struct tle_tcp_stream *s) +{ + struct rte_ring *r; + + r = s->tx.q; + return (r->prod.tail - r->cons.head) & r->prod.mask; +} + +static inline void +txs_enqueue(struct tle_ctx *ctx, struct tle_tcp_stream *s) +{ + struct rte_ring *r; + uint32_t n; + + if (rte_atomic32_add_return(&s->tx.arm, 1) == 1) { + r = CTX_TCP_TSQ(ctx); + n = rte_ring_enqueue_burst(r, (void * const *)&s, 1); + RTE_VERIFY(n == 1); + } +} + +static inline uint32_t +txs_dequeue_bulk(struct tle_ctx *ctx, struct tle_tcp_stream *s[], uint32_t num) +{ + struct rte_ring *r; + + r = CTX_TCP_TSQ(ctx); + return rte_ring_dequeue_burst(r, (void **)s, num); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _TCP_TXQ_H_ */ diff --git a/lib/libtle_l4p/tle_ctx.h b/lib/libtle_l4p/tle_ctx.h new file mode 100644 index 0000000..a3516bf --- /dev/null +++ b/lib/libtle_l4p/tle_ctx.h @@ -0,0 +1,233 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_CTX_H_ +#define _TLE_CTX_H_ + +#include +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * - each such ctx represents an 'independent copy of the stack'. + * It owns set of s and s entities and provides + * (de)multiplexing input/output packets from/into devices into/from streams. + * is an abstraction for the underlying device, that is able + * to RX/TX packets and may provide some HW offload capabilities. + * It is a user responsibility to add to the all s, + * that context has to manage, before starting to do stream operations + * (open/send/recv,close) over that context. + * Right now adding/deleting s to the context with open + * streams is not supported. + * represents an L4(UDP/TCP, etc.) endpoint and + * is an analogy to socket entity. + * As with a socket, there are ability to do recv/send over it. + * belongs to particular but is visible globally across + * the process, i.e. any thread within the process can do recv/send over it + * without any further synchronisation. + * While 'upper' layer API is thread safe, lower layer API (rx_bulk/tx_bulk) + * is not thread safe and is not supposed to be run on multiple threads + * in parallel. + * So single thread can drive multiple s and do IO for them, + * but multiple threads can't drive same without some + * explicit synchronization. + */ + +struct tle_ctx; +struct tle_dev; + +/** + * Blocked L4 ports info. + */ +struct tle_bl_port { + uint32_t nb_port; /**< number of blocked ports. */ + const uint16_t *port; /**< list of blocked ports. */ +}; + + +/** + * device parameters. + */ +struct tle_dev_param { + uint32_t rx_offload; /**< DEV_RX_OFFLOAD_* supported. */ + uint32_t tx_offload; /**< DEV_TX_OFFLOAD_* supported. */ + struct in_addr local_addr4; /**< local IPv4 address assigned. */ + struct in6_addr local_addr6; /**< local IPv6 address assigned. */ + struct tle_bl_port bl4; /**< blocked ports for IPv4 address. */ + struct tle_bl_port bl6; /**< blocked ports for IPv4 address. */ +}; + +#define TLE_DST_MAX_HDR 0x60 + +struct tle_dest { + struct rte_mempool *head_mp; + /**< MP for fragment headers and control packets. */ + struct tle_dev *dev; /**< device to send packets through. */ + uint16_t mtu; /**< MTU for given destination. */ + uint8_t l2_len; /**< L2 header length. */ + uint8_t l3_len; /**< L3 header length. */ + uint8_t hdr[TLE_DST_MAX_HDR]; /**< L2/L3 headers. */ +}; + +/** + * context creation parameters. + */ + +enum { + TLE_PROTO_UDP, + TLE_PROTO_TCP, + TLE_PROTO_NUM +}; + +struct tle_ctx_param { + int32_t socket_id; /**< socket ID to allocate memory for. */ + uint32_t proto; /**< L4 proto to handle. */ + uint32_t max_streams; /**< max number of streams in context. */ + uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */ + uint32_t max_stream_sbufs; /**< max send mbufs per stream. */ + uint32_t send_bulk_size; /**< expected # of packets per send call. */ + + int (*lookup4)(void *opaque, const struct in_addr *addr, + struct tle_dest *res); + /**< will be called by send() to get IPv4 packet destination info. */ + void *lookup4_data; + /**< opaque data pointer for lookup4() callback. */ + + int (*lookup6)(void *opaque, const struct in6_addr *addr, + struct tle_dest *res); + /**< will be called by send() to get IPv6 packet destination info. */ + void *lookup6_data; + /**< opaque data pointer for lookup6() callback. */ +}; + +/** + * create L4 processing context. + * @param ctx_prm + * Parameters used to create and initialise the L4 context. + * @return + * Pointer to context structure that can be used in future operations, + * or NULL on error, with error code set in rte_errno. + * + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOMEM - out of memory + */ +struct tle_ctx * +tle_ctx_create(const struct tle_ctx_param *ctx_prm); + +/** + * Destroy given context. + * + * @param ctx + * context to destroy + */ +void tle_ctx_destroy(struct tle_ctx *ctx); + +/** + * Add new device into the given context. + * This function is not multi-thread safe. + * + * @param ctx + * context to add new device into. + * @param dev_prm + * Parameters used to create and initialise new device inside the context. + * @return + * Pointer to device structure that can be used in future operations, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENODEV - max possible value of open devices is reached + * - ENOMEM - out of memory + */ +struct tle_dev * +tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm); + +/** + * Remove and destroy previously added device from the given context. + * This function is not multi-thread safe. + * + * @param dev + * device to remove and destroy. + * @return + * zero on successful completion. + * - -EINVAL - invalid parameter passed to function + */ +int tle_del_dev(struct tle_dev *dev); + +/** + * Flags to the context that destinations info might be changed, + * so if it has any destinations data cached, then + * it has to be invalidated. + * @param ctx + * context to invalidate. + */ +void tle_ctx_invalidate(struct tle_ctx *ctx); + +/** + * Stream asynchronous notification mechanisms: + * a) recv/send callback. + * Stream recv/send notification callbacks behaviour is edge-triggered (ET). + * recv callback will be invoked if stream receive buffer was empty and + * new packet(s) have arrived. + * send callback will be invoked when stream send buffer was full, + * and some packets belonging to that stream were sent + * (part of send buffer became free again). + * Note that both recv and send callbacks are called with sort of read lock + * held on that stream. So it is not permitted to call stream_close() + * within the callback function. Doing that would cause a deadlock. + * While it is allowed to call stream send/recv functions within the + * callback, it is not recommended: callback function will be invoked + * within tle_udp_rx_bulk/tle_udp_tx_bulk context and some heavy processing + * within the callback functions might cause performance degradation + * or even loss of packets for further streams. + * b) recv/send event. + * Stream recv/send events behaviour is level-triggered (LT). + * receive event will be raised by either + * tle_udp_rx_burst() or tle_udp_stream_recv() as long as there are any + * remaining packets inside stream receive buffer. + * send event will be raised by either + * tle_udp_tx_burst() or tle_udp_stream_send() as long as there are any + * free space inside stream send buffer. + * Note that callback and event are mutually exclusive on basis. + * It is not possible to open a stream with both recv event and callback + * specified. + * Though it is possible to open a stream with recv callback and send event, + * or visa-versa. + * If the user doesn't need any notification mechanism for that stream, + * both event and callback could be set to zero. + */ + +struct tle_event; +struct tle_stream; + +/** + * Stream recv/send callback function and data. + */ +struct tle_stream_cb { + void (*func)(void *, struct tle_stream *); + void *data; +}; + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_CTX_H_ */ diff --git a/lib/libtle_l4p/tle_event.h b/lib/libtle_l4p/tle_event.h new file mode 100644 index 0000000..b19954a --- /dev/null +++ b/lib/libtle_l4p/tle_event.h @@ -0,0 +1,278 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _SEV_IMPL_H_ +#define _SEV_IMPL_H_ + +#include +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct tle_evq; + +/** + * Possible states of the event. + */ +enum tle_ev_state { + TLE_SEV_IDLE, + TLE_SEV_DOWN, + TLE_SEV_UP, + TLE_SEV_NUM +}; + +struct tle_event { + TAILQ_ENTRY(tle_event) ql; + struct tle_evq *head; + const void *data; + enum tle_ev_state state; +} __rte_cache_aligned; + +struct tle_evq { + rte_spinlock_t lock; + uint32_t nb_events; + uint32_t nb_armed; + uint32_t nb_free; + TAILQ_HEAD(, tle_event) armed; + TAILQ_HEAD(, tle_event) free; + struct tle_event events[0]; +}; + +/** + * event queue creation parameters. + */ +struct tle_evq_param { + int32_t socket_id; /**< socket ID to allocate memory from. */ + uint32_t max_events; /**< max number of events in queue. */ +}; + +/** + * create event queue. + * @param prm + * Parameters used to create and initialise the queue. + * @return + * Pointer to new event queue structure, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOMEM - out of memory + */ +struct tle_evq *tle_evq_create(const struct tle_evq_param *prm); + +/** + * Destroy given event queue. + * + * @param evq + * event queue to destroy + */ +void tle_evq_destroy(struct tle_evq *evq); + +/** + * allocate a new event within given event queue. + * @param evq + * event queue to allocate a new stream within. + * @param data + * User data to be associated with that event. + * @return + * Pointer to event structure that can be used in future tle_event API calls, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOMEM - max limit of allocated events reached for that context + */ +struct tle_event *tle_event_alloc(struct tle_evq *evq, const void *data); + +/** + * free an allocated event. + * @param ev + * Pointer to the event to free. + */ +void tle_event_free(struct tle_event *ev); + + +/** + * move event from DOWN to UP state. + * @param ev + * Pointer to the event. + */ +static inline void +tle_event_raise(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev->state != TLE_SEV_DOWN) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (ev->state == TLE_SEV_DOWN) { + ev->state = TLE_SEV_UP; + TAILQ_INSERT_TAIL(&q->armed, ev, ql); + q->nb_armed++; + } + rte_spinlock_unlock(&q->lock); +} + +/** + * move event from UP to DOWN state. + * @param ev + * Pointer to the event. + */ +static inline void +tle_event_down(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev->state != TLE_SEV_UP) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (ev->state == TLE_SEV_UP) { + ev->state = TLE_SEV_DOWN; + TAILQ_REMOVE(&q->armed, ev, ql); + q->nb_armed--; + } + rte_spinlock_unlock(&q->lock); +} + +/** + * move from IDLE to DOWN/UP state. + * @param ev + * Pointer to the event. + * @param st + * new state for the event. + */ +static inline void +tle_event_active(struct tle_event *ev, enum tle_ev_state st) +{ + struct tle_evq *q; + + if (ev->state != TLE_SEV_IDLE) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (st > ev->state) { + if (st == TLE_SEV_UP) { + TAILQ_INSERT_TAIL(&q->armed, ev, ql); + q->nb_armed++; + } + ev->state = st; + } + rte_spinlock_unlock(&q->lock); +} + +/** + * move event IDLE state. + * @param ev + * Pointer to the event. + */ +static inline void +tle_event_idle(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev->state == TLE_SEV_IDLE) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (ev->state == TLE_SEV_UP) { + TAILQ_REMOVE(&q->armed, ev, ql); + q->nb_armed--; + } + ev->state = TLE_SEV_IDLE; + rte_spinlock_unlock(&q->lock); +} + +static inline void +tle_evq_idle(struct tle_evq *evq, struct tle_event *ev[], uint32_t num) +{ + uint32_t i, n; + + rte_spinlock_lock(&evq->lock); + + n = 0; + for (i = 0; i != num; i++) { + if (ev[i]->state == TLE_SEV_UP) { + TAILQ_REMOVE(&evq->armed, ev[i], ql); + n++; + } + ev[i]->state = TLE_SEV_IDLE; + } + + evq->nb_armed -= n; + rte_spinlock_unlock(&evq->lock); +} + + +/* + * return up to *num* user data pointers associated with + * the events that were in the UP state. + * Each retrieved event is automatically moved into the DOWN state. + * @param evq + * event queue to retrieve events from. + * @param evd + * An array of user data pointers associated with the events retrieved. + * It must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *evd* array. + * @return + * number of of entries filled inside *evd* array. + */ +static inline int32_t +tle_evq_get(struct tle_evq *evq, const void *evd[], uint32_t num) +{ + uint32_t i, n; + struct tle_event *ev; + + if (evq->nb_armed == 0) + return 0; + + rte_compiler_barrier(); + + rte_spinlock_lock(&evq->lock); + n = RTE_MIN(num, evq->nb_armed); + for (i = 0; i != n; i++) { + ev = TAILQ_FIRST(&evq->armed); + ev->state = TLE_SEV_DOWN; + TAILQ_REMOVE(&evq->armed, ev, ql); + evd[i] = ev->data; + } + evq->nb_armed -= n; + rte_spinlock_unlock(&evq->lock); + return n; +} + + +#ifdef __cplusplus +} +#endif + +#endif /* _SEV_IMPL_H_ */ diff --git a/lib/libtle_l4p/tle_tcp.h b/lib/libtle_l4p/tle_tcp.h new file mode 100644 index 0000000..e6eb336 --- /dev/null +++ b/lib/libtle_l4p/tle_tcp.h @@ -0,0 +1,395 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_TCP_H_ +#define _TLE_TCP_H_ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * TCP stream creation parameters. + */ +struct tle_tcp_stream_addr { + struct sockaddr_storage local; /**< stream local address. */ + struct sockaddr_storage remote; /**< stream remote address. */ +}; + +#define TLE_TCP_DEFAULT_RETRIES 3 + +struct tle_tcp_stream_cfg { + uint8_t nb_retries; /**< max number of retransmission attempts. */ + + /* _cb and _ev are mutually exclusive */ + struct tle_event *err_ev; /**< error event to use. */ + struct tle_stream_cb err_cb; /**< error callback to use. */ + + struct tle_event *recv_ev; /**< recv event to use. */ + struct tle_stream_cb recv_cb; /**< recv callback to use. */ + + struct tle_event *send_ev; /**< send event to use. */ + struct tle_stream_cb send_cb; /**< send callback to use. */ +}; + +struct tle_tcp_stream_param { + struct tle_tcp_stream_addr addr; + struct tle_tcp_stream_cfg cfg; +}; + +/** + * create a new stream within given TCP context. + * @param ctx + * TCP context to create new stream within. + * @param prm + * Parameters used to create and initialise the new stream. + * @return + * Pointer to TCP stream structure that can be used in future TCP API calls, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOFILE - max limit of open streams reached for that context + */ +struct tle_stream * +tle_tcp_stream_open(struct tle_ctx *ctx, + const struct tle_tcp_stream_param *prm); + +/** + * close an open stream. + * if the stream is in connected state, then: + * - connection termination would be performed. + * - if stream contains unsent data, then actual close will be postponed + * till either remaining data will be TX-ed, or timeout will expire. + * All packets that belong to that stream and remain in the device + * TX queue will be kept for father transmission. + * @param s + * Pointer to the stream to close. + * @return + * zero on successful completion. + * - -EINVAL - invalid parameter passed to function + * - -EDEADLK - close was already invoked on that stream + */ +int tle_tcp_stream_close(struct tle_stream *s); + +/** + * close a group of open streams. + * if the stream is in connected state, then: + * - connection termination would be performed. + * - if stream contains unsent data, then actual close will be postponed + * till either remaining data will be TX-ed, or timeout will expire. + * All packets that belong to that stream and remain in the device + * TX queue will be kept for father transmission. + * @param ts + * An array of pointers to streams that have to be closed. + * @param num + * Number of elements in the *ts* array. + * @return + * number of successfully closed streams. + * In case of error, error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - EDEADLK - close was already invoked on that stream + */ +uint32_t +tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num); + +/** + * get open stream local and remote addresses. + * @param s + * Pointer to the stream. + * @return + * zero on successful completion. + * - EINVAL - invalid parameter passed to function + */ +int +tle_tcp_stream_get_addr(const struct tle_stream *s, + struct tle_tcp_stream_addr *addr); + +/** + * Client mode connect API. + */ + +/** + * Attempt to establish connection with the destination TCP endpoint. + * Stream write event (or callback) will fire, if the connection will be + * established successfully. + * Note that stream in listen state or stream with already established + * connection, can't be subject of connect() call. + * In case of unsuccessful attempt, error event (or callback) will be + * activated. + * @param s + * Pointer to the stream. + * @param addr + * Address of the destination endpoint. + * @return + * zero on successful completion. + * - -EINVAL - invalid parameter passed to function + */ +int tle_tcp_stream_connect(struct tle_stream *s, const struct sockaddr *addr); + +/* + * Server mode connect API. + * Basic scheme for server mode API usage: + * + * + * tle_tcp_stream_listen(stream_to_listen); + * + * n = tle_tcp_synreqs(stream_to_listen, syn_reqs, sizeof(syn_reqs)); + * for (i = 0, k = 0; i != n; i++) { + * rc = ; + * if (rc == 0) { + * //proceed with connection establishment + * k++; + * accept_param[k].syn = syn_reqs[i]; + * + * } else { + * //reject connection requests from that endpoint + * rej_reqs[i - k] = syn_reqs[i]; + * } + * } + * + * //reject n - k connection requests + * tle_tcp_reject(stream_to_listen, rej_reqs, n - k); + * + * //accept k new connections + * rc = tle_tcp_accept(stream_to_listen, accept_param, new_con_streams, k); + * + */ + +struct tle_syn_req { + struct rte_mbuf *pkt; + /*< mbuf with incoming connection request. */ + void *opaque; /*< tldk related opaque pointer. */ +}; + +struct tle_tcp_accept_param { + struct tle_syn_req syn; /*< mbuf with incoming SYN request. */ + struct tle_tcp_stream_cfg cfg; /*< stream configure options. */ +}; + + +/** + * Set stream into the listen state (passive opener), i.e. make stream ready + * to accept new connections. + * Stream read event (or callback) will be activated as new SYN requests + * will arrive. + * Note that stream with already established (or establishing) connection + * can't be subject of listen() call. + * @param s + * Pointer to the stream. + * @return + * zero on successful completion. + * - -EINVAL - invalid parameter passed to function + */ +int tle_tcp_stream_listen(struct tle_stream *s); + +/** + * return up to *num* mbufs with SYN requests that were received + * for given TCP endpoint. + * Note that the stream has to be in listen state. + * For each returned mbuf: + * data_off set to the start of the packet + * l2_len, l3_len, l4_len are setup properly + * (so user can still extract L2/L3/L4 header info if needed) + * packet_type RTE_PTYPE_L2/L3/L4 bits are setup properly. + * L3/L4 checksum is verified. + * @param s + * TCP stream to receive packets from. + * @param rq + * An array of tle_syn_req structures that contains + * at least *num* elements in it. + * @param num + * Number of elements in the *pkt* array. + * @return + * number of of entries filled inside *pkt* array. + */ +uint16_t tle_tcp_stream_synreqs(struct tle_stream *s, struct tle_syn_req rq[], + uint32_t num); + +/** + * Accept connection requests for the given stream. + * Note that the stream has to be in listen state. + * For each new connection a new stream will be open. + * @param s + * TCP listen stream. + * @param prm + * An array of *tle_tcp_accept_param* structures that + * contains at least *num* elements in it. + * @param rs + * An array of pointers to *tle_stream* structures that + * must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *prm* and *rs* arrays. + * @return + * number of of entries filled inside *rs* array. + * In case of error, error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENFILE - no more streams are avaialble to open. + */ +int tle_tcp_stream_accept(struct tle_stream *s, + const struct tle_tcp_accept_param prm[], struct tle_stream *rs[], + uint32_t num); + +/** + * Reject connection requests for the given stream. + * Note that the stream has to be in listen state. + * For each new connection a new stream will be open. + * @param s + * TCP listen stream. + * @param rq + * An array of tle_syn_req structures that contains + * at least *num* elements in it. + * @param num + * Number of elements in the *pkt* array. + */ +void tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[], + uint32_t num); + +/** + * return up to *num* mbufs that was received for given TCP stream. + * Note that the stream has to be in connected state. + * Data ordering is preserved. + * For each returned mbuf: + * data_off set to the start of the packet's TCP data + * l2_len, l3_len, l4_len are setup properly + * (so user can still extract L2/L3 address info if needed) + * packet_type RTE_PTYPE_L2/L3/L4 bits are setup properly. + * L3/L4 checksum is verified. + * @param s + * TCP stream to receive packets from. + * @param pkt + * An array of pointers to *rte_mbuf* structures that + * must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *pkt* array. + * @return + * number of of entries filled inside *pkt* array. + */ +uint16_t tle_tcp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[], + uint16_t num); + +/** + * Consume and queue up to *num* packets, that will be sent eventually + * by tle_tcp_tx_bulk(). + * Note that the stream has to be in connected state. + * It is responsibility of that function is to determine over which TCP dev + * given packets have to be sent out and do necessary preparations for that. + * Based on the *dst_addr* it does route lookup, fills L2/L3/L4 headers, + * and, if necessary, fragments packets. + * Depending on the underlying device information, it either does + * IP/TCP checksum calculations in SW or sets mbuf TX checksum + * offload fields properly. + * For each input mbuf the following conditions have to be met: + * - data_off point to the start of packet's TCP data. + * - there is enough header space to prepend L2/L3/L4 headers. + * @param s + * TCP stream to send packets over. + * @param pkt + * The burst of output packets that need to be send. + * @param num + * Number of elements in the *pkt* array. + * @return + * number of packets successfully queued in the stream send buffer. + * In case of error, error code can be set in rte_errno. + * Possible rte_errno errors include: + * - EAGAIN - operation can be perfomed right now + * (most likely close() was perfomed on that stream allready). + * - ENOTCONN - the stream is not connected. + */ +uint16_t tle_tcp_stream_send(struct tle_stream *s, struct rte_mbuf *pkt[], + uint16_t num); + +/** + * Back End (BE) API. + * BE API functions are not multi-thread safe. + * Supposed to be called by the L2/L3 processing layer. + */ + +/** + * Take input mbufs and distribute them to open TCP streams. + * expects that for each input packet: + * - l2_len, l3_len, l4_len are setup correctly + * - (packet_type & (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6)) != 0, + * - (packet_type & RTE_PTYPE_L4_TCP) != 0, + * During delivery L3/L4 checksums will be verified + * (either relies on HW offload or in SW). + * May cause some extra packets to be queued for TX. + * This function is not multi-thread safe. + * @param dev + * TCP device the packets were received from. + * @param pkt + * The burst of input packets that need to be processed. + * @param rp + * The array that will contain pointers of unprocessed packets at return. + * Should contain at least *num* elements. + * @param rc + * The array that will contain error code for corresponding rp[] entry: + * - ENOENT - no open stream matching this packet. + * - ENOBUFS - receive buffer of the destination stream is full. + * Should contain at least *num* elements. + * @param num + * Number of elements in the *pkt* input array. + * @return + * number of packets delivered to the TCP streams. + */ +uint16_t tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num); + +/** + * Fill *pkt* with pointers to the packets that have to be transmitted + * over given TCP device. + * Output packets have to be ready to be passed straight to rte_eth_tx_burst() + * without any extra processing. + * TCP/IPv4 checksum either already calculated or appropriate mbuf fields set + * properly for HW offload. + * This function is not multi-thread safe. + * @param dev + * TCP device the output packets will be transmitted over. + * @param pkt + * An array of pointers to *rte_mbuf* structures that + * must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *pkt* array. + * @return + * number of of entries filled inside *pkt* array. + */ +uint16_t tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], + uint16_t num); + +/** + * perform internal processing for given TCP context. + * Checks which timers are expired and performs the required actions + * (retransmission/connection abort, etc.) + * May cause some extra packets to be queued for TX. + * This function is not multi-thread safe. + * @param ctx + * TCP context to process. + * @param num + * maximum number of streams to process. + * @return + * zero on successful completion. + * - EINVAL - invalid parameter passed to function + * @return + */ +int tle_tcp_process(struct tle_ctx *ctx, uint32_t num); + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_TCP_H_ */ diff --git a/lib/libtle_l4p/tle_udp.h b/lib/libtle_l4p/tle_udp.h new file mode 100644 index 0000000..d3a8fe9 --- /dev/null +++ b/lib/libtle_l4p/tle_udp.h @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_UDP_H_ +#define _TLE_UDP_H_ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * UDP stream creation parameters. + */ +struct tle_udp_stream_param { + struct sockaddr_storage local_addr; /**< stream local address. */ + struct sockaddr_storage remote_addr; /**< stream remote address. */ + + /* _cb and _ev are mutually exclusive */ + struct tle_event *recv_ev; /**< recv event to use. */ + struct tle_stream_cb recv_cb; /**< recv callback to use. */ + + struct tle_event *send_ev; /**< send event to use. */ + struct tle_stream_cb send_cb; /**< send callback to use. */ +}; + +/** + * create a new stream within given UDP context. + * @param ctx + * UDP context to create new stream within. + * @param prm + * Parameters used to create and initialise the new stream. + * @return + * Pointer to UDP stream structure that can be used in future UDP API calls, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOFILE - max limit of open streams reached for that context + */ +struct tle_stream * +tle_udp_stream_open(struct tle_ctx *ctx, + const struct tle_udp_stream_param *prm); + +/** + * close an open stream. + * All packets still remaining in stream receive buffer will be freed. + * All packets still remaining in stream transmit buffer will be kept + * for father transmission. + * @param s + * Pointer to the stream to close. + * @return + * zero on successful completion. + * - -EINVAL - invalid parameter passed to function + */ +int tle_udp_stream_close(struct tle_stream *s); + +/** + * get open stream parameters. + * @param s + * Pointer to the stream. + * @return + * zero on successful completion. + * - EINVAL - invalid parameter passed to function + */ +int +tle_udp_stream_get_param(const struct tle_stream *s, + struct tle_udp_stream_param *prm); + +/** + * Take input mbufs and distribute them to open UDP streams. + * expects that for each input packet: + * - l2_len, l3_len, l4_len are setup correctly + * - (packet_type & (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6)) != 0, + * - (packet_type & RTE_PTYPE_L4_UDP) != 0, + * During delivery L3/L4 checksums will be verified + * (either relies on HW offload or in SW). + * This function is not multi-thread safe. + * @param dev + * UDP device the packets were received from. + * @param pkt + * The burst of input packets that need to be processed. + * @param rp + * The array that will contain pointers of unprocessed packets at return. + * Should contain at least *num* elements. + * @param rc + * The array that will contain error code for corresponding rp[] entry: + * - ENOENT - no open stream matching this packet. + * - ENOBUFS - receive buffer of the destination stream is full. + * Should contain at least *num* elements. + * @param num + * Number of elements in the *pkt* input array. + * @return + * number of packets delivered to the UDP streams. + */ +uint16_t tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num); + +/** + * Fill *pkt* with pointers to the packets that have to be transmitted + * over given UDP device. + * Output packets have to be ready to be passed straight to rte_eth_tx_burst() + * without any extra processing. + * UDP/IPv4 checksum either already calculated or appropriate mbuf fields set + * properly for HW offload. + * This function is not multi-thread safe. + * @param dev + * UDP device the output packets will be transmitted over. + * @param pkt + * An array of pointers to *rte_mbuf* structures that + * must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *pkt* array. + * @return + * number of of entries filled inside *pkt* array. + */ +uint16_t tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], + uint16_t num); + +/* + * return up to *num* mbufs that was received for given UDP stream. + * For each returned mbuf: + * data_off set to the start of the packet's UDP data + * l2_len, l3_len, l4_len are setup properly + * (so user can still extract L2/L3 address info if needed) + * packet_type RTE_PTYPE_L2/L3/L4 bits are setup properly. + * L3/L4 checksum is verified. + * Packets with invalid L3/L4 checksum will be silently dropped. + * @param s + * UDP stream to receive packets from. + * @param pkt + * An array of pointers to *rte_mbuf* structures that + * must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *pkt* array. + * @return + * number of of entries filled inside *pkt* array. + */ +uint16_t tle_udp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[], + uint16_t num); + +/** + * Consume and queue up to *num* packets, that will be sent eventually + * by tle_udp_tx_bulk(). + * If *dst_addr* is NULL, then default remote address associated with that + * stream (if any) will be used. + * The main purpose of that function is to determine over which UDP dev + * given packets have to be sent out and do necessary preparations for that. + * Based on the *dst_addr* it does route lookup, fills L2/L3/L4 headers, + * and, if necessary, fragments packets. + * Depending on the underlying device information, it either does + * IP/UDP checksum calculations in SW or sets mbuf TX checksum + * offload fields properly. + * For each input mbuf the following conditions have to be met: + * - data_off point to the start of packet's UDP data. + * - there is enough header space to prepend L2/L3/L4 headers. + * @param s + * UDP stream to send packets over. + * @param pkt + * The burst of output packets that need to be send. + * @param num + * Number of elements in the *pkt* array. + * @param dst_addr + * Destination address to send packets to. + * @return + * number of packets successfully queued in the stream send buffer. + */ +uint16_t tle_udp_stream_send(struct tle_stream *s, struct rte_mbuf *pkt[], + uint16_t num, const struct sockaddr *dst_addr); + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_UDP_H_ */ diff --git a/lib/libtle_l4p/udp_rxtx.c b/lib/libtle_l4p/udp_rxtx.c new file mode 100644 index 0000000..01d3520 --- /dev/null +++ b/lib/libtle_l4p/udp_rxtx.c @@ -0,0 +1,646 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "udp_stream.h" +#include "misc.h" + +static inline struct tle_udp_stream * +rx_stream_obtain(struct tle_dev *dev, uint32_t type, uint32_t port) +{ + struct tle_udp_stream *s; + + if (type >= TLE_VNUM || dev->dp[type] == NULL) + return NULL; + + s = (struct tle_udp_stream *)dev->dp[type]->streams[port]; + if (s == NULL) + return NULL; + + if (rwl_acquire(&s->rx.use) < 0) + return NULL; + + return s; +} + +static inline uint16_t +get_pkt_type(const struct rte_mbuf *m) +{ + uint32_t v; + + v = m->packet_type & + (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK); + if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_UDP)) + return TLE_V4; + else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP)) + return TLE_V6; + else + return TLE_VNUM; +} + +static inline union l4_ports +pkt_info(const struct tle_dev *dev, struct rte_mbuf *m, + union l4_ports *ports, union ipv4_addrs *addr4, + union ipv6_addrs **addr6) +{ + uint32_t len; + union l4_ports ret, *up; + union ipv4_addrs *pa4; + + ret.src = get_pkt_type(m); + + len = m->l2_len; + if (ret.src == TLE_V4) { + pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *, + len + offsetof(struct ipv4_hdr, src_addr)); + addr4->raw = pa4->raw; + m->ol_flags |= dev->rx.ol_flags[TLE_V4]; + } else if (ret.src == TLE_V6) { + *addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *, + len + offsetof(struct ipv6_hdr, src_addr)); + m->ol_flags |= dev->rx.ol_flags[TLE_V6]; + } + + len += m->l3_len; + up = rte_pktmbuf_mtod_offset(m, union l4_ports *, + len + offsetof(struct udp_hdr, src_port)); + ports->raw = up->raw; + ret.dst = ports->dst; + return ret; +} + +/* + * Helper routine, enqueues packets to the stream and calls RX + * notification callback, if needed. + */ +static inline uint16_t +rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[], + int32_t rc[], uint32_t num) +{ + uint32_t i, k, r; + + r = rte_ring_enqueue_burst(s->rx.q, mb, num); + + /* if RX queue was empty invoke user RX notification callback. */ + if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r) + s->rx.cb.func(s->rx.cb.data, &s->s); + + for (i = r, k = 0; i != num; i++, k++) { + rc[k] = ENOBUFS; + rp[k] = mb[i]; + } + + return r; +} + +static inline uint16_t +rx_stream6(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + union ipv6_addrs *addr[], union l4_ports port[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + uint32_t i, k, n; + void *mb[num]; + + k = 0; + n = 0; + + for (i = 0; i != num; i++) { + + if ((port[i].raw & s->s.pmsk.raw) != s->s.port.raw || + ymm_mask_cmp(&addr[i]->raw, &s->s.ipv6.addr.raw, + &s->s.ipv6.mask.raw) != 0) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } else { + mb[n] = pkt[i]; + n++; + } + } + + return rx_stream(s, mb, rp + k, rc + k, n); +} + +static inline uint16_t +rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + union ipv4_addrs addr[], union l4_ports port[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + uint32_t i, k, n; + void *mb[num]; + + k = 0; + n = 0; + + for (i = 0; i != num; i++) { + + if ((addr[i].raw & s->s.ipv4.mask.raw) != s->s.ipv4.addr.raw || + (port[i].raw & s->s.pmsk.raw) != + s->s.port.raw) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } else { + mb[n] = pkt[i]; + n++; + } + } + + return rx_stream(s, mb, rp + k, rc + k, n); +} + +uint16_t +tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + struct tle_udp_stream *s; + uint32_t i, j, k, n, p, t; + union l4_ports tp[num], port[num]; + union ipv4_addrs a4[num]; + union ipv6_addrs *pa6[num]; + + for (i = 0; i != num; i++) + tp[i] = pkt_info(dev, pkt[i], &port[i], &a4[i], &pa6[i]); + + k = 0; + for (i = 0; i != num; i = j) { + + for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++) + ; + + t = tp[i].src; + p = tp[i].dst; + s = rx_stream_obtain(dev, t, p); + if (s != NULL) { + + if (t == TLE_V4) + n = rx_stream4(s, pkt + i, a4 + i, + port + i, rp + k, rc + k, j - i); + else + n = rx_stream6(s, pkt + i, pa6 + i, port + i, + rp + k, rc + k, j - i); + + k += j - i - n; + + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + rwl_release(&s->rx.use); + + } else { + for (; i != j; i++) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } + } + } + + return num - k; +} + +static inline void +stream_drb_release(struct tle_udp_stream *s, struct tle_drb *drb[], + uint32_t nb_drb) +{ + uint32_t n; + + n = rte_ring_count(s->tx.drb.r); + rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb); + + /* If stream is still open, then mark it as avaialble for writing. */ + if (rwl_try_acquire(&s->tx.use) > 0) { + + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + + /* if stream send buffer was full invoke TX callback */ + else if (s->tx.cb.func != NULL && n == 0) + s->tx.cb.func(s->tx.cb.data, &s->s); + + } + + rwl_release(&s->tx.use); +} + +uint16_t +tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num) +{ + uint32_t i, j, k, n; + struct tle_drb *drb[num]; + struct tle_udp_stream *s; + + /* extract packets from device TX queue. */ + + k = num; + n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt, + num, drb, &k); + + if (n == 0) + return 0; + + /* free empty drbs and notify related streams. */ + + for (i = 0; i != k; i = j) { + s = drb[i]->udata; + for (j = i + 1; j != k && s == drb[j]->udata; j++) + ; + stream_drb_release(s, drb + i, j - i); + } + + return n; +} + +/* + * helper function, do the necessary pre-processing for the received packets + * before handiing them to the strem_recv caller. + */ +static inline uint32_t +recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type) +{ + uint32_t i, k; + uint64_t f, flg[num], ofl[num]; + + for (i = 0; i != num; i++) { + flg[i] = m[i]->ol_flags; + ofl[i] = m[i]->tx_offload; + } + + k = 0; + for (i = 0; i != num; i++) { + + f = flg[i] & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD); + + /* drop packets with invalid cksum(s). */ + if (f != 0 && check_pkt_csum(m[i], m[i]->ol_flags, type, + IPPROTO_UDP) != 0) { + rte_pktmbuf_free(m[i]); + m[i] = NULL; + k++; + } else { + m[i]->ol_flags ^= f; + rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i])); + } + } + + return k; +} + +uint16_t +tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num) +{ + uint32_t k, n; + struct tle_udp_stream *s; + + s = UDP_STREAM(us); + n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num); + if (n == 0) + return 0; + + /* + * if we still have packets to read, + * then rearm stream RX event. + */ + if (n == num && rte_ring_count(s->rx.q) != 0) { + if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + rwl_release(&s->rx.use); + } + + k = recv_pkt_process(pkt, n, s->s.type); + return compress_pkt_list(pkt, n, k); +} + +static inline int +udp_fill_mbuf(struct rte_mbuf *m, + uint32_t type, uint64_t ol_flags, uint32_t pid, + union udph udph, const struct tle_dest *dst) +{ + uint32_t len, plen; + char *l2h; + union udph *l4h; + + len = dst->l2_len + dst->l3_len; + plen = m->pkt_len; + + /* copy to mbuf L2/L3 header template. */ + + l2h = rte_pktmbuf_prepend(m, len + sizeof(*l4h)); + if (l2h == NULL) + return -ENOBUFS; + + /* copy L2/L3 header */ + rte_memcpy(l2h, dst->hdr, len); + + /* copy UDP header */ + l4h = (union udph *)(l2h + len); + l4h->raw = udph.raw; + + /* setup mbuf TX offload related fields. */ + m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, + sizeof(*l4h), 0, 0, 0); + m->ol_flags |= ol_flags; + + l4h->len = rte_cpu_to_be_16(plen + sizeof(*l4h)); + + /* update proto specific fields. */ + + if (type == TLE_V4) { + struct ipv4_hdr *l3h; + l3h = (struct ipv4_hdr *)(l2h + dst->l2_len); + l3h->packet_id = rte_cpu_to_be_16(pid); + l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + + sizeof(*l4h)); + + if ((ol_flags & PKT_TX_UDP_CKSUM) != 0) + l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len, + ol_flags); + else + l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h); + + if ((ol_flags & PKT_TX_IP_CKSUM) == 0) + l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len); + } else { + struct ipv6_hdr *l3h; + l3h = (struct ipv6_hdr *)(l2h + dst->l2_len); + l3h->payload_len = rte_cpu_to_be_16(plen + sizeof(*l4h)); + if ((ol_flags & PKT_TX_UDP_CKSUM) != 0) + l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags); + else + l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h); + } + + return 0; +} + +/* ??? + * probably this function should be there - + * rte_ipv[4,6]_fragment_packet should do that. + */ +static inline void +frag_fixup(const struct rte_mbuf *ms, struct rte_mbuf *mf, uint32_t type) +{ + struct ipv4_hdr *l3h; + + mf->ol_flags = ms->ol_flags; + mf->tx_offload = ms->tx_offload; + + if (type == TLE_V4 && (ms->ol_flags & PKT_TX_IP_CKSUM) == 0) { + l3h = rte_pktmbuf_mtod(mf, struct ipv4_hdr *); + l3h->hdr_checksum = _ipv4x_cksum(l3h, mf->l3_len); + } +} + +/* + * Returns negative for failure to fragment or actual number of fragments. + */ +static inline int +fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num, + uint32_t type, const struct tle_dest *dst) +{ + int32_t frag_num, i; + uint16_t mtu; + void *eth_hdr; + + /* Remove the Ethernet header from the input packet */ + rte_pktmbuf_adj(pkt, dst->l2_len); + mtu = dst->mtu - dst->l2_len; + + /* fragment packet */ + if (type == TLE_V4) + frag_num = rte_ipv4_fragment_packet(pkt, frag, num, mtu, + dst->head_mp, dst->head_mp); + else + frag_num = rte_ipv6_fragment_packet(pkt, frag, num, mtu, + dst->head_mp, dst->head_mp); + + if (frag_num > 0) { + for (i = 0; i != frag_num; i++) { + + frag_fixup(pkt, frag[i], type); + + /* Move data_off to include l2 header first */ + eth_hdr = rte_pktmbuf_prepend(frag[i], dst->l2_len); + + /* copy l2 header into fragment */ + rte_memcpy(eth_hdr, dst->hdr, dst->l2_len); + } + } + + return frag_num; +} + +static inline void +stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[], + uint32_t nb_drb) +{ + rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb); +} + +static inline uint32_t +stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[], + uint32_t nb_drb) +{ + return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb); +} + +/* enqueue up to num packets to the destination device queue. */ +static inline uint16_t +queue_pkt_out(struct tle_udp_stream *s, struct tle_dev *dev, + const void *pkt[], uint16_t nb_pkt, + struct tle_drb *drbs[], uint32_t *nb_drb) +{ + uint32_t bsz, i, n, nb, nbc, nbm; + + bsz = s->tx.drb.nb_elem; + + /* calulate how many drbs are needed.*/ + nbc = *nb_drb; + nbm = (nb_pkt + bsz - 1) / bsz; + nb = RTE_MAX(nbm, nbc) - nbc; + + /* allocate required drbs */ + if (nb != 0) + nb = stream_drb_alloc(s, drbs + nbc, nb); + + nb += nbc; + + /* no free drbs, can't send anything */ + if (nb == 0) + return 0; + + /* not enough free drbs, reduce number of packets to send. */ + else if (nb != nbm) + nb_pkt = nb * bsz; + + /* enqueue packets to the destination device. */ + nbc = nb; + n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb); + + /* if not all available drbs were consumed, move them to the start. */ + nbc -= nb; + for (i = 0; i != nb; i++) + drbs[i] = drbs[nbc + i]; + + *nb_drb = nb; + return n; +} + +uint16_t +tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[], + uint16_t num, const struct sockaddr *dst_addr) +{ + int32_t di, frg, rc; + uint64_t ol_flags; + uint32_t i, k, n, nb; + uint32_t mtu, pid, type; + const struct sockaddr_in *d4; + const struct sockaddr_in6 *d6; + struct tle_udp_stream *s; + const void *da; + union udph udph; + struct tle_dest dst; + struct tle_drb *drb[num]; + + s = UDP_STREAM(us); + type = s->s.type; + + /* start filling UDP header. */ + udph.raw = 0; + udph.ports.src = s->s.port.dst; + + /* figure out what destination addr/port to use. */ + if (dst_addr != NULL) { + if (dst_addr->sa_family != s->prm.remote_addr.ss_family) { + rte_errno = EINVAL; + return 0; + } + if (type == TLE_V4) { + d4 = (const struct sockaddr_in *)dst_addr; + da = &d4->sin_addr; + udph.ports.dst = d4->sin_port; + } else { + d6 = (const struct sockaddr_in6 *)dst_addr; + da = &d6->sin6_addr; + udph.ports.dst = d6->sin6_port; + } + } else { + udph.ports.dst = s->s.port.src; + if (type == TLE_V4) + da = &s->s.ipv4.addr.src; + else + da = &s->s.ipv6.addr.src; + } + + di = stream_get_dest(&s->s, da, &dst); + if (di < 0) { + rte_errno = -di; + return 0; + } + + pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num; + mtu = dst.mtu - dst.l2_len - dst.l3_len; + + /* mark stream as not closable. */ + if (rwl_acquire(&s->tx.use) < 0) + return 0; + + nb = 0; + for (i = 0, k = 0; k != num; k = i) { + + /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */ + + frg = 0; + ol_flags = dst.dev->tx.ol_flags[type]; + + while (i != num && frg == 0) { + frg = pkt[i]->pkt_len > mtu; + if (frg != 0) + ol_flags &= ~PKT_TX_UDP_CKSUM; + rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i, + udph, &dst); + if (rc != 0) { + rte_errno = -rc; + goto out; + } + i += (frg == 0); + } + + /* enqueue non-fragment packets to the destination device. */ + if (k != i) { + k += queue_pkt_out(s, dst.dev, + (const void **)(uintptr_t)&pkt[k], i - k, + drb, &nb); + + /* stream TX queue is full. */ + if (k != i) + break; + } + + /* enqueue packet that need to be fragmented */ + if (i != num) { + + struct rte_mbuf *frag[RTE_LIBRTE_IP_FRAG_MAX_FRAG]; + + /* fragment the packet. */ + rc = fragment(pkt[i], frag, RTE_DIM(frag), type, &dst); + if (rc < 0) { + rte_errno = -rc; + break; + } + + n = queue_pkt_out(s, dst.dev, + (const void **)(uintptr_t)frag, rc, drb, &nb); + if (n == 0) { + while (rc-- != 0) + rte_pktmbuf_free(frag[rc]); + break; + } + + /* all fragments enqueued, free the original packet. */ + rte_pktmbuf_free(pkt[i]); + i++; + } + } + + /* if possible, rearm socket write event. */ + if (k == num && s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + +out: + /* free unused drbs. */ + if (nb != 0) + stream_drb_free(s, drb, nb); + + /* stream can be closed. */ + rwl_release(&s->tx.use); + + /* + * remove pkt l2/l3 headers, restore ol_flags for unsent, but + * already modified packets. + */ + ol_flags = ~dst.dev->tx.ol_flags[type]; + for (n = k; n != i; n++) { + rte_pktmbuf_adj(pkt[n], dst.l2_len + dst.l3_len + sizeof(udph)); + pkt[n]->ol_flags &= ol_flags; + } + + return k; +} diff --git a/lib/libtle_l4p/udp_stream.c b/lib/libtle_l4p/udp_stream.c new file mode 100644 index 0000000..9f379d9 --- /dev/null +++ b/lib/libtle_l4p/udp_stream.c @@ -0,0 +1,346 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "udp_stream.h" +#include "misc.h" + +static void +unuse_stream(struct tle_udp_stream *s) +{ + s->s.type = TLE_VNUM; + rte_atomic32_set(&s->rx.use, INT32_MIN); + rte_atomic32_set(&s->tx.use, INT32_MIN); +} + +static void +fini_stream(struct tle_udp_stream *s) +{ + if (s != NULL) { + rte_free(s->rx.q); + rte_free(s->tx.drb.r); + } +} + +static void +udp_fini_streams(struct tle_ctx *ctx) +{ + uint32_t i; + struct tle_udp_stream *s; + + s = ctx->streams.buf; + if (s != NULL) { + for (i = 0; i != ctx->prm.max_streams; i++) + fini_stream(s + i); + } + + rte_free(s); + ctx->streams.buf = NULL; + STAILQ_INIT(&ctx->streams.free); +} + +static int +init_stream(struct tle_ctx *ctx, struct tle_udp_stream *s) +{ + size_t bsz, rsz, sz; + uint32_t i, k, n, nb; + struct tle_drb *drb; + char name[RTE_RING_NAMESIZE]; + + /* init RX part. */ + + n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); + n = rte_align32pow2(n); + sz = sizeof(*s->rx.q) + n * sizeof(s->rx.q->ring[0]); + + s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s->rx.q == NULL) { + UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, s, sz, ctx->prm.socket_id, rte_errno); + return -ENOMEM; + } + + snprintf(name, sizeof(name), "%p@%zu", s, sz); + rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ); + + /* init TX part. */ + + nb = drb_nb_elem(ctx); + k = calc_stream_drb_num(ctx, nb); + n = rte_align32pow2(k); + + /* size of the drbs ring */ + rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]); + rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE); + + /* size of the drb. */ + bsz = tle_drb_calc_size(nb); + + /* total stream drbs size. */ + sz = rsz + bsz * k; + + s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s->tx.drb.r == NULL) { + UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, s, sz, ctx->prm.socket_id, rte_errno); + return -ENOMEM; + } + + snprintf(name, sizeof(name), "%p@%zu", s, sz); + rte_ring_init(s->tx.drb.r, name, n, 0); + + for (i = 0; i != k; i++) { + drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r + + rsz + bsz * i); + drb->udata = s; + drb->size = nb; + rte_ring_enqueue(s->tx.drb.r, drb); + } + + s->tx.drb.nb_elem = nb; + s->tx.drb.nb_max = k; + + /* mark stream as avaialble to use. */ + + s->s.ctx = ctx; + unuse_stream(s); + STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link); + + return 0; +} + +static void +udp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb) +{ + struct tle_udp_stream *us; + + us = (struct tle_udp_stream *)s; + rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb); +} + +static int +udp_init_streams(struct tle_ctx *ctx) +{ + size_t sz; + uint32_t i; + int32_t rc; + struct tle_udp_stream *s; + + sz = sizeof(*s) * ctx->prm.max_streams; + s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes on socket %d " + "for %u udp_streams failed\n", + sz, ctx->prm.socket_id, ctx->prm.max_streams); + return -ENOMEM; + } + + ctx->streams.buf = s; + STAILQ_INIT(&ctx->streams.free); + + for (i = 0; i != ctx->prm.max_streams; i++) { + rc = init_stream(ctx, s + i); + if (rc != 0) { + UDP_LOG(ERR, "initalisation of %u-th stream failed", i); + udp_fini_streams(ctx); + return rc; + } + } + + return 0; +} + +static void __attribute__((constructor)) +udp_stream_setup(void) +{ + static const struct stream_ops udp_ops = { + .init_streams = udp_init_streams, + .fini_streams = udp_fini_streams, + .free_drbs = udp_free_drbs, + }; + + tle_stream_ops[TLE_PROTO_UDP] = udp_ops; +} + +static inline void +stream_down(struct tle_udp_stream *s) +{ + rwl_down(&s->rx.use); + rwl_down(&s->tx.use); +} + +static inline void +stream_up(struct tle_udp_stream *s) +{ + rwl_up(&s->rx.use); + rwl_up(&s->tx.use); +} + +static int +check_stream_prm(const struct tle_ctx *ctx, + const struct tle_udp_stream_param *prm) +{ + if ((prm->local_addr.ss_family != AF_INET && + prm->local_addr.ss_family != AF_INET6) || + prm->local_addr.ss_family != prm->remote_addr.ss_family) + return -EINVAL; + + /* callback and event notifications mechanisms are mutually exclusive */ + if ((prm->recv_ev != NULL && prm->recv_cb.func != NULL) || + (prm->send_ev != NULL && prm->send_cb.func != NULL)) + return -EINVAL; + + /* check does context support desired address family. */ + if ((prm->local_addr.ss_family == AF_INET && + ctx->prm.lookup4 == NULL) || + (prm->local_addr.ss_family == AF_INET6 && + ctx->prm.lookup6 == NULL)) + return -EINVAL; + + return 0; +} + +struct tle_stream * +tle_udp_stream_open(struct tle_ctx *ctx, + const struct tle_udp_stream_param *prm) +{ + struct tle_udp_stream *s; + int32_t rc; + + if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) { + rte_errno = EINVAL; + return NULL; + } + + s = (struct tle_udp_stream *)get_stream(ctx); + if (s == NULL) { + rte_errno = ENFILE; + return NULL; + + /* some TX still pending for that stream. */ + } else if (UDP_STREAM_TX_PENDING(s)) { + put_stream(ctx, &s->s, 0); + rte_errno = EAGAIN; + return NULL; + } + + /* copy input parameters. */ + s->prm = *prm; + + /* setup L4 ports and L3 addresses fields. */ + rc = stream_fill_ctx(ctx, &s->s, + (const struct sockaddr *)&prm->local_addr, + (const struct sockaddr *)&prm->remote_addr); + + if (rc != 0) { + put_stream(ctx, &s->s, 1); + s = NULL; + rte_errno = rc; + } else { + /* setup stream notification menchanism */ + s->rx.ev = prm->recv_ev; + s->rx.cb = prm->recv_cb; + s->tx.ev = prm->send_ev; + s->tx.cb = prm->send_cb; + + /* mark stream as avaialbe for RX/TX */ + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + stream_up(s); + } + + return &s->s; +} + +int +tle_udp_stream_close(struct tle_stream *us) +{ + int32_t rc; + struct tle_ctx *ctx; + struct tle_udp_stream *s; + + static const struct tle_stream_cb zcb; + + s = UDP_STREAM(us); + if (us == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + ctx = s->s.ctx; + + /* mark stream as unavaialbe for RX/TX. */ + stream_down(s); + + /* reset stream events if any. */ + if (s->rx.ev != NULL) { + tle_event_idle(s->rx.ev); + s->rx.ev = NULL; + } + if (s->tx.ev != NULL) { + tle_event_idle(s->tx.ev); + s->tx.ev = NULL; + } + + s->rx.cb = zcb; + s->tx.cb = zcb; + + /* free stream's destination port */ + rc = stream_clear_ctx(ctx, &s->s); + + /* empty stream's RX queue */ + empty_mbuf_ring(s->rx.q); + + /* + * mark the stream as free again. + * if there still are pkts queued for TX, + * then put this stream to the tail of free list. + */ + put_stream(ctx, &s->s, UDP_STREAM_TX_FINISHED(s)); + return rc; +} + +int +tle_udp_stream_get_param(const struct tle_stream *us, + struct tle_udp_stream_param *prm) +{ + struct sockaddr_in *lin4; + struct sockaddr_in6 *lin6; + const struct tle_udp_stream *s; + + s = UDP_STREAM(us); + if (prm == NULL || us == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + prm[0] = s->prm; + if (prm->local_addr.ss_family == AF_INET) { + lin4 = (struct sockaddr_in *)&prm->local_addr; + lin4->sin_port = s->s.port.dst; + } else if (s->prm.local_addr.ss_family == AF_INET6) { + lin6 = (struct sockaddr_in6 *)&prm->local_addr; + lin6->sin6_port = s->s.port.dst; + } + + return 0; +} diff --git a/lib/libtle_l4p/udp_stream.h b/lib/libtle_l4p/udp_stream.h new file mode 100644 index 0000000..a950e56 --- /dev/null +++ b/lib/libtle_l4p/udp_stream.h @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _UDP_STREAM_H_ +#define _UDP_STREAM_H_ + +#include +#include +#include +#include + +#include "osdep.h" +#include "ctx.h" +#include "stream.h" + +#ifdef __cplusplus +extern "C" { +#endif + +union udph { + uint64_t raw; + struct { + union l4_ports ports; + uint16_t len; + uint16_t cksum; + }; +}; + +struct tle_udp_stream { + + struct tle_stream s; + + struct { + struct rte_ring *q; + struct tle_event *ev; + struct tle_stream_cb cb; + rte_atomic32_t use; + } rx __rte_cache_aligned; + + struct { + rte_atomic32_t use; + struct { + uint32_t nb_elem; /* number of obects per drb. */ + uint32_t nb_max; /* number of drbs per stream. */ + struct rte_ring *r; + } drb; + struct tle_event *ev; + struct tle_stream_cb cb; + } tx __rte_cache_aligned; + + struct tle_udp_stream_param prm; +} __rte_cache_aligned; + +#define UDP_STREAM(p) \ +((struct tle_udp_stream *)((uintptr_t)(p) - offsetof(struct tle_udp_stream, s))) + +#define UDP_STREAM_TX_PENDING(s) \ + ((s)->tx.drb.nb_max != rte_ring_count((s)->tx.drb.r)) + +#define UDP_STREAM_TX_FINISHED(s) \ + ((s)->tx.drb.nb_max == rte_ring_count((s)->tx.drb.r)) + +#ifdef __cplusplus +} +#endif + +#endif /* _UDP_STREAM_H_ */ -- cgit