From aa97dd1ce910b839fed46ad55d1e70e403f5a930 Mon Sep 17 00:00:00 2001 From: Konstantin Ananyev Date: Tue, 21 Feb 2017 18:12:20 +0000 Subject: Introduce first version of TCP code. Supported functionality: - open/close - listen/accept/connect - send/recv In order to achieve that libtle_udp library was reworked into libtle_l4p library that supports both TCP and UDP protocols. New libtle_timer library was introduced (thanks to Cisco guys and Dave Barach for sharing their timer code with us). Sample application was also reworked significantly to support both TCP and UDP traffic handling. New UT were introduced. Change-Id: I806b05011f521e89b58db403cfdd484a37beb775 Signed-off-by: Mohammad Abdul Awal Signed-off-by: Karol Latecki Signed-off-by: Daniel Mrzyglod Signed-off-by: Konstantin Ananyev --- lib/libtle_l4p/udp_stream.c | 346 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 346 insertions(+) create mode 100644 lib/libtle_l4p/udp_stream.c (limited to 'lib/libtle_l4p/udp_stream.c') diff --git a/lib/libtle_l4p/udp_stream.c b/lib/libtle_l4p/udp_stream.c new file mode 100644 index 0000000..9f379d9 --- /dev/null +++ b/lib/libtle_l4p/udp_stream.c @@ -0,0 +1,346 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "udp_stream.h" +#include "misc.h" + +static void +unuse_stream(struct tle_udp_stream *s) +{ + s->s.type = TLE_VNUM; + rte_atomic32_set(&s->rx.use, INT32_MIN); + rte_atomic32_set(&s->tx.use, INT32_MIN); +} + +static void +fini_stream(struct tle_udp_stream *s) +{ + if (s != NULL) { + rte_free(s->rx.q); + rte_free(s->tx.drb.r); + } +} + +static void +udp_fini_streams(struct tle_ctx *ctx) +{ + uint32_t i; + struct tle_udp_stream *s; + + s = ctx->streams.buf; + if (s != NULL) { + for (i = 0; i != ctx->prm.max_streams; i++) + fini_stream(s + i); + } + + rte_free(s); + ctx->streams.buf = NULL; + STAILQ_INIT(&ctx->streams.free); +} + +static int +init_stream(struct tle_ctx *ctx, struct tle_udp_stream *s) +{ + size_t bsz, rsz, sz; + uint32_t i, k, n, nb; + struct tle_drb *drb; + char name[RTE_RING_NAMESIZE]; + + /* init RX part. */ + + n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); + n = rte_align32pow2(n); + sz = sizeof(*s->rx.q) + n * sizeof(s->rx.q->ring[0]); + + s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s->rx.q == NULL) { + UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, s, sz, ctx->prm.socket_id, rte_errno); + return -ENOMEM; + } + + snprintf(name, sizeof(name), "%p@%zu", s, sz); + rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ); + + /* init TX part. */ + + nb = drb_nb_elem(ctx); + k = calc_stream_drb_num(ctx, nb); + n = rte_align32pow2(k); + + /* size of the drbs ring */ + rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]); + rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE); + + /* size of the drb. */ + bsz = tle_drb_calc_size(nb); + + /* total stream drbs size. */ + sz = rsz + bsz * k; + + s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s->tx.drb.r == NULL) { + UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, s, sz, ctx->prm.socket_id, rte_errno); + return -ENOMEM; + } + + snprintf(name, sizeof(name), "%p@%zu", s, sz); + rte_ring_init(s->tx.drb.r, name, n, 0); + + for (i = 0; i != k; i++) { + drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r + + rsz + bsz * i); + drb->udata = s; + drb->size = nb; + rte_ring_enqueue(s->tx.drb.r, drb); + } + + s->tx.drb.nb_elem = nb; + s->tx.drb.nb_max = k; + + /* mark stream as avaialble to use. */ + + s->s.ctx = ctx; + unuse_stream(s); + STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link); + + return 0; +} + +static void +udp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb) +{ + struct tle_udp_stream *us; + + us = (struct tle_udp_stream *)s; + rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb); +} + +static int +udp_init_streams(struct tle_ctx *ctx) +{ + size_t sz; + uint32_t i; + int32_t rc; + struct tle_udp_stream *s; + + sz = sizeof(*s) * ctx->prm.max_streams; + s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes on socket %d " + "for %u udp_streams failed\n", + sz, ctx->prm.socket_id, ctx->prm.max_streams); + return -ENOMEM; + } + + ctx->streams.buf = s; + STAILQ_INIT(&ctx->streams.free); + + for (i = 0; i != ctx->prm.max_streams; i++) { + rc = init_stream(ctx, s + i); + if (rc != 0) { + UDP_LOG(ERR, "initalisation of %u-th stream failed", i); + udp_fini_streams(ctx); + return rc; + } + } + + return 0; +} + +static void __attribute__((constructor)) +udp_stream_setup(void) +{ + static const struct stream_ops udp_ops = { + .init_streams = udp_init_streams, + .fini_streams = udp_fini_streams, + .free_drbs = udp_free_drbs, + }; + + tle_stream_ops[TLE_PROTO_UDP] = udp_ops; +} + +static inline void +stream_down(struct tle_udp_stream *s) +{ + rwl_down(&s->rx.use); + rwl_down(&s->tx.use); +} + +static inline void +stream_up(struct tle_udp_stream *s) +{ + rwl_up(&s->rx.use); + rwl_up(&s->tx.use); +} + +static int +check_stream_prm(const struct tle_ctx *ctx, + const struct tle_udp_stream_param *prm) +{ + if ((prm->local_addr.ss_family != AF_INET && + prm->local_addr.ss_family != AF_INET6) || + prm->local_addr.ss_family != prm->remote_addr.ss_family) + return -EINVAL; + + /* callback and event notifications mechanisms are mutually exclusive */ + if ((prm->recv_ev != NULL && prm->recv_cb.func != NULL) || + (prm->send_ev != NULL && prm->send_cb.func != NULL)) + return -EINVAL; + + /* check does context support desired address family. */ + if ((prm->local_addr.ss_family == AF_INET && + ctx->prm.lookup4 == NULL) || + (prm->local_addr.ss_family == AF_INET6 && + ctx->prm.lookup6 == NULL)) + return -EINVAL; + + return 0; +} + +struct tle_stream * +tle_udp_stream_open(struct tle_ctx *ctx, + const struct tle_udp_stream_param *prm) +{ + struct tle_udp_stream *s; + int32_t rc; + + if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) { + rte_errno = EINVAL; + return NULL; + } + + s = (struct tle_udp_stream *)get_stream(ctx); + if (s == NULL) { + rte_errno = ENFILE; + return NULL; + + /* some TX still pending for that stream. */ + } else if (UDP_STREAM_TX_PENDING(s)) { + put_stream(ctx, &s->s, 0); + rte_errno = EAGAIN; + return NULL; + } + + /* copy input parameters. */ + s->prm = *prm; + + /* setup L4 ports and L3 addresses fields. */ + rc = stream_fill_ctx(ctx, &s->s, + (const struct sockaddr *)&prm->local_addr, + (const struct sockaddr *)&prm->remote_addr); + + if (rc != 0) { + put_stream(ctx, &s->s, 1); + s = NULL; + rte_errno = rc; + } else { + /* setup stream notification menchanism */ + s->rx.ev = prm->recv_ev; + s->rx.cb = prm->recv_cb; + s->tx.ev = prm->send_ev; + s->tx.cb = prm->send_cb; + + /* mark stream as avaialbe for RX/TX */ + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + stream_up(s); + } + + return &s->s; +} + +int +tle_udp_stream_close(struct tle_stream *us) +{ + int32_t rc; + struct tle_ctx *ctx; + struct tle_udp_stream *s; + + static const struct tle_stream_cb zcb; + + s = UDP_STREAM(us); + if (us == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + ctx = s->s.ctx; + + /* mark stream as unavaialbe for RX/TX. */ + stream_down(s); + + /* reset stream events if any. */ + if (s->rx.ev != NULL) { + tle_event_idle(s->rx.ev); + s->rx.ev = NULL; + } + if (s->tx.ev != NULL) { + tle_event_idle(s->tx.ev); + s->tx.ev = NULL; + } + + s->rx.cb = zcb; + s->tx.cb = zcb; + + /* free stream's destination port */ + rc = stream_clear_ctx(ctx, &s->s); + + /* empty stream's RX queue */ + empty_mbuf_ring(s->rx.q); + + /* + * mark the stream as free again. + * if there still are pkts queued for TX, + * then put this stream to the tail of free list. + */ + put_stream(ctx, &s->s, UDP_STREAM_TX_FINISHED(s)); + return rc; +} + +int +tle_udp_stream_get_param(const struct tle_stream *us, + struct tle_udp_stream_param *prm) +{ + struct sockaddr_in *lin4; + struct sockaddr_in6 *lin6; + const struct tle_udp_stream *s; + + s = UDP_STREAM(us); + if (prm == NULL || us == NULL || s->s.type >= TLE_VNUM) + return -EINVAL; + + prm[0] = s->prm; + if (prm->local_addr.ss_family == AF_INET) { + lin4 = (struct sockaddr_in *)&prm->local_addr; + lin4->sin_port = s->s.port.dst; + } else if (s->prm.local_addr.ss_family == AF_INET6) { + lin6 = (struct sockaddr_in6 *)&prm->local_addr; + lin6->sin6_port = s->s.port.dst; + } + + return 0; +} -- cgit 1.2.3-korg