aboutsummaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/udpfwd/Makefile2
-rw-r--r--examples/udpfwd/README39
-rw-r--r--examples/udpfwd/main.c924
-rw-r--r--examples/udpfwd/netbe.h16
-rw-r--r--examples/udpfwd/parse.c55
-rw-r--r--examples/udpfwd/parse.h3
-rw-r--r--examples/udpfwd/pkt.c8
7 files changed, 738 insertions, 309 deletions
diff --git a/examples/udpfwd/Makefile b/examples/udpfwd/Makefile
index c23947a..5274d0f 100644
--- a/examples/udpfwd/Makefile
+++ b/examples/udpfwd/Makefile
@@ -40,5 +40,7 @@ LDLIBS += -L$(TLDK_ROOT)/$(RTE_TARGET)/lib
LDLIBS += -ltle_udp
EXTRA_CFLAGS += -O3
+CFLAGS_parse.o += -D_GNU_SOURCE
+CFLAGS_main.o += -mcmodel=medium
include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/udpfwd/README b/examples/udpfwd/README
index 8ab7e98..91b6e76 100644
--- a/examples/udpfwd/README
+++ b/examples/udpfwd/README
@@ -18,15 +18,15 @@ BE is responsible for:
(via tle_udp_rx_bulk).
- retrieve packets ready to be send out from UDP TLDK context(s)
and TX them over destined DPDK port.
-Right now only one RX/TX queue per port is used.
+Multiple RX/TX queues per port are supported by RSS. Right now the number of
+TX is same as the number of RX queue.
Each BE lcore can serve multiple DPDK ports, TLDK UDP contexts.
- Front End (FE)
FE responsibility is to open configured UDP streams and perform
send/recv over them. These streams can belong to different UDP contexts.
-Right now each lcore can act as BE or FE (but not both simultaneously).
-Master lcore can act as FE only.
+Right now each lcore can act as BE and/or FE.
Usage
=====
@@ -40,18 +40,18 @@ udpfwd <EAL parameters> -- \
-f | --fecfg <filename> /* frontend configuration file. */ \
<port0_params> <port1_params> ... <portN_params>
-port_params: port=<uint>,lcore=<uint>,\
+port_params: port=<uint>,lcore=<uint>[-<uint>],\
[rx_offload=<uint>,tx_offload=<uint>,mtu=<uint>,ipv4=<ipv4>,ipv6=<ipv6>]
port_params are used to configure the particular DPDK device (rte_ethdev port),
and specify BE lcore that will do RX/TX from/to the device and manage
-BE part of corresponding UDP context.
+BE part of corresponding UDP context. Multiple BE lcore can be specified.
-port - DPDK port id (right now on each port is used just one RX,
- one TX queue).
+port - DPDK port id (multiple queues are supported when multiple lcore
+ is specified for a port).
lcore - EAL lcore id to do IO over that port (rx_burst/tx_burst).
- several ports can be managed by the same lcore,
- but same port can't belong to more than one lcore.
+ several ports can be managed by the same lcore, and same port can
+ belong to more than one lcore.
rx_offload - RX HW offload capabilities to enable/use on this port.
(bitmask of DEV_RX_OFFLOAD_* values).
tx_offload - TX HW offload capabilities to enable/use on this port.
@@ -64,15 +64,15 @@ ipv6 - ipv6 address to assign to that port.
At least one of ipv4/ipv6 values have to be specified for each port.
As an example:
-udpfwd --lcores='3,6' -w 01:00.0 -- \
+udpfwd --lcores='3,6,8' -w 01:00.0 -- \
--promisc --rbufs 0x1000 --sbufs 0x1000 --streams 0x100 \
--fecfg ./fe.cfg --becfg ./be.cfg \
-port=0,lcore=6,rx_offload=0xf,tx_offload=0,\
+port=0,lcore=6,lcore=8,rx_offload=0xf,tx_offload=0,\
ipv4=192.168.1.233,ipv6=2001:4860:b002::28
-Will create TLDK UDP context on lcore=6 (BE lcore) to manage DPDK port 0.
-Will assign IPv4 address 192.168.1.233 and IPv6 address 2001:4860:b002::28
-to that port.
+Will create TLDK UDP context on lcore=6 and lcore=8 (BE lcore) to manage
+DPDK port 0. Will assign IPv4 address 192.168.1.233 and IPv6 address
+2001:4860:b002::28 to that port.
The following supported by DPDK RX HW offloads:
DEV_RX_OFFLOAD_VLAN_STRIP,
DEV_RX_OFFLOAD_IPV4_CKSUM,
@@ -81,6 +81,11 @@ The following supported by DPDK RX HW offloads:
will be enabled on that port.
No HW TX offloads will be enabled.
+If multiple lcore is specified per DPDK port, the following RSS hash will
+be enabled on that port:
+ ETH_RSS_UDP
+
+
Fornt-End (FE) and Back-End (BE) configuration files format:
------------------------------------------------------------
- each record on a separate line.
@@ -96,9 +101,10 @@ FE config record format:
lcore=<uint>,op=<"rx|tx|echo|fwd">,\
laddr=<ip>,lport=<uint16>,raddr=<ip>,rport=<uint16>,\
-[txlen=<uint>,fwladdr=<ip>,fwlport=<uint16>,fwraddr=<ip>,fwrport=<uint16>
+[txlen=<uint>,fwladdr=<ip>,fwlport=<uint16>,fwraddr=<ip>,fwrport=<uint16>,\
+belcore=<uint>]
-lcore - EAL lcore to manage that stream(s).
+lcore - EAL lcore to manage that stream(s) in the FE.
op - operation to perform on that stream:
"rx" - do receive only on that stream.
"tx" - do send only on that stream.
@@ -118,6 +124,7 @@ fwraddr - remote address for the forwarding stream(s) to open
("fwd mode only).
fwrport - remote port for the forwarding stream(s) to open
("fwd mode only).
+belcore - EAL lcore to manage that stream(s) in the BE.
Refer to fe.cfg for an example.
diff --git a/examples/udpfwd/main.c b/examples/udpfwd/main.c
index 2dde317..3daeb30 100644
--- a/examples/udpfwd/main.c
+++ b/examples/udpfwd/main.c
@@ -71,6 +71,36 @@ static const struct option long_opt[] = {
{NULL, 0, 0, 0}
};
+/**
+ * IPv4 Input size in bytes for RSS hash key calculation.
+ * source address, destination address, source port, and destination port.
+ */
+#define IPV4_TUPLE_SIZE 12
+
+/**
+ * IPv6 Input size in bytes for RSS hash key calculation.
+ * source address, destination address, source port, and destination port.
+ */
+#define IPV6_TUPLE_SIZE 36
+
+/**
+ * Location to be modified to create the IPv4 hash key which helps
+ * to distribute packets based on the destination UDP port.
+ */
+#define RSS_HASH_KEY_DEST_PORT_LOC_IPV4 15
+
+/*
+ * Location to be modified to create the IPv6 hash key which helps
+ * to distribute packets based on the destination UDP port.
+ */
+#define RSS_HASH_KEY_DEST_PORT_LOC_IPV6 39
+
+/**
+ * Size of the rte_eth_rss_reta_entry64 array to update through
+ * rte_eth_dev_rss_reta_update.
+ */
+#define RSS_RETA_CONF_ARRAY_SIZE (ETH_RSS_RETA_SIZE_512/RTE_RETA_GROUP_SIZE)
+
static volatile int force_quit;
static struct netbe_cfg becfg;
@@ -94,20 +124,145 @@ sig_handle(int signum)
force_quit = 1;
}
+static void
+prepare_hash_key(struct netbe_port *uprt, uint8_t key_size, uint16_t family)
+{
+ uint32_t align_nb_q;
+
+ align_nb_q = rte_align32pow2(uprt->nb_lcore);
+ memset(uprt->hash_key, 0, RSS_HASH_KEY_LENGTH);
+ uprt->hash_key_size = key_size;
+ if (family == AF_INET)
+ uprt->hash_key[RSS_HASH_KEY_DEST_PORT_LOC_IPV4] = align_nb_q;
+ else
+ uprt->hash_key[RSS_HASH_KEY_DEST_PORT_LOC_IPV6] = align_nb_q;
+}
+
+static uint32_t
+qidx_from_hash_index(uint32_t hash, uint32_t align_nb_q)
+{
+ uint32_t i, nb_bit, q;
+
+ nb_bit = (sizeof(uint32_t) * CHAR_BIT) - __builtin_clz(align_nb_q - 1);
+ q = (hash & 1);
+ for (i = 1; i < nb_bit; i++) {
+ hash >>= 1;
+ q <<= 1;
+ q |= (hash & 1);
+ }
+
+ return q;
+}
+
+static int
+update_rss_conf(struct netbe_port *uprt,
+ const struct rte_eth_dev_info *dev_info,
+ struct rte_eth_conf *port_conf)
+{
+ uint8_t hash_key_size;
+
+ if (uprt->nb_lcore > 1) {
+ if (dev_info->hash_key_size > 0)
+ hash_key_size = dev_info->hash_key_size;
+ else {
+ RTE_LOG(ERR, USER1,
+ "%s: dev_info did not provide a valid hash key size\n",
+ __func__);
+ return -EINVAL;
+ }
+
+ if (uprt->ipv4 != INADDR_ANY &&
+ memcmp(&uprt->ipv6, &in6addr_any,
+ sizeof(uprt->ipv6)) != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: RSS for both IPv4 and IPv6 not supported!\n",
+ __func__);
+ return -EINVAL;
+ } else if (uprt->ipv4 != INADDR_ANY) {
+ prepare_hash_key(uprt, hash_key_size, AF_INET);
+ } else if (memcmp(&uprt->ipv6, &in6addr_any, sizeof(uprt->ipv6))
+ != 0) {
+ prepare_hash_key(uprt, hash_key_size, AF_INET6);
+ } else {
+ RTE_LOG(ERR, USER1,
+ "%s: No IPv4 or IPv6 address is found!\n",
+ __func__);
+ return -EINVAL;
+ }
+ port_conf->rxmode.mq_mode = ETH_MQ_RX_RSS;
+ port_conf->rx_adv_conf.rss_conf.rss_hf = ETH_RSS_UDP;
+ port_conf->rx_adv_conf.rss_conf.rss_key_len = hash_key_size;
+ port_conf->rx_adv_conf.rss_conf.rss_key = uprt->hash_key;
+ }
+
+ return 0;
+}
+
+static int
+update_rss_reta(struct netbe_port *uprt,
+ const struct rte_eth_dev_info *dev_info)
+{
+ struct rte_eth_rss_reta_entry64 reta_conf[RSS_RETA_CONF_ARRAY_SIZE];
+ int32_t i, rc, align_nb_q;
+ int32_t q_index, idx, shift;
+
+ if (uprt->nb_lcore > 1) {
+ if (dev_info->reta_size == 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: Redirection table size 0 is invalid for RSS\n",
+ __func__);
+ return -EINVAL;
+ }
+ RTE_LOG(NOTICE, USER1,
+ "%s: The reta size of port %d is %u\n",
+ __func__, uprt->id, dev_info->reta_size);
+
+ if (dev_info->reta_size > ETH_RSS_RETA_SIZE_512) {
+ RTE_LOG(ERR, USER1,
+ "%s: More than %u entries of Reta not supported\n",
+ __func__, ETH_RSS_RETA_SIZE_512);
+ return -EINVAL;
+ }
+
+ memset(reta_conf, 0, sizeof(reta_conf));
+ align_nb_q = rte_align32pow2(uprt->nb_lcore);
+ for (i = 0; i < align_nb_q; i++) {
+ q_index = qidx_from_hash_index(i, align_nb_q) %
+ uprt->nb_lcore;
+
+ idx = i / RTE_RETA_GROUP_SIZE;
+ shift = i % RTE_RETA_GROUP_SIZE;
+ reta_conf[idx].mask |= (1ULL << shift);
+ reta_conf[idx].reta[shift] = q_index;
+ RTE_LOG(NOTICE, USER1,
+ "%s: port=%u RSS reta conf: hash=%u, q=%u\n",
+ __func__, uprt->id, i, q_index);
+ }
+
+ rc = rte_eth_dev_rss_reta_update(uprt->id,
+ reta_conf, dev_info->reta_size);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: Bad redirection table parameter, rc = %d\n",
+ __func__, rc);
+ return rc;
+ }
+ }
+
+ return 0;
+}
+
/*
* Initilise DPDK port.
- * In current version, only one queue per port is used.
+ * In current version, multi-queue per port is used.
*/
static int
-port_init(struct netbe_port *uprt, struct rte_mempool *mp)
+port_init(struct netbe_port *uprt)
{
- int32_t socket, rc;
- uint16_t q;
+ int32_t rc;
struct rte_eth_conf port_conf;
struct rte_eth_dev_info dev_info;
- const uint16_t rx_rings = 1, tx_rings = 1;
-
rte_eth_dev_info_get(uprt->id, &dev_info);
if ((dev_info.rx_offload_capa & uprt->rx_offload) != uprt->rx_offload) {
RTE_LOG(ERR, USER1,
@@ -130,16 +285,33 @@ port_init(struct netbe_port *uprt, struct rte_mempool *mp)
__func__, uprt->id);
port_conf.rxmode.hw_ip_checksum = 1;
}
-
port_conf.rxmode.max_rx_pkt_len = uprt->mtu + ETHER_CRC_LEN;
- rc = rte_eth_dev_configure(uprt->id, rx_rings, tx_rings, &port_conf);
+ rc = update_rss_conf(uprt, &dev_info, &port_conf);
+ if (rc != 0)
+ return rc;
+
+ rc = rte_eth_dev_configure(uprt->id, uprt->nb_lcore, uprt->nb_lcore,
+ &port_conf);
RTE_LOG(NOTICE, USER1,
- "%s: rte_eth_dev_configure(%u) returns %d;\n",
- __func__, uprt->id, rc);
+ "%s: rte_eth_dev_configure(prt_id=%u, nb_rxq=%u, nb_txq=%u) "
+ "returns %d;\n", __func__, uprt->id, uprt->nb_lcore,
+ uprt->nb_lcore, rc);
if (rc != 0)
return rc;
+ return 0;
+}
+
+static int
+queue_init(struct netbe_port *uprt, struct rte_mempool *mp)
+{
+ int32_t socket, rc;
+ uint16_t q;
+ struct rte_eth_dev_info dev_info;
+
+ rte_eth_dev_info_get(uprt->id, &dev_info);
+
socket = rte_eth_dev_socket_id(uprt->id);
dev_info.default_rxconf.rx_drop_en = 1;
@@ -151,21 +323,27 @@ port_init(struct netbe_port *uprt, struct rte_mempool *mp)
dev_info.default_txconf.txq_flags = 0;
}
- for (q = 0; q < rx_rings; q++) {
+ for (q = 0; q < uprt->nb_lcore; q++) {
rc = rte_eth_rx_queue_setup(uprt->id, q, RX_RING_SIZE,
- socket, NULL, mp);
- if (rc < 0)
+ socket, &dev_info.default_rxconf, mp);
+ if (rc < 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: rx queue=%u setup failed with error code: %d\n",
+ __func__, q, rc);
return rc;
+ }
}
- for (q = 0; q < tx_rings; q++) {
+ for (q = 0; q < uprt->nb_lcore; q++) {
rc = rte_eth_tx_queue_setup(uprt->id, q, TX_RING_SIZE,
socket, &dev_info.default_txconf);
- if (rc < 0)
+ if (rc < 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: tx queue=%u setup failed with error code: %d\n",
+ __func__, q, rc);
return rc;
+ }
}
-
-
return 0;
}
@@ -190,13 +368,29 @@ check_lcore(uint32_t lc)
static void
log_netbe_prt(const struct netbe_port *uprt)
{
+ uint32_t i;
+ char corelist[2 * RTE_MAX_LCORE + 1];
+ char hashkey[2 * RSS_HASH_KEY_LENGTH];
+
+ memset(corelist, 0, sizeof(corelist));
+ memset(hashkey, 0, sizeof(hashkey));
+ for (i = 0; i < uprt->nb_lcore; i++)
+ if (i < uprt->nb_lcore - 1)
+ sprintf(corelist + (2 * i), "%u,", uprt->lcore[i]);
+ else
+ sprintf(corelist + (2 * i), "%u", uprt->lcore[i]);
+
+ for (i = 0; i < uprt->hash_key_size; i++)
+ sprintf(hashkey + (2 * i), "%02x", uprt->hash_key[i]);
+
RTE_LOG(NOTICE, USER1,
- "uprt %p = <id = %u, lcore = %u, "
- "mtu = %u, rx_offload = %u, tx_offload = %u,\n"
+ "uprt %p = <id = %u, lcore = <%s>, mtu = %u, "
+ "rx_offload = %u, tx_offload = %u,\n"
"ipv4 = %#x, "
"ipv6 = %04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx, "
- "mac = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx>;\n",
- uprt, uprt->id, uprt->lcore,
+ "mac = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx>;\n"
+ "hashkey = %s;\n",
+ uprt, uprt->id, corelist,
uprt->mtu, uprt->rx_offload, uprt->tx_offload,
uprt->ipv4,
uprt->ipv6.s6_addr16[0], uprt->ipv6.s6_addr16[1],
@@ -205,7 +399,8 @@ log_netbe_prt(const struct netbe_port *uprt)
uprt->ipv6.s6_addr16[6], uprt->ipv6.s6_addr16[7],
uprt->mac.addr_bytes[0], uprt->mac.addr_bytes[1],
uprt->mac.addr_bytes[2], uprt->mac.addr_bytes[3],
- uprt->mac.addr_bytes[4], uprt->mac.addr_bytes[5]);
+ uprt->mac.addr_bytes[4], uprt->mac.addr_bytes[5],
+ hashkey);
}
static void
@@ -262,53 +457,126 @@ frag_pool_init(uint32_t sid)
return 0;
}
+static struct netbe_lcore *
+find_initilized_lcore(struct netbe_cfg *cfg, uint32_t lc_num)
+{
+ uint32_t i;
+
+ for (i = 0; i < cfg->cpu_num; i++)
+ if (cfg->cpu[i].id == lc_num)
+ return &cfg->cpu[i];
+
+ return NULL;
+}
+
+static int
+calculate_nb_prtq(struct netbe_cfg *cfg)
+{
+ uint32_t i, j, rc;
+ struct netbe_port *prt;
+ struct netbe_lcore *lc;
+
+ for (i = 0; i != cfg->prt_num; i++) {
+ prt = &cfg->prt[i];
+ for (j = 0; j != prt->nb_lcore; j++) {
+ rc = check_lcore(prt->lcore[j]);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: processing failed with err: %d\n",
+ __func__, rc);
+ return rc;
+ }
+
+ lc = find_initilized_lcore(cfg, prt->lcore[j]);
+ if (lc == NULL) {
+ lc = &cfg->cpu[cfg->cpu_num];
+ lc->id = prt->lcore[j];
+ cfg->cpu_num++;
+ }
+ lc->prtq[lc->prtq_num].rxqid = j;
+ lc->prtq[lc->prtq_num].txqid = j;
+ lc->prtq[lc->prtq_num].port = *prt;
+
+ lc->prtq_num++;
+ }
+ }
+
+ return 0;
+}
/*
* Setup all enabled ports.
*/
-static void
+static int
netbe_port_init(struct netbe_cfg *cfg, int argc, char *argv[])
{
int32_t rc;
- uint32_t i, n, sid;
+ uint32_t i, n, sid, j;
+ struct netbe_port *prt;
n = RTE_MIN(RTE_DIM(cfg->prt), (uint32_t)argc);
rc = 0;
for (i = 0; i != n; i++) {
rc = parse_netbe_arg(cfg->prt + i, argv[i]);
- if (rc != 0)
- break;
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: processing of \"%s\" failed with error code: %d\n",
+ __func__, argv[i], rc);
+ return rc;
+ }
+ }
+ cfg->prt_num = i;
- rc = check_lcore(cfg->prt[i].lcore);
- if (rc != 0)
- break;
+ /* calculate number of queues per lcore. */
+ rc = calculate_nb_prtq(cfg);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1, "%s: processing of arguments failed"
+ " with error code: %d\n", __func__, rc);
+ return rc;
+ }
- sid = rte_lcore_to_socket_id(cfg->prt[i].lcore) + 1;
- assert(sid < RTE_DIM(mpool));
+ for (i = 0; i != cfg->prt_num; i++) {
+ prt = cfg->prt + i;
+ rc = port_init(prt);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: port=%u init failed with error code: %d\n",
+ __func__, prt->id, rc);
+ return rc;
+ }
+ rte_eth_macaddr_get(prt->id, &prt->mac);
+ if (cfg->promisc)
+ rte_eth_promiscuous_enable(prt->id);
- if (mpool[sid] == NULL && (rc = pool_init(sid)) != 0)
- break;
+ for (j = 0; j < prt->nb_lcore; j++) {
+ sid = rte_lcore_to_socket_id(prt->lcore[j]) + 1;
+ assert(sid < RTE_DIM(mpool));
- if (frag_mpool[sid] == NULL && (rc = frag_pool_init(sid)) != 0)
- break;
+ if (mpool[sid] == NULL) {
+ rc = pool_init(sid);
+ if (rc != 0)
+ return rc;
+ }
- rc = port_init(cfg->prt + i, mpool[sid]);
- if (rc != 0)
- break;
+ if (frag_mpool[sid] == NULL) {
+ rc = frag_pool_init(sid);
+ if (rc != 0)
+ return rc;
+ }
- rte_eth_macaddr_get(cfg->prt[i].id, &cfg->prt[i].mac);
- if (cfg->promisc)
- rte_eth_promiscuous_enable(cfg->prt[i].id);
+ rc = queue_init(prt, mpool[sid]);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: lcore=%u queue init failed with err: %d\n",
+ __func__, prt->lcore[j], rc);
+ return rc;
+ }
+ }
}
-
- if (rc != 0)
- rte_exit(EXIT_FAILURE,
- "%s: processing of \"%s\" failed with error code: %d\n",
- __func__, argv[i], rc);
-
- cfg->prt_num = i;
log_netbe_cfg(cfg);
+
+ return 0;
}
/*
@@ -487,128 +755,153 @@ fill_dst(struct tle_udp_dest *dst, struct netbe_dev *bed,
}
}
-
-/*
- * BE lcore setup routine.
- */
static int
-lcore_init(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm,
- const struct netbe_port prt[], uint32_t prt_num)
+create_context(struct netbe_lcore *lc,
+ const struct tle_udp_ctx_param *ctx_prm)
{
- int32_t rc, sid;
- uint32_t i;
+ uint32_t rc = 0, sid;
uint64_t frag_cycles;
struct tle_udp_ctx_param cprm;
- struct tle_udp_dev_param dprm;
- lc->id = prt[0].lcore;
- lc->prt_num = prt_num;
+ if (lc->ctx == NULL) {
+ sid = rte_lcore_to_socket_id(lc->id);
- sid = rte_lcore_to_socket_id(lc->id);
+ rc = lcore_lpm_init(lc);
+ if (rc != 0)
+ return rc;
- rc = lcore_lpm_init(lc);
- if (rc != 0)
- return rc;
+ cprm = *ctx_prm;
+ cprm.socket_id = sid;
+ cprm.lookup4 = lpm4_dst_lookup;
+ cprm.lookup4_data = lc;
+ cprm.lookup6 = lpm6_dst_lookup;
+ cprm.lookup6_data = lc;
- cprm = *ctx_prm;
- cprm.socket_id = sid;
- cprm.lookup4 = lpm4_dst_lookup;
- cprm.lookup4_data = lc;
- cprm.lookup6 = lpm6_dst_lookup;
- cprm.lookup6_data = lc;
+ /* to facilitate both IPv4 and IPv6. */
+ cprm.max_streams *= 2;
- /* to facilitate both IPv4 and IPv6. */
- cprm.max_streams *= 2;
+ frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) /
+ MS_PER_S * FRAG_TTL;
- frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S * FRAG_TTL;
- lc->ftbl = rte_ip_frag_table_create(cprm.max_streams,
- FRAG_TBL_BUCKET_ENTRIES, cprm.max_streams, frag_cycles, sid);
- RTE_LOG(NOTICE, USER1, "%s(lcore=%u): frag_tbl=%p;\n",
- __func__, lc->id, lc->ftbl);
+ lc->ftbl = rte_ip_frag_table_create(cprm.max_streams,
+ FRAG_TBL_BUCKET_ENTRIES, cprm.max_streams,
+ frag_cycles, sid);
- lc->ctx = tle_udp_create(&cprm);
- RTE_LOG(NOTICE, USER1, "%s(lcore=%u): udp_ctx=%p;\n",
- __func__, lc->id, lc->ctx);
+ RTE_LOG(NOTICE, USER1, "%s(lcore=%u): frag_tbl=%p;\n",
+ __func__, lc->id, lc->ftbl);
- if (lc->ctx == NULL || lc->ftbl == NULL)
- rc = ENOMEM;
+ lc->ctx = tle_udp_create(&cprm);
- for (i = 0; i != prt_num && rc == 0; i++) {
+ RTE_LOG(NOTICE, USER1, "%s(lcore=%u): udp_ctx=%p;\n",
+ __func__, lc->id, lc->ctx);
- memset(&dprm, 0, sizeof(dprm));
+ if (lc->ctx == NULL || lc->ftbl == NULL)
+ rc = ENOMEM;
+ }
- lc->prt[i].rxqid = 0;
- lc->prt[i].txqid = 0;
- lc->prt[i].port = prt[i];
+ return rc;
+}
+
+/*
+ * BE lcore setup routine.
+ */
+static int
+lcore_init(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm,
+ const uint32_t prtqid, uint16_t *bl_ports, uint32_t nb_bl_ports)
+{
+ int32_t rc = 0;
+ struct tle_udp_dev_param dprm;
- dprm.rx_offload = prt[i].rx_offload;
- dprm.tx_offload = prt[i].tx_offload;
- dprm.local_addr4.s_addr = prt[i].ipv4;
- memcpy(&dprm.local_addr6, &prt[i].ipv6, sizeof(prt[i].ipv6));
+ rc = create_context(lc, ctx_prm);
- lc->prt[i].dev = tle_udp_add_dev(lc->ctx, &dprm);
- RTE_LOG(NOTICE, USER1, "%s(lcore=%u, port=%u), udp_dev: %p;\n",
- __func__, lc->id, prt[i].id, lc->prt[i].dev);
- if (lc->prt[i].dev == NULL)
+ if (lc->ctx != NULL) {
+ memset(&dprm, 0, sizeof(dprm));
+ dprm.rx_offload = lc->prtq[prtqid].port.rx_offload;
+ dprm.tx_offload = lc->prtq[prtqid].port.tx_offload;
+ dprm.local_addr4.s_addr = lc->prtq[prtqid].port.ipv4;
+ memcpy(&dprm.local_addr6, &lc->prtq[prtqid].port.ipv6,
+ sizeof(lc->prtq[prtqid].port.ipv6));
+ dprm.nb_bl_ports = nb_bl_ports;
+ dprm.bl_ports = bl_ports;
+
+ lc->prtq[prtqid].dev = tle_udp_add_dev(lc->ctx, &dprm);
+
+ RTE_LOG(NOTICE, USER1,
+ "%s(lcore=%u, port=%u, qid=%u), udp_dev: %p\n",
+ __func__, lc->id, lc->prtq[prtqid].port.id,
+ lc->prtq[prtqid].rxqid, lc->prtq[prtqid].dev);
+
+ if (lc->prtq[prtqid].dev == NULL)
rc = -rte_errno;
- }
- if (rc != 0) {
- RTE_LOG(ERR, USER1, "%s(lcore=%u) failed with error code: %d\n",
- __func__, lc->id, rc);
- tle_udp_destroy(lc->ctx);
- rte_ip_frag_table_destroy(lc->ftbl);
- rte_lpm_free(lc->lpm4);
- rte_lpm6_free(lc->lpm6);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s(lcore=%u) failed with error code: %d\n",
+ __func__, lc->id, rc);
+ tle_udp_destroy(lc->ctx);
+ rte_ip_frag_table_destroy(lc->ftbl);
+ rte_lpm_free(lc->lpm4);
+ rte_lpm6_free(lc->lpm6);
+ return rc;
+ }
}
return rc;
}
-static int
-prt_lcore_cmp(const void *s1, const void *s2)
+static uint16_t
+create_blocklist(const struct netbe_port *beprt, uint16_t *bl_ports,
+ uint32_t q)
{
- const struct netbe_port *p1, *p2;
+ uint32_t i, j, qid, align_nb_q;
- p1 = s1;
- p2 = s2;
- return p1->lcore - p2->lcore;
+ align_nb_q = rte_align32pow2(beprt->nb_lcore);
+ for (i = 0, j = 0; i < (UINT16_MAX + 1); i++) {
+ qid = (i % align_nb_q) % beprt->nb_lcore;
+ if (qid != q)
+ bl_ports[j++] = i;
+ }
+
+ return j;
}
-static void
-netbe_lcore_init(struct netbe_cfg *cfg, const struct tle_udp_ctx_param *ctx_prm)
+static int
+netbe_lcore_init(struct netbe_cfg *cfg,
+ const struct tle_udp_ctx_param *ctx_prm)
{
int32_t rc;
- uint32_t i, k, n, num;
- struct netbe_port sp[RTE_DIM(cfg->prt)];
-
- num = cfg->prt_num;
- memcpy(sp, cfg->prt, sizeof(sp[0]) * num);
- qsort(sp, num, sizeof(sp[0]), prt_lcore_cmp);
-
- /* Fill ports to be used by each lcore. */
+ uint32_t i, j, nb_bl_ports, sz;
+ struct netbe_lcore *lc;
+ static uint16_t *bl_ports;
- k = 0;
- n = 0;
+ /* Create the udp context and attached queue for each lcore. */
rc = 0;
- for (i = 0; i != num && rc == 0; i++) {
- if (sp[n].lcore != sp[i].lcore) {
- rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n);
- n = i;
- k++;
+ nb_bl_ports = 0;
+ sz = sizeof(uint16_t) * UINT16_MAX;
+ bl_ports = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
+ for (i = 0; i < cfg->cpu_num; i++) {
+ lc = &cfg->cpu[i];
+ for (j = 0; j < lc->prtq_num; j++) {
+ memset((uint8_t *)bl_ports, 0, sz);
+ /* create list of blocked ports based on q */
+ nb_bl_ports = create_blocklist(&lc->prtq[j].port,
+ bl_ports, lc->prtq[j].rxqid);
+ RTE_LOG(NOTICE, USER1, "lc=%u, q=%u, nb_bl_ports4=%u\n",
+ lc->id, lc->prtq[j].rxqid, nb_bl_ports);
+
+ rc = lcore_init(lc, ctx_prm, j, bl_ports, nb_bl_ports);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: failed with error code: %d\n",
+ __func__, rc);
+ rte_free(bl_ports);
+ return rc;
+ }
}
}
+ rte_free(bl_ports);
- if (rc == 0 && i != n) {
- rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n);
- k++;
- }
-
- if (rc != 0)
- rte_exit(EXIT_FAILURE, "%s: failed with error code: %d\n",
- __func__, rc);
-
- cfg->cpu_num = k;
+ return 0;
}
static void
@@ -659,7 +952,7 @@ netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family,
rc = 0;
for (i = 0; i != dnum && rc == 0; i++) {
- fill_dst(dp + i, lc->prt + dev_idx, dst + i, l3_type, sid);
+ fill_dst(dp + i, lc->prtq + dev_idx, dst + i, l3_type, sid);
if (family == AF_INET)
rc = netbe_add_ipv4_route(lc, dst + i, n + i);
else
@@ -675,42 +968,11 @@ netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family,
}
static int
-netbe_port2lcore(struct netbe_cfg *cfg, uint32_t port, struct netbe_lcore **plc)
-{
- uint32_t i, j;
- struct netbe_lcore *lc;
-
- for (i = 0; i != cfg->cpu_num; i++) {
- lc = cfg->cpu + i;
- for (j = 0; j != cfg->prt_num; j++) {
- if (lc->prt[j].port.id == port) {
- *plc = lc;
- return j;
- }
- }
- }
-
- return -ENOENT;
-}
-
-static int
-netbe_dest_cmp(const void *s1, const void *s2)
-{
- const struct netbe_dest *p1, *p2;
-
- p1 = s1;
- p2 = s2;
- if (p1->port == p2->port)
- return p1->family - p2->family;
- else
- return p1->port - p2->port;
-}
-
-static int
netbe_dest_init(const char *fname, struct netbe_cfg *cfg)
{
int32_t rc;
- uint32_t f, i, j, p;
+ uint32_t f, i, p;
+ uint32_t k, l, cnt;
struct netbe_lcore *lc;
struct netbe_dest_prm prm;
@@ -718,29 +980,36 @@ netbe_dest_init(const char *fname, struct netbe_cfg *cfg)
if (rc != 0)
return rc;
- qsort(prm.dest, prm.nb_dest, sizeof(prm.dest[0]), netbe_dest_cmp);
-
rc = 0;
- for (i = 0; i != prm.nb_dest; i = j) {
+ for (i = 0; i != prm.nb_dest; i++) {
p = prm.dest[i].port;
f = prm.dest[i].family;
- for (j = i + 1; j != prm.nb_dest && p == prm.dest[j].port &&
- f == prm.dest[j].family;
- j++)
- ;
- rc = netbe_port2lcore(cfg, p, &lc);
- if (rc < 0) {
+ cnt = 0;
+ for (k = 0; k != cfg->cpu_num; k++) {
+ lc = cfg->cpu + k;
+ for (l = 0; l != lc->prtq_num; l++)
+ if (lc->prtq[l].port.id == p) {
+ rc = netbe_add_dest(lc, l, f,
+ prm.dest + i, 1);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s(lcore=%u, family=%u) could not "
+ "add destinations(%u);\n",
+ __func__, lc->id, f, i);
+ return -ENOSPC;
+ }
+ cnt++;
+ }
+ }
+
+ if (cnt == 0) {
RTE_LOG(ERR, USER1, "%s(%s) error at line %u: "
"port %u not managed by any lcore;\n",
__func__, fname, prm.dest[i].line, p);
break;
}
-
- rc = netbe_add_dest(lc, rc, f, prm.dest + i, j - i);
- if (rc != 0)
- break;
}
free(prm.dest);
@@ -817,7 +1086,6 @@ netfe_stream_dump(const struct netfe_stream *fes)
fes->stat.txev[TLE_SEV_UP]);
}
-
/*
* helper function: opens IPv4 and IPv6 streams for selected port.
*/
@@ -828,6 +1096,9 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
int32_t rc;
uint32_t sidx;
struct netfe_stream *fes;
+ struct sockaddr_in *l4;
+ struct sockaddr_in6 *l6;
+ uint16_t errport;
sidx = fe->sidx;
fes = fe->fs + sidx;
@@ -842,8 +1113,10 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
if (op != FWD)
sprm->send_ev = fes->txev;
- RTE_LOG(ERR, USER1, "%s(%u) [%u]={op=%hu, rxev=%p, txev=%p}\n",
- __func__, lcore, sidx, op, fes->rxev, fes->txev);
+ RTE_LOG(ERR, USER1,
+ "%s(%u) [%u]={op=%hu, rxev=%p, txev=%p}, be_lc=%u\n",
+ __func__, lcore, sidx, op, fes->rxev, fes->txev,
+ becfg.cpu[bidx].id);
if (fes->rxev == NULL || fes->txev == NULL) {
netfe_stream_close(fe, 0);
rte_errno = ENOMEM;
@@ -865,6 +1138,17 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
rc = rte_errno;
netfe_stream_close(fe, 0);
rte_errno = rc;
+
+ if (sprm->local_addr.ss_family == AF_INET) {
+ l4 = (struct sockaddr_in *) &sprm->local_addr;
+ errport = ntohs(l4->sin_port);
+ } else {
+ l6 = (struct sockaddr_in6 *) &sprm->local_addr;
+ errport = ntohs(l6->sin6_port);
+ }
+ RTE_LOG(ERR, USER1, "stream open failed for port %u with error "
+ "code=%u, bidx=%u, lc=%u\n",
+ errport, rc, bidx, becfg.cpu[bidx].id);
return NULL;
}
@@ -873,7 +1157,6 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
fe->sidx = sidx + 1;
return fes;
-
}
static inline int
@@ -1034,7 +1317,6 @@ netfe_tx_process(uint32_t lcore, struct netfe_stream *fes)
fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
}
-
static inline void
netfe_fwd(uint32_t lcore, struct netfe_stream *fes)
{
@@ -1347,7 +1629,6 @@ netfe_lcore(void)
}
}
-
static void
netfe_lcore_fini(void)
{
@@ -1359,7 +1640,6 @@ netfe_lcore_fini(void)
return;
while (fe->sidx != 0) {
-
i = fe->sidx - 1;
netfe_stream_dump(fe->fs + i);
netfe_stream_close(fe, 1);
@@ -1379,26 +1659,26 @@ netbe_rx(struct netbe_lcore *lc, uint32_t pidx)
struct rte_mbuf *rp[MAX_PKT_BURST];
int32_t rc[MAX_PKT_BURST];
- n = rte_eth_rx_burst(lc->prt[pidx].port.id,
- lc->prt[pidx].rxqid, pkt, RTE_DIM(pkt));
+ n = rte_eth_rx_burst(lc->prtq[pidx].port.id,
+ lc->prtq[pidx].rxqid, pkt, RTE_DIM(pkt));
if (n == 0)
return;
- lc->prt[pidx].rx_stat.in += n;
+ lc->prtq[pidx].rx_stat.in += n;
NETBE_TRACE("%s(%u): rte_eth_rx_burst(%u, %u) returns %u\n",
- __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].rxqid,
+ __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].rxqid,
n);
- k = tle_udp_rx_bulk(lc->prt[pidx].dev, pkt, rp, rc, n);
+ k = tle_udp_rx_bulk(lc->prtq[pidx].dev, pkt, rp, rc, n);
- lc->prt[pidx].rx_stat.up += k;
- lc->prt[pidx].rx_stat.drop += n - k;
+ lc->prtq[pidx].rx_stat.up += k;
+ lc->prtq[pidx].rx_stat.drop += n - k;
NETBE_TRACE("%s(%u): tle_udp_rx_bulk(%p, %u) returns %u\n",
- __func__, lc->id, lc->prt[pidx].dev, n, k);
+ __func__, lc->id, lc->prtq[pidx].dev, n, k);
for (j = 0; j != n - k; j++) {
NETBE_TRACE("%s:%d(port=%u) rp[%u]={%p, %d};\n",
- __func__, __LINE__, lc->prt[pidx].port.id,
+ __func__, __LINE__, lc->prtq[pidx].port.id,
j, rp[j], rc[j]);
rte_pktmbuf_free(rp[j]);
}
@@ -1410,14 +1690,14 @@ netbe_tx(struct netbe_lcore *lc, uint32_t pidx)
uint32_t j, k, n;
struct rte_mbuf **mb;
- n = lc->prt[pidx].tx_buf.num;
- k = RTE_DIM(lc->prt[pidx].tx_buf.pkt) - n;
- mb = lc->prt[pidx].tx_buf.pkt;
+ n = lc->prtq[pidx].tx_buf.num;
+ k = RTE_DIM(lc->prtq[pidx].tx_buf.pkt) - n;
+ mb = lc->prtq[pidx].tx_buf.pkt;
- if (k >= RTE_DIM(lc->prt[pidx].tx_buf.pkt) / 2) {
- j = tle_udp_tx_bulk(lc->prt[pidx].dev, mb + n, k);
+ if (k >= RTE_DIM(lc->prtq[pidx].tx_buf.pkt) / 2) {
+ j = tle_udp_tx_bulk(lc->prtq[pidx].dev, mb + n, k);
n += j;
- lc->prt[pidx].tx_stat.down += j;
+ lc->prtq[pidx].tx_stat.down += j;
}
if (n == 0)
@@ -1425,21 +1705,21 @@ netbe_tx(struct netbe_lcore *lc, uint32_t pidx)
NETBE_TRACE("%s(%u): tle_udp_tx_bulk(%p) returns %u,\n"
"total pkts to send: %u\n",
- __func__, lc->id, lc->prt[pidx].dev, j, n);
+ __func__, lc->id, lc->prtq[pidx].dev, j, n);
for (j = 0; j != n; j++)
NETBE_PKT_DUMP(mb[j]);
- k = rte_eth_tx_burst(lc->prt[pidx].port.id,
- lc->prt[pidx].txqid, mb, n);
+ k = rte_eth_tx_burst(lc->prtq[pidx].port.id,
+ lc->prtq[pidx].txqid, mb, n);
- lc->prt[pidx].tx_stat.out += k;
- lc->prt[pidx].tx_stat.drop += n - k;
+ lc->prtq[pidx].tx_stat.out += k;
+ lc->prtq[pidx].tx_stat.drop += n - k;
NETBE_TRACE("%s(%u): rte_eth_tx_burst(%u, %u, %u) returns %u\n",
- __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].txqid,
+ __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].txqid,
n, k);
- lc->prt[pidx].tx_buf.num = n - k;
+ lc->prtq[pidx].tx_buf.num = n - k;
if (k != 0)
for (j = k; j != n; j++)
mb[j - k] = mb[j];
@@ -1458,17 +1738,19 @@ netbe_lcore_setup(struct netbe_lcore *lc)
* ???????
* wait for FE lcores to start, so BE dont' drop any packets
* because corresponding streams not opened yet by FE.
- * usefull when used with pcap PMDS.
+ * useful when used with pcap PMDS.
* think better way, or should this timeout be a cmdlien parameter.
* ???????
*/
rte_delay_ms(10);
rc = 0;
- for (i = 0; i != lc->prt_num && rc == 0; i++) {
+ for (i = 0; i != lc->prtq_num && rc == 0; i++) {
RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, udp_dev: %p)\n",
- __func__, i, lc->prt[i].port.id, lc->prt[i].dev);
- rc = setup_rx_cb(&lc->prt[i].port, lc);
+ __func__, i, lc->prtq[i].port.id, lc->prtq[i].dev);
+ rc = setup_rx_cb(&lc->prtq[i].port, lc, lc->prtq[i].rxqid);
+ if (rc < 0)
+ return rc;
}
if (rc == 0)
@@ -1486,7 +1768,7 @@ netbe_lcore(void)
if (lc == NULL)
return;
- for (i = 0; i != lc->prt_num; i++) {
+ for (i = 0; i != lc->prtq_num; i++) {
netbe_rx(lc, i);
netbe_tx(lc, i);
}
@@ -1504,26 +1786,26 @@ netbe_lcore_clear(void)
RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) finish\n",
__func__, lc->id, lc->ctx);
- for (i = 0; i != lc->prt_num; i++) {
- RTE_LOG(NOTICE, USER1, "%s:%u(port=%u) "
+ for (i = 0; i != lc->prtq_num; i++) {
+ RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, lcore=%u, q=%u, dev=%p) "
"rx_stats={"
"in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "}, "
"tx_stats={"
"in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "};\n",
- __func__, i, lc->prt[i].port.id,
- lc->prt[i].rx_stat.in,
- lc->prt[i].rx_stat.up,
- lc->prt[i].rx_stat.drop,
- lc->prt[i].tx_stat.down,
- lc->prt[i].tx_stat.out,
- lc->prt[i].tx_stat.drop);
+ __func__, i, lc->prtq[i].port.id, lc->id,
+ lc->prtq[i].rxqid,
+ lc->prtq[i].dev,
+ lc->prtq[i].rx_stat.in,
+ lc->prtq[i].rx_stat.up,
+ lc->prtq[i].rx_stat.drop,
+ lc->prtq[i].tx_stat.down,
+ lc->prtq[i].tx_stat.out,
+ lc->prtq[i].tx_stat.drop);
}
-
- for (i = 0; i != lc->prt_num; i++) {
- for (j = 0; j != lc->prt[i].tx_buf.num; j++)
- rte_pktmbuf_free(lc->prt[i].tx_buf.pkt[j]);
- }
+ for (i = 0; i != lc->prtq_num; i++)
+ for (j = 0; j != lc->prtq[i].tx_buf.num; j++)
+ rte_pktmbuf_free(lc->prtq[i].tx_buf.pkt[j]);
RTE_PER_LCORE(_be) = NULL;
}
@@ -1548,9 +1830,8 @@ lcore_main(void *arg)
rc = netfe_lcore_init(&prm->fe);
/* lcore FE init. */
- if (rc == 0 && prm->be.lc != NULL) {
+ if (rc == 0 && prm->be.lc != NULL)
rc = netbe_lcore_setup(prm->be.lc);
- }
if (rc != 0)
sig_handle(SIGQUIT);
@@ -1569,8 +1850,6 @@ lcore_main(void *arg)
return rc;
}
-
-
static int
netfe_lcore_cmp(const void *s1, const void *s2)
{
@@ -1582,38 +1861,82 @@ netfe_lcore_cmp(const void *s1, const void *s2)
}
/*
+ * Helper functions, verify the queue for corresponding UDP port.
+ */
+static uint8_t
+varify_queue_for_port(const struct netbe_dev *prtq, const uint16_t lport)
+{
+ uint32_t align_nb_q, qid;
+
+ align_nb_q = rte_align32pow2(prtq->port.nb_lcore);
+ qid = (lport % align_nb_q) % prtq->port.nb_lcore;
+ if (prtq->rxqid == qid)
+ return 1;
+
+ return 0;
+}
+
+/*
* Helper functions, finds BE by given local and remote addresses.
*/
static int
-netbe_find4(const struct in_addr *laddr, const struct in_addr *raddr)
+netbe_find4(const struct in_addr *laddr, const uint16_t lport,
+ const struct in_addr *raddr, const uint32_t be_lc)
{
uint32_t i, j;
- int32_t rc;
uint32_t idx;
struct netbe_lcore *bc;
- if (laddr->s_addr == INADDR_ANY) {
-
- /* we have exactly one BE, use it for all traffic */
- if (becfg.cpu_num == 1)
- return 0;
+ /* we have exactly one BE, use it for all traffic */
+ if (becfg.cpu_num == 1)
+ return 0;
- /* search by remote address. */
+ /* search by provided be_lcore */
+ if (be_lc != LCORE_ID_ANY) {
for (i = 0; i != becfg.cpu_num; i++) {
bc = becfg.cpu + i;
- rc = rte_lpm_lookup(bc->lpm4,
- rte_be_to_cpu_32(raddr->s_addr), &idx);
- if (rc == 0)
+ if (be_lc == bc->id)
return i;
}
- } else {
+ RTE_LOG(NOTICE, USER1, "%s: no stream with be_lcore=%u\n",
+ __func__, be_lc);
+ return -ENOENT;
+ }
- /* search by local address */
+ /* search by local address */
+ if (laddr->s_addr != INADDR_ANY) {
for (i = 0; i != becfg.cpu_num; i++) {
bc = becfg.cpu + i;
- for (j = 0; j != bc->prt_num; j++)
- if (laddr->s_addr == bc->prt[j].port.ipv4)
+ /* search by queue for the local port */
+ for (j = 0; j != bc->prtq_num; j++) {
+ if (laddr->s_addr == bc->prtq[j].port.ipv4) {
+
+ if (lport == 0)
+ return i;
+
+ if (varify_queue_for_port(bc->prtq + j, lport) != 0)
+ return i;
+ }
+ }
+ }
+ }
+
+ /* search by remote address */
+ if (raddr->s_addr != INADDR_ANY) {
+ for (i = 0; i != becfg.cpu_num; i++) {
+ bc = becfg.cpu + i;
+ if (rte_lpm_lookup(bc->lpm4,
+ rte_be_to_cpu_32(raddr->s_addr),
+ &idx) == 0) {
+
+ if (lport == 0)
return i;
+
+ /* search by queue for the local port */
+ for (j = 0; j != bc->prtq_num; j++)
+ if (varify_queue_for_port(bc->prtq + j, lport) != 0)
+ return i;
+ }
}
}
@@ -1621,35 +1944,64 @@ netbe_find4(const struct in_addr *laddr, const struct in_addr *raddr)
}
static int
-netbe_find6(const struct in6_addr *laddr, const struct in6_addr *raddr)
+netbe_find6(const struct in6_addr *laddr, uint16_t lport,
+ const struct in6_addr *raddr, uint32_t be_lc)
{
uint32_t i, j;
- int32_t rc;
uint8_t idx;
struct netbe_lcore *bc;
- if (memcmp(laddr, &in6addr_any, sizeof(*laddr)) == 0) {
+ /* we have exactly one BE, use it for all traffic */
+ if (becfg.cpu_num == 1)
+ return 0;
- /* we have exactly one BE, use it for all traffic */
- if (becfg.cpu_num == 1)
- return 0;
-
- /* search by remote address. */
+ /* search by provided be_lcore */
+ if (be_lc != LCORE_ID_ANY) {
for (i = 0; i != becfg.cpu_num; i++) {
bc = becfg.cpu + i;
- rc = rte_lpm6_lookup(bc->lpm6,
- (uint8_t *)(uintptr_t)raddr->s6_addr, &idx);
- if (rc == 0)
+ if (be_lc == bc->id)
return i;
}
- } else {
- /* search by local address */
+ RTE_LOG(NOTICE, USER1, "%s: no stream with be_lcore=%u\n",
+ __func__, be_lc);
+ return -ENOENT;
+ }
+
+ /* search by local address */
+ if (memcmp(laddr, &in6addr_any, sizeof(*laddr)) != 0) {
for (i = 0; i != becfg.cpu_num; i++) {
bc = becfg.cpu + i;
- for (j = 0; j != bc->prt_num; j++)
- if (memcmp(laddr, &bc->prt[j].port.ipv6,
- sizeof(*laddr)) == 0)
+ /* search by queue for the local port */
+ for (j = 0; j != bc->prtq_num; j++) {
+ if (memcmp(laddr, &bc->prtq[j].port.ipv6,
+ sizeof(*laddr)) == 0) {
+
+ if (lport == 0)
+ return i;
+
+ if (varify_queue_for_port(bc->prtq + j, lport) != 0)
+ return i;
+ }
+ }
+ }
+ }
+
+ /* search by remote address */
+ if (memcmp(raddr, &in6addr_any, sizeof(*raddr)) == 0) {
+ for (i = 0; i != becfg.cpu_num; i++) {
+ bc = becfg.cpu + i;
+ if (rte_lpm6_lookup(bc->lpm6,
+ (uint8_t *)(uintptr_t)raddr->s6_addr,
+ &idx) == 0) {
+
+ if (lport == 0)
return i;
+
+ /* search by queue for the local port */
+ for (j = 0; j != bc->prtq_num; j++)
+ if (varify_queue_for_port(bc->prtq + j, lport) != 0)
+ return i;
+ }
}
}
@@ -1657,7 +2009,7 @@ netbe_find6(const struct in6_addr *laddr, const struct in6_addr *raddr)
}
static int
-netbe_find(const struct tle_udp_stream_param *p)
+netbe_find(const struct tle_udp_stream_param *p, uint32_t be_lc)
{
const struct sockaddr_in *l4, *r4;
const struct sockaddr_in6 *l6, *r6;
@@ -1665,21 +2017,23 @@ netbe_find(const struct tle_udp_stream_param *p)
if (p->local_addr.ss_family == AF_INET) {
l4 = (const struct sockaddr_in *)&p->local_addr;
r4 = (const struct sockaddr_in *)&p->remote_addr;
- return netbe_find4(&l4->sin_addr, &r4->sin_addr);
+ return netbe_find4(&l4->sin_addr, ntohs(l4->sin_port),
+ &r4->sin_addr, be_lc);
} else if (p->local_addr.ss_family == AF_INET6) {
l6 = (const struct sockaddr_in6 *)&p->local_addr;
r6 = (const struct sockaddr_in6 *)&p->remote_addr;
- return netbe_find6(&l6->sin6_addr, &r6->sin6_addr);
+ return netbe_find6(&l6->sin6_addr, ntohs(l6->sin6_port),
+ &r6->sin6_addr, be_lc);
}
return -EINVAL;
}
static int
-netfe_sprm_flll_be(struct netfe_sprm *sp, uint32_t line)
+netfe_sprm_flll_be(struct netfe_sprm *sp, uint32_t line, uint32_t be_lc)
{
int32_t bidx;
- bidx = netbe_find(&sp->prm);
+ bidx = netbe_find(&sp->prm, be_lc);
if (bidx < 0) {
RTE_LOG(ERR, USER1, "%s(line=%u): no BE for that stream\n",
__func__, line);
@@ -1694,18 +2048,19 @@ static int
netfe_lcore_fill(struct lcore_prm prm[RTE_MAX_LCORE],
struct netfe_lcore_prm *lprm)
{
+ uint32_t be_lc;
uint32_t i, j, lc, ln;
/* determine on what BE each stream should be open. */
for (i = 0; i != lprm->nb_streams; i++) {
-
lc = lprm->stream[i].lcore;
ln = lprm->stream[i].line;
-
- if (netfe_sprm_flll_be(&lprm->stream[i].sprm, ln) != 0 ||
+ be_lc = lprm->stream[i].be_lcore;
+ if (netfe_sprm_flll_be(&lprm->stream[i].sprm, ln,
+ be_lc) != 0 ||
(lprm->stream[i].op == FWD &&
- netfe_sprm_flll_be(&lprm->stream[i].fprm,
- ln) != 0))
+ netfe_sprm_flll_be(&lprm->stream[i].fprm, ln,
+ be_lc) != 0))
return -EINVAL;
}
@@ -1759,6 +2114,7 @@ main(int argc, char *argv[])
char fecfg_fname[PATH_MAX + 1];
char becfg_fname[PATH_MAX + 1];
struct lcore_prm prm[RTE_MAX_LCORE];
+ struct rte_eth_dev_info dev_info;
fecfg_fname[0] = 0;
becfg_fname[0] = 0;
@@ -1824,8 +2180,15 @@ main(int argc, char *argv[])
signal(SIGINT, sig_handle);
- netbe_port_init(&becfg, argc - optind, argv + optind);
- netbe_lcore_init(&becfg, &ctx_prm);
+ rc = netbe_port_init(&becfg, argc - optind, argv + optind);
+ if (rc != 0)
+ rte_exit(EXIT_FAILURE,
+ "%s: netbe_port_init failed with error code: %d\n",
+ __func__, rc);
+
+ rc = netbe_lcore_init(&becfg, &ctx_prm);
+ if (rc != 0)
+ sig_handle(SIGQUIT);
if ((rc = netbe_dest_init(becfg_fname, &becfg)) != 0)
sig_handle(SIGQUIT);
@@ -1841,15 +2204,18 @@ main(int argc, char *argv[])
__func__, becfg.prt[i].id, rc);
sig_handle(SIGQUIT);
}
+ rte_eth_dev_info_get(becfg.prt[i].id, &dev_info);
+ rc = update_rss_reta(&becfg.prt[i], &dev_info);
+ if (rc != 0)
+ sig_handle(SIGQUIT);
}
feprm.max_streams = ctx_prm.max_streams * becfg.cpu_num;
if (rc == 0 && (rc = netfe_parse_cfg(fecfg_fname, &feprm)) != 0)
sig_handle(SIGQUIT);
- for (i = 0; rc == 0 && i != becfg.cpu_num; i++) {
+ for (i = 0; rc == 0 && i != becfg.cpu_num; i++)
prm[becfg.cpu[i].id].be.lc = becfg.cpu + i;
- }
if (rc == 0 && (rc = netfe_lcore_fill(prm, &feprm)) != 0)
sig_handle(SIGQUIT);
diff --git a/examples/udpfwd/netbe.h b/examples/udpfwd/netbe.h
index 0be785a..577627d 100644
--- a/examples/udpfwd/netbe.h
+++ b/examples/udpfwd/netbe.h
@@ -44,19 +44,25 @@
#define MAX_PKT_BURST 0x20
+/* Used to allocate the memory for hash key. */
+#define RSS_HASH_KEY_LENGTH 64
+
/*
* BE related structures.
*/
struct netbe_port {
uint32_t id;
- uint32_t lcore;
+ uint32_t nb_lcore;
+ uint32_t lcore[RTE_MAX_LCORE];
uint32_t mtu;
uint32_t rx_offload;
uint32_t tx_offload;
uint32_t ipv4;
struct in6_addr ipv6;
struct ether_addr mac;
+ uint32_t hash_key_size;
+ uint8_t hash_key[RSS_HASH_KEY_LENGTH];
};
struct netbe_dest {
@@ -109,10 +115,10 @@ struct netbe_lcore {
struct rte_lpm6 *lpm6;
struct rte_ip_frag_tbl *ftbl;
struct tle_udp_ctx *ctx;
- uint32_t prt_num;
+ uint32_t prtq_num;
uint32_t dst4_num;
uint32_t dst6_num;
- struct netbe_dev prt[RTE_MAX_ETHPORTS];
+ struct netbe_dev prtq[RTE_MAX_ETHPORTS * RTE_MAX_LCORE];
struct tle_udp_dest dst4[LCORE_MAX_DST];
struct tle_udp_dest dst6[LCORE_MAX_DST];
struct rte_ip_frag_death_row death_row;
@@ -144,6 +150,7 @@ struct netfe_sprm {
struct netfe_stream_prm {
uint32_t lcore;
+ uint32_t be_lcore;
uint32_t line;
uint16_t op;
uint16_t txlen; /* valid/used only for TXONLY op. */
@@ -253,6 +260,7 @@ struct lcore_prm {
} \
} while (0)
-int setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc);
+int setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc,
+ uint16_t qid);
#endif /* __NETBE_H__ */
diff --git a/examples/udpfwd/parse.c b/examples/udpfwd/parse.c
index 78df01b..cc8c0b2 100644
--- a/examples/udpfwd/parse.c
+++ b/examples/udpfwd/parse.c
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <sched.h>
#include "netbe.h"
#include "parse.h"
@@ -61,7 +62,6 @@ parse_ip_val(__rte_unused const char *key, const char *val, void *prm)
return 0;
}
-
#define PARSE_UINT8x16(s, v, l) \
do { \
char *end; \
@@ -110,6 +110,41 @@ parse_feop_val(__rte_unused const char *key, const char *val, void *prm)
}
static int
+parse_lcore_list_val(__rte_unused const char *key, const char *val, void *prm)
+{
+ union parse_val *rv;
+ unsigned long a, b;
+ uint32_t i;
+ char *end;
+
+ rv = prm;
+ errno = 0;
+ a = strtoul(val, &end, 0);
+ if (errno != 0 || (end[0] != 0 && end[0] != '-') || a > UINT32_MAX)
+ return -EINVAL;
+
+ if (end[0] == '-') {
+ val = end + 1;
+ errno = 0;
+ b = strtoul(val, &end, 0);
+ if (errno != 0 || end[0] != 0 || b > UINT32_MAX)
+ return -EINVAL;
+ } else
+ b = a;
+
+ if (a <= b) {
+ for (i = a; i <= b; i++)
+ CPU_SET(i, &rv->cpuset);
+ } else {
+ RTE_LOG(ERR, USER1,
+ "%s: lcores not in ascending order\n", __func__);
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static int
parse_kvargs(const char *arg, const char *keys_man[], uint32_t nb_man,
const char *keys_opt[], uint32_t nb_opt,
const arg_handler_t hndl[], union parse_val val[])
@@ -139,7 +174,7 @@ parse_kvargs(const char *arg, const char *keys_man[], uint32_t nb_man,
if (rte_kvargs_process(kvl, keys_man[j], hndl[j],
val + j) != 0) {
RTE_LOG(ERR, USER1,
- "%s: %s invalid value for key: %s\n",
+ "%s: %s invalid value for man key: %s\n",
__func__, arg, keys_man[j]);
rte_kvargs_free(kvl);
return -EINVAL;
@@ -151,7 +186,7 @@ parse_kvargs(const char *arg, const char *keys_man[], uint32_t nb_man,
if (rte_kvargs_process(kvl, keys_opt[j], hndl[k],
val + k) != 0) {
RTE_LOG(ERR, USER1,
- "%s: %s invalid value for key: %s\n",
+ "%s: %s invalid value for opt key: %s\n",
__func__, arg, keys_opt[j]);
rte_kvargs_free(kvl);
return -EINVAL;
@@ -166,6 +201,7 @@ int
parse_netbe_arg(struct netbe_port *prt, const char *arg)
{
int32_t rc;
+ uint32_t i, j;
static const char *keys_man[] = {
"port",
@@ -182,7 +218,7 @@ parse_netbe_arg(struct netbe_port *prt, const char *arg)
static const arg_handler_t hndl[] = {
parse_uint_val,
- parse_uint_val,
+ parse_lcore_list_val,
parse_uint_val,
parse_uint_val,
parse_uint_val,
@@ -201,7 +237,10 @@ parse_netbe_arg(struct netbe_port *prt, const char *arg)
return rc;
prt->id = val[0].u64;
- prt->lcore = val[1].u64;
+ for (i = 0, j = 0; i < RTE_MAX_LCORE; i++)
+ if (CPU_ISSET(i, &val[1].cpuset))
+ prt->lcore[j++] = i;
+ prt->nb_lcore = j;
prt->mtu = val[2].u64;
prt->rx_offload = val[3].u64;
prt->tx_offload = val[4].u64;
@@ -210,6 +249,7 @@ parse_netbe_arg(struct netbe_port *prt, const char *arg)
return 0;
}
+
static int
check_netbe_dest(const struct netbe_dest *dst)
{
@@ -390,6 +430,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg)
"fwlport",
"fwraddr",
"fwrport",
+ "belcore",
};
static const arg_handler_t hndl[] = {
@@ -404,16 +445,17 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg)
parse_uint_val,
parse_ip_val,
parse_uint_val,
+ parse_uint_val,
};
union parse_val val[RTE_DIM(hndl)];
memset(val, 0, sizeof(val));
+ val[11].u64 = LCORE_ID_ANY;
rc = parse_kvargs(arg, keys_man, RTE_DIM(keys_man),
keys_opt, RTE_DIM(keys_opt), hndl, val);
if (rc != 0)
return rc;
-
sp->lcore = val[0].u64;
sp->op = val[1].u64;
pv2saddr(&sp->sprm.prm.local_addr, val + 2, val + 3);
@@ -421,6 +463,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg)
sp->txlen = val[6].u64;
pv2saddr(&sp->fprm.prm.local_addr, val + 7, val + 8);
pv2saddr(&sp->fprm.prm.remote_addr, val + 9, val + 10);
+ sp->be_lcore = val[11].u64;
return 0;
}
diff --git a/examples/udpfwd/parse.h b/examples/udpfwd/parse.h
index 911c874..e25e64e 100644
--- a/examples/udpfwd/parse.h
+++ b/examples/udpfwd/parse.h
@@ -16,6 +16,8 @@
#ifndef __PARSE_H__
#define __PARSE_H__
+#define PARSE_LIST_DELIM "-"
+
union parse_val {
uint64_t u64;
struct {
@@ -26,6 +28,7 @@ union parse_val {
};
} in;
struct ether_addr mac;
+ rte_cpuset_t cpuset;
};
static int
diff --git a/examples/udpfwd/pkt.c b/examples/udpfwd/pkt.c
index af87b08..6832b9a 100644
--- a/examples/udpfwd/pkt.c
+++ b/examples/udpfwd/pkt.c
@@ -23,7 +23,6 @@ _mbuf_tx_offload(uint64_t il2, uint64_t il3, uint64_t il4, uint64_t tso,
return il2 | il3 << 7 | il4 << 16 | tso << 24 | ol3 << 40 | ol2 << 49;
}
-
static inline void
fill_pkt_hdr_len(struct rte_mbuf *m, uint32_t l2, uint32_t l3, uint32_t l4)
{
@@ -223,7 +222,7 @@ reassemble(struct rte_mbuf *m, struct netbe_lcore *lc, uint64_t tms,
tbl = lc->ftbl;
dr = &lc->death_row;
- l3cs = lc->prt[port].port.rx_offload & DEV_RX_OFFLOAD_IPV4_CKSUM;
+ l3cs = lc->prtq[port].port.rx_offload & DEV_RX_OFFLOAD_IPV4_CKSUM;
if (RTE_ETH_IS_IPV4_HDR(m->packet_type)) {
@@ -508,7 +507,8 @@ typen_rx_callback(uint8_t port, __rte_unused uint16_t queue,
}
int
-setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc)
+setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc,
+ uint16_t qid)
{
int32_t i, rc;
uint32_t smask;
@@ -589,7 +589,7 @@ setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc)
for (i = 0; i != RTE_DIM(ptype2cb); i++) {
if ((smask & ptype2cb[i].mask) == ptype2cb[i].mask) {
- cb = rte_eth_add_rx_callback(uprt->id, 0,
+ cb = rte_eth_add_rx_callback(uprt->id, qid,
ptype2cb[i].fn, lc);
rc = -rte_errno;
RTE_LOG(ERR, USER1,