diff options
-rw-r--r-- | examples/udpfwd/Makefile | 2 | ||||
-rw-r--r-- | examples/udpfwd/README | 39 | ||||
-rw-r--r-- | examples/udpfwd/main.c | 924 | ||||
-rw-r--r-- | examples/udpfwd/netbe.h | 16 | ||||
-rw-r--r-- | examples/udpfwd/parse.c | 55 | ||||
-rw-r--r-- | examples/udpfwd/parse.h | 3 | ||||
-rw-r--r-- | examples/udpfwd/pkt.c | 8 | ||||
-rw-r--r-- | lib/libtle_udp/tle_udp_impl.h | 2 | ||||
-rw-r--r-- | lib/libtle_udp/udp_ctl.c | 35 |
9 files changed, 768 insertions, 316 deletions
diff --git a/examples/udpfwd/Makefile b/examples/udpfwd/Makefile index c23947a..5274d0f 100644 --- a/examples/udpfwd/Makefile +++ b/examples/udpfwd/Makefile @@ -40,5 +40,7 @@ LDLIBS += -L$(TLDK_ROOT)/$(RTE_TARGET)/lib LDLIBS += -ltle_udp EXTRA_CFLAGS += -O3 +CFLAGS_parse.o += -D_GNU_SOURCE +CFLAGS_main.o += -mcmodel=medium include $(RTE_SDK)/mk/rte.extapp.mk diff --git a/examples/udpfwd/README b/examples/udpfwd/README index 8ab7e98..91b6e76 100644 --- a/examples/udpfwd/README +++ b/examples/udpfwd/README @@ -18,15 +18,15 @@ BE is responsible for: (via tle_udp_rx_bulk). - retrieve packets ready to be send out from UDP TLDK context(s) and TX them over destined DPDK port. -Right now only one RX/TX queue per port is used. +Multiple RX/TX queues per port are supported by RSS. Right now the number of +TX is same as the number of RX queue. Each BE lcore can serve multiple DPDK ports, TLDK UDP contexts. - Front End (FE) FE responsibility is to open configured UDP streams and perform send/recv over them. These streams can belong to different UDP contexts. -Right now each lcore can act as BE or FE (but not both simultaneously). -Master lcore can act as FE only. +Right now each lcore can act as BE and/or FE. Usage ===== @@ -40,18 +40,18 @@ udpfwd <EAL parameters> -- \ -f | --fecfg <filename> /* frontend configuration file. */ \ <port0_params> <port1_params> ... <portN_params> -port_params: port=<uint>,lcore=<uint>,\ +port_params: port=<uint>,lcore=<uint>[-<uint>],\ [rx_offload=<uint>,tx_offload=<uint>,mtu=<uint>,ipv4=<ipv4>,ipv6=<ipv6>] port_params are used to configure the particular DPDK device (rte_ethdev port), and specify BE lcore that will do RX/TX from/to the device and manage -BE part of corresponding UDP context. +BE part of corresponding UDP context. Multiple BE lcore can be specified. -port - DPDK port id (right now on each port is used just one RX, - one TX queue). +port - DPDK port id (multiple queues are supported when multiple lcore + is specified for a port). lcore - EAL lcore id to do IO over that port (rx_burst/tx_burst). - several ports can be managed by the same lcore, - but same port can't belong to more than one lcore. + several ports can be managed by the same lcore, and same port can + belong to more than one lcore. rx_offload - RX HW offload capabilities to enable/use on this port. (bitmask of DEV_RX_OFFLOAD_* values). tx_offload - TX HW offload capabilities to enable/use on this port. @@ -64,15 +64,15 @@ ipv6 - ipv6 address to assign to that port. At least one of ipv4/ipv6 values have to be specified for each port. As an example: -udpfwd --lcores='3,6' -w 01:00.0 -- \ +udpfwd --lcores='3,6,8' -w 01:00.0 -- \ --promisc --rbufs 0x1000 --sbufs 0x1000 --streams 0x100 \ --fecfg ./fe.cfg --becfg ./be.cfg \ -port=0,lcore=6,rx_offload=0xf,tx_offload=0,\ +port=0,lcore=6,lcore=8,rx_offload=0xf,tx_offload=0,\ ipv4=192.168.1.233,ipv6=2001:4860:b002::28 -Will create TLDK UDP context on lcore=6 (BE lcore) to manage DPDK port 0. -Will assign IPv4 address 192.168.1.233 and IPv6 address 2001:4860:b002::28 -to that port. +Will create TLDK UDP context on lcore=6 and lcore=8 (BE lcore) to manage +DPDK port 0. Will assign IPv4 address 192.168.1.233 and IPv6 address +2001:4860:b002::28 to that port. The following supported by DPDK RX HW offloads: DEV_RX_OFFLOAD_VLAN_STRIP, DEV_RX_OFFLOAD_IPV4_CKSUM, @@ -81,6 +81,11 @@ The following supported by DPDK RX HW offloads: will be enabled on that port. No HW TX offloads will be enabled. +If multiple lcore is specified per DPDK port, the following RSS hash will +be enabled on that port: + ETH_RSS_UDP + + Fornt-End (FE) and Back-End (BE) configuration files format: ------------------------------------------------------------ - each record on a separate line. @@ -96,9 +101,10 @@ FE config record format: lcore=<uint>,op=<"rx|tx|echo|fwd">,\ laddr=<ip>,lport=<uint16>,raddr=<ip>,rport=<uint16>,\ -[txlen=<uint>,fwladdr=<ip>,fwlport=<uint16>,fwraddr=<ip>,fwrport=<uint16> +[txlen=<uint>,fwladdr=<ip>,fwlport=<uint16>,fwraddr=<ip>,fwrport=<uint16>,\ +belcore=<uint>] -lcore - EAL lcore to manage that stream(s). +lcore - EAL lcore to manage that stream(s) in the FE. op - operation to perform on that stream: "rx" - do receive only on that stream. "tx" - do send only on that stream. @@ -118,6 +124,7 @@ fwraddr - remote address for the forwarding stream(s) to open ("fwd mode only). fwrport - remote port for the forwarding stream(s) to open ("fwd mode only). +belcore - EAL lcore to manage that stream(s) in the BE. Refer to fe.cfg for an example. diff --git a/examples/udpfwd/main.c b/examples/udpfwd/main.c index 2dde317..3daeb30 100644 --- a/examples/udpfwd/main.c +++ b/examples/udpfwd/main.c @@ -71,6 +71,36 @@ static const struct option long_opt[] = { {NULL, 0, 0, 0} }; +/** + * IPv4 Input size in bytes for RSS hash key calculation. + * source address, destination address, source port, and destination port. + */ +#define IPV4_TUPLE_SIZE 12 + +/** + * IPv6 Input size in bytes for RSS hash key calculation. + * source address, destination address, source port, and destination port. + */ +#define IPV6_TUPLE_SIZE 36 + +/** + * Location to be modified to create the IPv4 hash key which helps + * to distribute packets based on the destination UDP port. + */ +#define RSS_HASH_KEY_DEST_PORT_LOC_IPV4 15 + +/* + * Location to be modified to create the IPv6 hash key which helps + * to distribute packets based on the destination UDP port. + */ +#define RSS_HASH_KEY_DEST_PORT_LOC_IPV6 39 + +/** + * Size of the rte_eth_rss_reta_entry64 array to update through + * rte_eth_dev_rss_reta_update. + */ +#define RSS_RETA_CONF_ARRAY_SIZE (ETH_RSS_RETA_SIZE_512/RTE_RETA_GROUP_SIZE) + static volatile int force_quit; static struct netbe_cfg becfg; @@ -94,20 +124,145 @@ sig_handle(int signum) force_quit = 1; } +static void +prepare_hash_key(struct netbe_port *uprt, uint8_t key_size, uint16_t family) +{ + uint32_t align_nb_q; + + align_nb_q = rte_align32pow2(uprt->nb_lcore); + memset(uprt->hash_key, 0, RSS_HASH_KEY_LENGTH); + uprt->hash_key_size = key_size; + if (family == AF_INET) + uprt->hash_key[RSS_HASH_KEY_DEST_PORT_LOC_IPV4] = align_nb_q; + else + uprt->hash_key[RSS_HASH_KEY_DEST_PORT_LOC_IPV6] = align_nb_q; +} + +static uint32_t +qidx_from_hash_index(uint32_t hash, uint32_t align_nb_q) +{ + uint32_t i, nb_bit, q; + + nb_bit = (sizeof(uint32_t) * CHAR_BIT) - __builtin_clz(align_nb_q - 1); + q = (hash & 1); + for (i = 1; i < nb_bit; i++) { + hash >>= 1; + q <<= 1; + q |= (hash & 1); + } + + return q; +} + +static int +update_rss_conf(struct netbe_port *uprt, + const struct rte_eth_dev_info *dev_info, + struct rte_eth_conf *port_conf) +{ + uint8_t hash_key_size; + + if (uprt->nb_lcore > 1) { + if (dev_info->hash_key_size > 0) + hash_key_size = dev_info->hash_key_size; + else { + RTE_LOG(ERR, USER1, + "%s: dev_info did not provide a valid hash key size\n", + __func__); + return -EINVAL; + } + + if (uprt->ipv4 != INADDR_ANY && + memcmp(&uprt->ipv6, &in6addr_any, + sizeof(uprt->ipv6)) != 0) { + RTE_LOG(ERR, USER1, + "%s: RSS for both IPv4 and IPv6 not supported!\n", + __func__); + return -EINVAL; + } else if (uprt->ipv4 != INADDR_ANY) { + prepare_hash_key(uprt, hash_key_size, AF_INET); + } else if (memcmp(&uprt->ipv6, &in6addr_any, sizeof(uprt->ipv6)) + != 0) { + prepare_hash_key(uprt, hash_key_size, AF_INET6); + } else { + RTE_LOG(ERR, USER1, + "%s: No IPv4 or IPv6 address is found!\n", + __func__); + return -EINVAL; + } + port_conf->rxmode.mq_mode = ETH_MQ_RX_RSS; + port_conf->rx_adv_conf.rss_conf.rss_hf = ETH_RSS_UDP; + port_conf->rx_adv_conf.rss_conf.rss_key_len = hash_key_size; + port_conf->rx_adv_conf.rss_conf.rss_key = uprt->hash_key; + } + + return 0; +} + +static int +update_rss_reta(struct netbe_port *uprt, + const struct rte_eth_dev_info *dev_info) +{ + struct rte_eth_rss_reta_entry64 reta_conf[RSS_RETA_CONF_ARRAY_SIZE]; + int32_t i, rc, align_nb_q; + int32_t q_index, idx, shift; + + if (uprt->nb_lcore > 1) { + if (dev_info->reta_size == 0) { + RTE_LOG(ERR, USER1, + "%s: Redirection table size 0 is invalid for RSS\n", + __func__); + return -EINVAL; + } + RTE_LOG(NOTICE, USER1, + "%s: The reta size of port %d is %u\n", + __func__, uprt->id, dev_info->reta_size); + + if (dev_info->reta_size > ETH_RSS_RETA_SIZE_512) { + RTE_LOG(ERR, USER1, + "%s: More than %u entries of Reta not supported\n", + __func__, ETH_RSS_RETA_SIZE_512); + return -EINVAL; + } + + memset(reta_conf, 0, sizeof(reta_conf)); + align_nb_q = rte_align32pow2(uprt->nb_lcore); + for (i = 0; i < align_nb_q; i++) { + q_index = qidx_from_hash_index(i, align_nb_q) % + uprt->nb_lcore; + + idx = i / RTE_RETA_GROUP_SIZE; + shift = i % RTE_RETA_GROUP_SIZE; + reta_conf[idx].mask |= (1ULL << shift); + reta_conf[idx].reta[shift] = q_index; + RTE_LOG(NOTICE, USER1, + "%s: port=%u RSS reta conf: hash=%u, q=%u\n", + __func__, uprt->id, i, q_index); + } + + rc = rte_eth_dev_rss_reta_update(uprt->id, + reta_conf, dev_info->reta_size); + if (rc != 0) { + RTE_LOG(ERR, USER1, + "%s: Bad redirection table parameter, rc = %d\n", + __func__, rc); + return rc; + } + } + + return 0; +} + /* * Initilise DPDK port. - * In current version, only one queue per port is used. + * In current version, multi-queue per port is used. */ static int -port_init(struct netbe_port *uprt, struct rte_mempool *mp) +port_init(struct netbe_port *uprt) { - int32_t socket, rc; - uint16_t q; + int32_t rc; struct rte_eth_conf port_conf; struct rte_eth_dev_info dev_info; - const uint16_t rx_rings = 1, tx_rings = 1; - rte_eth_dev_info_get(uprt->id, &dev_info); if ((dev_info.rx_offload_capa & uprt->rx_offload) != uprt->rx_offload) { RTE_LOG(ERR, USER1, @@ -130,16 +285,33 @@ port_init(struct netbe_port *uprt, struct rte_mempool *mp) __func__, uprt->id); port_conf.rxmode.hw_ip_checksum = 1; } - port_conf.rxmode.max_rx_pkt_len = uprt->mtu + ETHER_CRC_LEN; - rc = rte_eth_dev_configure(uprt->id, rx_rings, tx_rings, &port_conf); + rc = update_rss_conf(uprt, &dev_info, &port_conf); + if (rc != 0) + return rc; + + rc = rte_eth_dev_configure(uprt->id, uprt->nb_lcore, uprt->nb_lcore, + &port_conf); RTE_LOG(NOTICE, USER1, - "%s: rte_eth_dev_configure(%u) returns %d;\n", - __func__, uprt->id, rc); + "%s: rte_eth_dev_configure(prt_id=%u, nb_rxq=%u, nb_txq=%u) " + "returns %d;\n", __func__, uprt->id, uprt->nb_lcore, + uprt->nb_lcore, rc); if (rc != 0) return rc; + return 0; +} + +static int +queue_init(struct netbe_port *uprt, struct rte_mempool *mp) +{ + int32_t socket, rc; + uint16_t q; + struct rte_eth_dev_info dev_info; + + rte_eth_dev_info_get(uprt->id, &dev_info); + socket = rte_eth_dev_socket_id(uprt->id); dev_info.default_rxconf.rx_drop_en = 1; @@ -151,21 +323,27 @@ port_init(struct netbe_port *uprt, struct rte_mempool *mp) dev_info.default_txconf.txq_flags = 0; } - for (q = 0; q < rx_rings; q++) { + for (q = 0; q < uprt->nb_lcore; q++) { rc = rte_eth_rx_queue_setup(uprt->id, q, RX_RING_SIZE, - socket, NULL, mp); - if (rc < 0) + socket, &dev_info.default_rxconf, mp); + if (rc < 0) { + RTE_LOG(ERR, USER1, + "%s: rx queue=%u setup failed with error code: %d\n", + __func__, q, rc); return rc; + } } - for (q = 0; q < tx_rings; q++) { + for (q = 0; q < uprt->nb_lcore; q++) { rc = rte_eth_tx_queue_setup(uprt->id, q, TX_RING_SIZE, socket, &dev_info.default_txconf); - if (rc < 0) + if (rc < 0) { + RTE_LOG(ERR, USER1, + "%s: tx queue=%u setup failed with error code: %d\n", + __func__, q, rc); return rc; + } } - - return 0; } @@ -190,13 +368,29 @@ check_lcore(uint32_t lc) static void log_netbe_prt(const struct netbe_port *uprt) { + uint32_t i; + char corelist[2 * RTE_MAX_LCORE + 1]; + char hashkey[2 * RSS_HASH_KEY_LENGTH]; + + memset(corelist, 0, sizeof(corelist)); + memset(hashkey, 0, sizeof(hashkey)); + for (i = 0; i < uprt->nb_lcore; i++) + if (i < uprt->nb_lcore - 1) + sprintf(corelist + (2 * i), "%u,", uprt->lcore[i]); + else + sprintf(corelist + (2 * i), "%u", uprt->lcore[i]); + + for (i = 0; i < uprt->hash_key_size; i++) + sprintf(hashkey + (2 * i), "%02x", uprt->hash_key[i]); + RTE_LOG(NOTICE, USER1, - "uprt %p = <id = %u, lcore = %u, " - "mtu = %u, rx_offload = %u, tx_offload = %u,\n" + "uprt %p = <id = %u, lcore = <%s>, mtu = %u, " + "rx_offload = %u, tx_offload = %u,\n" "ipv4 = %#x, " "ipv6 = %04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx, " - "mac = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx>;\n", - uprt, uprt->id, uprt->lcore, + "mac = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx>;\n" + "hashkey = %s;\n", + uprt, uprt->id, corelist, uprt->mtu, uprt->rx_offload, uprt->tx_offload, uprt->ipv4, uprt->ipv6.s6_addr16[0], uprt->ipv6.s6_addr16[1], @@ -205,7 +399,8 @@ log_netbe_prt(const struct netbe_port *uprt) uprt->ipv6.s6_addr16[6], uprt->ipv6.s6_addr16[7], uprt->mac.addr_bytes[0], uprt->mac.addr_bytes[1], uprt->mac.addr_bytes[2], uprt->mac.addr_bytes[3], - uprt->mac.addr_bytes[4], uprt->mac.addr_bytes[5]); + uprt->mac.addr_bytes[4], uprt->mac.addr_bytes[5], + hashkey); } static void @@ -262,53 +457,126 @@ frag_pool_init(uint32_t sid) return 0; } +static struct netbe_lcore * +find_initilized_lcore(struct netbe_cfg *cfg, uint32_t lc_num) +{ + uint32_t i; + + for (i = 0; i < cfg->cpu_num; i++) + if (cfg->cpu[i].id == lc_num) + return &cfg->cpu[i]; + + return NULL; +} + +static int +calculate_nb_prtq(struct netbe_cfg *cfg) +{ + uint32_t i, j, rc; + struct netbe_port *prt; + struct netbe_lcore *lc; + + for (i = 0; i != cfg->prt_num; i++) { + prt = &cfg->prt[i]; + for (j = 0; j != prt->nb_lcore; j++) { + rc = check_lcore(prt->lcore[j]); + if (rc != 0) { + RTE_LOG(ERR, USER1, + "%s: processing failed with err: %d\n", + __func__, rc); + return rc; + } + + lc = find_initilized_lcore(cfg, prt->lcore[j]); + if (lc == NULL) { + lc = &cfg->cpu[cfg->cpu_num]; + lc->id = prt->lcore[j]; + cfg->cpu_num++; + } + lc->prtq[lc->prtq_num].rxqid = j; + lc->prtq[lc->prtq_num].txqid = j; + lc->prtq[lc->prtq_num].port = *prt; + + lc->prtq_num++; + } + } + + return 0; +} /* * Setup all enabled ports. */ -static void +static int netbe_port_init(struct netbe_cfg *cfg, int argc, char *argv[]) { int32_t rc; - uint32_t i, n, sid; + uint32_t i, n, sid, j; + struct netbe_port *prt; n = RTE_MIN(RTE_DIM(cfg->prt), (uint32_t)argc); rc = 0; for (i = 0; i != n; i++) { rc = parse_netbe_arg(cfg->prt + i, argv[i]); - if (rc != 0) - break; + if (rc != 0) { + RTE_LOG(ERR, USER1, + "%s: processing of \"%s\" failed with error code: %d\n", + __func__, argv[i], rc); + return rc; + } + } + cfg->prt_num = i; - rc = check_lcore(cfg->prt[i].lcore); - if (rc != 0) - break; + /* calculate number of queues per lcore. */ + rc = calculate_nb_prtq(cfg); + if (rc != 0) { + RTE_LOG(ERR, USER1, "%s: processing of arguments failed" + " with error code: %d\n", __func__, rc); + return rc; + } - sid = rte_lcore_to_socket_id(cfg->prt[i].lcore) + 1; - assert(sid < RTE_DIM(mpool)); + for (i = 0; i != cfg->prt_num; i++) { + prt = cfg->prt + i; + rc = port_init(prt); + if (rc != 0) { + RTE_LOG(ERR, USER1, + "%s: port=%u init failed with error code: %d\n", + __func__, prt->id, rc); + return rc; + } + rte_eth_macaddr_get(prt->id, &prt->mac); + if (cfg->promisc) + rte_eth_promiscuous_enable(prt->id); - if (mpool[sid] == NULL && (rc = pool_init(sid)) != 0) - break; + for (j = 0; j < prt->nb_lcore; j++) { + sid = rte_lcore_to_socket_id(prt->lcore[j]) + 1; + assert(sid < RTE_DIM(mpool)); - if (frag_mpool[sid] == NULL && (rc = frag_pool_init(sid)) != 0) - break; + if (mpool[sid] == NULL) { + rc = pool_init(sid); + if (rc != 0) + return rc; + } - rc = port_init(cfg->prt + i, mpool[sid]); - if (rc != 0) - break; + if (frag_mpool[sid] == NULL) { + rc = frag_pool_init(sid); + if (rc != 0) + return rc; + } - rte_eth_macaddr_get(cfg->prt[i].id, &cfg->prt[i].mac); - if (cfg->promisc) - rte_eth_promiscuous_enable(cfg->prt[i].id); + rc = queue_init(prt, mpool[sid]); + if (rc != 0) { + RTE_LOG(ERR, USER1, + "%s: lcore=%u queue init failed with err: %d\n", + __func__, prt->lcore[j], rc); + return rc; + } + } } - - if (rc != 0) - rte_exit(EXIT_FAILURE, - "%s: processing of \"%s\" failed with error code: %d\n", - __func__, argv[i], rc); - - cfg->prt_num = i; log_netbe_cfg(cfg); + + return 0; } /* @@ -487,128 +755,153 @@ fill_dst(struct tle_udp_dest *dst, struct netbe_dev *bed, } } - -/* - * BE lcore setup routine. - */ static int -lcore_init(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm, - const struct netbe_port prt[], uint32_t prt_num) +create_context(struct netbe_lcore *lc, + const struct tle_udp_ctx_param *ctx_prm) { - int32_t rc, sid; - uint32_t i; + uint32_t rc = 0, sid; uint64_t frag_cycles; struct tle_udp_ctx_param cprm; - struct tle_udp_dev_param dprm; - lc->id = prt[0].lcore; - lc->prt_num = prt_num; + if (lc->ctx == NULL) { + sid = rte_lcore_to_socket_id(lc->id); - sid = rte_lcore_to_socket_id(lc->id); + rc = lcore_lpm_init(lc); + if (rc != 0) + return rc; - rc = lcore_lpm_init(lc); - if (rc != 0) - return rc; + cprm = *ctx_prm; + cprm.socket_id = sid; + cprm.lookup4 = lpm4_dst_lookup; + cprm.lookup4_data = lc; + cprm.lookup6 = lpm6_dst_lookup; + cprm.lookup6_data = lc; - cprm = *ctx_prm; - cprm.socket_id = sid; - cprm.lookup4 = lpm4_dst_lookup; - cprm.lookup4_data = lc; - cprm.lookup6 = lpm6_dst_lookup; - cprm.lookup6_data = lc; + /* to facilitate both IPv4 and IPv6. */ + cprm.max_streams *= 2; - /* to facilitate both IPv4 and IPv6. */ - cprm.max_streams *= 2; + frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) / + MS_PER_S * FRAG_TTL; - frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S * FRAG_TTL; - lc->ftbl = rte_ip_frag_table_create(cprm.max_streams, - FRAG_TBL_BUCKET_ENTRIES, cprm.max_streams, frag_cycles, sid); - RTE_LOG(NOTICE, USER1, "%s(lcore=%u): frag_tbl=%p;\n", - __func__, lc->id, lc->ftbl); + lc->ftbl = rte_ip_frag_table_create(cprm.max_streams, + FRAG_TBL_BUCKET_ENTRIES, cprm.max_streams, + frag_cycles, sid); - lc->ctx = tle_udp_create(&cprm); - RTE_LOG(NOTICE, USER1, "%s(lcore=%u): udp_ctx=%p;\n", - __func__, lc->id, lc->ctx); + RTE_LOG(NOTICE, USER1, "%s(lcore=%u): frag_tbl=%p;\n", + __func__, lc->id, lc->ftbl); - if (lc->ctx == NULL || lc->ftbl == NULL) - rc = ENOMEM; + lc->ctx = tle_udp_create(&cprm); - for (i = 0; i != prt_num && rc == 0; i++) { + RTE_LOG(NOTICE, USER1, "%s(lcore=%u): udp_ctx=%p;\n", + __func__, lc->id, lc->ctx); - memset(&dprm, 0, sizeof(dprm)); + if (lc->ctx == NULL || lc->ftbl == NULL) + rc = ENOMEM; + } - lc->prt[i].rxqid = 0; - lc->prt[i].txqid = 0; - lc->prt[i].port = prt[i]; + return rc; +} + +/* + * BE lcore setup routine. + */ +static int +lcore_init(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm, + const uint32_t prtqid, uint16_t *bl_ports, uint32_t nb_bl_ports) +{ + int32_t rc = 0; + struct tle_udp_dev_param dprm; - dprm.rx_offload = prt[i].rx_offload; - dprm.tx_offload = prt[i].tx_offload; - dprm.local_addr4.s_addr = prt[i].ipv4; - memcpy(&dprm.local_addr6, &prt[i].ipv6, sizeof(prt[i].ipv6)); + rc = create_context(lc, ctx_prm); - lc->prt[i].dev = tle_udp_add_dev(lc->ctx, &dprm); - RTE_LOG(NOTICE, USER1, "%s(lcore=%u, port=%u), udp_dev: %p;\n", - __func__, lc->id, prt[i].id, lc->prt[i].dev); - if (lc->prt[i].dev == NULL) + if (lc->ctx != NULL) { + memset(&dprm, 0, sizeof(dprm)); + dprm.rx_offload = lc->prtq[prtqid].port.rx_offload; + dprm.tx_offload = lc->prtq[prtqid].port.tx_offload; + dprm.local_addr4.s_addr = lc->prtq[prtqid].port.ipv4; + memcpy(&dprm.local_addr6, &lc->prtq[prtqid].port.ipv6, + sizeof(lc->prtq[prtqid].port.ipv6)); + dprm.nb_bl_ports = nb_bl_ports; + dprm.bl_ports = bl_ports; + + lc->prtq[prtqid].dev = tle_udp_add_dev(lc->ctx, &dprm); + + RTE_LOG(NOTICE, USER1, + "%s(lcore=%u, port=%u, qid=%u), udp_dev: %p\n", + __func__, lc->id, lc->prtq[prtqid].port.id, + lc->prtq[prtqid].rxqid, lc->prtq[prtqid].dev); + + if (lc->prtq[prtqid].dev == NULL) rc = -rte_errno; - } - if (rc != 0) { - RTE_LOG(ERR, USER1, "%s(lcore=%u) failed with error code: %d\n", - __func__, lc->id, rc); - tle_udp_destroy(lc->ctx); - rte_ip_frag_table_destroy(lc->ftbl); - rte_lpm_free(lc->lpm4); - rte_lpm6_free(lc->lpm6); + if (rc != 0) { + RTE_LOG(ERR, USER1, + "%s(lcore=%u) failed with error code: %d\n", + __func__, lc->id, rc); + tle_udp_destroy(lc->ctx); + rte_ip_frag_table_destroy(lc->ftbl); + rte_lpm_free(lc->lpm4); + rte_lpm6_free(lc->lpm6); + return rc; + } } return rc; } -static int -prt_lcore_cmp(const void *s1, const void *s2) +static uint16_t +create_blocklist(const struct netbe_port *beprt, uint16_t *bl_ports, + uint32_t q) { - const struct netbe_port *p1, *p2; + uint32_t i, j, qid, align_nb_q; - p1 = s1; - p2 = s2; - return p1->lcore - p2->lcore; + align_nb_q = rte_align32pow2(beprt->nb_lcore); + for (i = 0, j = 0; i < (UINT16_MAX + 1); i++) { + qid = (i % align_nb_q) % beprt->nb_lcore; + if (qid != q) + bl_ports[j++] = i; + } + + return j; } -static void -netbe_lcore_init(struct netbe_cfg *cfg, const struct tle_udp_ctx_param *ctx_prm) +static int +netbe_lcore_init(struct netbe_cfg *cfg, + const struct tle_udp_ctx_param *ctx_prm) { int32_t rc; - uint32_t i, k, n, num; - struct netbe_port sp[RTE_DIM(cfg->prt)]; - - num = cfg->prt_num; - memcpy(sp, cfg->prt, sizeof(sp[0]) * num); - qsort(sp, num, sizeof(sp[0]), prt_lcore_cmp); - - /* Fill ports to be used by each lcore. */ + uint32_t i, j, nb_bl_ports, sz; + struct netbe_lcore *lc; + static uint16_t *bl_ports; - k = 0; - n = 0; + /* Create the udp context and attached queue for each lcore. */ rc = 0; - for (i = 0; i != num && rc == 0; i++) { - if (sp[n].lcore != sp[i].lcore) { - rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n); - n = i; - k++; + nb_bl_ports = 0; + sz = sizeof(uint16_t) * UINT16_MAX; + bl_ports = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE); + for (i = 0; i < cfg->cpu_num; i++) { + lc = &cfg->cpu[i]; + for (j = 0; j < lc->prtq_num; j++) { + memset((uint8_t *)bl_ports, 0, sz); + /* create list of blocked ports based on q */ + nb_bl_ports = create_blocklist(&lc->prtq[j].port, + bl_ports, lc->prtq[j].rxqid); + RTE_LOG(NOTICE, USER1, "lc=%u, q=%u, nb_bl_ports4=%u\n", + lc->id, lc->prtq[j].rxqid, nb_bl_ports); + + rc = lcore_init(lc, ctx_prm, j, bl_ports, nb_bl_ports); + if (rc != 0) { + RTE_LOG(ERR, USER1, + "%s: failed with error code: %d\n", + __func__, rc); + rte_free(bl_ports); + return rc; + } } } + rte_free(bl_ports); - if (rc == 0 && i != n) { - rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n); - k++; - } - - if (rc != 0) - rte_exit(EXIT_FAILURE, "%s: failed with error code: %d\n", - __func__, rc); - - cfg->cpu_num = k; + return 0; } static void @@ -659,7 +952,7 @@ netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family, rc = 0; for (i = 0; i != dnum && rc == 0; i++) { - fill_dst(dp + i, lc->prt + dev_idx, dst + i, l3_type, sid); + fill_dst(dp + i, lc->prtq + dev_idx, dst + i, l3_type, sid); if (family == AF_INET) rc = netbe_add_ipv4_route(lc, dst + i, n + i); else @@ -675,42 +968,11 @@ netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family, } static int -netbe_port2lcore(struct netbe_cfg *cfg, uint32_t port, struct netbe_lcore **plc) -{ - uint32_t i, j; - struct netbe_lcore *lc; - - for (i = 0; i != cfg->cpu_num; i++) { - lc = cfg->cpu + i; - for (j = 0; j != cfg->prt_num; j++) { - if (lc->prt[j].port.id == port) { - *plc = lc; - return j; - } - } - } - - return -ENOENT; -} - -static int -netbe_dest_cmp(const void *s1, const void *s2) -{ - const struct netbe_dest *p1, *p2; - - p1 = s1; - p2 = s2; - if (p1->port == p2->port) - return p1->family - p2->family; - else - return p1->port - p2->port; -} - -static int netbe_dest_init(const char *fname, struct netbe_cfg *cfg) { int32_t rc; - uint32_t f, i, j, p; + uint32_t f, i, p; + uint32_t k, l, cnt; struct netbe_lcore *lc; struct netbe_dest_prm prm; @@ -718,29 +980,36 @@ netbe_dest_init(const char *fname, struct netbe_cfg *cfg) if (rc != 0) return rc; - qsort(prm.dest, prm.nb_dest, sizeof(prm.dest[0]), netbe_dest_cmp); - rc = 0; - for (i = 0; i != prm.nb_dest; i = j) { + for (i = 0; i != prm.nb_dest; i++) { p = prm.dest[i].port; f = prm.dest[i].family; - for (j = i + 1; j != prm.nb_dest && p == prm.dest[j].port && - f == prm.dest[j].family; - j++) - ; - rc = netbe_port2lcore(cfg, p, &lc); - if (rc < 0) { + cnt = 0; + for (k = 0; k != cfg->cpu_num; k++) { + lc = cfg->cpu + k; + for (l = 0; l != lc->prtq_num; l++) + if (lc->prtq[l].port.id == p) { + rc = netbe_add_dest(lc, l, f, + prm.dest + i, 1); + if (rc != 0) { + RTE_LOG(ERR, USER1, + "%s(lcore=%u, family=%u) could not " + "add destinations(%u);\n", + __func__, lc->id, f, i); + return -ENOSPC; + } + cnt++; + } + } + + if (cnt == 0) { RTE_LOG(ERR, USER1, "%s(%s) error at line %u: " "port %u not managed by any lcore;\n", __func__, fname, prm.dest[i].line, p); break; } - - rc = netbe_add_dest(lc, rc, f, prm.dest + i, j - i); - if (rc != 0) - break; } free(prm.dest); @@ -817,7 +1086,6 @@ netfe_stream_dump(const struct netfe_stream *fes) fes->stat.txev[TLE_SEV_UP]); } - /* * helper function: opens IPv4 and IPv6 streams for selected port. */ @@ -828,6 +1096,9 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm, int32_t rc; uint32_t sidx; struct netfe_stream *fes; + struct sockaddr_in *l4; + struct sockaddr_in6 *l6; + uint16_t errport; sidx = fe->sidx; fes = fe->fs + sidx; @@ -842,8 +1113,10 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm, if (op != FWD) sprm->send_ev = fes->txev; - RTE_LOG(ERR, USER1, "%s(%u) [%u]={op=%hu, rxev=%p, txev=%p}\n", - __func__, lcore, sidx, op, fes->rxev, fes->txev); + RTE_LOG(ERR, USER1, + "%s(%u) [%u]={op=%hu, rxev=%p, txev=%p}, be_lc=%u\n", + __func__, lcore, sidx, op, fes->rxev, fes->txev, + becfg.cpu[bidx].id); if (fes->rxev == NULL || fes->txev == NULL) { netfe_stream_close(fe, 0); rte_errno = ENOMEM; @@ -865,6 +1138,17 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm, rc = rte_errno; netfe_stream_close(fe, 0); rte_errno = rc; + + if (sprm->local_addr.ss_family == AF_INET) { + l4 = (struct sockaddr_in *) &sprm->local_addr; + errport = ntohs(l4->sin_port); + } else { + l6 = (struct sockaddr_in6 *) &sprm->local_addr; + errport = ntohs(l6->sin6_port); + } + RTE_LOG(ERR, USER1, "stream open failed for port %u with error " + "code=%u, bidx=%u, lc=%u\n", + errport, rc, bidx, becfg.cpu[bidx].id); return NULL; } @@ -873,7 +1157,6 @@ netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm, fe->sidx = sidx + 1; return fes; - } static inline int @@ -1034,7 +1317,6 @@ netfe_tx_process(uint32_t lcore, struct netfe_stream *fes) fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i]; } - static inline void netfe_fwd(uint32_t lcore, struct netfe_stream *fes) { @@ -1347,7 +1629,6 @@ netfe_lcore(void) } } - static void netfe_lcore_fini(void) { @@ -1359,7 +1640,6 @@ netfe_lcore_fini(void) return; while (fe->sidx != 0) { - i = fe->sidx - 1; netfe_stream_dump(fe->fs + i); netfe_stream_close(fe, 1); @@ -1379,26 +1659,26 @@ netbe_rx(struct netbe_lcore *lc, uint32_t pidx) struct rte_mbuf *rp[MAX_PKT_BURST]; int32_t rc[MAX_PKT_BURST]; - n = rte_eth_rx_burst(lc->prt[pidx].port.id, - lc->prt[pidx].rxqid, pkt, RTE_DIM(pkt)); + n = rte_eth_rx_burst(lc->prtq[pidx].port.id, + lc->prtq[pidx].rxqid, pkt, RTE_DIM(pkt)); if (n == 0) return; - lc->prt[pidx].rx_stat.in += n; + lc->prtq[pidx].rx_stat.in += n; NETBE_TRACE("%s(%u): rte_eth_rx_burst(%u, %u) returns %u\n", - __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].rxqid, + __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].rxqid, n); - k = tle_udp_rx_bulk(lc->prt[pidx].dev, pkt, rp, rc, n); + k = tle_udp_rx_bulk(lc->prtq[pidx].dev, pkt, rp, rc, n); - lc->prt[pidx].rx_stat.up += k; - lc->prt[pidx].rx_stat.drop += n - k; + lc->prtq[pidx].rx_stat.up += k; + lc->prtq[pidx].rx_stat.drop += n - k; NETBE_TRACE("%s(%u): tle_udp_rx_bulk(%p, %u) returns %u\n", - __func__, lc->id, lc->prt[pidx].dev, n, k); + __func__, lc->id, lc->prtq[pidx].dev, n, k); for (j = 0; j != n - k; j++) { NETBE_TRACE("%s:%d(port=%u) rp[%u]={%p, %d};\n", - __func__, __LINE__, lc->prt[pidx].port.id, + __func__, __LINE__, lc->prtq[pidx].port.id, j, rp[j], rc[j]); rte_pktmbuf_free(rp[j]); } @@ -1410,14 +1690,14 @@ netbe_tx(struct netbe_lcore *lc, uint32_t pidx) uint32_t j, k, n; struct rte_mbuf **mb; - n = lc->prt[pidx].tx_buf.num; - k = RTE_DIM(lc->prt[pidx].tx_buf.pkt) - n; - mb = lc->prt[pidx].tx_buf.pkt; + n = lc->prtq[pidx].tx_buf.num; + k = RTE_DIM(lc->prtq[pidx].tx_buf.pkt) - n; + mb = lc->prtq[pidx].tx_buf.pkt; - if (k >= RTE_DIM(lc->prt[pidx].tx_buf.pkt) / 2) { - j = tle_udp_tx_bulk(lc->prt[pidx].dev, mb + n, k); + if (k >= RTE_DIM(lc->prtq[pidx].tx_buf.pkt) / 2) { + j = tle_udp_tx_bulk(lc->prtq[pidx].dev, mb + n, k); n += j; - lc->prt[pidx].tx_stat.down += j; + lc->prtq[pidx].tx_stat.down += j; } if (n == 0) @@ -1425,21 +1705,21 @@ netbe_tx(struct netbe_lcore *lc, uint32_t pidx) NETBE_TRACE("%s(%u): tle_udp_tx_bulk(%p) returns %u,\n" "total pkts to send: %u\n", - __func__, lc->id, lc->prt[pidx].dev, j, n); + __func__, lc->id, lc->prtq[pidx].dev, j, n); for (j = 0; j != n; j++) NETBE_PKT_DUMP(mb[j]); - k = rte_eth_tx_burst(lc->prt[pidx].port.id, - lc->prt[pidx].txqid, mb, n); + k = rte_eth_tx_burst(lc->prtq[pidx].port.id, + lc->prtq[pidx].txqid, mb, n); - lc->prt[pidx].tx_stat.out += k; - lc->prt[pidx].tx_stat.drop += n - k; + lc->prtq[pidx].tx_stat.out += k; + lc->prtq[pidx].tx_stat.drop += n - k; NETBE_TRACE("%s(%u): rte_eth_tx_burst(%u, %u, %u) returns %u\n", - __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].txqid, + __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].txqid, n, k); - lc->prt[pidx].tx_buf.num = n - k; + lc->prtq[pidx].tx_buf.num = n - k; if (k != 0) for (j = k; j != n; j++) mb[j - k] = mb[j]; @@ -1458,17 +1738,19 @@ netbe_lcore_setup(struct netbe_lcore *lc) * ??????? * wait for FE lcores to start, so BE dont' drop any packets * because corresponding streams not opened yet by FE. - * usefull when used with pcap PMDS. + * useful when used with pcap PMDS. * think better way, or should this timeout be a cmdlien parameter. * ??????? */ rte_delay_ms(10); rc = 0; - for (i = 0; i != lc->prt_num && rc == 0; i++) { + for (i = 0; i != lc->prtq_num && rc == 0; i++) { RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, udp_dev: %p)\n", - __func__, i, lc->prt[i].port.id, lc->prt[i].dev); - rc = setup_rx_cb(&lc->prt[i].port, lc); + __func__, i, lc->prtq[i].port.id, lc->prtq[i].dev); + rc = setup_rx_cb(&lc->prtq[i].port, lc, lc->prtq[i].rxqid); + if (rc < 0) + return rc; } if (rc == 0) @@ -1486,7 +1768,7 @@ netbe_lcore(void) if (lc == NULL) return; - for (i = 0; i != lc->prt_num; i++) { + for (i = 0; i != lc->prtq_num; i++) { netbe_rx(lc, i); netbe_tx(lc, i); } @@ -1504,26 +1786,26 @@ netbe_lcore_clear(void) RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) finish\n", __func__, lc->id, lc->ctx); - for (i = 0; i != lc->prt_num; i++) { - RTE_LOG(NOTICE, USER1, "%s:%u(port=%u) " + for (i = 0; i != lc->prtq_num; i++) { + RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, lcore=%u, q=%u, dev=%p) " "rx_stats={" "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "}, " "tx_stats={" "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "};\n", - __func__, i, lc->prt[i].port.id, - lc->prt[i].rx_stat.in, - lc->prt[i].rx_stat.up, - lc->prt[i].rx_stat.drop, - lc->prt[i].tx_stat.down, - lc->prt[i].tx_stat.out, - lc->prt[i].tx_stat.drop); + __func__, i, lc->prtq[i].port.id, lc->id, + lc->prtq[i].rxqid, + lc->prtq[i].dev, + lc->prtq[i].rx_stat.in, + lc->prtq[i].rx_stat.up, + lc->prtq[i].rx_stat.drop, + lc->prtq[i].tx_stat.down, + lc->prtq[i].tx_stat.out, + lc->prtq[i].tx_stat.drop); } - - for (i = 0; i != lc->prt_num; i++) { - for (j = 0; j != lc->prt[i].tx_buf.num; j++) - rte_pktmbuf_free(lc->prt[i].tx_buf.pkt[j]); - } + for (i = 0; i != lc->prtq_num; i++) + for (j = 0; j != lc->prtq[i].tx_buf.num; j++) + rte_pktmbuf_free(lc->prtq[i].tx_buf.pkt[j]); RTE_PER_LCORE(_be) = NULL; } @@ -1548,9 +1830,8 @@ lcore_main(void *arg) rc = netfe_lcore_init(&prm->fe); /* lcore FE init. */ - if (rc == 0 && prm->be.lc != NULL) { + if (rc == 0 && prm->be.lc != NULL) rc = netbe_lcore_setup(prm->be.lc); - } if (rc != 0) sig_handle(SIGQUIT); @@ -1569,8 +1850,6 @@ lcore_main(void *arg) return rc; } - - static int netfe_lcore_cmp(const void *s1, const void *s2) { @@ -1582,38 +1861,82 @@ netfe_lcore_cmp(const void *s1, const void *s2) } /* + * Helper functions, verify the queue for corresponding UDP port. + */ +static uint8_t +varify_queue_for_port(const struct netbe_dev *prtq, const uint16_t lport) +{ + uint32_t align_nb_q, qid; + + align_nb_q = rte_align32pow2(prtq->port.nb_lcore); + qid = (lport % align_nb_q) % prtq->port.nb_lcore; + if (prtq->rxqid == qid) + return 1; + + return 0; +} + +/* * Helper functions, finds BE by given local and remote addresses. */ static int -netbe_find4(const struct in_addr *laddr, const struct in_addr *raddr) +netbe_find4(const struct in_addr *laddr, const uint16_t lport, + const struct in_addr *raddr, const uint32_t be_lc) { uint32_t i, j; - int32_t rc; uint32_t idx; struct netbe_lcore *bc; - if (laddr->s_addr == INADDR_ANY) { - - /* we have exactly one BE, use it for all traffic */ - if (becfg.cpu_num == 1) - return 0; + /* we have exactly one BE, use it for all traffic */ + if (becfg.cpu_num == 1) + return 0; - /* search by remote address. */ + /* search by provided be_lcore */ + if (be_lc != LCORE_ID_ANY) { for (i = 0; i != becfg.cpu_num; i++) { bc = becfg.cpu + i; - rc = rte_lpm_lookup(bc->lpm4, - rte_be_to_cpu_32(raddr->s_addr), &idx); - if (rc == 0) + if (be_lc == bc->id) return i; } - } else { + RTE_LOG(NOTICE, USER1, "%s: no stream with be_lcore=%u\n", + __func__, be_lc); + return -ENOENT; + } - /* search by local address */ + /* search by local address */ + if (laddr->s_addr != INADDR_ANY) { for (i = 0; i != becfg.cpu_num; i++) { bc = becfg.cpu + i; - for (j = 0; j != bc->prt_num; j++) - if (laddr->s_addr == bc->prt[j].port.ipv4) + /* search by queue for the local port */ + for (j = 0; j != bc->prtq_num; j++) { + if (laddr->s_addr == bc->prtq[j].port.ipv4) { + + if (lport == 0) + return i; + + if (varify_queue_for_port(bc->prtq + j, lport) != 0) + return i; + } + } + } + } + + /* search by remote address */ + if (raddr->s_addr != INADDR_ANY) { + for (i = 0; i != becfg.cpu_num; i++) { + bc = becfg.cpu + i; + if (rte_lpm_lookup(bc->lpm4, + rte_be_to_cpu_32(raddr->s_addr), + &idx) == 0) { + + if (lport == 0) return i; + + /* search by queue for the local port */ + for (j = 0; j != bc->prtq_num; j++) + if (varify_queue_for_port(bc->prtq + j, lport) != 0) + return i; + } } } @@ -1621,35 +1944,64 @@ netbe_find4(const struct in_addr *laddr, const struct in_addr *raddr) } static int -netbe_find6(const struct in6_addr *laddr, const struct in6_addr *raddr) +netbe_find6(const struct in6_addr *laddr, uint16_t lport, + const struct in6_addr *raddr, uint32_t be_lc) { uint32_t i, j; - int32_t rc; uint8_t idx; struct netbe_lcore *bc; - if (memcmp(laddr, &in6addr_any, sizeof(*laddr)) == 0) { + /* we have exactly one BE, use it for all traffic */ + if (becfg.cpu_num == 1) + return 0; - /* we have exactly one BE, use it for all traffic */ - if (becfg.cpu_num == 1) - return 0; - - /* search by remote address. */ + /* search by provided be_lcore */ + if (be_lc != LCORE_ID_ANY) { for (i = 0; i != becfg.cpu_num; i++) { bc = becfg.cpu + i; - rc = rte_lpm6_lookup(bc->lpm6, - (uint8_t *)(uintptr_t)raddr->s6_addr, &idx); - if (rc == 0) + if (be_lc == bc->id) return i; } - } else { - /* search by local address */ + RTE_LOG(NOTICE, USER1, "%s: no stream with be_lcore=%u\n", + __func__, be_lc); + return -ENOENT; + } + + /* search by local address */ + if (memcmp(laddr, &in6addr_any, sizeof(*laddr)) != 0) { for (i = 0; i != becfg.cpu_num; i++) { bc = becfg.cpu + i; - for (j = 0; j != bc->prt_num; j++) - if (memcmp(laddr, &bc->prt[j].port.ipv6, - sizeof(*laddr)) == 0) + /* search by queue for the local port */ + for (j = 0; j != bc->prtq_num; j++) { + if (memcmp(laddr, &bc->prtq[j].port.ipv6, + sizeof(*laddr)) == 0) { + + if (lport == 0) + return i; + + if (varify_queue_for_port(bc->prtq + j, lport) != 0) + return i; + } + } + } + } + + /* search by remote address */ + if (memcmp(raddr, &in6addr_any, sizeof(*raddr)) == 0) { + for (i = 0; i != becfg.cpu_num; i++) { + bc = becfg.cpu + i; + if (rte_lpm6_lookup(bc->lpm6, + (uint8_t *)(uintptr_t)raddr->s6_addr, + &idx) == 0) { + + if (lport == 0) return i; + + /* search by queue for the local port */ + for (j = 0; j != bc->prtq_num; j++) + if (varify_queue_for_port(bc->prtq + j, lport) != 0) + return i; + } } } @@ -1657,7 +2009,7 @@ netbe_find6(const struct in6_addr *laddr, const struct in6_addr *raddr) } static int -netbe_find(const struct tle_udp_stream_param *p) +netbe_find(const struct tle_udp_stream_param *p, uint32_t be_lc) { const struct sockaddr_in *l4, *r4; const struct sockaddr_in6 *l6, *r6; @@ -1665,21 +2017,23 @@ netbe_find(const struct tle_udp_stream_param *p) if (p->local_addr.ss_family == AF_INET) { l4 = (const struct sockaddr_in *)&p->local_addr; r4 = (const struct sockaddr_in *)&p->remote_addr; - return netbe_find4(&l4->sin_addr, &r4->sin_addr); + return netbe_find4(&l4->sin_addr, ntohs(l4->sin_port), + &r4->sin_addr, be_lc); } else if (p->local_addr.ss_family == AF_INET6) { l6 = (const struct sockaddr_in6 *)&p->local_addr; r6 = (const struct sockaddr_in6 *)&p->remote_addr; - return netbe_find6(&l6->sin6_addr, &r6->sin6_addr); + return netbe_find6(&l6->sin6_addr, ntohs(l6->sin6_port), + &r6->sin6_addr, be_lc); } return -EINVAL; } static int -netfe_sprm_flll_be(struct netfe_sprm *sp, uint32_t line) +netfe_sprm_flll_be(struct netfe_sprm *sp, uint32_t line, uint32_t be_lc) { int32_t bidx; - bidx = netbe_find(&sp->prm); + bidx = netbe_find(&sp->prm, be_lc); if (bidx < 0) { RTE_LOG(ERR, USER1, "%s(line=%u): no BE for that stream\n", __func__, line); @@ -1694,18 +2048,19 @@ static int netfe_lcore_fill(struct lcore_prm prm[RTE_MAX_LCORE], struct netfe_lcore_prm *lprm) { + uint32_t be_lc; uint32_t i, j, lc, ln; /* determine on what BE each stream should be open. */ for (i = 0; i != lprm->nb_streams; i++) { - lc = lprm->stream[i].lcore; ln = lprm->stream[i].line; - - if (netfe_sprm_flll_be(&lprm->stream[i].sprm, ln) != 0 || + be_lc = lprm->stream[i].be_lcore; + if (netfe_sprm_flll_be(&lprm->stream[i].sprm, ln, + be_lc) != 0 || (lprm->stream[i].op == FWD && - netfe_sprm_flll_be(&lprm->stream[i].fprm, - ln) != 0)) + netfe_sprm_flll_be(&lprm->stream[i].fprm, ln, + be_lc) != 0)) return -EINVAL; } @@ -1759,6 +2114,7 @@ main(int argc, char *argv[]) char fecfg_fname[PATH_MAX + 1]; char becfg_fname[PATH_MAX + 1]; struct lcore_prm prm[RTE_MAX_LCORE]; + struct rte_eth_dev_info dev_info; fecfg_fname[0] = 0; becfg_fname[0] = 0; @@ -1824,8 +2180,15 @@ main(int argc, char *argv[]) signal(SIGINT, sig_handle); - netbe_port_init(&becfg, argc - optind, argv + optind); - netbe_lcore_init(&becfg, &ctx_prm); + rc = netbe_port_init(&becfg, argc - optind, argv + optind); + if (rc != 0) + rte_exit(EXIT_FAILURE, + "%s: netbe_port_init failed with error code: %d\n", + __func__, rc); + + rc = netbe_lcore_init(&becfg, &ctx_prm); + if (rc != 0) + sig_handle(SIGQUIT); if ((rc = netbe_dest_init(becfg_fname, &becfg)) != 0) sig_handle(SIGQUIT); @@ -1841,15 +2204,18 @@ main(int argc, char *argv[]) __func__, becfg.prt[i].id, rc); sig_handle(SIGQUIT); } + rte_eth_dev_info_get(becfg.prt[i].id, &dev_info); + rc = update_rss_reta(&becfg.prt[i], &dev_info); + if (rc != 0) + sig_handle(SIGQUIT); } feprm.max_streams = ctx_prm.max_streams * becfg.cpu_num; if (rc == 0 && (rc = netfe_parse_cfg(fecfg_fname, &feprm)) != 0) sig_handle(SIGQUIT); - for (i = 0; rc == 0 && i != becfg.cpu_num; i++) { + for (i = 0; rc == 0 && i != becfg.cpu_num; i++) prm[becfg.cpu[i].id].be.lc = becfg.cpu + i; - } if (rc == 0 && (rc = netfe_lcore_fill(prm, &feprm)) != 0) sig_handle(SIGQUIT); diff --git a/examples/udpfwd/netbe.h b/examples/udpfwd/netbe.h index 0be785a..577627d 100644 --- a/examples/udpfwd/netbe.h +++ b/examples/udpfwd/netbe.h @@ -44,19 +44,25 @@ #define MAX_PKT_BURST 0x20 +/* Used to allocate the memory for hash key. */ +#define RSS_HASH_KEY_LENGTH 64 + /* * BE related structures. */ struct netbe_port { uint32_t id; - uint32_t lcore; + uint32_t nb_lcore; + uint32_t lcore[RTE_MAX_LCORE]; uint32_t mtu; uint32_t rx_offload; uint32_t tx_offload; uint32_t ipv4; struct in6_addr ipv6; struct ether_addr mac; + uint32_t hash_key_size; + uint8_t hash_key[RSS_HASH_KEY_LENGTH]; }; struct netbe_dest { @@ -109,10 +115,10 @@ struct netbe_lcore { struct rte_lpm6 *lpm6; struct rte_ip_frag_tbl *ftbl; struct tle_udp_ctx *ctx; - uint32_t prt_num; + uint32_t prtq_num; uint32_t dst4_num; uint32_t dst6_num; - struct netbe_dev prt[RTE_MAX_ETHPORTS]; + struct netbe_dev prtq[RTE_MAX_ETHPORTS * RTE_MAX_LCORE]; struct tle_udp_dest dst4[LCORE_MAX_DST]; struct tle_udp_dest dst6[LCORE_MAX_DST]; struct rte_ip_frag_death_row death_row; @@ -144,6 +150,7 @@ struct netfe_sprm { struct netfe_stream_prm { uint32_t lcore; + uint32_t be_lcore; uint32_t line; uint16_t op; uint16_t txlen; /* valid/used only for TXONLY op. */ @@ -253,6 +260,7 @@ struct lcore_prm { } \ } while (0) -int setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc); +int setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc, + uint16_t qid); #endif /* __NETBE_H__ */ diff --git a/examples/udpfwd/parse.c b/examples/udpfwd/parse.c index 78df01b..cc8c0b2 100644 --- a/examples/udpfwd/parse.c +++ b/examples/udpfwd/parse.c @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <sched.h> #include "netbe.h" #include "parse.h" @@ -61,7 +62,6 @@ parse_ip_val(__rte_unused const char *key, const char *val, void *prm) return 0; } - #define PARSE_UINT8x16(s, v, l) \ do { \ char *end; \ @@ -110,6 +110,41 @@ parse_feop_val(__rte_unused const char *key, const char *val, void *prm) } static int +parse_lcore_list_val(__rte_unused const char *key, const char *val, void *prm) +{ + union parse_val *rv; + unsigned long a, b; + uint32_t i; + char *end; + + rv = prm; + errno = 0; + a = strtoul(val, &end, 0); + if (errno != 0 || (end[0] != 0 && end[0] != '-') || a > UINT32_MAX) + return -EINVAL; + + if (end[0] == '-') { + val = end + 1; + errno = 0; + b = strtoul(val, &end, 0); + if (errno != 0 || end[0] != 0 || b > UINT32_MAX) + return -EINVAL; + } else + b = a; + + if (a <= b) { + for (i = a; i <= b; i++) + CPU_SET(i, &rv->cpuset); + } else { + RTE_LOG(ERR, USER1, + "%s: lcores not in ascending order\n", __func__); + return -EINVAL; + } + + return 0; +} + +static int parse_kvargs(const char *arg, const char *keys_man[], uint32_t nb_man, const char *keys_opt[], uint32_t nb_opt, const arg_handler_t hndl[], union parse_val val[]) @@ -139,7 +174,7 @@ parse_kvargs(const char *arg, const char *keys_man[], uint32_t nb_man, if (rte_kvargs_process(kvl, keys_man[j], hndl[j], val + j) != 0) { RTE_LOG(ERR, USER1, - "%s: %s invalid value for key: %s\n", + "%s: %s invalid value for man key: %s\n", __func__, arg, keys_man[j]); rte_kvargs_free(kvl); return -EINVAL; @@ -151,7 +186,7 @@ parse_kvargs(const char *arg, const char *keys_man[], uint32_t nb_man, if (rte_kvargs_process(kvl, keys_opt[j], hndl[k], val + k) != 0) { RTE_LOG(ERR, USER1, - "%s: %s invalid value for key: %s\n", + "%s: %s invalid value for opt key: %s\n", __func__, arg, keys_opt[j]); rte_kvargs_free(kvl); return -EINVAL; @@ -166,6 +201,7 @@ int parse_netbe_arg(struct netbe_port *prt, const char *arg) { int32_t rc; + uint32_t i, j; static const char *keys_man[] = { "port", @@ -182,7 +218,7 @@ parse_netbe_arg(struct netbe_port *prt, const char *arg) static const arg_handler_t hndl[] = { parse_uint_val, - parse_uint_val, + parse_lcore_list_val, parse_uint_val, parse_uint_val, parse_uint_val, @@ -201,7 +237,10 @@ parse_netbe_arg(struct netbe_port *prt, const char *arg) return rc; prt->id = val[0].u64; - prt->lcore = val[1].u64; + for (i = 0, j = 0; i < RTE_MAX_LCORE; i++) + if (CPU_ISSET(i, &val[1].cpuset)) + prt->lcore[j++] = i; + prt->nb_lcore = j; prt->mtu = val[2].u64; prt->rx_offload = val[3].u64; prt->tx_offload = val[4].u64; @@ -210,6 +249,7 @@ parse_netbe_arg(struct netbe_port *prt, const char *arg) return 0; } + static int check_netbe_dest(const struct netbe_dest *dst) { @@ -390,6 +430,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg) "fwlport", "fwraddr", "fwrport", + "belcore", }; static const arg_handler_t hndl[] = { @@ -404,16 +445,17 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg) parse_uint_val, parse_ip_val, parse_uint_val, + parse_uint_val, }; union parse_val val[RTE_DIM(hndl)]; memset(val, 0, sizeof(val)); + val[11].u64 = LCORE_ID_ANY; rc = parse_kvargs(arg, keys_man, RTE_DIM(keys_man), keys_opt, RTE_DIM(keys_opt), hndl, val); if (rc != 0) return rc; - sp->lcore = val[0].u64; sp->op = val[1].u64; pv2saddr(&sp->sprm.prm.local_addr, val + 2, val + 3); @@ -421,6 +463,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg) sp->txlen = val[6].u64; pv2saddr(&sp->fprm.prm.local_addr, val + 7, val + 8); pv2saddr(&sp->fprm.prm.remote_addr, val + 9, val + 10); + sp->be_lcore = val[11].u64; return 0; } diff --git a/examples/udpfwd/parse.h b/examples/udpfwd/parse.h index 911c874..e25e64e 100644 --- a/examples/udpfwd/parse.h +++ b/examples/udpfwd/parse.h @@ -16,6 +16,8 @@ #ifndef __PARSE_H__ #define __PARSE_H__ +#define PARSE_LIST_DELIM "-" + union parse_val { uint64_t u64; struct { @@ -26,6 +28,7 @@ union parse_val { }; } in; struct ether_addr mac; + rte_cpuset_t cpuset; }; static int diff --git a/examples/udpfwd/pkt.c b/examples/udpfwd/pkt.c index af87b08..6832b9a 100644 --- a/examples/udpfwd/pkt.c +++ b/examples/udpfwd/pkt.c @@ -23,7 +23,6 @@ _mbuf_tx_offload(uint64_t il2, uint64_t il3, uint64_t il4, uint64_t tso, return il2 | il3 << 7 | il4 << 16 | tso << 24 | ol3 << 40 | ol2 << 49; } - static inline void fill_pkt_hdr_len(struct rte_mbuf *m, uint32_t l2, uint32_t l3, uint32_t l4) { @@ -223,7 +222,7 @@ reassemble(struct rte_mbuf *m, struct netbe_lcore *lc, uint64_t tms, tbl = lc->ftbl; dr = &lc->death_row; - l3cs = lc->prt[port].port.rx_offload & DEV_RX_OFFLOAD_IPV4_CKSUM; + l3cs = lc->prtq[port].port.rx_offload & DEV_RX_OFFLOAD_IPV4_CKSUM; if (RTE_ETH_IS_IPV4_HDR(m->packet_type)) { @@ -508,7 +507,8 @@ typen_rx_callback(uint8_t port, __rte_unused uint16_t queue, } int -setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc) +setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc, + uint16_t qid) { int32_t i, rc; uint32_t smask; @@ -589,7 +589,7 @@ setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc) for (i = 0; i != RTE_DIM(ptype2cb); i++) { if ((smask & ptype2cb[i].mask) == ptype2cb[i].mask) { - cb = rte_eth_add_rx_callback(uprt->id, 0, + cb = rte_eth_add_rx_callback(uprt->id, qid, ptype2cb[i].fn, lc); rc = -rte_errno; RTE_LOG(ERR, USER1, diff --git a/lib/libtle_udp/tle_udp_impl.h b/lib/libtle_udp/tle_udp_impl.h index 8e61ea6..b46e4dd 100644 --- a/lib/libtle_udp/tle_udp_impl.h +++ b/lib/libtle_udp/tle_udp_impl.h @@ -63,6 +63,8 @@ struct tle_udp_dev_param { uint32_t tx_offload; /**< DEV_TX_OFFLOAD_* supported. */ struct in_addr local_addr4; /**< local IPv4 address assigned. */ struct in6_addr local_addr6; /**< local IPv6 address assigned. */ + uint32_t nb_bl_ports; /**< number of blocked ports. */ + uint16_t *bl_ports; /**< list of blocked ports. */ }; #define TLE_UDP_MAX_HDR 0x60 diff --git a/lib/libtle_udp/udp_ctl.c b/lib/libtle_udp/udp_ctl.c index 55c4afd..3ff9751 100644 --- a/lib/libtle_udp/udp_ctl.c +++ b/lib/libtle_udp/udp_ctl.c @@ -47,6 +47,11 @@ check_dev_prm(const struct tle_udp_dev_param *dev_prm) sizeof(tle_udp6_any)) == 0) return -EINVAL; + /* all the ports are blocked. */ + if (dev_prm->nb_bl_ports > UINT16_MAX || + (dev_prm->nb_bl_ports != 0 && dev_prm->bl_ports == NULL)) + return -EINVAL; + return 0; } @@ -241,9 +246,11 @@ tle_udp_ctx_invalidate(struct tle_udp_ctx *ctx) } static int -init_dev_proto(struct tle_udp_dev *dev, uint32_t idx, int32_t socket_id) +init_dev_proto(struct tle_udp_dev *dev, uint32_t idx, int32_t socket_id, + uint16_t *bl_ports, uint32_t nb_bl_ports) { size_t sz; + uint32_t i; sz = sizeof(*dev->dp[idx]); dev->dp[idx] = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, @@ -257,6 +264,11 @@ init_dev_proto(struct tle_udp_dev *dev, uint32_t idx, int32_t socket_id) } udp_pbm_init(&dev->dp[idx]->use, LPORT_START_BLK); + + if (bl_ports != NULL) + for (i = 0; i < nb_bl_ports; i++) + udp_pbm_set(&dev->dp[idx]->use, bl_ports[i]); + return 0; } @@ -281,6 +293,7 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx, const struct tle_udp_dev_param *dev_prm) { int32_t rc; + uint32_t i; struct tle_udp_dev *dev; if (ctx == NULL || dev_prm == NULL || check_dev_prm(dev_prm) != 0) { @@ -294,13 +307,23 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx, rc = 0; /* device can handle IPv4 traffic */ - if (dev_prm->local_addr4.s_addr != INADDR_ANY) - rc = init_dev_proto(dev, TLE_UDP_V4, ctx->prm.socket_id); + if (dev_prm->local_addr4.s_addr != INADDR_ANY) { + rc = init_dev_proto(dev, TLE_UDP_V4, ctx->prm.socket_id, + dev_prm->bl_ports, dev_prm->nb_bl_ports); + for (i = 0; i < dev_prm->nb_bl_ports; i++) + udp_pbm_set(&ctx->use[TLE_UDP_V4], + dev_prm->bl_ports[i]); + } /* device can handle IPv6 traffic */ if (rc == 0 && memcmp(&dev_prm->local_addr6, &tle_udp6_any, - sizeof(tle_udp6_any)) != 0) - rc = init_dev_proto(dev, TLE_UDP_V6, ctx->prm.socket_id); + sizeof(tle_udp6_any)) != 0) { + rc = init_dev_proto(dev, TLE_UDP_V6, ctx->prm.socket_id, + dev_prm->bl_ports, dev_prm->nb_bl_ports); + for (i = 0; i < dev_prm->nb_bl_ports; i++) + udp_pbm_set(&ctx->use[TLE_UDP_V6], + dev_prm->bl_ports[i]); + } if (rc != 0) { /* cleanup and return an error. */ @@ -483,13 +506,11 @@ stream_fill_dev(struct tle_udp_ctx *ctx, struct tle_udp_stream *s) return ENFILE; /* fill socket's dst port and type */ - sp = htons(p); s->type = t; s->port.dst = sp; /* mark port as in-use */ - udp_pbm_set(&ctx->use[t], p); if (dev != NULL) { udp_pbm_set(pbm, p); |