aboutsummaryrefslogtreecommitdiffstats
path: root/examples/eventdev_pipeline/pipeline_worker_generic.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/eventdev_pipeline/pipeline_worker_generic.c')
-rw-r--r--examples/eventdev_pipeline/pipeline_worker_generic.c268
1 files changed, 67 insertions, 201 deletions
diff --git a/examples/eventdev_pipeline/pipeline_worker_generic.c b/examples/eventdev_pipeline/pipeline_worker_generic.c
index 2215e9eb..16906494 100644
--- a/examples/eventdev_pipeline/pipeline_worker_generic.c
+++ b/examples/eventdev_pipeline/pipeline_worker_generic.c
@@ -119,153 +119,13 @@ worker_generic_burst(void *arg)
return 0;
}
-static __rte_always_inline int
-consumer(void)
-{
- const uint64_t freq_khz = rte_get_timer_hz() / 1000;
- struct rte_event packet;
-
- static uint64_t received;
- static uint64_t last_pkts;
- static uint64_t last_time;
- static uint64_t start_time;
- int i;
- uint8_t dev_id = cons_data.dev_id;
- uint8_t port_id = cons_data.port_id;
-
- do {
- uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
- &packet, 1, 0);
-
- if (n == 0) {
- RTE_ETH_FOREACH_DEV(i)
- rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
- return 0;
- }
- if (start_time == 0)
- last_time = start_time = rte_get_timer_cycles();
-
- received++;
- uint8_t outport = packet.mbuf->port;
-
- exchange_mac(packet.mbuf);
- rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
- packet.mbuf);
-
- if (cons_data.release)
- rte_event_enqueue_burst(dev_id, port_id,
- &packet, n);
-
- /* Print out mpps every 1<22 packets */
- if (!cdata.quiet && received >= last_pkts + (1<<22)) {
- const uint64_t now = rte_get_timer_cycles();
- const uint64_t total_ms = (now - start_time) / freq_khz;
- const uint64_t delta_ms = (now - last_time) / freq_khz;
- uint64_t delta_pkts = received - last_pkts;
-
- printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
- "avg %.3f mpps [current %.3f mpps]\n",
- __func__,
- received,
- total_ms,
- received / (total_ms * 1000.0),
- delta_pkts / (delta_ms * 1000.0));
- last_pkts = received;
- last_time = now;
- }
-
- cdata.num_packets--;
- if (cdata.num_packets <= 0)
- fdata->done = 1;
- /* Be stuck in this loop if single. */
- } while (!fdata->done && fdata->tx_single);
-
- return 0;
-}
-
-static __rte_always_inline int
-consumer_burst(void)
-{
- const uint64_t freq_khz = rte_get_timer_hz() / 1000;
- struct rte_event packets[BATCH_SIZE];
-
- static uint64_t received;
- static uint64_t last_pkts;
- static uint64_t last_time;
- static uint64_t start_time;
- unsigned int i, j;
- uint8_t dev_id = cons_data.dev_id;
- uint8_t port_id = cons_data.port_id;
-
- do {
- uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
- packets, RTE_DIM(packets), 0);
-
- if (n == 0) {
- RTE_ETH_FOREACH_DEV(j)
- rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
- return 0;
- }
- if (start_time == 0)
- last_time = start_time = rte_get_timer_cycles();
-
- received += n;
- for (i = 0; i < n; i++) {
- uint8_t outport = packets[i].mbuf->port;
-
- exchange_mac(packets[i].mbuf);
- rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
- packets[i].mbuf);
-
- packets[i].op = RTE_EVENT_OP_RELEASE;
- }
-
- if (cons_data.release) {
- uint16_t nb_tx;
-
- nb_tx = rte_event_enqueue_burst(dev_id, port_id,
- packets, n);
- while (nb_tx < n)
- nb_tx += rte_event_enqueue_burst(dev_id,
- port_id, packets + nb_tx,
- n - nb_tx);
- }
-
- /* Print out mpps every 1<22 packets */
- if (!cdata.quiet && received >= last_pkts + (1<<22)) {
- const uint64_t now = rte_get_timer_cycles();
- const uint64_t total_ms = (now - start_time) / freq_khz;
- const uint64_t delta_ms = (now - last_time) / freq_khz;
- uint64_t delta_pkts = received - last_pkts;
-
- printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
- "avg %.3f mpps [current %.3f mpps]\n",
- received,
- total_ms,
- received / (total_ms * 1000.0),
- delta_pkts / (delta_ms * 1000.0));
- last_pkts = received;
- last_time = now;
- }
-
- cdata.num_packets -= n;
- if (cdata.num_packets <= 0)
- fdata->done = 1;
- /* Be stuck in this loop if single. */
- } while (!fdata->done && fdata->tx_single);
-
- return 0;
-}
-
static int
-setup_eventdev_generic(struct cons_data *cons_data,
- struct worker_data *worker_data)
+setup_eventdev_generic(struct worker_data *worker_data)
{
const uint8_t dev_id = 0;
/* +1 stages is for a SINGLE_LINK TX stage */
const uint8_t nb_queues = cdata.num_stages + 1;
- /* + 1 is one port for consumer */
- const uint8_t nb_ports = cdata.num_workers + 1;
+ const uint8_t nb_ports = cdata.num_workers;
struct rte_event_dev_config config = {
.nb_event_queues = nb_queues,
.nb_event_ports = nb_ports,
@@ -285,11 +145,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
.nb_atomic_flows = 1024,
.nb_atomic_order_sequences = 1024,
};
- struct rte_event_port_conf tx_p_conf = {
- .dequeue_depth = 128,
- .enqueue_depth = 128,
- .new_event_threshold = 4096,
- };
struct rte_event_queue_conf tx_q_conf = {
.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
@@ -297,7 +152,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
struct port_link worker_queues[MAX_NUM_STAGES];
uint8_t disable_implicit_release;
- struct port_link tx_queue;
unsigned int i;
int ret, ndev = rte_event_dev_count();
@@ -314,7 +168,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
wkr_p_conf.disable_implicit_release = disable_implicit_release;
- tx_p_conf.disable_implicit_release = disable_implicit_release;
if (dev_info.max_event_port_dequeue_depth <
config.nb_event_port_dequeue_depth)
@@ -372,8 +225,7 @@ setup_eventdev_generic(struct cons_data *cons_data,
printf("%d: error creating qid %d\n", __LINE__, i);
return -1;
}
- tx_queue.queue_id = i;
- tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
+ cdata.tx_queue_id = i;
if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
@@ -403,26 +255,6 @@ setup_eventdev_generic(struct cons_data *cons_data,
w->port_id = i;
}
- if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
- tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
- if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
- tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
-
- /* port for consumer, linked to TX queue */
- if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
- printf("Error setting up port %d\n", i);
- return -1;
- }
- if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
- &tx_queue.priority, 1) != 1) {
- printf("%d: error creating link for port %d\n",
- __LINE__, i);
- return -1;
- }
- *cons_data = (struct cons_data){.dev_id = dev_id,
- .port_id = i,
- .release = disable_implicit_release };
-
ret = rte_event_dev_service_id_get(dev_id,
&fdata->evdev_service_id);
if (ret != -ESRCH && ret != 0) {
@@ -431,76 +263,107 @@ setup_eventdev_generic(struct cons_data *cons_data,
}
rte_service_runstate_set(fdata->evdev_service_id, 1);
rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
- if (rte_event_dev_start(dev_id) < 0) {
- printf("Error starting eventdev\n");
- return -1;
- }
return dev_id;
}
static void
-init_rx_adapter(uint16_t nb_ports)
+init_adapters(uint16_t nb_ports)
{
int i;
int ret;
+ uint8_t tx_port_id = 0;
uint8_t evdev_id = 0;
struct rte_event_dev_info dev_info;
ret = rte_event_dev_info_get(evdev_id, &dev_info);
- struct rte_event_port_conf rx_p_conf = {
- .dequeue_depth = 8,
- .enqueue_depth = 8,
- .new_event_threshold = 1200,
+ struct rte_event_port_conf adptr_p_conf = {
+ .dequeue_depth = cdata.worker_cq_depth,
+ .enqueue_depth = 64,
+ .new_event_threshold = 4096,
};
- if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
- rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
- if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
- rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+ if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+ adptr_p_conf.dequeue_depth =
+ dev_info.max_event_port_dequeue_depth;
+ if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+ adptr_p_conf.enqueue_depth =
+ dev_info.max_event_port_enqueue_depth;
/* Create one adapter for all the ethernet ports. */
ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
- &rx_p_conf);
+ &adptr_p_conf);
if (ret)
rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
cdata.rx_adapter_id);
+ ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
+ &adptr_p_conf);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
+ cdata.tx_adapter_id);
+
struct rte_event_eth_rx_adapter_queue_conf queue_conf;
memset(&queue_conf, 0, sizeof(queue_conf));
queue_conf.ev.sched_type = cdata.queue_type;
queue_conf.ev.queue_id = cdata.qid[0];
for (i = 0; i < nb_ports; i++) {
- uint32_t cap;
-
- ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
- if (ret)
- rte_exit(EXIT_FAILURE,
- "failed to get event rx adapter "
- "capabilities");
-
ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
-1, &queue_conf);
if (ret)
rte_exit(EXIT_FAILURE,
"Failed to add queues to Rx adapter");
+
+ ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
+ -1);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "Failed to add queues to Tx adapter");
}
+ ret = rte_event_eth_tx_adapter_event_port_get(cdata.tx_adapter_id,
+ &tx_port_id);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "Failed to get Tx adapter port id");
+ ret = rte_event_port_link(evdev_id, tx_port_id, &cdata.tx_queue_id,
+ NULL, 1);
+ if (ret != 1)
+ rte_exit(EXIT_FAILURE,
+ "Unable to link Tx adapter port to Tx queue");
+
ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
&fdata->rxadptr_service_id);
if (ret != -ESRCH && ret != 0) {
rte_exit(EXIT_FAILURE,
- "Error getting the service ID for sw eventdev\n");
+ "Error getting the service ID for Rx adapter\n");
}
rte_service_runstate_set(fdata->rxadptr_service_id, 1);
rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
+ ret = rte_event_eth_tx_adapter_service_id_get(cdata.tx_adapter_id,
+ &fdata->txadptr_service_id);
+ if (ret != -ESRCH && ret != 0) {
+ rte_exit(EXIT_FAILURE,
+ "Error getting the service ID for Tx adapter\n");
+ }
+ rte_service_runstate_set(fdata->txadptr_service_id, 1);
+ rte_service_set_runstate_mapped_check(fdata->txadptr_service_id, 0);
+
ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
if (ret)
rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
cdata.rx_adapter_id);
+
+ ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
+ cdata.tx_adapter_id);
+
+ if (rte_event_dev_start(evdev_id) < 0)
+ rte_exit(EXIT_FAILURE, "Error starting eventdev");
}
static void
@@ -510,6 +373,7 @@ generic_opt_check(void)
int ret;
uint32_t cap = 0;
uint8_t rx_needed = 0;
+ uint8_t sched_needed = 0;
struct rte_event_dev_info eventdev_info;
memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@@ -519,6 +383,8 @@ generic_opt_check(void)
RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
rte_exit(EXIT_FAILURE,
"Event dev doesn't support all type queues\n");
+ sched_needed = !(eventdev_info.event_dev_cap &
+ RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);
RTE_ETH_FOREACH_DEV(i) {
ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
@@ -531,9 +397,8 @@ generic_opt_check(void)
if (cdata.worker_lcore_mask == 0 ||
(rx_needed && cdata.rx_lcore_mask == 0) ||
- cdata.tx_lcore_mask == 0 || (cdata.sched_lcore_mask == 0
- && !(eventdev_info.event_dev_cap &
- RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+ (cdata.tx_lcore_mask == 0) ||
+ (sched_needed && cdata.sched_lcore_mask == 0)) {
printf("Core part of pipeline was not assigned any cores. "
"This will stall the pipeline, please check core masks "
"(use -h for details on setting core masks):\n"
@@ -545,23 +410,24 @@ generic_opt_check(void)
rte_exit(-1, "Fix core masks\n");
}
- if (eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
+ if (!sched_needed)
memset(fdata->sched_core, 0,
sizeof(unsigned int) * MAX_NUM_CORE);
+ if (!rx_needed)
+ memset(fdata->rx_core, 0,
+ sizeof(unsigned int) * MAX_NUM_CORE);
}
void
set_worker_generic_setup_data(struct setup_data *caps, bool burst)
{
if (burst) {
- caps->consumer = consumer_burst;
caps->worker = worker_generic_burst;
} else {
- caps->consumer = consumer;
caps->worker = worker_generic;
}
- caps->adptr_setup = init_rx_adapter;
+ caps->adptr_setup = init_adapters;
caps->scheduler = schedule_devices;
caps->evdev_setup = setup_eventdev_generic;
caps->check_opt = generic_opt_check;