aboutsummaryrefslogtreecommitdiffstats
path: root/examples/l4fwd/udp.h
diff options
context:
space:
mode:
Diffstat (limited to 'examples/l4fwd/udp.h')
-rw-r--r--examples/l4fwd/udp.h588
1 files changed, 588 insertions, 0 deletions
diff --git a/examples/l4fwd/udp.h b/examples/l4fwd/udp.h
new file mode 100644
index 0000000..cdec6a5
--- /dev/null
+++ b/examples/l4fwd/udp.h
@@ -0,0 +1,588 @@
+/*
+ * Copyright (c) 2016 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef UDP_H_
+#define UDP_H_
+
+/*
+ * helper function: opens IPv4 and IPv6 streams for selected port.
+ */
+static struct netfe_stream *
+netfe_stream_open_udp(struct netfe_lcore *fe, struct netfe_sprm *sprm,
+ uint32_t lcore, uint16_t op, uint32_t bidx)
+{
+ int32_t rc;
+ struct netfe_stream *fes;
+ struct sockaddr_in *l4;
+ struct sockaddr_in6 *l6;
+ uint16_t errport;
+ struct tle_udp_stream_param uprm;
+
+ fes = netfe_get_stream(&fe->free);
+ if (fes == NULL) {
+ rte_errno = ENOBUFS;
+ return NULL;
+ }
+
+ fes->rxev = tle_event_alloc(fe->rxeq, fes);
+ fes->txev = tle_event_alloc(fe->txeq, fes);
+
+ if (fes->rxev == NULL || fes->txev == NULL) {
+ netfe_stream_close(fe, fes);
+ rte_errno = ENOMEM;
+ return NULL;
+ }
+
+ if (op == TXONLY || op == FWD) {
+ tle_event_active(fes->txev, TLE_SEV_DOWN);
+ fes->stat.txev[TLE_SEV_DOWN]++;
+ }
+
+ if (op != TXONLY) {
+ tle_event_active(fes->rxev, TLE_SEV_DOWN);
+ fes->stat.rxev[TLE_SEV_DOWN]++;
+ }
+
+ memset(&uprm, 0, sizeof(uprm));
+ uprm.local_addr = sprm->local_addr;
+ uprm.remote_addr = sprm->remote_addr;
+ uprm.recv_ev = fes->rxev;
+ if (op != FWD)
+ uprm.send_ev = fes->txev;
+ fes->s = tle_udp_stream_open(becfg.cpu[bidx].ctx, &uprm);
+
+ if (fes->s == NULL) {
+ rc = rte_errno;
+ netfe_stream_close(fe, fes);
+ rte_errno = rc;
+
+ if (sprm->local_addr.ss_family == AF_INET) {
+ l4 = (struct sockaddr_in *) &sprm->local_addr;
+ errport = ntohs(l4->sin_port);
+ } else {
+ l6 = (struct sockaddr_in6 *) &sprm->local_addr;
+ errport = ntohs(l6->sin6_port);
+ }
+
+ RTE_LOG(ERR, USER1, "stream open failed for port %u with error "
+ "code=%u, bidx=%u, lc=%u\n",
+ errport, rc, bidx, becfg.cpu[bidx].id);
+ return NULL;
+ }
+
+ RTE_LOG(NOTICE, USER1,
+ "%s(%u)={s=%p, op=%hu, proto=%s, rxev=%p, txev=%p}, belc=%u\n",
+ __func__, lcore, fes->s, op, proto_name[becfg.proto],
+ fes->rxev, fes->txev, becfg.cpu[bidx].id);
+
+ fes->op = op;
+ fes->proto = becfg.proto;
+ fes->family = sprm->local_addr.ss_family;
+
+ return fes;
+}
+
+static int
+netfe_lcore_init_udp(const struct netfe_lcore_prm *prm)
+{
+ size_t sz;
+ int32_t rc;
+ uint32_t i, lcore, snum;
+ struct netfe_lcore *fe;
+ struct tle_evq_param eprm;
+ struct netfe_stream *fes;
+ struct netfe_sprm *sprm;
+
+ lcore = rte_lcore_id();
+
+ snum = prm->max_streams;
+ RTE_LOG(NOTICE, USER1, "%s(lcore=%u, nb_streams=%u, max_streams=%u)\n",
+ __func__, lcore, prm->nb_streams, snum);
+
+ memset(&eprm, 0, sizeof(eprm));
+ eprm.socket_id = rte_lcore_to_socket_id(lcore);
+ eprm.max_events = snum;
+
+ sz = sizeof(*fe) + snum * sizeof(struct netfe_stream);
+ fe = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+ rte_lcore_to_socket_id(lcore));
+
+ if (fe == NULL) {
+ RTE_LOG(ERR, USER1, "%s:%d failed to allocate %zu bytes\n",
+ __func__, __LINE__, sz);
+ return -ENOMEM;
+ }
+
+ RTE_PER_LCORE(_fe) = fe;
+
+ fe->snum = snum;
+ /* initialize the stream pool */
+ LIST_INIT(&fe->free.head);
+ LIST_INIT(&fe->use.head);
+ fes = (struct netfe_stream *)(fe + 1);
+ for (i = 0; i != snum; i++, fes++)
+ netfe_put_stream(fe, &fe->free, fes);
+
+ /* allocate the event queues */
+ fe->rxeq = tle_evq_create(&eprm);
+ fe->txeq = tle_evq_create(&eprm);
+
+ RTE_LOG(INFO, USER1, "%s(%u) rx evq=%p, tx evq=%p\n",
+ __func__, lcore, fe->rxeq, fe->txeq);
+ if (fe->rxeq == NULL || fe->txeq == NULL)
+ return -ENOMEM;
+
+ rc = fwd_tbl_init(fe, AF_INET, lcore);
+ RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
+ __func__, lcore, AF_INET, rc);
+ if (rc != 0)
+ return rc;
+
+ rc = fwd_tbl_init(fe, AF_INET6, lcore);
+ RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
+ __func__, lcore, AF_INET6, rc);
+ if (rc != 0)
+ return rc;
+
+ /* open all requested streams. */
+ for (i = 0; i != prm->nb_streams; i++) {
+ sprm = &prm->stream[i].sprm;
+ fes = netfe_stream_open_udp(fe, sprm, lcore, prm->stream[i].op,
+ sprm->bidx);
+ if (fes == NULL) {
+ rc = -rte_errno;
+ break;
+ }
+
+ netfe_stream_dump(fes, &sprm->local_addr, &sprm->remote_addr);
+
+ if (prm->stream[i].op == FWD) {
+ fes->fwdprm = prm->stream[i].fprm;
+ rc = fwd_tbl_add(fe,
+ prm->stream[i].fprm.remote_addr.ss_family,
+ (const struct sockaddr *)
+ &prm->stream[i].fprm.remote_addr,
+ fes);
+ if (rc != 0) {
+ netfe_stream_close(fe, fes);
+ break;
+ }
+ } else if (prm->stream[i].op == TXONLY) {
+ fes->txlen = prm->stream[i].txlen;
+ fes->raddr = prm->stream[i].sprm.remote_addr;
+ }
+ }
+
+ return rc;
+}
+
+static struct netfe_stream *
+find_fwd_dst_udp(uint32_t lcore, struct netfe_stream *fes,
+ const struct sockaddr *sa)
+{
+ uint32_t rc;
+ struct netfe_stream *fed;
+ struct netfe_lcore *fe;
+ struct tle_udp_stream_param uprm;
+
+ fe = RTE_PER_LCORE(_fe);
+
+ fed = fwd_tbl_lkp(fe, fes->family, sa);
+ if (fed != NULL)
+ return fed;
+
+ /* create a new stream and put it into the fwd table. */
+ memset(&uprm, 0, sizeof(uprm));
+ uprm.local_addr = fes->fwdprm.local_addr;
+ uprm.remote_addr = fes->fwdprm.remote_addr;
+
+ /* open forward stream with wildcard remote addr. */
+ memset(&uprm.remote_addr.ss_family + 1, 0,
+ sizeof(uprm.remote_addr) - sizeof(uprm.remote_addr.ss_family));
+
+ fed = netfe_stream_open_udp(fe, &fes->fwdprm, lcore, FWD,
+ fes->fwdprm.bidx);
+ if (fed == NULL)
+ return NULL;
+
+ rc = fwd_tbl_add(fe, fes->family, sa, fed);
+ if (rc != 0) {
+ netfe_stream_close(fe, fed);
+ fed = NULL;
+ }
+
+ fed->fwdprm.remote_addr = *(const struct sockaddr_storage *)sa;
+ return fed;
+}
+
+static inline int
+netfe_addr_eq(struct sockaddr_storage *l, struct sockaddr_storage *r,
+ uint16_t family)
+{
+ struct sockaddr_in *l4, *r4;
+ struct sockaddr_in6 *l6, *r6;
+
+ if (family == AF_INET) {
+ l4 = (struct sockaddr_in *)l;
+ r4 = (struct sockaddr_in *)r;
+ return (l4->sin_port == r4->sin_port &&
+ l4->sin_addr.s_addr == r4->sin_addr.s_addr);
+ } else {
+ l6 = (struct sockaddr_in6 *)l;
+ r6 = (struct sockaddr_in6 *)r;
+ return (l6->sin6_port == r6->sin6_port &&
+ memcmp(&l6->sin6_addr, &r6->sin6_addr,
+ sizeof(l6->sin6_addr)));
+ }
+}
+
+static inline void
+netfe_pkt_addr(const struct rte_mbuf *m, struct sockaddr_storage *ps,
+ uint16_t family)
+{
+ const struct ipv4_hdr *ip4h;
+ const struct ipv6_hdr *ip6h;
+ const struct udp_hdr *udph;
+ struct sockaddr_in *in4;
+ struct sockaddr_in6 *in6;
+
+ NETFE_PKT_DUMP(m);
+
+ udph = rte_pktmbuf_mtod_offset(m, struct udp_hdr *, -m->l4_len);
+
+ if (family == AF_INET) {
+ in4 = (struct sockaddr_in *)ps;
+ ip4h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
+ -(m->l4_len + m->l3_len));
+ in4->sin_port = udph->src_port;
+ in4->sin_addr.s_addr = ip4h->src_addr;
+ } else {
+ in6 = (struct sockaddr_in6 *)ps;
+ ip6h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
+ -(m->l4_len + m->l3_len));
+ in6->sin6_port = udph->src_port;
+ rte_memcpy(&in6->sin6_addr, ip6h->src_addr,
+ sizeof(in6->sin6_addr));
+ }
+}
+
+static inline uint32_t
+pkt_eq_addr(struct rte_mbuf *pkt[], uint32_t num, uint16_t family,
+ struct sockaddr_storage *cur, struct sockaddr_storage *nxt)
+{
+ uint32_t i;
+
+ for (i = 0; i != num; i++) {
+ netfe_pkt_addr(pkt[i], nxt, family);
+ if (netfe_addr_eq(cur, nxt, family) == 0)
+ break;
+ }
+
+ return i;
+}
+
+static inline void
+netfe_fwd_udp(uint32_t lcore, struct netfe_stream *fes)
+{
+ uint32_t i, j, k, n, x;
+ uint16_t family;
+ void *pi0, *pi1, *pt;
+ struct rte_mbuf **pkt;
+ struct netfe_stream *fed;
+ struct sockaddr_storage in[2];
+
+ family = fes->family;
+ n = fes->pbuf.num;
+ pkt = fes->pbuf.pkt;
+
+ if (n == 0)
+ return;
+
+ in[0].ss_family = family;
+ in[1].ss_family = family;
+ pi0 = &in[0];
+ pi1 = &in[1];
+
+ netfe_pkt_addr(pkt[0], pi0, family);
+
+ x = 0;
+ for (i = 0; i != n; i = j) {
+
+ j = i + pkt_eq_addr(&pkt[i + 1],
+ n - i - 1, family, pi0, pi1) + 1;
+
+ fed = find_fwd_dst_udp(lcore, fes,
+ (const struct sockaddr *)pi0);
+ if (fed != NULL) {
+
+ /**
+ * TODO: cannot use function pointers for unequal
+ * number of params.
+ */
+ k = tle_udp_stream_send(fed->s, pkt + i, j - i,
+ (const struct sockaddr *)
+ &fes->fwdprm.remote_addr);
+
+ NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) "
+ "returns %u\n",
+ __func__, lcore, proto_name[fes->proto],
+ fed->s, j - i, k);
+
+ fed->stat.txp += k;
+ fed->stat.drops += j - i - k;
+ fes->stat.fwp += k;
+
+ } else {
+ NETFE_TRACE("%s(%u, %p): no fwd stream for %u pkts;\n",
+ __func__, lcore, fes->s, j - i);
+ for (k = i; k != j; k++) {
+ NETFE_TRACE("%s(%u, %p): free(%p);\n",
+ __func__, lcore, fes->s, pkt[k]);
+ rte_pktmbuf_free(pkt[j]);
+ }
+ fes->stat.drops += j - i;
+ }
+
+ /* copy unforwarded mbufs. */
+ for (i += k; i != j; i++, x++)
+ pkt[x] = pkt[i];
+
+ /* swap the pointers */
+ pt = pi0;
+ pi0 = pi1;
+ pi1 = pt;
+ }
+
+ fes->pbuf.num = x;
+
+ if (x != 0) {
+ tle_event_raise(fes->txev);
+ fes->stat.txev[TLE_SEV_UP]++;
+ }
+
+ if (n == RTE_DIM(fes->pbuf.pkt)) {
+ tle_event_active(fes->rxev, TLE_SEV_UP);
+ fes->stat.rxev[TLE_SEV_UP]++;
+ }
+}
+
+static inline void
+netfe_rxtx_process_udp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
+{
+ uint32_t i, j, k, n;
+ uint16_t family;
+ void *pi0, *pi1, *pt;
+ struct rte_mbuf **pkt;
+ struct sockaddr_storage in[2];
+
+ family = fes->family;
+ n = fes->pbuf.num;
+ pkt = fes->pbuf.pkt;
+
+ /* there is nothing to send. */
+ if (n == 0) {
+ tle_event_idle(fes->txev);
+ fes->stat.txev[TLE_SEV_IDLE]++;
+ return;
+ }
+
+ in[0].ss_family = family;
+ in[1].ss_family = family;
+ pi0 = &in[0];
+ pi1 = &in[1];
+
+ netfe_pkt_addr(pkt[0], pi0, family);
+
+ for (i = 0; i != n; i = j) {
+
+ j = i + pkt_eq_addr(&pkt[i + 1],
+ n - i - 1, family, pi0, pi1) + 1;
+
+ /**
+ * TODO: cannot use function pointers for unequal param num.
+ */
+ k = tle_udp_stream_send(fes->s, pkt + i, j - i,
+ (const struct sockaddr *)pi0);
+
+ NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) returns %u\n",
+ __func__, lcore, proto_name[fes->proto],
+ fes->s, j - i, k);
+ fes->stat.txp += k;
+ fes->stat.drops += j - i - k;
+
+ i += k;
+
+ /* stream send buffer is full */
+ if (i != j)
+ break;
+
+ /* swap the pointers */
+ pt = pi0;
+ pi0 = pi1;
+ pi1 = pt;
+ }
+
+ /* not able to send anything. */
+ if (i == 0)
+ return;
+
+ if (n == RTE_DIM(fes->pbuf.pkt)) {
+ /* mark stream as readable */
+ tle_event_active(fes->rxev, TLE_SEV_UP);
+ fes->stat.rxev[TLE_SEV_UP]++;
+ }
+
+ /* adjust pbuf array. */
+ fes->pbuf.num = n - i;
+ for (j = i; j != n; j++)
+ pkt[j - i] = pkt[j];
+}
+
+static inline void
+netfe_tx_process_udp(uint32_t lcore, struct netfe_stream *fes)
+{
+ uint32_t i, k, n;
+
+ /* refill with new mbufs. */
+ pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
+
+ n = fes->pbuf.num;
+ if (n == 0)
+ return;
+
+ /**
+ * TODO: cannot use function pointers for unequal param num.
+ */
+ k = tle_udp_stream_send(fes->s, fes->pbuf.pkt, n, NULL);
+ NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) returns %u\n",
+ __func__, lcore, proto_name[fes->proto], fes->s, n, k);
+ fes->stat.txp += k;
+ fes->stat.drops += n - k;
+
+ if (k == 0)
+ return;
+
+ /* adjust pbuf array. */
+ fes->pbuf.num = n - k;
+ for (i = k; i != n; i++)
+ fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
+}
+
+static inline void
+netfe_lcore_udp(void)
+{
+ struct netfe_lcore *fe;
+ uint32_t j, n, lcore;
+ struct netfe_stream *fs[MAX_PKT_BURST];
+
+ fe = RTE_PER_LCORE(_fe);
+ if (fe == NULL)
+ return;
+
+ lcore = rte_lcore_id();
+
+ /* look for rx events */
+ n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
+
+ if (n != 0) {
+ NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) returns %u\n",
+ __func__, lcore, fe->rxeq, n);
+ for (j = 0; j != n; j++)
+ netfe_rx_process(lcore, fs[j]);
+ }
+
+ /* look for tx events */
+ n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
+
+ if (n != 0) {
+ NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n",
+ __func__, lcore, fe->txeq, n);
+ for (j = 0; j != n; j++) {
+ if (fs[j]->op == RXTX)
+ netfe_rxtx_process_udp(lcore, fs[j]);
+ else if (fs[j]->op == FWD)
+ netfe_fwd_udp(lcore, fs[j]);
+ else if (fs[j]->op == TXONLY)
+ netfe_tx_process_udp(lcore, fs[j]);
+ }
+ }
+}
+
+static void
+netfe_lcore_fini_udp(void)
+{
+ struct netfe_lcore *fe;
+ uint32_t i;
+ struct tle_udp_stream_param uprm;
+ struct netfe_stream *fes;
+
+ fe = RTE_PER_LCORE(_fe);
+ if (fe == NULL)
+ return;
+
+ for (i = 0; i != fe->use.num; i++) {
+ fes = netfe_get_stream(&fe->use);
+ tle_udp_stream_get_param(fes->s, &uprm);
+ netfe_stream_dump(fes, &uprm.local_addr, &uprm.remote_addr);
+ netfe_stream_close(fe, fes);
+ }
+
+ tle_evq_destroy(fe->txeq);
+ tle_evq_destroy(fe->rxeq);
+ RTE_PER_LCORE(_fe) = NULL;
+ rte_free(fe);
+}
+
+static int
+lcore_main_udp(void *arg)
+{
+ int32_t rc;
+ uint32_t lcore;
+ struct lcore_prm *prm;
+
+ prm = arg;
+ lcore = rte_lcore_id();
+
+ RTE_LOG(NOTICE, USER1, "%s(lcore=%u) start\n",
+ __func__, lcore);
+
+ rc = 0;
+
+ /* lcore FE init. */
+ if (prm->fe.max_streams != 0)
+ rc = netfe_lcore_init_udp(&prm->fe);
+
+ /* lcore FE init. */
+ if (rc == 0 && prm->be.lc != NULL)
+ rc = netbe_lcore_setup(prm->be.lc);
+
+ if (rc != 0)
+ sig_handle(SIGQUIT);
+
+ while (force_quit == 0) {
+ netfe_lcore_udp();
+ netbe_lcore();
+ }
+
+ RTE_LOG(NOTICE, USER1, "%s(lcore=%u) finish\n",
+ __func__, lcore);
+
+ netfe_lcore_fini_udp();
+ netbe_lcore_clear();
+
+ return rc;
+}
+
+#endif /* UDP_H_ */