aboutsummaryrefslogtreecommitdiffstats
path: root/examples/eventdev_pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'examples/eventdev_pipeline')
-rw-r--r--examples/eventdev_pipeline/main.c88
-rw-r--r--examples/eventdev_pipeline/pipeline_common.h31
-rw-r--r--examples/eventdev_pipeline/pipeline_worker_generic.c268
-rw-r--r--examples/eventdev_pipeline/pipeline_worker_tx.c156
4 files changed, 207 insertions, 336 deletions
diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c
index 700bc696..92e08bc0 100644
--- a/examples/eventdev_pipeline/main.c
+++ b/examples/eventdev_pipeline/main.c
@@ -26,20 +26,6 @@ core_in_use(unsigned int lcore_id) {
fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
}
-static void
-eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
- void *userdata)
-{
- int port_id = (uintptr_t) userdata;
- unsigned int _sent = 0;
-
- do {
- /* Note: hard-coded TX queue */
- _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
- unsent - _sent);
- } while (_sent != unsent);
-}
-
/*
* Parse the coremask given as argument (hexadecimal string) and fill
* the global configuration (core role and core count) with the parsed
@@ -263,6 +249,7 @@ parse_app_args(int argc, char **argv)
static inline int
port_init(uint8_t port, struct rte_mempool *mbuf_pool)
{
+ struct rte_eth_rxconf rx_conf;
static const struct rte_eth_conf port_conf_default = {
.rxmode = {
.mq_mode = ETH_MQ_RX_RSS,
@@ -291,6 +278,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
port_conf.txmode.offloads |=
DEV_TX_OFFLOAD_MBUF_FAST_FREE;
+ rx_conf = dev_info.default_rxconf;
+ rx_conf.offloads = port_conf.rxmode.offloads;
port_conf.rx_adv_conf.rss_conf.rss_hf &=
dev_info.flow_type_rss_offloads;
@@ -311,7 +300,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
/* Allocate and set up 1 RX queue per Ethernet port. */
for (q = 0; q < rx_rings; q++) {
retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
- rte_eth_dev_socket_id(port), NULL, mbuf_pool);
+ rte_eth_dev_socket_id(port), &rx_conf,
+ mbuf_pool);
if (retval < 0)
return retval;
}
@@ -350,7 +340,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
static int
init_ports(uint16_t num_ports)
{
- uint16_t portid, i;
+ uint16_t portid;
if (!cdata.num_mbuf)
cdata.num_mbuf = 16384 * num_ports;
@@ -367,36 +357,26 @@ init_ports(uint16_t num_ports)
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
portid);
- RTE_ETH_FOREACH_DEV(i) {
- void *userdata = (void *)(uintptr_t) i;
- fdata->tx_buf[i] =
- rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
- if (fdata->tx_buf[i] == NULL)
- rte_panic("Out of memory\n");
- rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
- rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
- eth_tx_buffer_retry,
- userdata);
- }
-
return 0;
}
static void
do_capability_setup(uint8_t eventdev_id)
{
+ int ret;
uint16_t i;
- uint8_t mt_unsafe = 0;
+ uint8_t generic_pipeline = 0;
uint8_t burst = 0;
RTE_ETH_FOREACH_DEV(i) {
- struct rte_eth_dev_info dev_info;
- memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
-
- rte_eth_dev_info_get(i, &dev_info);
- /* Check if it is safe ask worker to tx. */
- mt_unsafe |= !(dev_info.tx_offload_capa &
- DEV_TX_OFFLOAD_MT_LOCKFREE);
+ uint32_t caps = 0;
+
+ ret = rte_event_eth_tx_adapter_caps_get(eventdev_id, i, &caps);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "Invalid capability for Tx adptr port %d\n", i);
+ generic_pipeline |= !(caps &
+ RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
}
struct rte_event_dev_info eventdev_info;
@@ -406,21 +386,42 @@ do_capability_setup(uint8_t eventdev_id)
burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
0;
- if (mt_unsafe)
+ if (generic_pipeline)
set_worker_generic_setup_data(&fdata->cap, burst);
else
- set_worker_tx_setup_data(&fdata->cap, burst);
+ set_worker_tx_enq_setup_data(&fdata->cap, burst);
}
static void
signal_handler(int signum)
{
+ static uint8_t once;
+ uint16_t portid;
+
if (fdata->done)
rte_exit(1, "Exiting on signal %d\n", signum);
- if (signum == SIGINT || signum == SIGTERM) {
+ if ((signum == SIGINT || signum == SIGTERM) && !once) {
printf("\n\nSignal %d received, preparing to exit...\n",
signum);
+ if (cdata.dump_dev)
+ rte_event_dev_dump(0, stdout);
+ once = 1;
fdata->done = 1;
+ rte_smp_wmb();
+
+ RTE_ETH_FOREACH_DEV(portid) {
+ rte_event_eth_rx_adapter_stop(portid);
+ rte_event_eth_tx_adapter_stop(portid);
+ rte_eth_dev_stop(portid);
+ }
+
+ rte_eal_mp_wait_lcore();
+
+ RTE_ETH_FOREACH_DEV(portid) {
+ rte_eth_dev_close(portid);
+ }
+
+ rte_event_dev_close(0);
}
if (signum == SIGTSTP)
rte_event_dev_dump(0, stdout);
@@ -499,7 +500,7 @@ main(int argc, char **argv)
if (worker_data == NULL)
rte_panic("rte_calloc failed\n");
- int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data);
+ int dev_id = fdata->cap.evdev_setup(worker_data);
if (dev_id < 0)
rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
@@ -524,8 +525,8 @@ main(int argc, char **argv)
if (fdata->tx_core[lcore_id])
printf(
- "[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
- __func__, lcore_id, cons_data.port_id);
+ "[%s()] lcore %d executing NIC Tx\n",
+ __func__, lcore_id);
if (fdata->sched_core[lcore_id])
printf("[%s()] lcore %d executing scheduler\n",
@@ -555,9 +556,6 @@ main(int argc, char **argv)
rte_eal_mp_wait_lcore();
- if (cdata.dump_dev)
- rte_event_dev_dump(dev_id, stdout);
-
if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
(uint64_t)-ENOTSUP)) {
printf("\nPort Workload distribution:\n");
diff --git a/examples/eventdev_pipeline/pipeline_common.h b/examples/eventdev_pipeline/pipeline_common.h
index 9703396f..a6cc912f 100644
--- a/examples/eventdev_pipeline/pipeline_common.h
+++ b/examples/eventdev_pipeline/pipeline_common.h
@@ -16,6 +16,7 @@
#include <rte_ethdev.h>
#include <rte_eventdev.h>
#include <rte_event_eth_rx_adapter.h>
+#include <rte_event_eth_tx_adapter.h>
#include <rte_service.h>
#include <rte_service_component.h>
@@ -23,38 +24,30 @@
#define BATCH_SIZE 16
#define MAX_NUM_CORE 64
-struct cons_data {
- uint8_t dev_id;
- uint8_t port_id;
- uint8_t release;
-} __rte_cache_aligned;
-
struct worker_data {
uint8_t dev_id;
uint8_t port_id;
} __rte_cache_aligned;
typedef int (*worker_loop)(void *);
-typedef int (*consumer_loop)(void);
typedef void (*schedule_loop)(unsigned int);
-typedef int (*eventdev_setup)(struct cons_data *, struct worker_data *);
-typedef void (*rx_adapter_setup)(uint16_t nb_ports);
+typedef int (*eventdev_setup)(struct worker_data *);
+typedef void (*adapter_setup)(uint16_t nb_ports);
typedef void (*opt_check)(void);
struct setup_data {
worker_loop worker;
- consumer_loop consumer;
schedule_loop scheduler;
eventdev_setup evdev_setup;
- rx_adapter_setup adptr_setup;
+ adapter_setup adptr_setup;
opt_check check_opt;
};
struct fastpath_data {
volatile int done;
- uint32_t tx_lock;
uint32_t evdev_service_id;
uint32_t rxadptr_service_id;
+ uint32_t txadptr_service_id;
bool rx_single;
bool tx_single;
bool sched_single;
@@ -62,7 +55,6 @@ struct fastpath_data {
unsigned int tx_core[MAX_NUM_CORE];
unsigned int sched_core[MAX_NUM_CORE];
unsigned int worker_core[MAX_NUM_CORE];
- struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
struct setup_data cap;
} __rte_cache_aligned;
@@ -88,6 +80,8 @@ struct config_data {
int16_t next_qid[MAX_NUM_STAGES+2];
int16_t qid[MAX_NUM_STAGES];
uint8_t rx_adapter_id;
+ uint8_t tx_adapter_id;
+ uint8_t tx_queue_id;
uint64_t worker_lcore_mask;
uint64_t rx_lcore_mask;
uint64_t tx_lcore_mask;
@@ -99,8 +93,6 @@ struct port_link {
uint8_t priority;
};
-struct cons_data cons_data;
-
struct fastpath_data *fdata;
struct config_data cdata;
@@ -142,12 +134,11 @@ schedule_devices(unsigned int lcore_id)
}
}
- if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
- rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
- fdata->cap.consumer();
- rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
+ if (fdata->tx_core[lcore_id]) {
+ rte_service_run_iter_on_app_lcore(fdata->txadptr_service_id,
+ !fdata->tx_single);
}
}
void set_worker_generic_setup_data(struct setup_data *caps, bool burst);
-void set_worker_tx_setup_data(struct setup_data *caps, bool burst);
+void set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst);
diff --git a/examples/eventdev_pipeline/pipeline_worker_generic.c b/examples/eventdev_pipeline/pipeline_worker_generic.c
index 2215e9eb..16906494 100644
--- a/examples/eventdev_pipeline/pipeline_worker_generic.c
+++ b/examples/eventdev_pipeline/pipeline_worker_generic.c
@@ -119,153 +119,13 @@ worker_generic_burst(void *arg)
return 0;
}
-static __rte_always_inline int
-consumer(void)
-{
- const uint64_t freq_khz = rte_get_timer_hz() / 1000;
- struct rte_event packet;
-
- static uint64_t received;
- static uint64_t last_pkts;
- static uint64_t last_time;
- static uint64_t start_time;
- int i;
- uint8_t dev_id = cons_data.dev_id;
- uint8_t port_id = cons_data.port_id;
-
- do {
- uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
- &packet, 1, 0);
-
- if (n == 0) {
- RTE_ETH_FOREACH_DEV(i)
- rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
- return 0;
- }
- if (start_time == 0)
- last_time = start_time = rte_get_timer_cycles();
-
- received++;
- uint8_t outport = packet.mbuf->port;
-
- exchange_mac(packet.mbuf);
- rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
- packet.mbuf);
-
- if (cons_data.release)
- rte_event_enqueue_burst(dev_id, port_id,
- &packet, n);
-
- /* Print out mpps every 1<22 packets */
- if (!cdata.quiet && received >= last_pkts + (1<<22)) {
- const uint64_t now = rte_get_timer_cycles();
- const uint64_t total_ms = (now - start_time) / freq_khz;
- const uint64_t delta_ms = (now - last_time) / freq_khz;
- uint64_t delta_pkts = received - last_pkts;
-
- printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
- "avg %.3f mpps [current %.3f mpps]\n",
- __func__,
- received,
- total_ms,
- received / (total_ms * 1000.0),
- delta_pkts / (delta_ms * 1000.0));
- last_pkts = received;
- last_time = now;
- }
-
- cdata.num_packets--;
- if (cdata.num_packets <= 0)
- fdata->done = 1;
- /* Be stuck in this loop if single. */
- } while (!fdata->done && fdata->tx_single);
-
- return 0;
-}
-
-static __rte_always_inline int
-consumer_burst(void)
-{
- const uint64_t freq_khz = rte_get_timer_hz() / 1000;
- struct rte_event packets[BATCH_SIZE];
-
- static uint64_t received;
- static uint64_t last_pkts;
- static uint64_t last_time;
- static uint64_t start_time;
- unsigned int i, j;
- uint8_t dev_id = cons_data.dev_id;
- uint8_t port_id = cons_data.port_id;
-
- do {
- uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
- packets, RTE_DIM(packets), 0);
-
- if (n == 0) {
- RTE_ETH_FOREACH_DEV(j)
- rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
- return 0;
- }
- if (start_time == 0)
- last_time = start_time = rte_get_timer_cycles();
-
- received += n;
- for (i = 0; i < n; i++) {
- uint8_t outport = packets[i].mbuf->port;
-
- exchange_mac(packets[i].mbuf);
- rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
- packets[i].mbuf);
-
- packets[i].op = RTE_EVENT_OP_RELEASE;
- }
-
- if (cons_data.release) {
- uint16_t nb_tx;
-
- nb_tx = rte_event_enqueue_burst(dev_id, port_id,
- packets, n);
- while (nb_tx < n)
- nb_tx += rte_event_enqueue_burst(dev_id,
- port_id, packets + nb_tx,
- n - nb_tx);
- }
-
- /* Print out mpps every 1<22 packets */
- if (!cdata.quiet && received >= last_pkts + (1<<22)) {
- const uint64_t now = rte_get_timer_cycles();
- const uint64_t total_ms = (now - start_time) / freq_khz;
- const uint64_t delta_ms = (now - last_time) / freq_khz;
- uint64_t delta_pkts = received - last_pkts;
-
- printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
- "avg %.3f mpps [current %.3f mpps]\n",
- received,
- total_ms,
- received / (total_ms * 1000.0),
- delta_pkts / (delta_ms * 1000.0));
- last_pkts = received;
- last_time = now;
- }
-
- cdata.num_packets -= n;
- if (cdata.num_packets <= 0)
- fdata->done = 1;
- /* Be stuck in this loop if single. */
- } while (!fdata->done && fdata->tx_single);
-
- return 0;
-}
-
static int
-setup_eventdev_generic(struct cons_data *cons_data,
- struct worker_data *worker_data)
+setup_eventdev_generic(struct worker_data *worker_data)
{
const uint8_t dev_id = 0;
/* +1 stages is for a SINGLE_LINK TX stage */
const uint8_t nb_queues = cdata.num_stages + 1;
- /* + 1 is one port for consumer */
- const uint8_t nb_ports = cdata.num_workers + 1;
+ const uint8_t nb_ports = cdata.num_workers;
struct rte_event_dev_config config = {
.nb_event_queues = nb_queues,
.nb_event_ports = nb_ports,
@@ -285,11 +145,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
.nb_atomic_flows = 1024,
.nb_atomic_order_sequences = 1024,
};
- struct rte_event_port_conf tx_p_conf = {
- .dequeue_depth = 128,
- .enqueue_depth = 128,
- .new_event_threshold = 4096,
- };
struct rte_event_queue_conf tx_q_conf = {
.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
@@ -297,7 +152,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
struct port_link worker_queues[MAX_NUM_STAGES];
uint8_t disable_implicit_release;
- struct port_link tx_queue;
unsigned int i;
int ret, ndev = rte_event_dev_count();
@@ -314,7 +168,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
wkr_p_conf.disable_implicit_release = disable_implicit_release;
- tx_p_conf.disable_implicit_release = disable_implicit_release;
if (dev_info.max_event_port_dequeue_depth <
config.nb_event_port_dequeue_depth)
@@ -372,8 +225,7 @@ setup_eventdev_generic(struct cons_data *cons_data,
printf("%d: error creating qid %d\n", __LINE__, i);
return -1;
}
- tx_queue.queue_id = i;
- tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
+ cdata.tx_queue_id = i;
if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
@@ -403,26 +255,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
w->port_id = i;
}
- if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
- tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
- if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
- tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
-
- /* port for consumer, linked to TX queue */
- if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
- printf("Error setting up port %d\n", i);
- return -1;
- }
- if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
- &tx_queue.priority, 1) != 1) {
- printf("%d: error creating link for port %d\n",
- __LINE__, i);
- return -1;
- }
- *cons_data = (struct cons_data){.dev_id = dev_id,
- .port_id = i,
- .release = disable_implicit_release };
-
ret = rte_event_dev_service_id_get(dev_id,
&fdata->evdev_service_id);
if (ret != -ESRCH && ret != 0) {
@@ -431,76 +263,107 @@ setup_eventdev_generic(struct cons_data *cons_data,
}
rte_service_runstate_set(fdata->evdev_service_id, 1);
rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
- if (rte_event_dev_start(dev_id) < 0) {
- printf("Error starting eventdev\n");
- return -1;
- }
return dev_id;
}
static void
-init_rx_adapter(uint16_t nb_ports)
+init_adapters(uint16_t nb_ports)
{
int i;
int ret;
+ uint8_t tx_port_id = 0;
uint8_t evdev_id = 0;
struct rte_event_dev_info dev_info;
ret = rte_event_dev_info_get(evdev_id, &dev_info);
- struct rte_event_port_conf rx_p_conf = {
- .dequeue_depth = 8,
- .enqueue_depth = 8,
- .new_event_threshold = 1200,
+ struct rte_event_port_conf adptr_p_conf = {
+ .dequeue_depth = cdata.worker_cq_depth,
+ .enqueue_depth = 64,
+ .new_event_threshold = 4096,
};
- if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
- rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
- if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
- rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+ if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+ adptr_p_conf.dequeue_depth =
+ dev_info.max_event_port_dequeue_depth;
+ if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+ adptr_p_conf.enqueue_depth =
+ dev_info.max_event_port_enqueue_depth;
/* Create one adapter for all the ethernet ports. */
ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
- &rx_p_conf);
+ &adptr_p_conf);
if (ret)
rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
cdata.rx_adapter_id);
+ ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
+ &adptr_p_conf);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
+ cdata.tx_adapter_id);
+
struct rte_event_eth_rx_adapter_queue_conf queue_conf;
memset(&queue_conf, 0, sizeof(queue_conf));
queue_conf.ev.sched_type = cdata.queue_type;
queue_conf.ev.queue_id = cdata.qid[0];
for (i = 0; i < nb_ports; i++) {
- uint32_t cap;
-
- ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
- if (ret)
- rte_exit(EXIT_FAILURE,
- "failed to get event rx adapter "
- "capabilities");
-
ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
-1, &queue_conf);
if (ret)
rte_exit(EXIT_FAILURE,
"Failed to add queues to Rx adapter");
+
+ ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
+ -1);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "Failed to add queues to Tx adapter");
}
+ ret = rte_event_eth_tx_adapter_event_port_get(cdata.tx_adapter_id,
+ &tx_port_id);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "Failed to get Tx adapter port id");
+ ret = rte_event_port_link(evdev_id, tx_port_id, &cdata.tx_queue_id,
+ NULL, 1);
+ if (ret != 1)
+ rte_exit(EXIT_FAILURE,
+ "Unable to link Tx adapter port to Tx queue");
+
ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
&fdata->rxadptr_service_id);
if (ret != -ESRCH && ret != 0) {
rte_exit(EXIT_FAILURE,
- "Error getting the service ID for sw eventdev\n");
+ "Error getting the service ID for Rx adapter\n");
}
rte_service_runstate_set(fdata->rxadptr_service_id, 1);
rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
+ ret = rte_event_eth_tx_adapter_service_id_get(cdata.tx_adapter_id,
+ &fdata->txadptr_service_id);
+ if (ret != -ESRCH && ret != 0) {
+ rte_exit(EXIT_FAILURE,
+ "Error getting the service ID for Tx adapter\n");
+ }
+ rte_service_runstate_set(fdata->txadptr_service_id, 1);
+ rte_service_set_runstate_mapped_check(fdata->txadptr_service_id, 0);
+
ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
if (ret)
rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
cdata.rx_adapter_id);
+
+ ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
+ cdata.tx_adapter_id);
+
+ if (rte_event_dev_start(evdev_id) < 0)
+ rte_exit(EXIT_FAILURE, "Error starting eventdev");
}
static void
@@ -510,6 +373,7 @@ generic_opt_check(void)
int ret;
uint32_t cap = 0;
uint8_t rx_needed = 0;
+ uint8_t sched_needed = 0;
struct rte_event_dev_info eventdev_info;
memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@@ -519,6 +383,8 @@ generic_opt_check(void)
RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
rte_exit(EXIT_FAILURE,
"Event dev doesn't support all type queues\n");
+ sched_needed = !(eventdev_info.event_dev_cap &
+ RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);
RTE_ETH_FOREACH_DEV(i) {
ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
@@ -531,9 +397,8 @@ generic_opt_check(void)
if (cdata.worker_lcore_mask == 0 ||
(rx_needed && cdata.rx_lcore_mask == 0) ||
- cdata.tx_lcore_mask == 0 || (cdata.sched_lcore_mask == 0
- && !(eventdev_info.event_dev_cap &
- RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+ (cdata.tx_lcore_mask == 0) ||
+ (sched_needed && cdata.sched_lcore_mask == 0)) {
printf("Core part of pipeline was not assigned any cores. "
"This will stall the pipeline, please check core masks "
"(use -h for details on setting core masks):\n"
@@ -545,23 +410,24 @@ generic_opt_check(void)
rte_exit(-1, "Fix core masks\n");
}
- if (eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
+ if (!sched_needed)
memset(fdata->sched_core, 0,
sizeof(unsigned int) * MAX_NUM_CORE);
+ if (!rx_needed)
+ memset(fdata->rx_core, 0,
+ sizeof(unsigned int) * MAX_NUM_CORE);
}
void
set_worker_generic_setup_data(struct setup_data *caps, bool burst)
{
if (burst) {
- caps->consumer = consumer_burst;
caps->worker = worker_generic_burst;
} else {
- caps->consumer = consumer;
caps->worker = worker_generic;
}
- caps->adptr_setup = init_rx_adapter;
+ caps->adptr_setup = init_adapters;
caps->scheduler = schedule_devices;
caps->evdev_setup = setup_eventdev_generic;
caps->check_opt = generic_opt_check;
diff --git a/examples/eventdev_pipeline/pipeline_worker_tx.c b/examples/eventdev_pipeline/pipeline_worker_tx.c
index 3dbde92d..85eb075f 100644
--- a/examples/eventdev_pipeline/pipeline_worker_tx.c
+++ b/examples/eventdev_pipeline/pipeline_worker_tx.c
@@ -36,10 +36,11 @@ worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
}
static __rte_always_inline void
-worker_tx_pkt(struct rte_mbuf *mbuf)
+worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev)
{
- exchange_mac(mbuf);
- while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
+ exchange_mac(ev->mbuf);
+ rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
+ while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1))
rte_pause();
}
@@ -64,15 +65,15 @@ worker_do_tx_single(void *arg)
received++;
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev.mbuf);
+ worker_tx_pkt(dev, port, &ev);
tx++;
- continue;
+ } else {
+ work();
+ ev.queue_id++;
+ worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+ worker_event_enqueue(dev, port, &ev);
+ fwd++;
}
- work();
- ev.queue_id++;
- worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
- worker_event_enqueue(dev, port, &ev);
- fwd++;
}
if (!cdata.quiet)
@@ -100,14 +101,14 @@ worker_do_tx_single_atq(void *arg)
received++;
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev.mbuf);
+ worker_tx_pkt(dev, port, &ev);
tx++;
- continue;
+ } else {
+ work();
+ worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+ worker_event_enqueue(dev, port, &ev);
+ fwd++;
}
- work();
- worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
- worker_event_enqueue(dev, port, &ev);
- fwd++;
}
if (!cdata.quiet)
@@ -141,7 +142,7 @@ worker_do_tx_single_burst(void *arg)
rte_prefetch0(ev[i + 1].mbuf);
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev[i].mbuf);
+ worker_tx_pkt(dev, port, &ev[i]);
ev[i].op = RTE_EVENT_OP_RELEASE;
tx++;
@@ -188,7 +189,7 @@ worker_do_tx_single_burst_atq(void *arg)
rte_prefetch0(ev[i + 1].mbuf);
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev[i].mbuf);
+ worker_tx_pkt(dev, port, &ev[i]);
ev[i].op = RTE_EVENT_OP_RELEASE;
tx++;
} else
@@ -232,7 +233,7 @@ worker_do_tx(void *arg)
if (cq_id >= lst_qid) {
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev.mbuf);
+ worker_tx_pkt(dev, port, &ev);
tx++;
continue;
}
@@ -280,7 +281,7 @@ worker_do_tx_atq(void *arg)
if (cq_id == lst_qid) {
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev.mbuf);
+ worker_tx_pkt(dev, port, &ev);
tx++;
continue;
}
@@ -330,7 +331,7 @@ worker_do_tx_burst(void *arg)
if (cq_id >= lst_qid) {
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev[i].mbuf);
+ worker_tx_pkt(dev, port, &ev[i]);
tx++;
ev[i].op = RTE_EVENT_OP_RELEASE;
continue;
@@ -387,7 +388,7 @@ worker_do_tx_burst_atq(void *arg)
if (cq_id == lst_qid) {
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev[i].mbuf);
+ worker_tx_pkt(dev, port, &ev[i]);
tx++;
ev[i].op = RTE_EVENT_OP_RELEASE;
continue;
@@ -413,10 +414,8 @@ worker_do_tx_burst_atq(void *arg)
}
static int
-setup_eventdev_worker_tx(struct cons_data *cons_data,
- struct worker_data *worker_data)
+setup_eventdev_worker_tx_enq(struct worker_data *worker_data)
{
- RTE_SET_USED(cons_data);
uint8_t i;
const uint8_t atq = cdata.all_type_queues ? 1 : 0;
const uint8_t dev_id = 0;
@@ -575,10 +574,9 @@ setup_eventdev_worker_tx(struct cons_data *cons_data,
}
rte_service_runstate_set(fdata->evdev_service_id, 1);
rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
- if (rte_event_dev_start(dev_id) < 0) {
- printf("Error starting eventdev\n");
- return -1;
- }
+
+ if (rte_event_dev_start(dev_id) < 0)
+ rte_exit(EXIT_FAILURE, "Error starting eventdev");
return dev_id;
}
@@ -602,7 +600,7 @@ service_rx_adapter(void *arg)
}
static void
-init_rx_adapter(uint16_t nb_ports)
+init_adapters(uint16_t nb_ports)
{
int i;
int ret;
@@ -613,17 +611,18 @@ init_rx_adapter(uint16_t nb_ports)
ret = rte_event_dev_info_get(evdev_id, &dev_info);
adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0);
- struct rte_event_port_conf rx_p_conf = {
- .dequeue_depth = 8,
- .enqueue_depth = 8,
- .new_event_threshold = 1200,
+ struct rte_event_port_conf adptr_p_conf = {
+ .dequeue_depth = cdata.worker_cq_depth,
+ .enqueue_depth = 64,
+ .new_event_threshold = 4096,
};
- if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
- rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
- if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
- rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
-
+ if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+ adptr_p_conf.dequeue_depth =
+ dev_info.max_event_port_dequeue_depth;
+ if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+ adptr_p_conf.enqueue_depth =
+ dev_info.max_event_port_enqueue_depth;
struct rte_event_eth_rx_adapter_queue_conf queue_conf;
memset(&queue_conf, 0, sizeof(queue_conf));
@@ -633,11 +632,11 @@ init_rx_adapter(uint16_t nb_ports)
uint32_t cap;
uint32_t service_id;
- ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf);
+ ret = rte_event_eth_rx_adapter_create(i, evdev_id,
+ &adptr_p_conf);
if (ret)
rte_exit(EXIT_FAILURE,
- "failed to create rx adapter[%d]",
- cdata.rx_adapter_id);
+ "failed to create rx adapter[%d]", i);
ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
if (ret)
@@ -654,7 +653,6 @@ init_rx_adapter(uint16_t nb_ports)
rte_exit(EXIT_FAILURE,
"Failed to add queues to Rx adapter");
-
/* Producer needs to be scheduled. */
if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
ret = rte_event_eth_rx_adapter_service_id_get(i,
@@ -680,9 +678,29 @@ init_rx_adapter(uint16_t nb_ports)
ret = rte_event_eth_rx_adapter_start(i);
if (ret)
rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
- cdata.rx_adapter_id);
+ i);
}
+ /* We already know that Tx adapter has INTERNAL port cap*/
+ ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
+ &adptr_p_conf);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
+ cdata.tx_adapter_id);
+
+ for (i = 0; i < nb_ports; i++) {
+ ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
+ -1);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "Failed to add queues to Tx adapter");
+ }
+
+ ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
+ cdata.tx_adapter_id);
+
if (adptr_services->nb_rx_adptrs) {
struct rte_service_spec service;
@@ -695,8 +713,7 @@ init_rx_adapter(uint16_t nb_ports)
&fdata->rxadptr_service_id);
if (ret)
rte_exit(EXIT_FAILURE,
- "Rx adapter[%d] service register failed",
- cdata.rx_adapter_id);
+ "Rx adapter service register failed");
rte_service_runstate_set(fdata->rxadptr_service_id, 1);
rte_service_component_runstate_set(fdata->rxadptr_service_id,
@@ -708,23 +725,19 @@ init_rx_adapter(uint16_t nb_ports)
rte_free(adptr_services);
}
- if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL &&
- (dev_info.event_dev_cap &
+ if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap &
RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))
fdata->cap.scheduler = NULL;
-
- if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
- memset(fdata->sched_core, 0,
- sizeof(unsigned int) * MAX_NUM_CORE);
}
static void
-worker_tx_opt_check(void)
+worker_tx_enq_opt_check(void)
{
int i;
int ret;
uint32_t cap = 0;
uint8_t rx_needed = 0;
+ uint8_t sched_needed = 0;
struct rte_event_dev_info eventdev_info;
memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@@ -734,32 +747,38 @@ worker_tx_opt_check(void)
RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
rte_exit(EXIT_FAILURE,
"Event dev doesn't support all type queues\n");
+ sched_needed = !(eventdev_info.event_dev_cap &
+ RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);
RTE_ETH_FOREACH_DEV(i) {
ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
if (ret)
rte_exit(EXIT_FAILURE,
- "failed to get event rx adapter "
- "capabilities");
+ "failed to get event rx adapter capabilities");
rx_needed |=
!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
}
if (cdata.worker_lcore_mask == 0 ||
(rx_needed && cdata.rx_lcore_mask == 0) ||
- (cdata.sched_lcore_mask == 0 &&
- !(eventdev_info.event_dev_cap &
- RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+ (sched_needed && cdata.sched_lcore_mask == 0)) {
printf("Core part of pipeline was not assigned any cores. "
"This will stall the pipeline, please check core masks "
"(use -h for details on setting core masks):\n"
- "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
- "\n\tworkers: %"PRIu64"\n",
- cdata.rx_lcore_mask, cdata.tx_lcore_mask,
- cdata.sched_lcore_mask,
- cdata.worker_lcore_mask);
+ "\trx: %"PRIu64"\n\tsched: %"PRIu64
+ "\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask,
+ cdata.sched_lcore_mask, cdata.worker_lcore_mask);
rte_exit(-1, "Fix core masks\n");
}
+
+ if (!sched_needed)
+ memset(fdata->sched_core, 0,
+ sizeof(unsigned int) * MAX_NUM_CORE);
+ if (!rx_needed)
+ memset(fdata->rx_core, 0,
+ sizeof(unsigned int) * MAX_NUM_CORE);
+
+ memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
}
static worker_loop
@@ -821,18 +840,15 @@ get_worker_multi_stage(bool burst)
}
void
-set_worker_tx_setup_data(struct setup_data *caps, bool burst)
+set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst)
{
if (cdata.num_stages == 1)
caps->worker = get_worker_single_stage(burst);
else
caps->worker = get_worker_multi_stage(burst);
- memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
-
- caps->check_opt = worker_tx_opt_check;
- caps->consumer = NULL;
+ caps->check_opt = worker_tx_enq_opt_check;
caps->scheduler = schedule_devices;
- caps->evdev_setup = setup_eventdev_worker_tx;
- caps->adptr_setup = init_rx_adapter;
+ caps->evdev_setup = setup_eventdev_worker_tx_enq;
+ caps->adptr_setup = init_adapters;
}