aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohammad Abdul Awal <mohammad.abdul.awal@intel.com>2016-06-15 09:21:55 +0100
committerMohammad Abdul Awal <mohammad.abdul.awal@intel.com>2016-09-06 13:16:52 +0100
commit8ae38dfb285ab9843312964b3081b2c52ba857dc (patch)
tree87dd74f1ca5668df4b859fdb7633ac7f83bbf75e
parent58a3674671da166ff3e3ec4dd552eb60adf3652b (diff)
Initial working version of RSS
The patch enables RSS support for TLDK udpfwd application. The number of rx queues and tx queues are equal to the number of lcore enabled for backend operation. NICs calculate the RSS hash based on the UDP dest port only. The RSS hash key is calculated at runtime based on the number of queues enebaled. Change-Id: I06006a4606f8faad8f16241348b2ce19b70335e5 Signed-off-by: Mohammad Abdul Awal <mohammad.abdul.awal@intel.com>
-rw-r--r--examples/udpfwd/Makefile2
-rw-r--r--examples/udpfwd/README39
-rw-r--r--examples/udpfwd/main.c924
-rw-r--r--examples/udpfwd/netbe.h16
-rw-r--r--examples/udpfwd/parse.c55
-rw-r--r--examples/udpfwd/parse.h3
-rw-r--r--examples/udpfwd/pkt.c8
-rw-r--r--lib/libtle_udp/tle_udp_impl.h2
-rw-r--r--lib/libtle_udp/udp_ctl.c35
9 files changed, 768 insertions, 316 deletions
diff --git a/examples/udpfwd/Makefile b/examples/udpfwd/Makefile
index c23947a..5274d0f 100644
--- a/examples/udpfwd/Makefile
+++ b/examples/udpfwd/Makefile
@@ -40,5 +40,7 @@ LDLIBS += -L$(TLDK_ROOT)/$(RTE_TARGET)/lib
LDLIBS += -ltle_udp
EXTRA_CFLAGS += -O3
+CFLAGS_parse.o += -D_GNU_SOURCE
+CFLAGS_main.o += -mcmodel=medium
include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/udpfwd/README b/examples/udpfwd/README
index 8ab7e98..91b6e76 100644
--- a/examples/udpfwd/README
+++ b/examples/udpfwd/README
@@ -18,15 +18,15 @@ BE is responsible for:
(via tle_udp_rx_bulk).
- retrieve packets ready to be send out from UDP TLDK context(s)
and TX them over destined DPDK port.
-Right now only one RX/TX queue per port is used.
+Multiple RX/TX queues per port are supported by RSS. Right now the number of
+TX is same as the number of RX queue.
Each BE lcore can serve multiple DPDK ports, TLDK UDP contexts.
- Front End (FE)
FE responsibility is to open configured UDP streams and perform
send/recv over them. These streams can belong to different UDP contexts.
-Right now each lcore can act as BE or FE (but not both simultaneously).
-Master lcore can act as FE only.
+Right now each lcore can act as BE and/or FE.
Usage
=====
@@ -40,18 +40,18 @@ udpfwd <EAL parameters> -- \
-f | --fecfg <filename> /* frontend configuration file. */ \
<port0_params> <port1_params> ... <portN_params>
-port_params: port=<uint>,lcore=<uint>,\
+port_params: port=<uint>,lcore=<uint>[-<uint>],\
[rx_offload=<uint>,tx_offload=<uint>,mtu=<uint>,ipv4=<ipv4>,ipv6=<ipv6>]
port_params are used to configure the particular DPDK device (rte_ethdev port),
and specify BE lcore that will do RX/TX from/to the device and manage
-BE part of corresponding UDP context.
+BE part of corresponding UDP context. Multiple BE lcore can be specified.
-port - DPDK port id (right now on each port is used just one RX,
- one TX queue).
+port - DPDK port id (multiple queues are supported when multiple lcore
+ is specified for a port).
lcore - EAL lcore id to do IO over that port (rx_burst/tx_burst).
- several ports can be managed by the same lcore,
- but same port can't belong to more than one lcore.
+ several ports can be managed by the same lcore, and same port can
+ belong to more than one lcore.
rx_offload - RX HW offload capabilities to enable/use on this port.
(bitmask of DEV_RX_OFFLOAD_* values).
tx_offload - TX HW offload capabilities to enable/use on this port.
@@ -64,15 +64,15 @@ ipv6 - ipv6 address to assign to that port.
At least one of ipv4/ipv6 values have to be specified for each port.
As an example:
-udpfwd --lcores='3,6' -w 01:00.0 -- \
+udpfwd --lcores='3,6,8' -w 01:00.0 -- \
--promisc --rbufs 0x1000 --sbufs 0x1000 --streams 0x100 \
--fecfg ./fe.cfg --becfg ./be.cfg \
-port=0,lcore=6,rx_offload=0xf,tx_offload=0,\
+port=0,lcore=6,lcore=8,rx_offload=0xf,tx_offload=0,\
ipv4=192.168.1.233,ipv6=2001:4860:b002::28
-Will create TLDK UDP context on lcore=6 (BE lcore) to manage DPDK port 0.
-Will assign IPv4 address 192.168.1.233 and IPv6 address 2001:4860:b002::28
-to that port.
+Will create TLDK UDP context on lcore=6 and lcore=8 (BE lcore) to manage
+DPDK port 0. Will assign IPv4 address 192.168.1.233 and IPv6 address
+2001:4860:b002::28 to that port.
The following supported by DPDK RX HW offloads:
DEV_RX_OFFLOAD_VLAN_STRIP,
DEV_RX_OFFLOAD_IPV4_CKSUM,
@@ -81,6 +81,11 @@ The following supported by DPDK RX HW offloads:
will be enabled on that port.
No HW TX offloads will be enabled.
+If multiple lcore is specified per DPDK port, the following RSS hash will
+be enabled on that port:
+ ETH_RSS_UDP
+
+
Fornt-End (FE) and Back-End (BE) configuration files format:
------------------------------------------------------------
- each record on a separate line.
@@ -96,9 +101,10 @@ FE config record format:
lcore=<uint>,op=<"rx|tx|echo|fwd">,\
laddr=<ip>,lport=<uint16>,raddr=<ip>,rport=<uint16>,\
-[txlen=<uint>,fwladdr=<ip>,fwlport=<uint16>,fwraddr=<ip>,fwrport=<uint16>
+[txlen=<uint>,fwladdr=<ip>,fwlport=<uint16>,fwraddr=<ip>,fwrport=<uint16>,\
+belcore=<uint>]
-lcore - EAL lcore to manage that stream(s).
+lcore - EAL lcore to manage that stream(s) in the FE.
op - operation to perform on that stream:
"rx" - do receive only on that stream.
"tx" - do send only on that stream.
@@ -118,6 +124,7 @@ fwraddr - remote address for the forwarding stream(s) to open
("fwd mode only).
fwrport - remote port for the forwarding stream(s) to open
("fwd mode only).
+belcore - EAL lcore to manage that stream(s) in the BE.
Refer to fe.cfg for an example.
diff --git a/examples/udpfwd/main.c b/examples/udpfwd/main.c
index 2dde317..3daeb30 100644
--- a/examples/udpfwd/main.c
+++ b/examples/udpfwd/main.c
@@ -71,6 +71,36 @@ static const struct option long_opt[] = {
{NULL, 0, 0, 0}
};
+/**
+ * IPv4 Input size in bytes for RSS hash key calculation.
+ * source address, destination address, source port, and destination port.
+ */
+#define IPV4_TUPLE_SIZE 12
+
+/**
+ * IPv6 Input size in bytes for RSS hash key calculation.
+ * source address, destination address, source port, and destination port.
+ */
+#define IPV6_TUPLE_SIZE 36
+
+/**
+ * Location to be modified to create the IPv4 hash key which helps
+ * to distribute packets based on the destination UDP port.
+ */
+#define RSS_HASH_KEY_DEST_PORT_LOC_IPV4 15
+
+/*
+ * Location to be modified to create the IPv6 hash key which helps
+ * to distribute packets based on the destination UDP port.
+ */
+#define RSS_HASH_KEY_DEST_PORT_LOC_IPV6 39
+
+/**
+ * Size of the rte_eth_rss_reta_entry64 array to update through
+ * rte_eth_dev_rss_reta_update.
+ */
+#define RSS_RETA_CONF_ARRAY_SIZE (ETH_RSS_RETA_SIZE_512/RTE_RETA_GROUP_SIZE)
+
static volatile int force_quit;
static struct netbe_cfg becfg;
@@ -94,20 +124,145 @@ sig_handle(int signum)
force_quit = 1;
}
+static void
+prepare_hash_key(struct netbe_port *uprt, uint8_t key_size, uint16_t family)
+{
+ uint32_t align_nb_q;
+
+ align_nb_q = rte_align32pow2(uprt->nb_lcore);
+ memset(uprt->hash_key, 0, RSS_HASH_KEY_LENGTH);
+ uprt->hash_key_size = key_size;
+ if (family == AF_INET)
+ uprt->hash_key[RSS_HASH_KEY_DEST_PORT_LOC_IPV4] = align_nb_q;
+ else
+ uprt->hash_key[RSS_HASH_KEY_DEST_PORT_LOC_IPV6] = align_nb_q;
+}
+
+static uint32_t
+qidx_from_hash_index(uint32_t hash, uint32_t align_nb_q)
+{
+ uint32_t i, nb_bit, q;
+
+ nb_bit = (sizeof(uint32_t) * CHAR_BIT) - __builtin_clz(align_nb_q - 1);
+ q = (hash & 1);
+ for (i = 1; i < nb_bit; i++) {
+ hash >>= 1;
+ q <<= 1;
+ q |= (hash & 1);
+ }
+
+ return q;
+}
+
+static int
+update_rss_conf(struct netbe_port *uprt,
+ const struct rte_eth_dev_info *dev_info,
+ struct rte_eth_conf *port_conf)
+{
+ uint8_t hash_key_size;
+
+ if (uprt->nb_lcore > 1) {
+ if (dev_info->hash_key_size > 0)
+ hash_key_size = dev_info->hash_key_size;
+ else {
+ RTE_LOG(ERR, USER1,
+ "%s: dev_info did not provide a valid hash key size\n",
+ __func__);
+ return -EINVAL;
+ }
+
+ if (uprt->ipv4 != INADDR_ANY &&
+ memcmp(&uprt->ipv6, &in6addr_any,
+ sizeof(uprt->ipv6)) != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: RSS for both IPv4 and IPv6 not supported!\n",
+ __func__);
+ return -EINVAL;
+ } else if (uprt->ipv4 != INADDR_ANY) {
+ prepare_hash_key(uprt, hash_key_size, AF_INET);
+ } else if (memcmp(&uprt->ipv6, &in6addr_any, sizeof(uprt->ipv6))
+ != 0) {
+ prepare_hash_key(uprt, hash_key_size, AF_INET6);
+ } else {
+ RTE_LOG(ERR, USER1,
+ "%s: No IPv4 or IPv6 address is found!\n",
+ __func__);
+ return -EINVAL;
+ }
+ port_conf->rxmode.mq_mode = ETH_MQ_RX_RSS;
+ port_conf->rx_adv_conf.rss_conf.rss_hf = ETH_RSS_UDP;
+ port_conf->rx_adv_conf.rss_conf.rss_key_len = hash_key_size;
+ port_conf->rx_adv_conf.rss_conf.rss_key = uprt->hash_key;
+ }
+
+ return 0;
+}
+
+static int
+update_rss_reta(struct netbe_port *uprt,
+ const struct rte_eth_dev_info *dev_info)
+{
+ struct rte_eth_rss_reta_entry64 reta_conf[RSS_RETA_CONF_ARRAY_SIZE];
+ int32_t i, rc, align_nb_q;
+ int32_t q_index, idx, shift;
+
+ if (uprt->nb_lcore > 1) {
+ if (dev_info->reta_size == 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: Redirection table size 0 is invalid for RSS\n",
+ __func__);
+ return -EINVAL;
+ }
+ RTE_LOG(NOTICE, USER1,
+ "%s: The reta size of port %d is %u\n",
+ __func__, uprt->id, dev_info->reta_size);
+
+ if (dev_info->reta_size > ETH_RSS_RETA_SIZE_512) {
+ RTE_LOG(ERR, USER1,
+ "%s: More than %u entries of Reta not supported\n",
+ __func__, ETH_RSS_RETA_SIZE_512);
+ return -EINVAL;
+ }
+
+ memset(reta_conf, 0, sizeof(reta_conf));
+ align_nb_q = rte_align32pow2(uprt->nb_lcore);
+ for (i = 0; i < align_nb_q; i++) {
+ q_index = qidx_from_hash_index(i, align_nb_q) %
+ uprt->nb_lcore;
+
+ idx = i / RTE_RETA_GROUP_SIZE;
+ shift = i % RTE_RETA_GROUP_SIZE;
+ reta_conf[idx].mask |= (1ULL << shift);
+ reta_conf[idx].reta[shift] = q_index;
+ RTE_LOG(NOTICE, USER1,
+ "%s: port=%u RSS reta conf: hash=%u, q=%u\n",
+ __func__, uprt->id, i, q_index);
+ }
+
+ rc = rte_eth_dev_rss_reta_update(uprt->id,
+ reta_conf, dev_info->reta_size);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: Bad redirection table parameter, rc = %d\n",
+ __func__, rc);
+ return rc;
+ }
+ }
+
+ return 0;
+}
+
/*
* Initilise DPDK port.
- * In current version, only one queue per port is used.
+ * In current version, multi-queue per port is used.
*/
static int
-port_init(struct netbe_port *uprt, struct rte_mempool *mp)
+port_init(struct netbe_port *uprt)
{
- int32_t socket, rc;
- uint16_t q;
+ int32_t rc;
struct rte_eth_conf port_conf;
struct rte_eth_dev_info dev_info;
- const uint16_t rx_rings = 1, tx_rings = 1;
-
rte_eth_dev_info_get(uprt->id, &dev_info);
if ((dev_info.rx_offload_capa & uprt->rx_offload) != uprt->rx_offload) {
RTE_LOG(ERR, USER1,
@@ -130,16 +285,33 @@ port_init(struct netbe_port *uprt, struct rte_mempool *mp)
__func__, uprt->id);
port_conf.rxmode.hw_ip_checksum = 1;
}
-
port_conf.rxmode.max_rx_pkt_len = uprt->mtu + ETHER_CRC_LEN;
- rc = rte_eth_dev_configure(uprt->id, rx_rings, tx_rings, &port_conf);
+ rc = update_rss_conf(uprt, &dev_info, &port_conf);
+ if (rc != 0)
+ return rc;
+
+ rc = rte_eth_dev_configure(uprt->id, uprt->nb_lcore, uprt->nb_lcore,
+ &port_conf);
RTE_LOG(NOTICE, USER1,
- "%s: rte_eth_dev_configure(%u) returns %d;\n",
- __func__, uprt->id, rc);
+ "%s: rte_eth_dev_configure(prt_id=%u, nb_rxq=%u, nb_txq=%u) "
+ "returns %d;\n", __func__, uprt->id, uprt->nb_lcore,
+ uprt->nb_lcore, rc);
if (rc != 0)
return rc;
+ return 0;
+}
+
+static int
+queue_init(struct netbe_port *uprt, struct rte_mempool *mp)
+{
+ int32_t socket, rc;
+ uint16_t q;
+ struct rte_eth_dev_info dev_info;
+
+ rte_eth_dev_info_get(uprt->id, &dev_info);
+
socket = rte_eth_dev_socket_id(uprt->id);
dev_info.default_rxconf.rx_drop_en = 1;
@@ -151,21 +323,27 @@ port_init(struct netbe_port *uprt, struct rte_mempool *mp)
dev_info.default_txconf.txq_flags = 0;
}
- for (q = 0; q < rx_rings; q++) {
+ for (q = 0; q < uprt->nb_lcore; q++) {
rc = rte_eth_rx_queue_setup(uprt->id, q, RX_RING_SIZE,
- socket, NULL, mp);
- if (rc < 0)
+ socket, &dev_info.default_rxconf, mp);
+ if (rc < 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: rx queue=%u setup failed with error code: %d\n",
+ __func__, q, rc);
return rc;
+ }
}
- for (q = 0; q < tx_rings; q++) {
+ for (q = 0; q < uprt->nb_lcore; q++) {
rc = rte_eth_tx_queue_setup(uprt->id, q, TX_RING_SIZE,
socket, &dev_info.default_txconf);
- if (rc < 0)
+ if (rc < 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: tx queue=%u setup failed with error code: %d\n",
+ __func__, q, rc);
return rc;
+ }
}
-
-
return 0;
}
@@ -190,13 +368,29 @@ check_lcore(uint32_t lc)
static void
log_netbe_prt(const struct netbe_port *uprt)
{
+ uint32_t i;
+ char corelist[2 * RTE_MAX_LCORE + 1];
+ char hashkey[2 * RSS_HASH_KEY_LENGTH];
+
+ memset(corelist, 0, sizeof(corelist));
+ memset(hashkey, 0, sizeof(hashkey));
+ for (i = 0; i < uprt->nb_lcore; i++)
+ if (i < uprt->nb_lcore - 1)
+ sprintf(corelist + (2 * i), "%u,", uprt->lcore[i]);
+ else
+ sprintf(corelist + (2 * i), "%u", uprt->lcore[i]);
+
+ for (i = 0; i < uprt->hash_key_size; i++)
+ sprintf(hashkey + (2 * i), "%02x", uprt->hash_key[i]);
+
RTE_LOG(NOTICE, USER1,
- "uprt %p = <id = %u, lcore = %u, "
- "mtu = %u, rx_offload = %u, tx_offload = %u,\n"
+ "uprt %p = <id = %u, lcore = <%s>, mtu = %u, "
+ "rx_offload = %u, tx_offload = %u,\n"
"ipv4 = %#x, "
"ipv6 = %04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx, "
- "mac = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx>;\n",
- uprt, uprt->id, uprt->lcore,
+ "mac = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx>;\n"
+ "hashkey = %s;\n",
+ uprt, uprt->id, corelist,
uprt->mtu, uprt->rx_offload, uprt->tx_offload,
uprt->ipv4,
uprt->ipv6.s6_addr16[0], uprt->ipv6.s6_addr16[1],
@@ -205,7 +399,8 @@ log_netbe_prt(const struct netbe_port *uprt)
uprt->ipv6.s6_addr16[6], uprt->ipv6.s6_addr16[7],
uprt->mac.addr_bytes[0], uprt->mac.addr_bytes[1],
uprt->mac.addr_bytes[2], uprt->mac.addr_bytes[3],
- uprt->mac.addr_bytes[4], uprt->mac.addr_bytes[5]);
+ uprt->mac.addr_bytes[4], uprt->mac.addr_bytes[5],
+ hashkey);
}
static void
@@ -262,53 +457,126 @@ frag_pool_init(uint32_t sid)
return 0;
}
+static struct netbe_lcore *
+find_initilized_lcore(struct netbe_cfg *cfg, uint32_t lc_num)
+{
+ uint32_t i;
+
+ for (i = 0; i < cfg->cpu_num; i++)
+ if (cfg->cpu[i].id == lc_num)
+ return &cfg->cpu[i];
+
+ return NULL;
+}
+
+static int
+calculate_nb_prtq(struct netbe_cfg *cfg)
+{
+ uint32_t i, j, rc;
+ struct netbe_port *prt;
+ struct netbe_lcore *lc;
+
+ for (i = 0; i != cfg->prt_num; i++) {
+ prt = &cfg->prt[i];
+ for (j = 0; j != prt->nb_lcore; j++) {
+ rc = check_lcore(prt->lcore[j]);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: processing failed with err: %d\n",
+ __func__, rc);
+ return rc;
+ }
+
+ lc = find_initilized_lcore(cfg, prt->lcore[j]);
+ if (lc == NULL) {
+ lc = &cfg->cpu[cfg->cpu_num];
+ lc->id = prt->lcore[j];
+ cfg->cpu_num++;
+ }
+ lc->prtq[lc->prtq_num].rxqid = j;
+ lc->prtq[lc->prtq_num].txqid = j;
+ lc->prtq[lc->prtq_num].port = *prt;
+
+ lc->prtq_num++;
+ }
+ }
+
+ return 0;
+}
/*
* Setup all enabled ports.
*/
-static void
+static int
netbe_port_init(struct netbe_cfg *cfg, int argc, char *argv[])
{
int32_t rc;
- uint32_t i, n, sid;
+ uint32_t i, n, sid, j;
+ struct netbe_port *prt;
n = RTE_MIN(RTE_DIM(cfg->prt), (uint32_t)argc);
rc = 0;
for (i = 0; i != n; i++) {
rc = parse_netbe_arg(cfg->prt + i, argv[i]);
- if (rc != 0)
- break;
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: processing of \"%s\" failed with error code: %d\n",
+ __func__, argv[i], rc);
+ return rc;
+ }
+ }
+ cfg->prt_num = i;
- rc = check_lcore(cfg->prt[i].lcore);
- if (rc != 0)
- break;
+ /* calculate number of queues per lcore. */
+ rc = calculate_nb_prtq(cfg);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1, "%s: processing of arguments failed"
+ " with error code: %d\n", __func__, rc);
+ return rc;
+ }
- sid = rte_lcore_to_socket_id(cfg->prt[i].lcore) + 1;
- assert(sid < RTE_DIM(mpool));
+ for (i = 0; i != cfg->prt_num; i++) {
+ prt = cfg->prt + i;
+ rc = port_init(prt);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: port=%u init failed with error code: %d\n",
+ __func__, prt->id, rc);
+ return rc;
+ }
+ rte_eth_macaddr_get(prt->id, &prt->mac);
+ if (cfg->promisc)
+ rte_eth_promiscuous_enable(prt->id);
- if (mpool[sid] == NULL && (rc = pool_init(sid)) != 0)
- break;
+ for (j = 0; j < prt->nb_lcore; j++) {
+ sid = rte_lcore_to_socket_id(prt->lcore[j]) + 1;
+ assert(sid < RTE_DIM(mpool));
- if (frag_mpool[sid] == NULL && (rc = frag_pool_init(sid)) != 0)
- break;
+ if (mpool[sid] == NULL) {
+ rc = pool_init(sid);
+ if (rc != 0)
+ return rc;
+ }
- rc = port_init(cfg->prt + i, mpool[sid]);
- if (rc != 0)
- break;
+ if (frag_mpool[sid] == NULL) {
+ rc = frag_pool_init(sid);
+ if (rc != 0)
+ return rc;
+ }
- rte_eth_macaddr_get(cfg->prt[i].id, &cfg->prt[i].mac);
- if (cfg->promisc)
- rte_eth_promiscuous_enable(cfg->prt[i].id);
+ rc = queue_init(prt, mpool[sid]);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: lcore=%u queue init failed with err: %d\n",
+ __func__, prt->lcore[j], rc);
+ return rc;
+ }
+ }
}
-
- if (rc != 0)
- rte_exit(EXIT_FAILURE,
- "%s: processing of \"%s\" failed with error code: %d\n",
- __func__, argv[i], rc);
-
- cfg->prt_num = i;
log_netbe_cfg(cfg);
+
+ return 0;
}
/*
@@ -487,128 +755,153 @@ fill_dst(struct tle_udp_dest *dst, struct netbe_dev *bed,
}
}
-
-/*
- * BE lcore setup routine.
- */
static int
-lcore_init(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm,
- const struct netbe_port prt[], uint32_t prt_num)
+create_context(struct netbe_lcore *lc,
+ const struct tle_udp_ctx_param *ctx_prm)
{
- int32_t rc, sid;
- uint32_t i;
+ uint32_t rc = 0, sid;
uint64_t frag_cycles;
struct tle_udp_ctx_param cprm;
- struct tle_udp_dev_param dprm;
- lc->id = prt[0].lcore;
- lc->prt_num = prt_num;
+ if (lc->ctx == NULL) {
+ sid = rte_lcore_to_socket_id(lc->id);
- sid = rte_lcore_to_socket_id(lc->id);
+ rc = lcore_lpm_init(lc);
+ if (rc != 0)
+ return rc;
- rc = lcore_lpm_init(lc);
- if (rc != 0)
- return rc;
+ cprm = *ctx_prm;
+ cprm.socket_id = sid;
+ cprm.lookup4 = lpm4_dst_lookup;
+ cprm.lookup4_data = lc;
+ cprm.lookup6 = lpm6_dst_lookup;
+ cprm.lookup6_data = lc;
- cprm = *ctx_prm;
- cprm.socket_id = sid;
- cprm.lookup4 = lpm4_dst_lookup;
- cprm.lookup4_data = lc;
- cprm.lookup6 = lpm6_dst_lookup;
- cprm.lookup6_data = lc;
+ /* to facilitate both IPv4 and IPv6. */
+ cprm.max_streams *= 2;
- /* to facilitate both IPv4 and IPv6. */
- cprm.max_streams *= 2;
+ frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) /
+ MS_PER_S * FRAG_TTL;
- frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S * FRAG_TTL;
- lc->ftbl = rte_ip_frag_table_create(cprm.max_streams,
- FRAG_TBL_BUCKET_ENTRIES, cprm.max_streams, frag_cycles, sid);
- RTE_LOG(NOTICE, USER1, "%s(lcore=%u): frag_tbl=%p;\n",
- __func__, lc->id, lc->ftbl);
+ lc->ftbl = rte_ip_frag_table_create(cprm.max_streams,
+ FRAG_TBL_BUCKET_ENTRIES, cprm.max_streams,
+ frag_cycles, sid);
- lc->ctx = tle_udp_create(&cprm);
- RTE_LOG(NOTICE, USER1, "%s(lcore=%u): udp_ctx=%p;\n",
- __func__, lc->id, lc->ctx);
+ RTE_LOG(NOTICE, USER1, "%s(lcore=%u): frag_tbl=%p;\n",
+ __func__, lc->id, lc->ftbl);
- if (lc->ctx == NULL || lc->ftbl == NULL)
- rc = ENOMEM;
+ lc->ctx = tle_udp_create(&cprm);
- for (i = 0; i != prt_num && rc == 0; i++) {
+ RTE_LOG(NOTICE, USER1, "%s(lcore=%u): udp_ctx=%p;\n",
+ __func__, lc->id, lc->ctx);
- memset(&dprm, 0, sizeof(dprm));
+ if (lc->ctx == NULL || lc->ftbl == NULL)
+ rc = ENOMEM;
+ }
- lc->prt[i].rxqid = 0;
- lc->prt[i].txqid = 0;
- lc->prt[i].port = prt[i];
+ return rc;
+}
+
+/*
+ * BE lcore setup routine.
+ */
+static int
+lcore_init(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm,
+ const uint32_t prtqid, uint16_t *bl_ports, uint32_t nb_bl_ports)
+{
+ int32_t rc = 0;
+ struct tle_udp_dev_param dprm;
- dprm.rx_offload = prt[i].rx_offload;
- dprm.tx_offload = prt[i].tx_offload;
- dprm.local_addr4.s_addr = prt[i].ipv4;
- memcpy(&dprm.local_addr6, &prt[i].ipv6, sizeof(prt[i].ipv6));
+ rc = create_context(lc, ctx_prm);
- lc->prt[i].dev = tle_udp_add_dev(lc->ctx, &dprm);
- RTE_LOG(NOTICE, USER1, "%s(lcore=%u, port=%u), udp_dev: %p;\n",
- __func__, lc->id, prt[i].id, lc->prt[i].dev);
- if (lc->prt[i].dev == NULL)
+ if (lc->ctx != NULL) {
+ memset(&dprm, 0, sizeof(dprm));
+ dprm.rx_offload = lc->prtq[prtqid].port.rx_offload;
+ dprm.tx_offload = lc->prtq[prtqid].port.tx_offload;
+ dprm.local_addr4.s_addr = lc->prtq[prtqid].port.ipv4;
+ memcpy(&dprm.local_addr6, &lc->prtq[prtqid].port.ipv6,
+ sizeof(lc->prtq[prtqid].port.ipv6));
+ dprm.nb_bl_ports = nb_bl_ports;
+ dprm.bl_ports = bl_ports;
+
+ lc->prtq[prtqid].dev = tle_udp_add_dev(lc->ctx, &dprm);
+
+ RTE_LOG(NOTICE, USER1,
+ "%s(lcore=%u, port=%u, qid=%u), udp_dev: %p\n",
+ __func__, lc->id, lc->prtq[prtqid].port.id,
+ lc->prtq[prtqid].rxqid, lc->prtq[prtqid].dev);
+
+ if (lc->prtq[prtqid].dev == NULL)
rc = -rte_errno;
- }
- if (rc != 0) {
- RTE_LOG(ERR, USER1, "%s(lcore=%u) failed with error code: %d\n",
- __func__, lc->id, rc);
- tle_udp_destroy(lc->ctx);
- rte_ip_frag_table_destroy(lc->ftbl);
- rte_lpm_free(lc->lpm4);
- rte_lpm6_free(lc->lpm6);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s(lcore=%u) failed with error code: %d\n",
+ __func__, lc->id, rc);
+ tle_udp_destroy(lc->ctx);
+ rte_ip_frag_table_destroy(lc->ftbl);
+ rte_lpm_free(lc->lpm4);
+ rte_lpm6_free(lc->lpm6);
+ return rc;
+ }
}
return rc;
}
-static int
-prt_lcore_cmp(const void *s1, const void *s2)
+static uint16_t
+create_blocklist(const struct netbe_port *beprt, uint16_t *bl_ports,
+ uint32_t q)
{
- const struct netbe_port *p1, *p2;
+ uint32_t i, j, qid, align_nb_q;
- p1 = s1;
- p2 = s2;
- return p1->lcore - p2->lcore;
+ align_nb_q = rte_align32pow2(beprt->nb_lcore);
+ for (i = 0, j = 0; i < (UINT16_MAX + 1); i++) {
+ qid = (i % align_nb_q) % beprt->nb_lcore;
+ if (qid != q)
+ bl_ports[j++] = i;
+ }
+
+ return j;
}
-static void
-netbe_lcore_init(struct netbe_cfg *cfg, const struct tle_udp_ctx_param *ctx_prm)
+static int
+netbe_lcore_init(struct netbe_cfg *cfg,
+ const struct tle_udp_ctx_param *ctx_prm)
{
int32_t rc;
- uint32_t i, k, n, num;
- struct netbe_port sp[RTE_DIM(cfg->prt)];
-
- num = cfg->prt_num;
- memcpy(sp, cfg->prt, sizeof(sp[0]) * num);
- qsort(sp, num, sizeof(sp[0]), prt_lcore_cmp);
-
- /* Fill ports to be used by each lcore. */
+ uint32_t i, j, nb_bl_ports, sz;
+ struct netbe_lcore *lc;
+ static uint16_t *bl_ports;
- k = 0;
- n = 0;
+ /* Create the udp context and attached queue for each lcore. */
rc = 0;
- for (i = 0; i != num && rc == 0; i++) {
- if (sp[n].lcore != sp[i].lcore) {
- rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n);
- n = i;
- k++;
+ nb_bl_ports = 0;
+ sz = sizeof(uint16_t) * UINT16_MAX;
+ bl_ports = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
+ for (i = 0; i < cfg->cpu_num; i++) {
+ lc = &cfg->cpu[i];
+ for (j = 0; j < lc->prtq_num; j++) {
+ memset((uint8_t *)bl_ports, 0, sz);
+ /* create list of blocked ports based on q */
+ nb_bl_ports = create_blocklist(&lc->prtq[j].port,
+ bl_ports, lc->prtq[j].rxqid);
+ RTE_LOG(NOTICE, USER1, "lc=%u, q=%u, nb_bl_ports4=%u\n",
+ lc->id, lc->prtq[j].rxqid, nb_bl_ports);
+
+ rc = lcore_init(lc, ctx_prm, j, bl_ports, nb_bl_ports);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s: failed with error code: %d\n",
+ __func__, rc);
+ rte_free(bl_ports);
+ return rc;
+ }
}
}
+ rte_free(bl_ports);
- if (rc == 0 && i != n) {
- rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n);
- k++;
- }
-
- if (rc != 0)
- rte_exit(EXIT_FAILURE, "%s: failed with error code: %d\n",
- __func__, rc);
-
- cfg->cpu_num = k;
+ return 0;
}
static void
@@ -659,7 +952,7 @@ netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family,
rc = 0;
for (i = 0; i != dnum && rc == 0; i++) {
- fill_dst(dp + i, lc->prt + dev_idx, dst + i, l3_type, sid);
+ fill_dst(dp + i, lc->prtq + dev_idx, dst + i, l3_type, sid);
if (family == AF_INET)
rc = netbe_add_ipv4_route(lc, dst + i, n + i);
else
@@ -675,42 +968,11 @@ netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family,
}
static int
-netbe_port2lcore(struct netbe_cfg *cfg, uint32_t port, struct netbe_lcore **plc)
-{
- uint32_t i, j;
- struct netbe_lcore *lc;
-
- for (i = 0; i != cfg->cpu_num; i++) {
- lc = cfg->cpu + i;
- for (j = 0; j != cfg->prt_num; j++) {
- if (lc->prt[j].port.id == port) {
- *plc = lc;
- return j;
- }
- }
- }
-
- return -ENOENT;
-}
-
-static int
-netbe_dest_cmp(const void *s1, const void *s2)
-{
- const struct netbe_dest *p1, *p2;
-
- p1 = s1;
- p2 = s2;
- if (p1->port == p2->port)
- return p1->family - p2->family;
- else
- return p1->port - p2->port;
-}
-
-static int
netbe_dest_init(const char *fname, struct netbe_cfg *cfg)
{
int32_t rc;
- uint32_t f, i, j, p;
+ uint32_t f, i, p;
+ uint32_t k, l, cnt;
struct netbe_lcore *lc;
struct netbe_dest_prm prm;
@@ -718,29 +980,36 @@ netbe_dest_init(const char *fname, struct netbe_cfg *cfg)
if (rc != 0)
return rc;
- qsort(prm.dest, prm.nb_dest, sizeof(prm.dest[0]), netbe_dest_cmp);
-
rc = 0;
- for (i = 0; i != prm.nb_dest; i = j) {
+ for (i = 0; i != prm.nb_dest; i++) {
p = prm.dest[i].port;
f = prm.dest[i].family;
- for (j = i + 1; j != prm.nb_dest && p == prm.dest[j].port &&
- f == prm.dest[j].family;
- j++)
- ;
- rc = netbe_port2lcore(cfg, p, &lc);
- if (rc < 0) {
+ cnt = 0;
+ for (k = 0; k != cfg->cpu_num; k++) {
+ lc = cfg->cpu + k;
+ for (l = 0; l != lc->prtq_num; l++)
+ if (lc->prtq[l].port.id == p) {
+ rc = netbe_add_dest(lc, l, f,
+ prm.dest + i, 1);
+ if (rc != 0) {
+ RTE_LOG(ERR, USER1,
+ "%s(lcore=%u, family=%u) could not "
+ "add destinations(%u);\n",
+ __func__, lc->id, f, i);
+ return -ENOSPC;
+ }
+ cnt++;
+ }
+ }
+
+ if (cnt == 0) {
RTE_LOG(ERR, USER1, "%s(%s) error at line %u: "
"port %u not managed by any lcore;\n",
__func__, fname, prm.dest[i].line, p);
break;
}
-
- rc = netbe_add_dest(lc, rc, f, prm.dest + i, j - i);
- if (rc != 0)
- break;
}
free(prm.dest);
@@ -817,7 +1086,6 @@ netfe_stream_dump(const struct netfe_stream *fes)
fes->stat.txev[TLE_SEV_UP]);
}
-
/*
* helper function: opens IPv4 and IPv6 streams for selected port.
*/
@@ -828,6 +1096,9 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
int32_t rc;
uint32_t sidx;
struct netfe_stream *fes;
+ struct sockaddr_in *l4;
+ struct sockaddr_in6 *l6;
+ uint16_t errport;
sidx = fe->sidx;
fes = fe->fs + sidx;
@@ -842,8 +1113,10 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
if (op != FWD)
sprm->send_ev = fes->txev;
- RTE_LOG(ERR, USER1, "%s(%u) [%u]={op=%hu, rxev=%p, txev=%p}\n",
- __func__, lcore, sidx, op, fes->rxev, fes->txev);
+ RTE_LOG(ERR, USER1,
+ "%s(%u) [%u]={op=%hu, rxev=%p, txev=%p}, be_lc=%u\n",
+ __func__, lcore, sidx, op, fes->rxev, fes->txev,
+ becfg.cpu[bidx].id);
if (fes->rxev == NULL || fes->txev == NULL) {
netfe_stream_close(fe, 0);
rte_errno = ENOMEM;
@@ -865,6 +1138,17 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
rc = rte_errno;
netfe_stream_close(fe, 0);
rte_errno = rc;
+
+ if (sprm->local_addr.ss_family == AF_INET) {
+ l4 = (struct sockaddr_in *) &sprm->local_addr;
+ errport = ntohs(l4->sin_port);
+ } else {
+ l6 = (struct sockaddr_in6 *) &sprm->local_addr;
+ errport = ntohs(l6->sin6_port);
+ }
+ RTE_LOG(ERR, USER1, "stream open failed for port %u with error "
+ "code=%u, bidx=%u, lc=%u\n",
+ errport, rc, bidx, becfg.cpu[bidx].id);
return NULL;
}
@@ -873,7 +1157,6 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
fe->sidx = sidx + 1;
return fes;
-
}
static inline int
@@ -1034,7 +1317,6 @@ netfe_tx_process(uint32_t lcore, struct netfe_stream *fes)
fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
}
-
static inline void
netfe_fwd(uint32_t lcore, struct netfe_stream *fes)
{
@@ -1347,7 +1629,6 @@ netfe_lcore(void)
}
}
-
static void
netfe_lcore_fini(void)
{
@@ -1359,7 +1640,6 @@ netfe_lcore_fini(void)
return;
while (fe->sidx != 0) {
-
i = fe->sidx - 1;
netfe_stream_dump(fe->fs + i);
netfe_stream_close(fe, 1);
@@ -1379,26 +1659,26 @@ netbe_rx(struct netbe_lcore *lc, uint32_t pidx)
struct rte_mbuf *rp[MAX_PKT_BURST];
int32_t rc[MAX_PKT_BURST];
- n = rte_eth_rx_burst(lc->prt[pidx].port.id,
- lc->prt[pidx].rxqid, pkt, RTE_DIM(pkt));
+ n = rte_eth_rx_burst(lc->prtq[pidx].port.id,
+ lc->prtq[pidx].rxqid, pkt, RTE_DIM(pkt));
if (n == 0)
return;
- lc->prt[pidx].rx_stat.in += n;
+ lc->prtq[pidx].rx_stat.in += n;
NETBE_TRACE("%s(%u): rte_eth_rx_burst(%u, %u) returns %u\n",
- __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].rxqid,
+ __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].rxqid,
n);
- k = tle_udp_rx_bulk(lc->prt[pidx].dev, pkt, rp, rc, n);
+ k = tle_udp_rx_bulk(lc->prtq[pidx].dev, pkt, rp, rc, n);
- lc->prt[pidx].rx_stat.up += k;
- lc->prt[pidx].rx_stat.drop += n - k;
+ lc->prtq[pidx].rx_stat.up += k;
+ lc->prtq[pidx].rx_stat.drop += n - k;
NETBE_TRACE("%s(%u): tle_udp_rx_bulk(%p, %u) returns %u\n",
- __func__, lc->id, lc->prt[pidx].dev, n, k);
+ __func__, lc->id, lc->prtq[pidx].dev, n, k);
for (j = 0; j != n - k; j++) {
NETBE_TRACE("%s:%d(port=%u) rp[%u]={%p, %d};\n",
- __func__, __LINE__, lc->prt[pidx].port.id,
+ __func__, __LINE__, lc->prtq[pidx].port.id,
j, rp[j], rc[j]);
rte_pktmbuf_free(rp[j]);
}
@@ -1410,14 +1690,14 @@ netbe_tx(struct netbe_lcore *lc, uint32_t pidx)
uint32_t j, k, n;
struct rte_mbuf **mb;
- n = lc->prt[pidx].tx_buf.num;
- k = RTE_DIM(lc->prt[pidx].tx_buf.pkt) - n;
- mb = lc->prt[pidx].tx_buf.pkt;
+ n = lc->prtq[pidx].tx_buf.num;
+ k = RTE_DIM(lc->prtq[pidx].tx_buf.pkt) - n;
+ mb = lc->prtq[pidx].tx_buf.pkt;
- if (k >= RTE_DIM(lc->prt[pidx].tx_buf.pkt) / 2) {
- j = tle_udp_tx_bulk(lc->prt[pidx].dev, mb + n, k);
+ if (k >= RTE_DIM(lc->prtq[pidx].tx_buf.pkt) / 2) {
+ j = tle_udp_tx_bulk(lc->prtq[pidx].dev, mb + n, k);
n += j;
- lc->prt[pidx].tx_stat.down += j;
+ lc->prtq[pidx].tx_stat.down += j;
}
if (n == 0)
@@ -1425,21 +1705,21 @@ netbe_tx(struct netbe_lcore *lc, uint32_t pidx)
NETBE_TRACE("%s(%u): tle_udp_tx_bulk(%p) returns %u,\n"
"total pkts to send: %u\n",
- __func__, lc->id, lc->prt[pidx].dev, j, n);
+ __func__, lc->id, lc->prtq[pidx].dev, j, n);
for (j = 0; j != n; j++)
NETBE_PKT_DUMP(mb[j]);
- k = rte_eth_tx_burst(lc->prt[pidx].port.id,
- lc->prt[pidx].txqid, mb, n);
+ k = rte_eth_tx_burst(lc->prtq[pidx].port.id,
+ lc->prtq[pidx].txqid, mb, n);
- lc->prt[pidx].tx_stat.out += k;
- lc->prt[pidx].tx_stat.drop += n - k;
+ lc->prtq[pidx].tx_stat.out += k;
+ lc->prtq[pidx].tx_stat.drop += n - k;
NETBE_TRACE("%s(%u): rte_eth_tx_burst(%u, %u, %u) returns %u\n",
- __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].txqid,
+ __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].txqid,
n, k);
- lc->prt[pidx].tx_buf.num = n - k;
+ lc->prtq[pidx].tx_buf.num = n - k;
if (k != 0)
for (j = k; j != n; j++)
mb[j - k] = mb[j];
@@ -1458,17 +1738,19 @@ netbe_lcore_setup(struct netbe_lcore *lc)
* ???????
* wait for FE lcores to start, so BE dont' drop any packets
* because corresponding streams not opened yet by FE.
- * usefull when used with pcap PMDS.
+ * useful when used with pcap PMDS.
* think better way, or should this timeout be a cmdlien parameter.
* ???????
*/
rte_delay_ms(10);
rc = 0;
- for (i = 0; i != lc->prt_num && rc == 0; i++) {
+ for (i = 0; i != lc->prtq_num && rc == 0; i++) {
RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, udp_dev: %p)\n",
- __func__, i, lc->prt[i].port.id, lc->prt[i].dev);
- rc = setup_rx_cb(&lc->prt[i].port, lc);
+ __func__, i, lc->prtq[i].port.id, lc->prtq[i].dev);
+ rc = setup_rx_cb(&lc->prtq[i].port, lc, lc->prtq[i].rxqid);
+ if (rc < 0)
+ return rc;
}
if (rc == 0)
@@ -1486,7 +1768,7 @@ netbe_lcore(void)
if (lc == NULL)
return;
- for (i = 0; i != lc->prt_num; i++) {
+ for (i = 0; i != lc->prtq_num; i++) {
netbe_rx(lc, i);
netbe_tx(lc, i);
}
@@ -1504,26 +1786,26 @@ netbe_lcore_clear(void)
RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) finish\n",
__func__, lc->id, lc->ctx);
- for (i = 0; i != lc->prt_num; i++) {
- RTE_LOG(NOTICE, USER1, "%s:%u(port=%u) "
+ for (i = 0; i != lc->prtq_num; i++) {
+ RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, lcore=%u, q=%u, dev=%p) "
"rx_stats={"
"in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "}, "
"tx_stats={"
"in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "};\n",
- __func__, i, lc->prt[i].port.id,
- lc->prt[i].rx_stat.in,
- lc->prt[i].rx_stat.up,
- lc->prt[i].rx_stat.drop,
- lc->prt[i].tx_stat.down,
- lc->prt[i].tx_stat.out,
- lc->prt[i].tx_stat.drop);
+ __func__, i, lc->prtq[i].port.id, lc->id,
+ lc->prtq[i].rxqid,
+ lc->prtq[i].dev,
+ lc->prtq[i].rx_stat.in,
+ lc->prtq[i].rx_stat.up,
+ lc->prtq[i].rx_stat.drop,
+ lc->prtq[i].tx_stat.down,
+ lc->prtq[i].tx_stat.out,
+ lc->prtq[i].tx_stat.drop);
}
-
- for (i = 0; i != lc->prt_num; i++) {
- for (j = 0; j != lc->prt[i].tx_buf.num; j++)
- rte_pktmbuf_free(lc->prt[i].tx_buf.pkt[j]);
- }
+ for (i = 0; i != lc->prtq_num; i++)
+ for (j = 0; j != lc->prtq[i].tx_buf.num; j++)
+ rte_pktmbuf_free(lc->prtq[i].tx_buf.pkt[j]);
RTE_PER_LCORE(_be) = NULL;
}
@@ -1548,9 +1830,8 @@ lcore_main(void *arg)
rc = netfe_lcore_init(&prm->fe);
/* lcore FE init. */
- if (rc == 0 && prm->be.lc != NULL) {
+ if (rc == 0 && prm->be.lc != NULL)
rc = netbe_lcore_setup(prm->be.lc);
- }
if (rc != 0)
sig_handle(SIGQUIT);
@@ -1569,8 +1850,6 @@ lcore_main(void *arg)
return rc;
}
-
-
static int
netfe_lcore_cmp(const void *s1, const void *s2)
{
@@ -1582,38 +1861,82 @@ netfe_lcore_cmp(const void *s1, const void *s2)
}
/*
+ * Helper functions, verify the queue for corresponding UDP port.
+ */
+static uint8_t
+varify_queue_for_port(const struct netbe_dev *prtq, const uint16_t lport)
+{
+ uint32_t align_nb_q, qid;
+
+ align_nb_q = rte_align32pow2(prtq->port.nb_lcore);
+ qid = (lport % align_nb_q) % prtq->port.nb_lcore;
+ if (prtq->rxqid == qid)
+ return 1;
+
+ return 0;
+}
+
+/*
* Helper functions, finds BE by given local and remote addresses.
*/
static int
-netbe_find4(const struct in_addr *laddr, const struct in_addr *raddr)
+netbe_find4(const struct in_addr *laddr, const uint16_t lport,
+ const struct in_addr *raddr, const uint32_t be_lc)
{
uint32_t i, j;
- int32_t rc;
uint32_t idx;
struct netbe_lcore *bc;
- if (laddr->s_addr == INADDR_ANY) {
-
- /* we have exactly one BE, use it for all traffic */
- if (becfg.cpu_num == 1)
- return 0;
+ /* we have exactly one BE, use it for all traffic */
+ if (becfg.cpu_num == 1)
+ return 0;
- /* search by remote address. */
+ /* search by provided be_lcore */
+ if (be_lc != LCORE_ID_ANY) {
for (i = 0; i != becfg.cpu_num; i++) {
bc = becfg.cpu + i;
- rc = rte_lpm_lookup(bc->lpm4,
- rte_be_to_cpu_32(raddr->s_addr), &idx);
- if (rc == 0)
+ if (be_lc == bc->id)
return i;
}
- } else {
+ RTE_LOG(NOTICE, USER1, "%s: no stream with be_lcore=%u\n",
+ __func__, be_lc);
+ return -ENOENT;
+ }
- /* search by local address */
+ /* search by local address */
+ if (laddr->s_addr != INADDR_ANY) {
for (i = 0; i != becfg.cpu_num; i++) {
bc = becfg.cpu + i;
- for (j = 0; j != bc->prt_num; j++)
- if (laddr->s_addr == bc->prt[j].port.ipv4)
+ /* search by queue for the local port */
+ for (j = 0; j != bc->prtq_num; j++) {
+ if (laddr->s_addr == bc->prtq[j].port.ipv4) {
+
+ if (lport == 0)
+ return i;
+
+ if (varify_queue_for_port(bc->prtq + j, lport) != 0)
+ return i;
+ }
+ }
+ }
+ }
+
+ /* search by remote address */
+ if (raddr->s_addr != INADDR_ANY) {
+ for (i = 0; i != becfg.cpu_num; i++) {
+ bc = becfg.cpu + i;
+ if (rte_lpm_lookup(bc->lpm4,
+ rte_be_to_cpu_32(raddr->s_addr),
+ &idx) == 0) {
+
+ if (lport == 0)
return i;
+
+ /* search by queue for the local port */
+ for (j = 0; j != bc->prtq_num; j++)
+ if (varify_queue_for_port(bc->prtq + j, lport) != 0)
+ return i;
+ }
}
}
@@ -1621,35 +1944,64 @@ netbe_find4(const struct in_addr *laddr, const struct in_addr *raddr)
}
static int
-netbe_find6(const struct in6_addr *laddr, const struct in6_addr *raddr)
+netbe_find6(const struct in6_addr *laddr, uint16_t lport,
+ const struct in6_addr *raddr, uint32_t be_lc)
{
uint32_t i, j;
- int32_t rc;
uint8_t idx;
struct netbe_lcore *bc;
- if (memcmp(laddr, &in6addr_any, sizeof(*laddr)) == 0) {
+ /* we have exactly one BE, use it for all traffic */
+ if (becfg.cpu_num == 1)
+ return 0;
- /* we have exactly one BE, use it for all traffic */
- if (becfg.cpu_num == 1)
- return 0;
-
- /* search by remote address. */
+ /* search by provided be_lcore */
+ if (be_lc != LCORE_ID_ANY) {
for (i = 0; i != becfg.cpu_num; i++) {
bc = becfg.cpu + i;
- rc = rte_lpm6_lookup(bc->lpm6,
- (uint8_t *)(uintptr_t)raddr->s6_addr, &idx);
- if (rc == 0)
+ if (be_lc == bc->id)
return i;
}
- } else {
- /* search by local address */
+ RTE_LOG(NOTICE, USER1, "%s: no stream with be_lcore=%u\n",
+ __func__, be_lc);
+ return -ENOENT;
+ }
+
+ /* search by local address */
+ if (memcmp(laddr, &in6addr_any, sizeof(*laddr)) != 0) {
for (i = 0; i != becfg.cpu_num; i++) {
bc = becfg.cpu + i;
- for (j = 0; j != bc->prt_num; j++)
- if (memcmp(laddr, &bc->prt[j].port.ipv6,
- sizeof(*laddr)) == 0)
+ /* search by queue for the local port */
+ for (j = 0; j != bc->prtq_num; j++) {
+ if (memcmp(laddr, &bc->prtq[j].port.ipv6,
+ sizeof(*laddr)) == 0) {
+
+ if (lport == 0)
+ return i;
+
+ if (varify_queue_for_port(bc->prtq + j, lport) != 0)
+ return i;
+ }
+ }
+ }
+ }
+
+ /* search by remote address */
+ if (memcmp(raddr, &in6addr_any, sizeof(*raddr)) == 0) {
+ for (i = 0; i != becfg.cpu_num; i++) {
+ bc = becfg.cpu + i;
+ if (rte_lpm6_lookup(bc->lpm6,
+ (uint8_t *)(uintptr_t)raddr->s6_addr,
+ &idx) == 0) {
+
+ if (lport == 0)
return i;
+
+ /* search by queue for the local port */
+ for (j = 0; j != bc->prtq_num; j++)
+ if (varify_queue_for_port(bc->prtq + j, lport) != 0)
+ return i;
+ }
}
}
@@ -1657,7 +2009,7 @@ netbe_find6(const struct in6_addr *laddr, const struct in6_addr *raddr)
}
static int
-netbe_find(const struct tle_udp_stream_param *p)
+netbe_find(const struct tle_udp_stream_param *p, uint32_t be_lc)
{
const struct sockaddr_in *l4, *r4;
const struct sockaddr_in6 *l6, *r6;
@@ -1665,21 +2017,23 @@ netbe_find(const struct tle_udp_stream_param *p)
if (p->local_addr.ss_family == AF_INET) {
l4 = (const struct sockaddr_in *)&p->local_addr;
r4 = (const struct sockaddr_in *)&p->remote_addr;
- return netbe_find4(&l4->sin_addr, &r4->sin_addr);
+ return netbe_find4(&l4->sin_addr, ntohs(l4->sin_port),
+ &r4->sin_addr, be_lc);
} else if (p->local_addr.ss_family == AF_INET6) {
l6 = (const struct sockaddr_in6 *)&p->local_addr;
r6 = (const struct sockaddr_in6 *)&p->remote_addr;
- return netbe_find6(&l6->sin6_addr, &r6->sin6_addr);
+ return netbe_find6(&l6->sin6_addr, ntohs(l6->sin6_port),
+ &r6->sin6_addr, be_lc);
}
return -EINVAL;
}
static int
-netfe_sprm_flll_be(struct netfe_sprm *sp, uint32_t line)
+netfe_sprm_flll_be(struct netfe_sprm *sp, uint32_t line, uint32_t be_lc)
{
int32_t bidx;
- bidx = netbe_find(&sp->prm);
+ bidx = netbe_find(&sp->prm, be_lc);
if (bidx < 0) {
RTE_LOG(ERR, USER1, "%s(line=%u): no BE for that stream\n",
__func__, line);
@@ -1694,18 +2048,19 @@ static int
netfe_lcore_fill(struct lcore_prm prm[RTE_MAX_LCORE],
struct netfe_lcore_prm *lprm)
{
+ uint32_t be_lc;
uint32_t i, j, lc, ln;
/* determine on what BE each stream should be open. */
for (i = 0; i != lprm->nb_streams; i++) {
-
lc = lprm->stream[i].lcore;
ln = lprm->stream[i].line;
-
- if (netfe_sprm_flll_be(&lprm->stream[i].sprm, ln) != 0 ||
+ be_lc = lprm->stream[i].be_lcore;
+ if (netfe_sprm_flll_be(&lprm->stream[i].sprm, ln,
+ be_lc) != 0 ||
(lprm->stream[i].op == FWD &&
- netfe_sprm_flll_be(&lprm->stream[i].fprm,
- ln) != 0))
+ netfe_sprm_flll_be(&lprm->stream[i].fprm, ln,
+ be_lc) != 0))
return -EINVAL;
}
@@ -1759,6 +2114,7 @@ main(int argc, char *argv[])
char fecfg_fname[PATH_MAX + 1];
char becfg_fname[PATH_MAX + 1];
struct lcore_prm prm[RTE_MAX_LCORE];
+ struct rte_eth_dev_info dev_info;
fecfg_fname[0] = 0;
becfg_fname[0] = 0;
@@ -1824,8 +2180,15 @@ main(int argc, char *argv[])
signal(SIGINT, sig_handle);
- netbe_port_init(&becfg, argc - optind, argv + optind);
- netbe_lcore_init(&becfg, &ctx_prm);
+ rc = netbe_port_init(&becfg, argc - optind, argv + optind);
+ if (rc != 0)
+ rte_exit(EXIT_FAILURE,
+ "%s: netbe_port_init failed with error code: %d\n",
+ __func__, rc);
+
+ rc = netbe_lcore_init(&becfg, &ctx_prm);
+ if (rc != 0)
+ sig_handle(SIGQUIT);
if ((rc = netbe_dest_init(becfg_fname, &becfg)) != 0)
sig_handle(SIGQUIT);
@@ -1841,15 +2204,18 @@ main(int argc, char *argv[])
__func__, becfg.prt[i].id, rc);
sig_handle(SIGQUIT);
}
+ rte_eth_dev_info_get(becfg.prt[i].id, &dev_info);
+ rc = update_rss_reta(&becfg.prt[i], &dev_info);
+ if (rc != 0)
+ sig_handle(SIGQUIT);
}
feprm.max_streams = ctx_prm.max_streams * becfg.cpu_num;
if (rc == 0 && (rc = netfe_parse_cfg(fecfg_fname, &feprm)) != 0)
sig_handle(SIGQUIT);
- for (i = 0; rc == 0 && i != becfg.cpu_num; i++) {
+ for (i = 0; rc == 0 && i != becfg.cpu_num; i++)
prm[becfg.cpu[i].id].be.lc = becfg.cpu + i;
- }
if (rc == 0 && (rc = netfe_lcore_fill(prm, &feprm)) != 0)
sig_handle(SIGQUIT);
diff --git a/examples/udpfwd/netbe.h b/examples/udpfwd/netbe.h
index 0be785a..577627d 100644
--- a/examples/udpfwd/netbe.h
+++ b/examples/udpfwd/netbe.h
@@ -44,19 +44,25 @@
#define MAX_PKT_BURST 0x20
+/* Used to allocate the memory for hash key. */
+#define RSS_HASH_KEY_LENGTH 64
+
/*
* BE related structures.
*/
struct netbe_port {
uint32_t id;
- uint32_t lcore;
+ uint32_t nb_lcore;
+ uint32_t lcore[RTE_MAX_LCORE];
uint32_t mtu;
uint32_t rx_offload;
uint32_t tx_offload;
uint32_t ipv4;
struct in6_addr ipv6;
struct ether_addr mac;
+ uint32_t hash_key_size;
+ uint8_t hash_key[RSS_HASH_KEY_LENGTH];
};
struct netbe_dest {
@@ -109,10 +115,10 @@ struct netbe_lcore {
struct rte_lpm6 *lpm6;
struct rte_ip_frag_tbl *ftbl;
struct tle_udp_ctx *ctx;
- uint32_t prt_num;
+ uint32_t prtq_num;
uint32_t dst4_num;
uint32_t dst6_num;
- struct netbe_dev prt[RTE_MAX_ETHPORTS];
+ struct netbe_dev prtq[RTE_MAX_ETHPORTS * RTE_MAX_LCORE];
struct tle_udp_dest dst4[LCORE_MAX_DST];
struct tle_udp_dest dst6[LCORE_MAX_DST];
struct rte_ip_frag_death_row death_row;
@@ -144,6 +150,7 @@ struct netfe_sprm {
struct netfe_stream_prm {
uint32_t lcore;
+ uint32_t be_lcore;
uint32_t line;
uint16_t op;
uint16_t txlen; /* valid/used only for TXONLY op. */
@@ -253,6 +260,7 @@ struct lcore_prm {
} \
} while (0)
-int setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc);
+int setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc,
+ uint16_t qid);
#endif /* __NETBE_H__ */
diff --git a/examples/udpfwd/parse.c b/examples/udpfwd/parse.c
index 78df01b..cc8c0b2 100644
--- a/examples/udpfwd/parse.c
+++ b/examples/udpfwd/parse.c
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <sched.h>
#include "netbe.h"
#include "parse.h"
@@ -61,7 +62,6 @@ parse_ip_val(__rte_unused const char *key, const char *val, void *prm)
return 0;
}
-
#define PARSE_UINT8x16(s, v, l) \
do { \
char *end; \
@@ -110,6 +110,41 @@ parse_feop_val(__rte_unused const char *key, const char *val, void *prm)
}
static int
+parse_lcore_list_val(__rte_unused const char *key, const char *val, void *prm)
+{
+ union parse_val *rv;
+ unsigned long a, b;
+ uint32_t i;
+ char *end;
+
+ rv = prm;
+ errno = 0;
+ a = strtoul(val, &end, 0);
+ if (errno != 0 || (end[0] != 0 && end[0] != '-') || a > UINT32_MAX)
+ return -EINVAL;
+
+ if (end[0] == '-') {
+ val = end + 1;
+ errno = 0;
+ b = strtoul(val, &end, 0);
+ if (errno != 0 || end[0] != 0 || b > UINT32_MAX)
+ return -EINVAL;
+ } else
+ b = a;
+
+ if (a <= b) {
+ for (i = a; i <= b; i++)
+ CPU_SET(i, &rv->cpuset);
+ } else {
+ RTE_LOG(ERR, USER1,
+ "%s: lcores not in ascending order\n", __func__);
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static int
parse_kvargs(const char *arg, const char *keys_man[], uint32_t nb_man,
const char *keys_opt[], uint32_t nb_opt,
const arg_handler_t hndl[], union parse_val val[])
@@ -139,7 +174,7 @@ parse_kvargs(const char *arg, const char *keys_man[], uint32_t nb_man,
if (rte_kvargs_process(kvl, keys_man[j], hndl[j],
val + j) != 0) {
RTE_LOG(ERR, USER1,
- "%s: %s invalid value for key: %s\n",
+ "%s: %s invalid value for man key: %s\n",
__func__, arg, keys_man[j]);
rte_kvargs_free(kvl);
return -EINVAL;
@@ -151,7 +186,7 @@ parse_kvargs(const char *arg, const char *keys_man[], uint32_t nb_man,
if (rte_kvargs_process(kvl, keys_opt[j], hndl[k],
val + k) != 0) {
RTE_LOG(ERR, USER1,
- "%s: %s invalid value for key: %s\n",
+ "%s: %s invalid value for opt key: %s\n",
__func__, arg, keys_opt[j]);
rte_kvargs_free(kvl);
return -EINVAL;
@@ -166,6 +201,7 @@ int
parse_netbe_arg(struct netbe_port *prt, const char *arg)
{
int32_t rc;
+ uint32_t i, j;
static const char *keys_man[] = {
"port",
@@ -182,7 +218,7 @@ parse_netbe_arg(struct netbe_port *prt, const char *arg)
static const arg_handler_t hndl[] = {
parse_uint_val,
- parse_uint_val,
+ parse_lcore_list_val,
parse_uint_val,
parse_uint_val,
parse_uint_val,
@@ -201,7 +237,10 @@ parse_netbe_arg(struct netbe_port *prt, const char *arg)
return rc;
prt->id = val[0].u64;
- prt->lcore = val[1].u64;
+ for (i = 0, j = 0; i < RTE_MAX_LCORE; i++)
+ if (CPU_ISSET(i, &val[1].cpuset))
+ prt->lcore[j++] = i;
+ prt->nb_lcore = j;
prt->mtu = val[2].u64;
prt->rx_offload = val[3].u64;
prt->tx_offload = val[4].u64;
@@ -210,6 +249,7 @@ parse_netbe_arg(struct netbe_port *prt, const char *arg)
return 0;
}
+
static int
check_netbe_dest(const struct netbe_dest *dst)
{
@@ -390,6 +430,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg)
"fwlport",
"fwraddr",
"fwrport",
+ "belcore",
};
static const arg_handler_t hndl[] = {
@@ -404,16 +445,17 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg)
parse_uint_val,
parse_ip_val,
parse_uint_val,
+ parse_uint_val,
};
union parse_val val[RTE_DIM(hndl)];
memset(val, 0, sizeof(val));
+ val[11].u64 = LCORE_ID_ANY;
rc = parse_kvargs(arg, keys_man, RTE_DIM(keys_man),
keys_opt, RTE_DIM(keys_opt), hndl, val);
if (rc != 0)
return rc;
-
sp->lcore = val[0].u64;
sp->op = val[1].u64;
pv2saddr(&sp->sprm.prm.local_addr, val + 2, val + 3);
@@ -421,6 +463,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg)
sp->txlen = val[6].u64;
pv2saddr(&sp->fprm.prm.local_addr, val + 7, val + 8);
pv2saddr(&sp->fprm.prm.remote_addr, val + 9, val + 10);
+ sp->be_lcore = val[11].u64;
return 0;
}
diff --git a/examples/udpfwd/parse.h b/examples/udpfwd/parse.h
index 911c874..e25e64e 100644
--- a/examples/udpfwd/parse.h
+++ b/examples/udpfwd/parse.h
@@ -16,6 +16,8 @@
#ifndef __PARSE_H__
#define __PARSE_H__
+#define PARSE_LIST_DELIM "-"
+
union parse_val {
uint64_t u64;
struct {
@@ -26,6 +28,7 @@ union parse_val {
};
} in;
struct ether_addr mac;
+ rte_cpuset_t cpuset;
};
static int
diff --git a/examples/udpfwd/pkt.c b/examples/udpfwd/pkt.c
index af87b08..6832b9a 100644
--- a/examples/udpfwd/pkt.c
+++ b/examples/udpfwd/pkt.c
@@ -23,7 +23,6 @@ _mbuf_tx_offload(uint64_t il2, uint64_t il3, uint64_t il4, uint64_t tso,
return il2 | il3 << 7 | il4 << 16 | tso << 24 | ol3 << 40 | ol2 << 49;
}
-
static inline void
fill_pkt_hdr_len(struct rte_mbuf *m, uint32_t l2, uint32_t l3, uint32_t l4)
{
@@ -223,7 +222,7 @@ reassemble(struct rte_mbuf *m, struct netbe_lcore *lc, uint64_t tms,
tbl = lc->ftbl;
dr = &lc->death_row;
- l3cs = lc->prt[port].port.rx_offload & DEV_RX_OFFLOAD_IPV4_CKSUM;
+ l3cs = lc->prtq[port].port.rx_offload & DEV_RX_OFFLOAD_IPV4_CKSUM;
if (RTE_ETH_IS_IPV4_HDR(m->packet_type)) {
@@ -508,7 +507,8 @@ typen_rx_callback(uint8_t port, __rte_unused uint16_t queue,
}
int
-setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc)
+setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc,
+ uint16_t qid)
{
int32_t i, rc;
uint32_t smask;
@@ -589,7 +589,7 @@ setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc)
for (i = 0; i != RTE_DIM(ptype2cb); i++) {
if ((smask & ptype2cb[i].mask) == ptype2cb[i].mask) {
- cb = rte_eth_add_rx_callback(uprt->id, 0,
+ cb = rte_eth_add_rx_callback(uprt->id, qid,
ptype2cb[i].fn, lc);
rc = -rte_errno;
RTE_LOG(ERR, USER1,
diff --git a/lib/libtle_udp/tle_udp_impl.h b/lib/libtle_udp/tle_udp_impl.h
index 8e61ea6..b46e4dd 100644
--- a/lib/libtle_udp/tle_udp_impl.h
+++ b/lib/libtle_udp/tle_udp_impl.h
@@ -63,6 +63,8 @@ struct tle_udp_dev_param {
uint32_t tx_offload; /**< DEV_TX_OFFLOAD_* supported. */
struct in_addr local_addr4; /**< local IPv4 address assigned. */
struct in6_addr local_addr6; /**< local IPv6 address assigned. */
+ uint32_t nb_bl_ports; /**< number of blocked ports. */
+ uint16_t *bl_ports; /**< list of blocked ports. */
};
#define TLE_UDP_MAX_HDR 0x60
diff --git a/lib/libtle_udp/udp_ctl.c b/lib/libtle_udp/udp_ctl.c
index 55c4afd..3ff9751 100644
--- a/lib/libtle_udp/udp_ctl.c
+++ b/lib/libtle_udp/udp_ctl.c
@@ -47,6 +47,11 @@ check_dev_prm(const struct tle_udp_dev_param *dev_prm)
sizeof(tle_udp6_any)) == 0)
return -EINVAL;
+ /* all the ports are blocked. */
+ if (dev_prm->nb_bl_ports > UINT16_MAX ||
+ (dev_prm->nb_bl_ports != 0 && dev_prm->bl_ports == NULL))
+ return -EINVAL;
+
return 0;
}
@@ -241,9 +246,11 @@ tle_udp_ctx_invalidate(struct tle_udp_ctx *ctx)
}
static int
-init_dev_proto(struct tle_udp_dev *dev, uint32_t idx, int32_t socket_id)
+init_dev_proto(struct tle_udp_dev *dev, uint32_t idx, int32_t socket_id,
+ uint16_t *bl_ports, uint32_t nb_bl_ports)
{
size_t sz;
+ uint32_t i;
sz = sizeof(*dev->dp[idx]);
dev->dp[idx] = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
@@ -257,6 +264,11 @@ init_dev_proto(struct tle_udp_dev *dev, uint32_t idx, int32_t socket_id)
}
udp_pbm_init(&dev->dp[idx]->use, LPORT_START_BLK);
+
+ if (bl_ports != NULL)
+ for (i = 0; i < nb_bl_ports; i++)
+ udp_pbm_set(&dev->dp[idx]->use, bl_ports[i]);
+
return 0;
}
@@ -281,6 +293,7 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx,
const struct tle_udp_dev_param *dev_prm)
{
int32_t rc;
+ uint32_t i;
struct tle_udp_dev *dev;
if (ctx == NULL || dev_prm == NULL || check_dev_prm(dev_prm) != 0) {
@@ -294,13 +307,23 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx,
rc = 0;
/* device can handle IPv4 traffic */
- if (dev_prm->local_addr4.s_addr != INADDR_ANY)
- rc = init_dev_proto(dev, TLE_UDP_V4, ctx->prm.socket_id);
+ if (dev_prm->local_addr4.s_addr != INADDR_ANY) {
+ rc = init_dev_proto(dev, TLE_UDP_V4, ctx->prm.socket_id,
+ dev_prm->bl_ports, dev_prm->nb_bl_ports);
+ for (i = 0; i < dev_prm->nb_bl_ports; i++)
+ udp_pbm_set(&ctx->use[TLE_UDP_V4],
+ dev_prm->bl_ports[i]);
+ }
/* device can handle IPv6 traffic */
if (rc == 0 && memcmp(&dev_prm->local_addr6, &tle_udp6_any,
- sizeof(tle_udp6_any)) != 0)
- rc = init_dev_proto(dev, TLE_UDP_V6, ctx->prm.socket_id);
+ sizeof(tle_udp6_any)) != 0) {
+ rc = init_dev_proto(dev, TLE_UDP_V6, ctx->prm.socket_id,
+ dev_prm->bl_ports, dev_prm->nb_bl_ports);
+ for (i = 0; i < dev_prm->nb_bl_ports; i++)
+ udp_pbm_set(&ctx->use[TLE_UDP_V6],
+ dev_prm->bl_ports[i]);
+ }
if (rc != 0) {
/* cleanup and return an error. */
@@ -483,13 +506,11 @@ stream_fill_dev(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
return ENFILE;
/* fill socket's dst port and type */
-
sp = htons(p);
s->type = t;
s->port.dst = sp;
/* mark port as in-use */
-
udp_pbm_set(&ctx->use[t], p);
if (dev != NULL) {
udp_pbm_set(pbm, p);