diff options
author | Konstantin Ananyev <konstantin.ananyev@intel.com> | 2017-02-21 18:12:20 +0000 |
---|---|---|
committer | Konstantin Ananyev <konstantin.ananyev@intel.com> | 2017-02-24 16:37:08 +0000 |
commit | aa97dd1ce910b839fed46ad55d1e70e403f5a930 (patch) | |
tree | f6f0fd494eaf499859bff9f20f5ddfac9ab99233 /lib/libtle_l4p/udp_stream.c | |
parent | f5f10013ffef8e4ac1071087b8492fe6380d98fe (diff) |
Introduce first version of TCP code.
Supported functionality:
- open/close
- listen/accept/connect
- send/recv
In order to achieve that libtle_udp library was
reworked into libtle_l4p library that supports
both TCP and UDP protocols.
New libtle_timer library was introduced
(thanks to Cisco guys and Dave Barach <dbarach@cisco.com>
for sharing their timer code with us).
Sample application was also reworked significantly
to support both TCP and UDP traffic handling.
New UT were introduced.
Change-Id: I806b05011f521e89b58db403cfdd484a37beb775
Signed-off-by: Mohammad Abdul Awal <mohammad.abdul.awal@intel.com>
Signed-off-by: Karol Latecki <karolx.latecki@intel.com>
Signed-off-by: Daniel Mrzyglod <danielx.t.mrzyglod@intel.com>
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Diffstat (limited to 'lib/libtle_l4p/udp_stream.c')
-rw-r--r-- | lib/libtle_l4p/udp_stream.c | 346 |
1 files changed, 346 insertions, 0 deletions
diff --git a/lib/libtle_l4p/udp_stream.c b/lib/libtle_l4p/udp_stream.c new file mode 100644 index 0000000..9f379d9 --- /dev/null +++ b/lib/libtle_l4p/udp_stream.c @@ -0,0 +1,346 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <string.h> +#include <rte_malloc.h> +#include <rte_errno.h> +#include <rte_ethdev.h> +#include <rte_ip.h> +#include <rte_udp.h> + +#include "udp_stream.h" +#include "misc.h" + +static void +unuse_stream(struct tle_udp_stream *s) +{ + s->s.type = TLE_VNUM; + rte_atomic32_set(&s->rx.use, INT32_MIN); + rte_atomic32_set(&s->tx.use, INT32_MIN); +} + +static void +fini_stream(struct tle_udp_stream *s) +{ + if (s != NULL) { + rte_free(s->rx.q); + rte_free(s->tx.drb.r); + } +} + +static void +udp_fini_streams(struct tle_ctx *ctx) +{ + uint32_t i; + struct tle_udp_stream *s; + + s = ctx->streams.buf; + if (s != NULL) { + for (i = 0; i != ctx->prm.max_streams; i++) + fini_stream(s + i); + } + + rte_free(s); + ctx->streams.buf = NULL; + STAILQ_INIT(&ctx->streams.free); +} + +static int +init_stream(struct tle_ctx *ctx, struct tle_udp_stream *s) +{ + size_t bsz, rsz, sz; + uint32_t i, k, n, nb; + struct tle_drb *drb; + char name[RTE_RING_NAMESIZE]; + + /* init RX part. */ + + n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); + n = rte_align32pow2(n); + sz = sizeof(*s->rx.q) + n * sizeof(s->rx.q->ring[0]); + + s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s->rx.q == NULL) { + UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, s, sz, ctx->prm.socket_id, rte_errno); + return -ENOMEM; + } + + snprintf(name, sizeof(name), "%p@%zu", s, sz); + rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ); + + /* init TX part. */ + + nb = drb_nb_elem(ctx); + k = calc_stream_drb_num(ctx, nb); + n = rte_align32pow2(k); + + /* size of the drbs ring */ + rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]); + rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE); + + /* size of the drb. */ + bsz = tle_drb_calc_size(nb); + + /* total stream drbs size. */ + sz = rsz + bsz * k; + + s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s->tx.drb.r == NULL) { + UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, s, sz, ctx->prm.socket_id, rte_errno); + return -ENOMEM; + } + + snprintf(name, sizeof(name), "%p@%zu", s, sz); + rte_ring_init(s->tx.drb.r, name, n, 0); + + for (i = 0; i != k; i++) { + drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r + + rsz + bsz * i); + drb->udata = s; + drb->size = nb; + rte_ring_enqueue(s->tx.drb.r, drb); + } + + s->tx.drb.nb_elem = nb; + s->tx.drb.nb_max = k; + + /* mark stream as avaialble to use. */ + + s->s.ctx = ctx; + unuse_stream(s); + STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link); + + return 0; +} + +static void +udp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb) +{ + struct tle_udp_stream *us; + + us = (struct tle_udp_stream *)s; + rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb); +} + +static int +udp_init_streams(struct tle_ctx *ctx) +{ + size_t sz; + uint32_t i; + int32_t rc; + struct tle_udp_stream *s; + + sz = sizeof(*s) * ctx->prm.max_streams; + s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes on socket %d " + "for %u udp_streams failed\n", + sz, ctx->prm.socket_id, ctx->prm.max_streams); + return -ENOMEM; + } + + ctx->streams.buf = s; + STAILQ_INIT(&ctx->streams.free); + + for (i = 0; i != ctx->prm.max_streams; i++) { + rc = init_stream(ctx, s + i); + if (rc != 0) { + UDP_LOG(ERR, "initalisation of %u-th stream failed", i); + udp_fini_streams(ctx); + return rc; + } + } + + return 0; +} + +static void __attribute__((constructor)) +udp_stream_setup(void) +{ + static const struct stream_ops udp_ops = { + .init_streams = udp_init_streams, + .fini_streams = udp_fini_streams, + .free_drbs = udp_free_drbs, + }; + + tle_stream_ops[TLE_PROTO_UDP] = udp_ops; +} + +static inline void +stream_down(struct tle_udp_stream *s) +{ + rwl_down(&s->rx.use); + rwl_down(&s->tx.use); +} + +static inline void +stream_up(struct tle_udp_stream *s) +{ + rwl_up(&s->rx.use); + rwl_up(&s->tx.use); +} + +static int +check_stream_prm(const struct tle_ctx *ctx, + const struct tle_udp_stream_param *prm) +{ + if ((prm->local_addr.ss_family != AF_INET && + prm->local_addr.ss_family != AF_INET6) || + prm->local_addr.ss_family != prm->remote_addr.ss_family) + return -EINVAL; + + /* callback and event notifications mechanisms are mutually exclusive */ + if ((prm->recv_ev != NULL && prm->recv_cb.func != NULL) || + (prm->send_ev != NULL && prm->send_cb.func != NULL)) + return -EINVAL; + + /* check does context support desired address family. */ + if ((prm->local_addr.ss_family == AF_INET && + ctx->prm.lookup4 == NULL) || + (prm->local_addr.ss_family == AF_INET6 && + ctx->prm.lookup6 == NULL)) + return -EINVAL; + + return 0; +} + +struct tle_stream * +tle_udp_stream_open(struct tle_ctx *ctx, + const struct tle_udp_stream_param *prm) +{ + struct tle_udp_stream *s; + int32_t rc; + + if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) { + rte_errno = EINVAL; + return NULL; + } + + s = (struct tle_udp_stream *)get_stream(ctx); + if (s == NULL) { + rte_errno = ENFILE; + return NULL; + + /* some TX still pending for that stream. */ + } else if (UDP_STREAM_TX_PENDING(s)) { + put_stream(ctx, &s->s, 0); + rte_errno = EAGAIN; + return NULL; + } + + /* copy input parameters. */ + s->prm = *prm; + + /* setup L4 ports and L3 addresses fields. */ + rc = stream_fill_ctx(ctx, &s->s, + (const struct sockaddr *)&prm->local_addr, + (const struct sockaddr *)&prm->remote_addr); + + if (rc != 0) { + put_stream(ctx, &s->s, 1); + s = NULL; + rte_errno = rc; + } else { + /* setup stream notification menchanism */ + s->rx.ev = prm->recv_ev; + s->rx.cb = prm->recv_cb; + s->tx.ev = prm->send_ev; + s->tx.cb = prm->send_cb; + + /* mark stream as avaialbe for RX/TX */ + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + stream_up(s); + } + + return &s->s; +} + +int +tle_udp_stream_close(struct tle_stream *us) +{ + int32_t rc; + struct tle_ctx *ctx; + struct tle_udp_stream *s; + + static const struct tle_stream_cb zcb; + + s = UDP_STREAM(us); + if (us == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + ctx = s->s.ctx; + + /* mark stream as unavaialbe for RX/TX. */ + stream_down(s); + + /* reset stream events if any. */ + if (s->rx.ev != NULL) { + tle_event_idle(s->rx.ev); + s->rx.ev = NULL; + } + if (s->tx.ev != NULL) { + tle_event_idle(s->tx.ev); + s->tx.ev = NULL; + } + + s->rx.cb = zcb; + s->tx.cb = zcb; + + /* free stream's destination port */ + rc = stream_clear_ctx(ctx, &s->s); + + /* empty stream's RX queue */ + empty_mbuf_ring(s->rx.q); + + /* + * mark the stream as free again. + * if there still are pkts queued for TX, + * then put this stream to the tail of free list. + */ + put_stream(ctx, &s->s, UDP_STREAM_TX_FINISHED(s)); + return rc; +} + +int +tle_udp_stream_get_param(const struct tle_stream *us, + struct tle_udp_stream_param *prm) +{ + struct sockaddr_in *lin4; + struct sockaddr_in6 *lin6; + const struct tle_udp_stream *s; + + s = UDP_STREAM(us); + if (prm == NULL || us == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + prm[0] = s->prm; + if (prm->local_addr.ss_family == AF_INET) { + lin4 = (struct sockaddr_in *)&prm->local_addr; + lin4->sin_port = s->s.port.dst; + } else if (s->prm.local_addr.ss_family == AF_INET6) { + lin6 = (struct sockaddr_in6 *)&prm->local_addr; + lin6->sin6_port = s->s.port.dst; + } + + return 0; +} |