From ca33590b6af032bff57d9cc70455660466a654b2 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Mon, 19 Feb 2018 11:16:57 +0000 Subject: New upstream version 18.02 Change-Id: I89ed24cb2a49b78fe5be6970b99dd46c1499fcc3 Signed-off-by: Luca Boccassi --- examples/eventdev_pipeline/pipeline_worker_tx.c | 838 ++++++++++++++++++++++++ 1 file changed, 838 insertions(+) create mode 100644 examples/eventdev_pipeline/pipeline_worker_tx.c (limited to 'examples/eventdev_pipeline/pipeline_worker_tx.c') diff --git a/examples/eventdev_pipeline/pipeline_worker_tx.c b/examples/eventdev_pipeline/pipeline_worker_tx.c new file mode 100644 index 00000000..b254b03f --- /dev/null +++ b/examples/eventdev_pipeline/pipeline_worker_tx.c @@ -0,0 +1,838 @@ +/* + * SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2014 Intel Corporation + * Copyright 2017 Cavium, Inc. + */ + +#include "pipeline_common.h" + +static __rte_always_inline void +worker_fwd_event(struct rte_event *ev, uint8_t sched) +{ + ev->event_type = RTE_EVENT_TYPE_CPU; + ev->op = RTE_EVENT_OP_FORWARD; + ev->sched_type = sched; +} + +static __rte_always_inline void +worker_event_enqueue(const uint8_t dev, const uint8_t port, + struct rte_event *ev) +{ + while (rte_event_enqueue_burst(dev, port, ev, 1) != 1) + rte_pause(); +} + +static __rte_always_inline void +worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, + struct rte_event *ev, const uint16_t nb_rx) +{ + uint16_t enq; + + enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); + while (enq < nb_rx) { + enq += rte_event_enqueue_burst(dev, port, + ev + enq, nb_rx - enq); + } +} + +static __rte_always_inline void +worker_tx_pkt(struct rte_mbuf *mbuf) +{ + exchange_mac(mbuf); + while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1) + rte_pause(); +} + +/* Single stage pipeline workers */ + +static int +worker_do_tx_single(void *arg) +{ + struct worker_data *data = (struct worker_data *)arg; + const uint8_t dev = data->dev_id; + const uint8_t port = data->port_id; + size_t fwd = 0, received = 0, tx = 0; + struct rte_event ev; + + while (!fdata->done) { + + if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { + rte_pause(); + continue; + } + + received++; + + if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev.mbuf); + tx++; + continue; + } + work(); + ev.queue_id++; + worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); + worker_event_enqueue(dev, port, &ev); + fwd++; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + return 0; +} + +static int +worker_do_tx_single_atq(void *arg) +{ + struct worker_data *data = (struct worker_data *)arg; + const uint8_t dev = data->dev_id; + const uint8_t port = data->port_id; + size_t fwd = 0, received = 0, tx = 0; + struct rte_event ev; + + while (!fdata->done) { + + if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { + rte_pause(); + continue; + } + + received++; + + if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev.mbuf); + tx++; + continue; + } + work(); + worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); + worker_event_enqueue(dev, port, &ev); + fwd++; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + return 0; +} + +static int +worker_do_tx_single_burst(void *arg) +{ + struct rte_event ev[BATCH_SIZE + 1]; + + struct worker_data *data = (struct worker_data *)arg; + const uint8_t dev = data->dev_id; + const uint8_t port = data->port_id; + size_t fwd = 0, received = 0, tx = 0; + + while (!fdata->done) { + uint16_t i; + uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, + BATCH_SIZE, 0); + + if (!nb_rx) { + rte_pause(); + continue; + } + received += nb_rx; + + for (i = 0; i < nb_rx; i++) { + rte_prefetch0(ev[i + 1].mbuf); + if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { + + worker_tx_pkt(ev[i].mbuf); + ev[i].op = RTE_EVENT_OP_RELEASE; + tx++; + + } else { + ev[i].queue_id++; + worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); + } + work(); + } + + worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_rx; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + return 0; +} + +static int +worker_do_tx_single_burst_atq(void *arg) +{ + struct rte_event ev[BATCH_SIZE + 1]; + + struct worker_data *data = (struct worker_data *)arg; + const uint8_t dev = data->dev_id; + const uint8_t port = data->port_id; + size_t fwd = 0, received = 0, tx = 0; + + while (!fdata->done) { + uint16_t i; + uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, + BATCH_SIZE, 0); + + if (!nb_rx) { + rte_pause(); + continue; + } + + received += nb_rx; + + for (i = 0; i < nb_rx; i++) { + rte_prefetch0(ev[i + 1].mbuf); + if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { + + worker_tx_pkt(ev[i].mbuf); + ev[i].op = RTE_EVENT_OP_RELEASE; + tx++; + } else + worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); + work(); + } + + worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_rx; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + return 0; +} + +/* Multi stage Pipeline Workers */ + +static int +worker_do_tx(void *arg) +{ + struct rte_event ev; + + struct worker_data *data = (struct worker_data *)arg; + const uint8_t dev = data->dev_id; + const uint8_t port = data->port_id; + const uint8_t lst_qid = cdata.num_stages - 1; + size_t fwd = 0, received = 0, tx = 0; + + + while (!fdata->done) { + + if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { + rte_pause(); + continue; + } + + received++; + const uint8_t cq_id = ev.queue_id % cdata.num_stages; + + if (cq_id >= lst_qid) { + if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev.mbuf); + tx++; + continue; + } + + worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); + ev.queue_id = (cq_id == lst_qid) ? + cdata.next_qid[ev.queue_id] : ev.queue_id; + } else { + ev.queue_id = cdata.next_qid[ev.queue_id]; + worker_fwd_event(&ev, cdata.queue_type); + } + work(); + + worker_event_enqueue(dev, port, &ev); + fwd++; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + + return 0; +} + +static int +worker_do_tx_atq(void *arg) +{ + struct rte_event ev; + + struct worker_data *data = (struct worker_data *)arg; + const uint8_t dev = data->dev_id; + const uint8_t port = data->port_id; + const uint8_t lst_qid = cdata.num_stages - 1; + size_t fwd = 0, received = 0, tx = 0; + + while (!fdata->done) { + + if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { + rte_pause(); + continue; + } + + received++; + const uint8_t cq_id = ev.sub_event_type % cdata.num_stages; + + if (cq_id == lst_qid) { + if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev.mbuf); + tx++; + continue; + } + + worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); + } else { + ev.sub_event_type++; + worker_fwd_event(&ev, cdata.queue_type); + } + work(); + + worker_event_enqueue(dev, port, &ev); + fwd++; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + + return 0; +} + +static int +worker_do_tx_burst(void *arg) +{ + struct rte_event ev[BATCH_SIZE]; + + struct worker_data *data = (struct worker_data *)arg; + uint8_t dev = data->dev_id; + uint8_t port = data->port_id; + uint8_t lst_qid = cdata.num_stages - 1; + size_t fwd = 0, received = 0, tx = 0; + + while (!fdata->done) { + uint16_t i; + const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, + ev, BATCH_SIZE, 0); + + if (nb_rx == 0) { + rte_pause(); + continue; + } + received += nb_rx; + + for (i = 0; i < nb_rx; i++) { + const uint8_t cq_id = ev[i].queue_id % cdata.num_stages; + + if (cq_id >= lst_qid) { + if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev[i].mbuf); + tx++; + ev[i].op = RTE_EVENT_OP_RELEASE; + continue; + } + ev[i].queue_id = (cq_id == lst_qid) ? + cdata.next_qid[ev[i].queue_id] : + ev[i].queue_id; + + worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); + } else { + ev[i].queue_id = cdata.next_qid[ev[i].queue_id]; + worker_fwd_event(&ev[i], cdata.queue_type); + } + work(); + } + worker_event_enqueue_burst(dev, port, ev, nb_rx); + + fwd += nb_rx; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + + return 0; +} + +static int +worker_do_tx_burst_atq(void *arg) +{ + struct rte_event ev[BATCH_SIZE]; + + struct worker_data *data = (struct worker_data *)arg; + uint8_t dev = data->dev_id; + uint8_t port = data->port_id; + uint8_t lst_qid = cdata.num_stages - 1; + size_t fwd = 0, received = 0, tx = 0; + + while (!fdata->done) { + uint16_t i; + + const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, + ev, BATCH_SIZE, 0); + + if (nb_rx == 0) { + rte_pause(); + continue; + } + received += nb_rx; + + for (i = 0; i < nb_rx; i++) { + const uint8_t cq_id = ev[i].sub_event_type % + cdata.num_stages; + + if (cq_id == lst_qid) { + if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev[i].mbuf); + tx++; + ev[i].op = RTE_EVENT_OP_RELEASE; + continue; + } + + worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); + } else { + ev[i].sub_event_type++; + worker_fwd_event(&ev[i], cdata.queue_type); + } + work(); + } + + worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_rx; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + + return 0; +} + +static int +setup_eventdev_worker_tx(struct cons_data *cons_data, + struct worker_data *worker_data) +{ + RTE_SET_USED(cons_data); + uint8_t i; + const uint8_t atq = cdata.all_type_queues ? 1 : 0; + const uint8_t dev_id = 0; + const uint8_t nb_ports = cdata.num_workers; + uint8_t nb_slots = 0; + uint8_t nb_queues = rte_eth_dev_count(); + + /* + * In case where all type queues are not enabled, use queues equal to + * number of stages * eth_dev_count and one extra queue per pipeline + * for Tx. + */ + if (!atq) { + nb_queues *= cdata.num_stages; + nb_queues += rte_eth_dev_count(); + } + + struct rte_event_dev_config config = { + .nb_event_queues = nb_queues, + .nb_event_ports = nb_ports, + .nb_events_limit = 4096, + .nb_event_queue_flows = 1024, + .nb_event_port_dequeue_depth = 128, + .nb_event_port_enqueue_depth = 128, + }; + struct rte_event_port_conf wkr_p_conf = { + .dequeue_depth = cdata.worker_cq_depth, + .enqueue_depth = 64, + .new_event_threshold = 4096, + }; + struct rte_event_queue_conf wkr_q_conf = { + .schedule_type = cdata.queue_type, + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + .nb_atomic_flows = 1024, + .nb_atomic_order_sequences = 1024, + }; + + int ret, ndev = rte_event_dev_count(); + + if (ndev < 1) { + printf("%d: No Eventdev Devices Found\n", __LINE__); + return -1; + } + + + struct rte_event_dev_info dev_info; + ret = rte_event_dev_info_get(dev_id, &dev_info); + printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); + + if (dev_info.max_event_port_dequeue_depth < + config.nb_event_port_dequeue_depth) + config.nb_event_port_dequeue_depth = + dev_info.max_event_port_dequeue_depth; + if (dev_info.max_event_port_enqueue_depth < + config.nb_event_port_enqueue_depth) + config.nb_event_port_enqueue_depth = + dev_info.max_event_port_enqueue_depth; + + ret = rte_event_dev_configure(dev_id, &config); + if (ret < 0) { + printf("%d: Error configuring device\n", __LINE__); + return -1; + } + + printf(" Stages:\n"); + for (i = 0; i < nb_queues; i++) { + + if (atq) { + + nb_slots = cdata.num_stages; + wkr_q_conf.event_queue_cfg = + RTE_EVENT_QUEUE_CFG_ALL_TYPES; + } else { + uint8_t slot; + + nb_slots = cdata.num_stages + 1; + slot = i % nb_slots; + wkr_q_conf.schedule_type = slot == cdata.num_stages ? + RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; + } + + if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) { + printf("%d: error creating qid %d\n", __LINE__, i); + return -1; + } + cdata.qid[i] = i; + cdata.next_qid[i] = i+1; + if (cdata.enable_queue_priorities) { + const uint32_t prio_delta = + (RTE_EVENT_DEV_PRIORITY_LOWEST) / + nb_slots; + + /* higher priority for queues closer to tx */ + wkr_q_conf.priority = + RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * + (i % nb_slots); + } + + const char *type_str = "Atomic"; + switch (wkr_q_conf.schedule_type) { + case RTE_SCHED_TYPE_ORDERED: + type_str = "Ordered"; + break; + case RTE_SCHED_TYPE_PARALLEL: + type_str = "Parallel"; + break; + } + printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str, + wkr_q_conf.priority); + } + + printf("\n"); + if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) + wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; + if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) + wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; + + /* set up one port per worker, linking to all stage queues */ + for (i = 0; i < cdata.num_workers; i++) { + struct worker_data *w = &worker_data[i]; + w->dev_id = dev_id; + if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) { + printf("Error setting up port %d\n", i); + return -1; + } + + if (rte_event_port_link(dev_id, i, NULL, NULL, 0) + != nb_queues) { + printf("%d: error creating link for port %d\n", + __LINE__, i); + return -1; + } + w->port_id = i; + } + /* + * Reduce the load on ingress event queue by splitting the traffic + * across multiple event queues. + * for example, nb_stages = 2 and nb_ethdev = 2 then + * + * nb_queues = (2 * 2) + 2 = 6 (non atq) + * rx_stride = 3 + * + * So, traffic is split across queue 0 and queue 3 since queue id for + * rx adapter is chosen * i.e in the above + * case eth port 0, 1 will inject packets into event queue 0, 3 + * respectively. + * + * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx. + */ + cdata.rx_stride = atq ? 1 : nb_slots; + ret = rte_event_dev_service_id_get(dev_id, + &fdata->evdev_service_id); + if (ret != -ESRCH && ret != 0) { + printf("Error getting the service ID\n"); + return -1; + } + rte_service_runstate_set(fdata->evdev_service_id, 1); + rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0); + if (rte_event_dev_start(dev_id) < 0) { + printf("Error starting eventdev\n"); + return -1; + } + + return dev_id; +} + + +struct rx_adptr_services { + uint16_t nb_rx_adptrs; + uint32_t *rx_adpt_arr; +}; + +static int32_t +service_rx_adapter(void *arg) +{ + int i; + struct rx_adptr_services *adptr_services = arg; + + for (i = 0; i < adptr_services->nb_rx_adptrs; i++) + rte_service_run_iter_on_app_lcore( + adptr_services->rx_adpt_arr[i], 1); + return 0; +} + +static void +init_rx_adapter(uint16_t nb_ports) +{ + int i; + int ret; + uint8_t evdev_id = 0; + struct rx_adptr_services *adptr_services = NULL; + struct rte_event_dev_info dev_info; + + ret = rte_event_dev_info_get(evdev_id, &dev_info); + adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0); + + struct rte_event_port_conf rx_p_conf = { + .dequeue_depth = 8, + .enqueue_depth = 8, + .new_event_threshold = 1200, + }; + + if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth) + rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth; + if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth) + rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth; + + + struct rte_event_eth_rx_adapter_queue_conf queue_conf; + memset(&queue_conf, 0, sizeof(queue_conf)); + queue_conf.ev.sched_type = cdata.queue_type; + + for (i = 0; i < nb_ports; i++) { + uint32_t cap; + uint32_t service_id; + + ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf); + if (ret) + rte_exit(EXIT_FAILURE, + "failed to create rx adapter[%d]", + cdata.rx_adapter_id); + + ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap); + if (ret) + rte_exit(EXIT_FAILURE, + "failed to get event rx adapter " + "capabilities"); + + queue_conf.ev.queue_id = cdata.rx_stride ? + (i * cdata.rx_stride) + : (uint8_t)cdata.qid[0]; + + ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf); + if (ret) + rte_exit(EXIT_FAILURE, + "Failed to add queues to Rx adapter"); + + + /* Producer needs to be scheduled. */ + if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { + ret = rte_event_eth_rx_adapter_service_id_get(i, + &service_id); + if (ret != -ESRCH && ret != 0) { + rte_exit(EXIT_FAILURE, + "Error getting the service ID for rx adptr\n"); + } + + rte_service_runstate_set(service_id, 1); + rte_service_set_runstate_mapped_check(service_id, 0); + + adptr_services->nb_rx_adptrs++; + adptr_services->rx_adpt_arr = rte_realloc( + adptr_services->rx_adpt_arr, + adptr_services->nb_rx_adptrs * + sizeof(uint32_t), 0); + adptr_services->rx_adpt_arr[ + adptr_services->nb_rx_adptrs - 1] = + service_id; + } + + ret = rte_event_eth_rx_adapter_start(i); + if (ret) + rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed", + cdata.rx_adapter_id); + } + + if (adptr_services->nb_rx_adptrs) { + struct rte_service_spec service; + + memset(&service, 0, sizeof(struct rte_service_spec)); + snprintf(service.name, sizeof(service.name), "rx_service"); + service.callback = service_rx_adapter; + service.callback_userdata = (void *)adptr_services; + + int32_t ret = rte_service_component_register(&service, + &fdata->rxadptr_service_id); + if (ret) + rte_exit(EXIT_FAILURE, + "Rx adapter[%d] service register failed", + cdata.rx_adapter_id); + + rte_service_runstate_set(fdata->rxadptr_service_id, 1); + rte_service_component_runstate_set(fdata->rxadptr_service_id, + 1); + rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, + 0); + } else { + memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); + rte_free(adptr_services); + } + + if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL && + (dev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)) + fdata->cap.scheduler = NULL; + + if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED) + memset(fdata->sched_core, 0, + sizeof(unsigned int) * MAX_NUM_CORE); +} + +static void +worker_tx_opt_check(void) +{ + int i; + int ret; + uint32_t cap = 0; + uint8_t rx_needed = 0; + struct rte_event_dev_info eventdev_info; + + memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); + rte_event_dev_info_get(0, &eventdev_info); + + if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) + rte_exit(EXIT_FAILURE, + "Event dev doesn't support all type queues\n"); + + for (i = 0; i < rte_eth_dev_count(); i++) { + ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); + if (ret) + rte_exit(EXIT_FAILURE, + "failed to get event rx adapter " + "capabilities"); + rx_needed |= + !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT); + } + + if (cdata.worker_lcore_mask == 0 || + (rx_needed && cdata.rx_lcore_mask == 0) || + (cdata.sched_lcore_mask == 0 && + !(eventdev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) { + printf("Core part of pipeline was not assigned any cores. " + "This will stall the pipeline, please check core masks " + "(use -h for details on setting core masks):\n" + "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64 + "\n\tworkers: %"PRIu64"\n", + cdata.rx_lcore_mask, cdata.tx_lcore_mask, + cdata.sched_lcore_mask, + cdata.worker_lcore_mask); + rte_exit(-1, "Fix core masks\n"); + } +} + +static worker_loop +get_worker_loop_single_burst(uint8_t atq) +{ + if (atq) + return worker_do_tx_single_burst_atq; + + return worker_do_tx_single_burst; +} + +static worker_loop +get_worker_loop_single_non_burst(uint8_t atq) +{ + if (atq) + return worker_do_tx_single_atq; + + return worker_do_tx_single; +} + +static worker_loop +get_worker_loop_burst(uint8_t atq) +{ + if (atq) + return worker_do_tx_burst_atq; + + return worker_do_tx_burst; +} + +static worker_loop +get_worker_loop_non_burst(uint8_t atq) +{ + if (atq) + return worker_do_tx_atq; + + return worker_do_tx; +} + +static worker_loop +get_worker_single_stage(bool burst) +{ + uint8_t atq = cdata.all_type_queues ? 1 : 0; + + if (burst) + return get_worker_loop_single_burst(atq); + + return get_worker_loop_single_non_burst(atq); +} + +static worker_loop +get_worker_multi_stage(bool burst) +{ + uint8_t atq = cdata.all_type_queues ? 1 : 0; + + if (burst) + return get_worker_loop_burst(atq); + + return get_worker_loop_non_burst(atq); +} + +void +set_worker_tx_setup_data(struct setup_data *caps, bool burst) +{ + if (cdata.num_stages == 1) + caps->worker = get_worker_single_stage(burst); + else + caps->worker = get_worker_multi_stage(burst); + + memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); + + caps->check_opt = worker_tx_opt_check; + caps->consumer = NULL; + caps->scheduler = schedule_devices; + caps->evdev_setup = setup_eventdev_worker_tx; + caps->adptr_setup = init_rx_adapter; +} -- cgit 1.2.3-korg