diff options
Diffstat (limited to 'drivers/event')
29 files changed, 3274 insertions, 194 deletions
diff --git a/drivers/event/Makefile b/drivers/event/Makefile index f301d8dc..03ad1b6c 100644 --- a/drivers/event/Makefile +++ b/drivers/event/Makefile @@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk DIRS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += skeleton DIRS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw +DIRS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw DIRS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += octeontx ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y) DIRS-$(CONFIG_RTE_LIBRTE_PMD_DPAA_EVENTDEV) += dpaa diff --git a/drivers/event/dpaa/Makefile b/drivers/event/dpaa/Makefile index ddd85522..6f93e7f4 100644 --- a/drivers/event/dpaa/Makefile +++ b/drivers/event/dpaa/Makefile @@ -34,5 +34,6 @@ LDLIBS += -lrte_mempool_dpaa LDLIBS += -lrte_eal -lrte_mbuf -lrte_mempool -lrte_ring LDLIBS += -lrte_ethdev -lrte_net -lrte_kvargs LDLIBS += -lrte_eventdev -lrte_pmd_dpaa -lrte_bus_vdev +LDLIBS += -lrte_common_dpaax include $(RTE_SDK)/mk/rte.lib.mk diff --git a/drivers/event/dpaa/dpaa_eventdev.c b/drivers/event/dpaa/dpaa_eventdev.c index 5443ef56..1e247e4f 100644 --- a/drivers/event/dpaa/dpaa_eventdev.c +++ b/drivers/event/dpaa/dpaa_eventdev.c @@ -30,6 +30,7 @@ #include <rte_dpaa_bus.h> #include <rte_dpaa_logs.h> #include <rte_cycles.h> +#include <rte_kvargs.h> #include <dpaa_ethdev.h> #include "dpaa_eventdev.h" @@ -43,19 +44,31 @@ * 1 Eventdev can have N Eventqueue */ +#define DISABLE_INTR_MODE "disable_intr" + static int dpaa_event_dequeue_timeout_ticks(struct rte_eventdev *dev, uint64_t ns, uint64_t *timeout_ticks) { - uint64_t cycles_per_second; - - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); + uint64_t cycles_per_second; + cycles_per_second = rte_get_timer_hz(); - *timeout_ticks = ns * (cycles_per_second / NS_PER_S); + *timeout_ticks = (ns * cycles_per_second) / NS_PER_S; + + return 0; +} + +static int +dpaa_event_dequeue_timeout_ticks_intr(struct rte_eventdev *dev, uint64_t ns, + uint64_t *timeout_ticks) +{ + RTE_SET_USED(dev); + *timeout_ticks = ns/1000; return 0; } @@ -100,6 +113,56 @@ dpaa_event_enqueue(void *port, const struct rte_event *ev) return dpaa_event_enqueue_burst(port, ev, 1); } +static void drain_4_bytes(int fd, fd_set *fdset) +{ + if (FD_ISSET(fd, fdset)) { + /* drain 4 bytes */ + uint32_t junk; + ssize_t sjunk = read(qman_thread_fd(), &junk, sizeof(junk)); + if (sjunk != sizeof(junk)) + DPAA_EVENTDEV_ERR("UIO irq read error"); + } +} + +static inline int +dpaa_event_dequeue_wait(uint64_t timeout_ticks) +{ + int fd_qman, nfds; + int ret; + fd_set readset; + + /* Go into (and back out of) IRQ mode for each select, + * it simplifies exit-path considerations and other + * potential nastiness. + */ + struct timeval tv = { + .tv_sec = timeout_ticks / 1000000, + .tv_usec = timeout_ticks % 1000000 + }; + + fd_qman = qman_thread_fd(); + nfds = fd_qman + 1; + FD_ZERO(&readset); + FD_SET(fd_qman, &readset); + + qman_irqsource_add(QM_PIRQ_DQRI); + + ret = select(nfds, &readset, NULL, NULL, &tv); + if (ret < 0) + return ret; + /* Calling irqsource_remove() prior to thread_irq() + * means thread_irq() will not process whatever caused + * the interrupts, however it does ensure that, once + * thread_irq() re-enables interrupts, they won't fire + * again immediately. + */ + qman_irqsource_remove(~0); + drain_4_bytes(fd_qman, &readset); + qman_thread_irq(); + + return ret; +} + static uint16_t dpaa_event_dequeue_burst(void *port, struct rte_event ev[], uint16_t nb_events, uint64_t timeout_ticks) @@ -107,8 +170,8 @@ dpaa_event_dequeue_burst(void *port, struct rte_event ev[], int ret; u16 ch_id; void *buffers[8]; - u32 num_frames, i; - uint64_t wait_time, cur_ticks, start_ticks; + u32 num_frames, i, irq = 0; + uint64_t cur_ticks = 0, wait_time_ticks = 0; struct dpaa_port *portal = (struct dpaa_port *)port; struct rte_mbuf *mbuf; @@ -147,20 +210,21 @@ dpaa_event_dequeue_burst(void *port, struct rte_event ev[], } DPAA_PER_LCORE_DQRR_HELD = 0; - if (portal->timeout == DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_INVALID) - wait_time = timeout_ticks; + if (timeout_ticks) + wait_time_ticks = timeout_ticks; else - wait_time = portal->timeout; + wait_time_ticks = portal->timeout_us; - /* Lets dequeue the frames */ - start_ticks = rte_get_timer_cycles(); - wait_time += start_ticks; + wait_time_ticks += rte_get_timer_cycles(); do { + /* Lets dequeue the frames */ num_frames = qman_portal_dequeue(ev, nb_events, buffers); - if (num_frames != 0) + if (irq) + irq = 0; + if (num_frames) break; cur_ticks = rte_get_timer_cycles(); - } while (cur_ticks < wait_time); + } while (cur_ticks < wait_time_ticks); return num_frames; } @@ -171,11 +235,91 @@ dpaa_event_dequeue(void *port, struct rte_event *ev, uint64_t timeout_ticks) return dpaa_event_dequeue_burst(port, ev, 1, timeout_ticks); } +static uint16_t +dpaa_event_dequeue_burst_intr(void *port, struct rte_event ev[], + uint16_t nb_events, uint64_t timeout_ticks) +{ + int ret; + u16 ch_id; + void *buffers[8]; + u32 num_frames, i, irq = 0; + uint64_t cur_ticks = 0, wait_time_ticks = 0; + struct dpaa_port *portal = (struct dpaa_port *)port; + struct rte_mbuf *mbuf; + + if (unlikely(!RTE_PER_LCORE(dpaa_io))) { + /* Affine current thread context to a qman portal */ + ret = rte_dpaa_portal_init((void *)0); + if (ret) { + DPAA_EVENTDEV_ERR("Unable to initialize portal"); + return ret; + } + } + + if (unlikely(!portal->is_port_linked)) { + /* + * Affine event queue for current thread context + * to a qman portal. + */ + for (i = 0; i < portal->num_linked_evq; i++) { + ch_id = portal->evq_info[i].ch_id; + dpaa_eventq_portal_add(ch_id); + } + portal->is_port_linked = true; + } + + /* Check if there are atomic contexts to be released */ + i = 0; + while (DPAA_PER_LCORE_DQRR_SIZE) { + if (DPAA_PER_LCORE_DQRR_HELD & (1 << i)) { + qman_dca_index(i, 0); + mbuf = DPAA_PER_LCORE_DQRR_MBUF(i); + mbuf->seqn = DPAA_INVALID_MBUF_SEQN; + DPAA_PER_LCORE_DQRR_HELD &= ~(1 << i); + DPAA_PER_LCORE_DQRR_SIZE--; + } + i++; + } + DPAA_PER_LCORE_DQRR_HELD = 0; + + if (timeout_ticks) + wait_time_ticks = timeout_ticks; + else + wait_time_ticks = portal->timeout_us; + + do { + /* Lets dequeue the frames */ + num_frames = qman_portal_dequeue(ev, nb_events, buffers); + if (irq) + irq = 0; + if (num_frames) + break; + if (wait_time_ticks) { /* wait for time */ + if (dpaa_event_dequeue_wait(wait_time_ticks) > 0) { + irq = 1; + continue; + } + break; /* no event after waiting */ + } + cur_ticks = rte_get_timer_cycles(); + } while (cur_ticks < wait_time_ticks); + + return num_frames; +} + +static uint16_t +dpaa_event_dequeue_intr(void *port, + struct rte_event *ev, + uint64_t timeout_ticks) +{ + return dpaa_event_dequeue_burst_intr(port, ev, 1, timeout_ticks); +} + static void dpaa_event_dev_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *dev_info) { - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); dev_info->driver_name = "event_dpaa"; @@ -184,7 +328,7 @@ dpaa_event_dev_info_get(struct rte_eventdev *dev, dev_info->max_dequeue_timeout_ns = DPAA_EVENT_MAX_DEQUEUE_TIMEOUT; dev_info->dequeue_timeout_ns = - DPAA_EVENT_MIN_DEQUEUE_TIMEOUT; + DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_NS; dev_info->max_event_queues = DPAA_EVENT_MAX_QUEUES; dev_info->max_event_queue_flows = @@ -220,8 +364,7 @@ dpaa_event_dev_configure(const struct rte_eventdev *dev) int ret, i; uint32_t *ch_id; - EVENTDEV_DRV_FUNC_TRACE(); - + EVENTDEV_INIT_FUNC_TRACE(); priv->dequeue_timeout_ns = conf->dequeue_timeout_ns; priv->nb_events_limit = conf->nb_events_limit; priv->nb_event_queues = conf->nb_event_queues; @@ -231,26 +374,18 @@ dpaa_event_dev_configure(const struct rte_eventdev *dev) priv->nb_event_port_enqueue_depth = conf->nb_event_port_enqueue_depth; priv->event_dev_cfg = conf->event_dev_cfg; - /* Check dequeue timeout method is per dequeue or global */ - if (priv->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT) { - /* - * Use timeout value as given in dequeue operation. - * So invalidating this timetout value. - */ - priv->dequeue_timeout_ns = 0; - } - ch_id = rte_malloc("dpaa-channels", sizeof(uint32_t) * priv->nb_event_queues, RTE_CACHE_LINE_SIZE); if (ch_id == NULL) { - EVENTDEV_DRV_ERR("Fail to allocate memory for dpaa channels\n"); + DPAA_EVENTDEV_ERR("Fail to allocate memory for dpaa channels\n"); return -ENOMEM; } /* Create requested event queues within the given event device */ ret = qman_alloc_pool_range(ch_id, priv->nb_event_queues, 1, 0); if (ret < 0) { - EVENTDEV_DRV_ERR("Failed to create internal channel\n"); + DPAA_EVENTDEV_ERR("qman_alloc_pool_range %u, err =%d\n", + priv->nb_event_queues, ret); rte_free(ch_id); return ret; } @@ -260,30 +395,41 @@ dpaa_event_dev_configure(const struct rte_eventdev *dev) /* Lets prepare event ports */ memset(&priv->ports[0], 0, sizeof(struct dpaa_port) * priv->nb_event_ports); + + /* Check dequeue timeout method is per dequeue or global */ if (priv->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT) { - for (i = 0; i < priv->nb_event_ports; i++) { - priv->ports[i].timeout = - DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_INVALID; - } - } else if (priv->dequeue_timeout_ns == 0) { - for (i = 0; i < priv->nb_event_ports; i++) { - dpaa_event_dequeue_timeout_ticks(NULL, - DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_NS, - &priv->ports[i].timeout); - } + /* + * Use timeout value as given in dequeue operation. + * So invalidating this timeout value. + */ + priv->dequeue_timeout_ns = 0; + + } else if (conf->dequeue_timeout_ns == 0) { + priv->dequeue_timeout_ns = DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_NS; } else { - for (i = 0; i < priv->nb_event_ports; i++) { - dpaa_event_dequeue_timeout_ticks(NULL, - priv->dequeue_timeout_ns, - &priv->ports[i].timeout); + priv->dequeue_timeout_ns = conf->dequeue_timeout_ns; + } + + for (i = 0; i < priv->nb_event_ports; i++) { + if (priv->intr_mode) { + priv->ports[i].timeout_us = + priv->dequeue_timeout_ns/1000; + } else { + uint64_t cycles_per_second; + + cycles_per_second = rte_get_timer_hz(); + priv->ports[i].timeout_us = + (priv->dequeue_timeout_ns * cycles_per_second) + / NS_PER_S; } } + /* * TODO: Currently portals are affined with threads. Maximum threads * can be created equals to number of lcore. */ rte_free(ch_id); - EVENTDEV_DRV_LOG("Configured eventdev devid=%d", dev->data->dev_id); + DPAA_EVENTDEV_INFO("Configured eventdev devid=%d", dev->data->dev_id); return 0; } @@ -291,7 +437,7 @@ dpaa_event_dev_configure(const struct rte_eventdev *dev) static int dpaa_event_dev_start(struct rte_eventdev *dev) { - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); return 0; @@ -300,14 +446,14 @@ dpaa_event_dev_start(struct rte_eventdev *dev) static void dpaa_event_dev_stop(struct rte_eventdev *dev) { - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); } static int dpaa_event_dev_close(struct rte_eventdev *dev) { - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); return 0; @@ -317,7 +463,7 @@ static void dpaa_event_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id, struct rte_event_queue_conf *queue_conf) { - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); RTE_SET_USED(queue_id); @@ -334,14 +480,14 @@ dpaa_event_queue_setup(struct rte_eventdev *dev, uint8_t queue_id, struct dpaa_eventdev *priv = dev->data->dev_private; struct dpaa_eventq *evq_info = &priv->evq_info[queue_id]; - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); switch (queue_conf->schedule_type) { case RTE_SCHED_TYPE_PARALLEL: case RTE_SCHED_TYPE_ATOMIC: break; case RTE_SCHED_TYPE_ORDERED: - EVENTDEV_DRV_ERR("Schedule type is not supported."); + DPAA_EVENTDEV_ERR("Schedule type is not supported."); return -1; } evq_info->event_queue_cfg = queue_conf->event_queue_cfg; @@ -353,7 +499,7 @@ dpaa_event_queue_setup(struct rte_eventdev *dev, uint8_t queue_id, static void dpaa_event_queue_release(struct rte_eventdev *dev, uint8_t queue_id) { - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); RTE_SET_USED(queue_id); @@ -363,7 +509,7 @@ static void dpaa_event_port_default_conf_get(struct rte_eventdev *dev, uint8_t port_id, struct rte_event_port_conf *port_conf) { - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); RTE_SET_USED(port_id); @@ -379,7 +525,7 @@ dpaa_event_port_setup(struct rte_eventdev *dev, uint8_t port_id, { struct dpaa_eventdev *eventdev = dev->data->dev_private; - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(port_conf); dev->data->ports[port_id] = &eventdev->ports[port_id]; @@ -390,7 +536,7 @@ dpaa_event_port_setup(struct rte_eventdev *dev, uint8_t port_id, static void dpaa_event_port_release(void *port) { - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(port); } @@ -454,7 +600,8 @@ dpaa_event_port_unlink(struct rte_eventdev *dev, void *port, event_queue->event_port = NULL; } - event_port->num_linked_evq = event_port->num_linked_evq - i; + if (event_port->num_linked_evq) + event_port->num_linked_evq = event_port->num_linked_evq - i; return (int)i; } @@ -466,7 +613,7 @@ dpaa_event_eth_rx_adapter_caps_get(const struct rte_eventdev *dev, { const char *ethdev_driver = eth_dev->device->driver->name; - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); @@ -491,14 +638,14 @@ dpaa_event_eth_rx_adapter_queue_add( struct dpaa_if *dpaa_intf = eth_dev->data->dev_private; int ret, i; - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); if (rx_queue_id == -1) { for (i = 0; i < dpaa_intf->nb_rx_queues; i++) { ret = dpaa_eth_eventq_attach(eth_dev, i, ch_id, queue_conf); if (ret) { - EVENTDEV_DRV_ERR( + DPAA_EVENTDEV_ERR( "Event Queue attach failed:%d\n", ret); goto detach_configured_queues; } @@ -508,7 +655,7 @@ dpaa_event_eth_rx_adapter_queue_add( ret = dpaa_eth_eventq_attach(eth_dev, rx_queue_id, ch_id, queue_conf); if (ret) - EVENTDEV_DRV_ERR("dpaa_eth_eventq_attach failed:%d\n", ret); + DPAA_EVENTDEV_ERR("dpaa_eth_eventq_attach failed:%d\n", ret); return ret; detach_configured_queues: @@ -527,14 +674,14 @@ dpaa_event_eth_rx_adapter_queue_del(const struct rte_eventdev *dev, int ret, i; struct dpaa_if *dpaa_intf = eth_dev->data->dev_private; - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); if (rx_queue_id == -1) { for (i = 0; i < dpaa_intf->nb_rx_queues; i++) { ret = dpaa_eth_eventq_detach(eth_dev, i); if (ret) - EVENTDEV_DRV_ERR( + DPAA_EVENTDEV_ERR( "Event Queue detach failed:%d\n", ret); } @@ -543,7 +690,7 @@ dpaa_event_eth_rx_adapter_queue_del(const struct rte_eventdev *dev, ret = dpaa_eth_eventq_detach(eth_dev, rx_queue_id); if (ret) - EVENTDEV_DRV_ERR("dpaa_eth_eventq_detach failed:%d\n", ret); + DPAA_EVENTDEV_ERR("dpaa_eth_eventq_detach failed:%d\n", ret); return ret; } @@ -551,7 +698,7 @@ static int dpaa_event_eth_rx_adapter_start(const struct rte_eventdev *dev, const struct rte_eth_dev *eth_dev) { - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); RTE_SET_USED(eth_dev); @@ -563,7 +710,7 @@ static int dpaa_event_eth_rx_adapter_stop(const struct rte_eventdev *dev, const struct rte_eth_dev *eth_dev) { - EVENTDEV_DRV_FUNC_TRACE(); + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(dev); RTE_SET_USED(eth_dev); @@ -593,8 +740,44 @@ static struct rte_eventdev_ops dpaa_eventdev_ops = { .eth_rx_adapter_stop = dpaa_event_eth_rx_adapter_stop, }; +static int flag_check_handler(__rte_unused const char *key, + const char *value, __rte_unused void *opaque) +{ + if (strcmp(value, "1")) + return -1; + + return 0; +} + +static int +dpaa_event_check_flags(const char *params) +{ + struct rte_kvargs *kvlist; + + if (params == NULL || params[0] == '\0') + return 0; + + kvlist = rte_kvargs_parse(params, NULL); + if (kvlist == NULL) + return 0; + + if (!rte_kvargs_count(kvlist, DISABLE_INTR_MODE)) { + rte_kvargs_free(kvlist); + return 0; + } + /* INTR MODE is disabled when there's key-value pair: disable_intr = 1*/ + if (rte_kvargs_process(kvlist, DISABLE_INTR_MODE, + flag_check_handler, NULL) < 0) { + rte_kvargs_free(kvlist); + return 0; + } + rte_kvargs_free(kvlist); + + return 1; +} + static int -dpaa_event_dev_create(const char *name) +dpaa_event_dev_create(const char *name, const char *params) { struct rte_eventdev *eventdev; struct dpaa_eventdev *priv; @@ -603,21 +786,30 @@ dpaa_event_dev_create(const char *name) sizeof(struct dpaa_eventdev), rte_socket_id()); if (eventdev == NULL) { - EVENTDEV_DRV_ERR("Failed to create eventdev vdev %s", name); + DPAA_EVENTDEV_ERR("Failed to create eventdev vdev %s", name); goto fail; } + priv = eventdev->data->dev_private; eventdev->dev_ops = &dpaa_eventdev_ops; eventdev->enqueue = dpaa_event_enqueue; eventdev->enqueue_burst = dpaa_event_enqueue_burst; - eventdev->dequeue = dpaa_event_dequeue; - eventdev->dequeue_burst = dpaa_event_dequeue_burst; + + if (dpaa_event_check_flags(params)) { + eventdev->dequeue = dpaa_event_dequeue; + eventdev->dequeue_burst = dpaa_event_dequeue_burst; + } else { + priv->intr_mode = 1; + eventdev->dev_ops->timeout_ticks = + dpaa_event_dequeue_timeout_ticks_intr; + eventdev->dequeue = dpaa_event_dequeue_intr; + eventdev->dequeue_burst = dpaa_event_dequeue_burst_intr; + } /* For secondary processes, the primary has done all the work */ if (rte_eal_process_type() != RTE_PROC_PRIMARY) return 0; - priv = eventdev->data->dev_private; priv->max_event_queues = DPAA_EVENT_MAX_QUEUES; return 0; @@ -629,11 +821,14 @@ static int dpaa_event_dev_probe(struct rte_vdev_device *vdev) { const char *name; + const char *params; name = rte_vdev_device_name(vdev); - EVENTDEV_DRV_LOG("Initializing %s", name); + DPAA_EVENTDEV_INFO("Initializing %s", name); + + params = rte_vdev_device_args(vdev); - return dpaa_event_dev_create(name); + return dpaa_event_dev_create(name, params); } static int @@ -642,7 +837,7 @@ dpaa_event_dev_remove(struct rte_vdev_device *vdev) const char *name; name = rte_vdev_device_name(vdev); - EVENTDEV_DRV_LOG("Closing %s", name); + DPAA_EVENTDEV_INFO("Closing %s", name); return rte_event_pmd_vdev_uninit(name); } @@ -653,3 +848,5 @@ static struct rte_vdev_driver vdev_eventdev_dpaa_pmd = { }; RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DPAA_PMD, vdev_eventdev_dpaa_pmd); +RTE_PMD_REGISTER_PARAM_STRING(EVENTDEV_NAME_DPAA_PMD, + DISABLE_INTR_MODE "=<int>"); diff --git a/drivers/event/dpaa/dpaa_eventdev.h b/drivers/event/dpaa/dpaa_eventdev.h index 583e46ca..8134e6ba 100644 --- a/drivers/event/dpaa/dpaa_eventdev.h +++ b/drivers/event/dpaa/dpaa_eventdev.h @@ -12,15 +12,8 @@ #define EVENTDEV_NAME_DPAA_PMD event_dpaa1 -#define EVENTDEV_DRV_LOG(fmt, args...) \ - DPAA_EVENTDEV_INFO(fmt, ## args) -#define EVENTDEV_DRV_FUNC_TRACE() \ - DPAA_EVENTDEV_DEBUG("%s() Called:\n", __func__) -#define EVENTDEV_DRV_ERR(fmt, args...) \ - DPAA_EVENTDEV_ERR("%s(): " fmt "\n", __func__, ## args) - -#define DPAA_EVENT_MAX_PORTS 8 -#define DPAA_EVENT_MAX_QUEUES 16 +#define DPAA_EVENT_MAX_PORTS 4 +#define DPAA_EVENT_MAX_QUEUES 8 #define DPAA_EVENT_MIN_DEQUEUE_TIMEOUT 1 #define DPAA_EVENT_MAX_DEQUEUE_TIMEOUT (UINT32_MAX - 1) #define DPAA_EVENT_MAX_QUEUE_FLOWS 2048 @@ -28,7 +21,7 @@ #define DPAA_EVENT_MAX_EVENT_PRIORITY_LEVELS 0 #define DPAA_EVENT_MAX_EVENT_PORT RTE_MIN(RTE_MAX_LCORE, INT8_MAX) #define DPAA_EVENT_MAX_PORT_DEQUEUE_DEPTH 8 -#define DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_NS 100UL +#define DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_NS 100000UL #define DPAA_EVENT_PORT_DEQUEUE_TIMEOUT_INVALID ((uint64_t)-1) #define DPAA_EVENT_MAX_PORT_ENQUEUE_DEPTH 1 #define DPAA_EVENT_MAX_NUM_EVENTS (INT32_MAX - 1) @@ -61,7 +54,7 @@ struct dpaa_port { struct dpaa_eventq evq_info[DPAA_EVENT_MAX_QUEUES]; uint8_t num_linked_evq; uint8_t is_port_linked; - uint64_t timeout; + uint64_t timeout_us; }; struct dpaa_eventdev { @@ -72,7 +65,7 @@ struct dpaa_eventdev { uint8_t max_event_queues; uint8_t nb_event_queues; uint8_t nb_event_ports; - uint8_t resvd; + uint8_t intr_mode; uint32_t nb_event_queue_flows; uint32_t nb_event_port_dequeue_depth; uint32_t nb_event_port_enqueue_depth; diff --git a/drivers/event/dpaa2/Makefile b/drivers/event/dpaa2/Makefile index 5e1a6320..e0134cc4 100644 --- a/drivers/event/dpaa2/Makefile +++ b/drivers/event/dpaa2/Makefile @@ -21,13 +21,19 @@ CFLAGS += -I$(RTE_SDK)/lib/librte_eal/linuxapp/eal LDLIBS += -lrte_eal -lrte_eventdev LDLIBS += -lrte_bus_fslmc -lrte_mempool_dpaa2 -lrte_pmd_dpaa2 LDLIBS += -lrte_bus_vdev +LDLIBS += -lrte_common_dpaax CFLAGS += -I$(RTE_SDK)/drivers/net/dpaa2 CFLAGS += -I$(RTE_SDK)/drivers/net/dpaa2/mc +ifeq ($(CONFIG_RTE_LIBRTE_SECURITY),y) +LDLIBS += -lrte_pmd_dpaa2_sec +CFLAGS += -I$(RTE_SDK)/drivers/crypto/dpaa2_sec +endif + # versioning export map EXPORT_MAP := rte_pmd_dpaa2_event_version.map -LIBABIVER := 1 +LIBABIVER := 2 # depends on fslmc bus which uses experimental API CFLAGS += -DALLOW_EXPERIMENTAL_API diff --git a/drivers/event/dpaa2/dpaa2_eventdev.c b/drivers/event/dpaa2/dpaa2_eventdev.c index ea1e5cc6..8d168b02 100644 --- a/drivers/event/dpaa2/dpaa2_eventdev.c +++ b/drivers/event/dpaa2/dpaa2_eventdev.c @@ -27,6 +27,7 @@ #include <rte_pci.h> #include <rte_bus_vdev.h> #include <rte_ethdev_driver.h> +#include <rte_cryptodev.h> #include <rte_event_eth_rx_adapter.h> #include <fslmc_vfio.h> @@ -34,6 +35,9 @@ #include <dpaa2_hw_mempool.h> #include <dpaa2_hw_dpio.h> #include <dpaa2_ethdev.h> +#ifdef RTE_LIBRTE_SECURITY +#include <dpaa2_sec_event.h> +#endif #include "dpaa2_eventdev.h" #include "dpaa2_eventdev_logs.h" #include <portal/dpaa2_hw_pvt.h> @@ -54,34 +58,63 @@ static uint16_t dpaa2_eventdev_enqueue_burst(void *port, const struct rte_event ev[], uint16_t nb_events) { - struct rte_eventdev *ev_dev = - ((struct dpaa2_io_portal_t *)port)->eventdev; - struct dpaa2_eventdev *priv = ev_dev->data->dev_private; + + struct dpaa2_port *dpaa2_portal = port; + struct dpaa2_dpio_dev *dpio_dev; uint32_t queue_id = ev[0].queue_id; - struct evq_info_t *evq_info = &priv->evq_info[queue_id]; + struct dpaa2_eventq *evq_info; uint32_t fqid; struct qbman_swp *swp; struct qbman_fd fd_arr[MAX_TX_RING_SLOTS]; uint32_t loop, frames_to_send; struct qbman_eq_desc eqdesc[MAX_TX_RING_SLOTS]; uint16_t num_tx = 0; - int ret; - - RTE_SET_USED(port); + int i, n, ret; + uint8_t channel_index; if (unlikely(!DPAA2_PER_LCORE_DPIO)) { + /* Affine current thread context to a qman portal */ ret = dpaa2_affine_qbman_swp(); - if (ret) { + if (ret < 0) { DPAA2_EVENTDEV_ERR("Failure in affining portal"); return 0; } } - + /* todo - dpaa2_portal shall have dpio_dev - no per thread variable */ + dpio_dev = DPAA2_PER_LCORE_DPIO; swp = DPAA2_PER_LCORE_PORTAL; + if (likely(dpaa2_portal->is_port_linked)) + goto skip_linking; + + /* Create mapping between portal and channel to receive packets */ + for (i = 0; i < DPAA2_EVENT_MAX_QUEUES; i++) { + evq_info = &dpaa2_portal->evq_info[i]; + if (!evq_info->event_port) + continue; + + ret = dpio_add_static_dequeue_channel(dpio_dev->dpio, + CMD_PRI_LOW, + dpio_dev->token, + evq_info->dpcon->dpcon_id, + &channel_index); + if (ret < 0) { + DPAA2_EVENTDEV_ERR( + "Static dequeue config failed: err(%d)", ret); + goto err; + } + + qbman_swp_push_set(swp, channel_index, 1); + evq_info->dpcon->channel_index = channel_index; + } + dpaa2_portal->is_port_linked = true; + +skip_linking: + evq_info = &dpaa2_portal->evq_info[queue_id]; + while (nb_events) { - frames_to_send = (nb_events >> 3) ? - MAX_TX_RING_SLOTS : nb_events; + frames_to_send = (nb_events > dpaa2_eqcr_size) ? + dpaa2_eqcr_size : nb_events; for (loop = 0; loop < frames_to_send; loop++) { const struct rte_event *event = &ev[num_tx + loop]; @@ -99,14 +132,14 @@ dpaa2_eventdev_enqueue_burst(void *port, const struct rte_event ev[], qbman_eq_desc_set_no_orp(&eqdesc[loop], 0); qbman_eq_desc_set_response(&eqdesc[loop], 0, 0); - if (event->mbuf->seqn) { + if (event->sched_type == RTE_SCHED_TYPE_ATOMIC + && event->mbuf->seqn) { uint8_t dqrr_index = event->mbuf->seqn - 1; qbman_eq_desc_set_dca(&eqdesc[loop], 1, dqrr_index, 0); DPAA2_PER_LCORE_DQRR_SIZE--; - DPAA2_PER_LCORE_DQRR_HELD &= - ~(1 << dqrr_index); + DPAA2_PER_LCORE_DQRR_HELD &= ~(1 << dqrr_index); } memset(&fd_arr[loop], 0, sizeof(struct qbman_fd)); @@ -116,7 +149,7 @@ dpaa2_eventdev_enqueue_burst(void *port, const struct rte_event ev[], * to avoid copy */ struct rte_event *ev_temp = rte_malloc(NULL, - sizeof(struct rte_event), 0); + sizeof(struct rte_event), 0); if (!ev_temp) { if (!loop) @@ -143,6 +176,18 @@ send_partial: } return num_tx; +err: + for (n = 0; n < i; n++) { + evq_info = &dpaa2_portal->evq_info[n]; + if (!evq_info->event_port) + continue; + qbman_swp_push_set(swp, evq_info->dpcon->channel_index, 0); + dpio_remove_static_dequeue_channel(dpio_dev->dpio, 0, + dpio_dev->token, + evq_info->dpcon->dpcon_id); + } + return 0; + } static uint16_t @@ -197,6 +242,7 @@ static void dpaa2_eventdev_process_atomic(struct qbman_swp *swp, ev->mbuf->seqn = dqrr_index + 1; DPAA2_PER_LCORE_DQRR_SIZE++; DPAA2_PER_LCORE_DQRR_HELD |= 1 << dqrr_index; + DPAA2_PER_LCORE_DQRR_MBUF(dqrr_index) = ev->mbuf; } static uint16_t @@ -204,22 +250,53 @@ dpaa2_eventdev_dequeue_burst(void *port, struct rte_event ev[], uint16_t nb_events, uint64_t timeout_ticks) { const struct qbman_result *dq; + struct dpaa2_dpio_dev *dpio_dev = NULL; + struct dpaa2_port *dpaa2_portal = port; + struct dpaa2_eventq *evq_info; struct qbman_swp *swp; const struct qbman_fd *fd; struct dpaa2_queue *rxq; - int num_pkts = 0, ret, i = 0; - - RTE_SET_USED(port); + int num_pkts = 0, ret, i = 0, n; + uint8_t channel_index; if (unlikely(!DPAA2_PER_LCORE_DPIO)) { + /* Affine current thread context to a qman portal */ ret = dpaa2_affine_qbman_swp(); - if (ret) { + if (ret < 0) { DPAA2_EVENTDEV_ERR("Failure in affining portal"); return 0; } } + + dpio_dev = DPAA2_PER_LCORE_DPIO; swp = DPAA2_PER_LCORE_PORTAL; + if (likely(dpaa2_portal->is_port_linked)) + goto skip_linking; + + /* Create mapping between portal and channel to receive packets */ + for (i = 0; i < DPAA2_EVENT_MAX_QUEUES; i++) { + evq_info = &dpaa2_portal->evq_info[i]; + if (!evq_info->event_port) + continue; + + ret = dpio_add_static_dequeue_channel(dpio_dev->dpio, + CMD_PRI_LOW, + dpio_dev->token, + evq_info->dpcon->dpcon_id, + &channel_index); + if (ret < 0) { + DPAA2_EVENTDEV_ERR( + "Static dequeue config failed: err(%d)", ret); + goto err; + } + + qbman_swp_push_set(swp, channel_index, 1); + evq_info->dpcon->channel_index = channel_index; + } + dpaa2_portal->is_port_linked = true; + +skip_linking: /* Check if there are atomic contexts to be released */ while (DPAA2_PER_LCORE_DQRR_SIZE) { if (DPAA2_PER_LCORE_DQRR_HELD & (1 << i)) { @@ -258,6 +335,18 @@ dpaa2_eventdev_dequeue_burst(void *port, struct rte_event ev[], } while (num_pkts < nb_events); return num_pkts; +err: + for (n = 0; n < i; n++) { + evq_info = &dpaa2_portal->evq_info[n]; + if (!evq_info->event_port) + continue; + + qbman_swp_push_set(swp, evq_info->dpcon->channel_index, 0); + dpio_remove_static_dequeue_channel(dpio_dev->dpio, 0, + dpio_dev->token, + evq_info->dpcon->dpcon_id); + } + return 0; } static uint16_t @@ -283,7 +372,7 @@ dpaa2_eventdev_info_get(struct rte_eventdev *dev, dev_info->max_dequeue_timeout_ns = DPAA2_EVENT_MAX_DEQUEUE_TIMEOUT; dev_info->dequeue_timeout_ns = - DPAA2_EVENT_MIN_DEQUEUE_TIMEOUT; + DPAA2_EVENT_PORT_DEQUEUE_TIMEOUT_NS; dev_info->max_event_queues = priv->max_event_queues; dev_info->max_event_queue_flows = DPAA2_EVENT_MAX_QUEUE_FLOWS; @@ -292,6 +381,9 @@ dpaa2_eventdev_info_get(struct rte_eventdev *dev, dev_info->max_event_priority_levels = DPAA2_EVENT_MAX_EVENT_PRIORITY_LEVELS; dev_info->max_event_ports = rte_fslmc_get_device_count(DPAA2_IO); + /* we only support dpio upto number of cores*/ + if (dev_info->max_event_ports > rte_lcore_count()) + dev_info->max_event_ports = rte_lcore_count(); dev_info->max_event_port_dequeue_depth = DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH; dev_info->max_event_port_enqueue_depth = @@ -313,7 +405,6 @@ dpaa2_eventdev_configure(const struct rte_eventdev *dev) EVENTDEV_INIT_FUNC_TRACE(); - priv->dequeue_timeout_ns = conf->dequeue_timeout_ns; priv->nb_event_queues = conf->nb_event_queues; priv->nb_event_ports = conf->nb_event_ports; priv->nb_event_queue_flows = conf->nb_event_queue_flows; @@ -321,6 +412,20 @@ dpaa2_eventdev_configure(const struct rte_eventdev *dev) priv->nb_event_port_enqueue_depth = conf->nb_event_port_enqueue_depth; priv->event_dev_cfg = conf->event_dev_cfg; + /* Check dequeue timeout method is per dequeue or global */ + if (priv->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT) { + /* + * Use timeout value as given in dequeue operation. + * So invalidating this timeout value. + */ + priv->dequeue_timeout_ns = 0; + + } else if (conf->dequeue_timeout_ns == 0) { + priv->dequeue_timeout_ns = DPAA2_EVENT_PORT_DEQUEUE_TIMEOUT_NS; + } else { + priv->dequeue_timeout_ns = conf->dequeue_timeout_ns; + } + DPAA2_EVENTDEV_DEBUG("Configured eventdev devid=%d", dev->data->dev_id); return 0; @@ -370,31 +475,39 @@ dpaa2_eventdev_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id, queue_conf->priority = RTE_EVENT_DEV_PRIORITY_NORMAL; } -static void -dpaa2_eventdev_queue_release(struct rte_eventdev *dev, uint8_t queue_id) -{ - EVENTDEV_INIT_FUNC_TRACE(); - - RTE_SET_USED(dev); - RTE_SET_USED(queue_id); -} - static int dpaa2_eventdev_queue_setup(struct rte_eventdev *dev, uint8_t queue_id, const struct rte_event_queue_conf *queue_conf) { struct dpaa2_eventdev *priv = dev->data->dev_private; - struct evq_info_t *evq_info = - &priv->evq_info[queue_id]; + struct dpaa2_eventq *evq_info = &priv->evq_info[queue_id]; EVENTDEV_INIT_FUNC_TRACE(); + switch (queue_conf->schedule_type) { + case RTE_SCHED_TYPE_PARALLEL: + case RTE_SCHED_TYPE_ATOMIC: + break; + case RTE_SCHED_TYPE_ORDERED: + DPAA2_EVENTDEV_ERR("Schedule type is not supported."); + return -1; + } evq_info->event_queue_cfg = queue_conf->event_queue_cfg; + evq_info->event_queue_id = queue_id; return 0; } static void +dpaa2_eventdev_queue_release(struct rte_eventdev *dev, uint8_t queue_id) +{ + EVENTDEV_INIT_FUNC_TRACE(); + + RTE_SET_USED(dev); + RTE_SET_USED(queue_id); +} + +static void dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, struct rte_event_port_conf *port_conf) { @@ -402,7 +515,6 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, RTE_SET_USED(dev); RTE_SET_USED(port_id); - RTE_SET_USED(port_conf); port_conf->new_event_threshold = DPAA2_EVENT_MAX_NUM_EVENTS; @@ -413,56 +525,44 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, port_conf->disable_implicit_release = 0; } -static void -dpaa2_eventdev_port_release(void *port) -{ - EVENTDEV_INIT_FUNC_TRACE(); - - RTE_SET_USED(port); -} - static int dpaa2_eventdev_port_setup(struct rte_eventdev *dev, uint8_t port_id, const struct rte_event_port_conf *port_conf) { + char event_port_name[32]; + struct dpaa2_port *portal; + EVENTDEV_INIT_FUNC_TRACE(); RTE_SET_USED(port_conf); - if (!dpaa2_io_portal[port_id].dpio_dev) { - dpaa2_io_portal[port_id].dpio_dev = - dpaa2_get_qbman_swp(port_id); - rte_atomic16_inc(&dpaa2_io_portal[port_id].dpio_dev->ref_count); - if (!dpaa2_io_portal[port_id].dpio_dev) - return -1; + sprintf(event_port_name, "event-port-%d", port_id); + portal = rte_malloc(event_port_name, sizeof(struct dpaa2_port), 0); + if (!portal) { + DPAA2_EVENTDEV_ERR("Memory allocation failure"); + return -ENOMEM; } - dpaa2_io_portal[port_id].eventdev = dev; - dev->data->ports[port_id] = &dpaa2_io_portal[port_id]; + memset(portal, 0, sizeof(struct dpaa2_port)); + dev->data->ports[port_id] = portal; return 0; } -static int -dpaa2_eventdev_port_unlink(struct rte_eventdev *dev, void *port, - uint8_t queues[], uint16_t nb_unlinks) +static void +dpaa2_eventdev_port_release(void *port) { - struct dpaa2_eventdev *priv = dev->data->dev_private; - struct dpaa2_io_portal_t *dpaa2_portal = port; - struct evq_info_t *evq_info; - int i; + struct dpaa2_port *portal = port; EVENTDEV_INIT_FUNC_TRACE(); - for (i = 0; i < nb_unlinks; i++) { - evq_info = &priv->evq_info[queues[i]]; - qbman_swp_push_set(dpaa2_portal->dpio_dev->sw_portal, - evq_info->dpcon->channel_index, 0); - dpio_remove_static_dequeue_channel(dpaa2_portal->dpio_dev->dpio, - 0, dpaa2_portal->dpio_dev->token, - evq_info->dpcon->dpcon_id); - } + /* TODO: Cleanup is required when ports are in linked state. */ + if (portal->is_port_linked) + DPAA2_EVENTDEV_WARN("Event port must be unlinked before release"); - return (int)nb_unlinks; + if (portal) + rte_free(portal); + + portal = NULL; } static int @@ -471,51 +571,71 @@ dpaa2_eventdev_port_link(struct rte_eventdev *dev, void *port, uint16_t nb_links) { struct dpaa2_eventdev *priv = dev->data->dev_private; - struct dpaa2_io_portal_t *dpaa2_portal = port; - struct evq_info_t *evq_info; - uint8_t channel_index; - int ret, i, n; + struct dpaa2_port *dpaa2_portal = port; + struct dpaa2_eventq *evq_info; + uint16_t i; EVENTDEV_INIT_FUNC_TRACE(); + RTE_SET_USED(priorities); + for (i = 0; i < nb_links; i++) { evq_info = &priv->evq_info[queues[i]]; + memcpy(&dpaa2_portal->evq_info[queues[i]], evq_info, + sizeof(struct dpaa2_eventq)); + dpaa2_portal->evq_info[queues[i]].event_port = port; + dpaa2_portal->num_linked_evq++; + } - ret = dpio_add_static_dequeue_channel( - dpaa2_portal->dpio_dev->dpio, - CMD_PRI_LOW, dpaa2_portal->dpio_dev->token, - evq_info->dpcon->dpcon_id, &channel_index); - if (ret < 0) { - DPAA2_EVENTDEV_ERR( - "Static dequeue config failed: err(%d)", ret); - goto err; - } + return (int)nb_links; +} - qbman_swp_push_set(dpaa2_portal->dpio_dev->sw_portal, - channel_index, 1); - evq_info->dpcon->channel_index = channel_index; - } +static int +dpaa2_eventdev_port_unlink(struct rte_eventdev *dev, void *port, + uint8_t queues[], uint16_t nb_unlinks) +{ + struct dpaa2_port *dpaa2_portal = port; + int i; + struct dpaa2_dpio_dev *dpio_dev = NULL; + struct dpaa2_eventq *evq_info; + struct qbman_swp *swp; - RTE_SET_USED(priorities); + EVENTDEV_INIT_FUNC_TRACE(); - return (int)nb_links; -err: - for (n = 0; n < i; n++) { - evq_info = &priv->evq_info[queues[n]]; - qbman_swp_push_set(dpaa2_portal->dpio_dev->sw_portal, - evq_info->dpcon->channel_index, 0); - dpio_remove_static_dequeue_channel(dpaa2_portal->dpio_dev->dpio, - 0, dpaa2_portal->dpio_dev->token, - evq_info->dpcon->dpcon_id); + RTE_SET_USED(dev); + RTE_SET_USED(queues); + + for (i = 0; i < nb_unlinks; i++) { + evq_info = &dpaa2_portal->evq_info[queues[i]]; + + if (DPAA2_PER_LCORE_DPIO && evq_info->dpcon) { + /* todo dpaa2_portal shall have dpio_dev-no per lcore*/ + dpio_dev = DPAA2_PER_LCORE_DPIO; + swp = DPAA2_PER_LCORE_PORTAL; + + qbman_swp_push_set(swp, + evq_info->dpcon->channel_index, 0); + dpio_remove_static_dequeue_channel(dpio_dev->dpio, 0, + dpio_dev->token, + evq_info->dpcon->dpcon_id); + } + memset(evq_info, 0, sizeof(struct dpaa2_eventq)); + if (dpaa2_portal->num_linked_evq) + dpaa2_portal->num_linked_evq--; } - return ret; + + if (!dpaa2_portal->num_linked_evq) + dpaa2_portal->is_port_linked = false; + + return (int)nb_unlinks; } + static int dpaa2_eventdev_timeout_ticks(struct rte_eventdev *dev, uint64_t ns, uint64_t *timeout_ticks) { - uint32_t scale = 1; + uint32_t scale = 1000*1000; EVENTDEV_INIT_FUNC_TRACE(); @@ -677,6 +797,151 @@ dpaa2_eventdev_eth_stop(const struct rte_eventdev *dev, return 0; } +#ifdef RTE_LIBRTE_SECURITY +static int +dpaa2_eventdev_crypto_caps_get(const struct rte_eventdev *dev, + const struct rte_cryptodev *cdev, + uint32_t *caps) +{ + const char *name = cdev->data->name; + + EVENTDEV_INIT_FUNC_TRACE(); + + RTE_SET_USED(dev); + + if (!strncmp(name, "dpsec-", 6)) + *caps = RTE_EVENT_CRYPTO_ADAPTER_DPAA2_CAP; + else + return -1; + + return 0; +} + +static int +dpaa2_eventdev_crypto_queue_add_all(const struct rte_eventdev *dev, + const struct rte_cryptodev *cryptodev, + const struct rte_event *ev) +{ + struct dpaa2_eventdev *priv = dev->data->dev_private; + uint8_t ev_qid = ev->queue_id; + uint16_t dpcon_id = priv->evq_info[ev_qid].dpcon->dpcon_id; + int i, ret; + + EVENTDEV_INIT_FUNC_TRACE(); + + for (i = 0; i < cryptodev->data->nb_queue_pairs; i++) { + ret = dpaa2_sec_eventq_attach(cryptodev, i, + dpcon_id, ev); + if (ret) { + DPAA2_EVENTDEV_ERR("dpaa2_sec_eventq_attach failed: ret %d\n", + ret); + goto fail; + } + } + return 0; +fail: + for (i = (i - 1); i >= 0 ; i--) + dpaa2_sec_eventq_detach(cryptodev, i); + + return ret; +} + +static int +dpaa2_eventdev_crypto_queue_add(const struct rte_eventdev *dev, + const struct rte_cryptodev *cryptodev, + int32_t rx_queue_id, + const struct rte_event *ev) +{ + struct dpaa2_eventdev *priv = dev->data->dev_private; + uint8_t ev_qid = ev->queue_id; + uint16_t dpcon_id = priv->evq_info[ev_qid].dpcon->dpcon_id; + int ret; + + EVENTDEV_INIT_FUNC_TRACE(); + + if (rx_queue_id == -1) + return dpaa2_eventdev_crypto_queue_add_all(dev, + cryptodev, ev); + + ret = dpaa2_sec_eventq_attach(cryptodev, rx_queue_id, + dpcon_id, ev); + if (ret) { + DPAA2_EVENTDEV_ERR( + "dpaa2_sec_eventq_attach failed: ret: %d\n", ret); + return ret; + } + return 0; +} + +static int +dpaa2_eventdev_crypto_queue_del_all(const struct rte_eventdev *dev, + const struct rte_cryptodev *cdev) +{ + int i, ret; + + EVENTDEV_INIT_FUNC_TRACE(); + + RTE_SET_USED(dev); + + for (i = 0; i < cdev->data->nb_queue_pairs; i++) { + ret = dpaa2_sec_eventq_detach(cdev, i); + if (ret) { + DPAA2_EVENTDEV_ERR( + "dpaa2_sec_eventq_detach failed:ret %d\n", ret); + return ret; + } + } + + return 0; +} + +static int +dpaa2_eventdev_crypto_queue_del(const struct rte_eventdev *dev, + const struct rte_cryptodev *cryptodev, + int32_t rx_queue_id) +{ + int ret; + + EVENTDEV_INIT_FUNC_TRACE(); + + if (rx_queue_id == -1) + return dpaa2_eventdev_crypto_queue_del_all(dev, cryptodev); + + ret = dpaa2_sec_eventq_detach(cryptodev, rx_queue_id); + if (ret) { + DPAA2_EVENTDEV_ERR( + "dpaa2_sec_eventq_detach failed: ret: %d\n", ret); + return ret; + } + + return 0; +} + +static int +dpaa2_eventdev_crypto_start(const struct rte_eventdev *dev, + const struct rte_cryptodev *cryptodev) +{ + EVENTDEV_INIT_FUNC_TRACE(); + + RTE_SET_USED(dev); + RTE_SET_USED(cryptodev); + + return 0; +} + +static int +dpaa2_eventdev_crypto_stop(const struct rte_eventdev *dev, + const struct rte_cryptodev *cryptodev) +{ + EVENTDEV_INIT_FUNC_TRACE(); + + RTE_SET_USED(dev); + RTE_SET_USED(cryptodev); + + return 0; +} +#endif + static struct rte_eventdev_ops dpaa2_eventdev_ops = { .dev_infos_get = dpaa2_eventdev_info_get, .dev_configure = dpaa2_eventdev_configure, @@ -698,6 +963,13 @@ static struct rte_eventdev_ops dpaa2_eventdev_ops = { .eth_rx_adapter_queue_del = dpaa2_eventdev_eth_queue_del, .eth_rx_adapter_start = dpaa2_eventdev_eth_start, .eth_rx_adapter_stop = dpaa2_eventdev_eth_stop, +#ifdef RTE_LIBRTE_SECURITY + .crypto_adapter_caps_get = dpaa2_eventdev_crypto_caps_get, + .crypto_adapter_queue_pair_add = dpaa2_eventdev_crypto_queue_add, + .crypto_adapter_queue_pair_del = dpaa2_eventdev_crypto_queue_del, + .crypto_adapter_start = dpaa2_eventdev_crypto_start, + .crypto_adapter_stop = dpaa2_eventdev_crypto_stop, +#endif }; static int @@ -789,6 +1061,8 @@ dpaa2_eventdev_create(const char *name) priv->max_event_queues++; } while (dpcon_dev && dpci_dev); + RTE_LOG(INFO, PMD, "%s eventdev created\n", name); + return 0; fail: return -EFAULT; diff --git a/drivers/event/dpaa2/dpaa2_eventdev.h b/drivers/event/dpaa2/dpaa2_eventdev.h index 229f66af..c847b3ea 100644 --- a/drivers/event/dpaa2/dpaa2_eventdev.h +++ b/drivers/event/dpaa2/dpaa2_eventdev.h @@ -21,6 +21,7 @@ #define DPAA2_EVENT_MAX_QUEUES 16 #define DPAA2_EVENT_MIN_DEQUEUE_TIMEOUT 1 #define DPAA2_EVENT_MAX_DEQUEUE_TIMEOUT (UINT32_MAX - 1) +#define DPAA2_EVENT_PORT_DEQUEUE_TIMEOUT_NS 100UL #define DPAA2_EVENT_MAX_QUEUE_FLOWS 2048 #define DPAA2_EVENT_MAX_QUEUE_PRIORITY_LEVELS 8 #define DPAA2_EVENT_MAX_EVENT_PRIORITY_LEVELS 0 @@ -41,6 +42,15 @@ enum { (RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT | \ RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ | \ RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) + +/**< Crypto Rx adapter cap to return If the packet transfers from + * the cryptodev to eventdev with DPAA2 devices. + */ +#define RTE_EVENT_CRYPTO_ADAPTER_DPAA2_CAP \ + (RTE_EVENT_CRYPTO_ADAPTER_CAP_INTERNAL_PORT_OP_NEW | \ + RTE_EVENT_CRYPTO_ADAPTER_CAP_INTERNAL_PORT_QP_EV_BIND | \ + RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA) + /**< Ethernet Rx adapter cap to return If the packet transfers from * the ethdev to eventdev with DPAA2 devices. */ @@ -56,17 +66,27 @@ struct dpaa2_dpcon_dev { uint8_t channel_index; }; -struct evq_info_t { +struct dpaa2_eventq { /* DPcon device */ struct dpaa2_dpcon_dev *dpcon; /* Attached DPCI device */ struct dpaa2_dpci_dev *dpci; + /* Mapped event port */ + struct dpaa2_io_portal_t *event_port; /* Configuration provided by the user */ uint32_t event_queue_cfg; + uint32_t event_queue_id; +}; + +struct dpaa2_port { + struct dpaa2_eventq evq_info[DPAA2_EVENT_MAX_QUEUES]; + uint8_t num_linked_evq; + uint8_t is_port_linked; + uint64_t timeout_us; }; struct dpaa2_eventdev { - struct evq_info_t evq_info[DPAA2_EVENT_MAX_QUEUES]; + struct dpaa2_eventq evq_info[DPAA2_EVENT_MAX_QUEUES]; uint32_t dequeue_timeout_ns; uint8_t max_event_queues; uint8_t nb_event_queues; diff --git a/drivers/event/dpaa2/meson.build b/drivers/event/dpaa2/meson.build index de7a4615..a0db6fc2 100644 --- a/drivers/event/dpaa2/meson.build +++ b/drivers/event/dpaa2/meson.build @@ -1,11 +1,14 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright 2018 NXP +version = 2 + if host_machine.system() != 'linux' build = false endif -deps += ['bus_vdev', 'pmd_dpaa2'] +deps += ['bus_vdev', 'pmd_dpaa2', 'pmd_dpaa2_sec'] sources = files('dpaa2_hw_dpcon.c', 'dpaa2_eventdev.c') allow_experimental_apis = true +includes += include_directories('../../crypto/dpaa2_sec/') diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile new file mode 100644 index 00000000..490ed0b9 --- /dev/null +++ b/drivers/event/dsw/Makefile @@ -0,0 +1,29 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2018 Ericsson AB + +include $(RTE_SDK)/mk/rte.vars.mk + +LIB = librte_pmd_dsw_event.a + +CFLAGS += -DALLOW_EXPERIMENTAL_API +CFLAGS += -O3 +CFLAGS += $(WERROR_FLAGS) +ifneq ($(CONFIG_RTE_TOOLCHAIN_ICC),y) +CFLAGS += -Wno-format-nonliteral +endif + +LDLIBS += -lrte_eal +LDLIBS += -lrte_mbuf +LDLIBS += -lrte_mempool +LDLIBS += -lrte_ring +LDLIBS += -lrte_eventdev +LDLIBS += -lrte_bus_vdev + +LIBABIVER := 1 + +EXPORT_MAP := rte_pmd_dsw_event_version.map + +SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += \ + dsw_evdev.c dsw_event.c dsw_xstats.c + +include $(RTE_SDK)/mk/rte.lib.mk diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c new file mode 100644 index 00000000..33ba1364 --- /dev/null +++ b/drivers/event/dsw/dsw_evdev.c @@ -0,0 +1,435 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2018 Ericsson AB + */ + +#include <stdbool.h> + +#include <rte_cycles.h> +#include <rte_eventdev_pmd.h> +#include <rte_eventdev_pmd_vdev.h> +#include <rte_random.h> + +#include "dsw_evdev.h" + +#define EVENTDEV_NAME_DSW_PMD event_dsw + +static int +dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id, + const struct rte_event_port_conf *conf) +{ + struct dsw_evdev *dsw = dsw_pmd_priv(dev); + struct dsw_port *port; + struct rte_event_ring *in_ring; + struct rte_ring *ctl_in_ring; + char ring_name[RTE_RING_NAMESIZE]; + + port = &dsw->ports[port_id]; + + *port = (struct dsw_port) { + .id = port_id, + .dsw = dsw, + .dequeue_depth = conf->dequeue_depth, + .enqueue_depth = conf->enqueue_depth, + .new_event_threshold = conf->new_event_threshold + }; + + snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id, + port_id); + + in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE, + dev->data->socket_id, + RING_F_SC_DEQ|RING_F_EXACT_SZ); + + if (in_ring == NULL) + return -ENOMEM; + + snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u", + dev->data->dev_id, port_id); + + ctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE, + dev->data->socket_id, + RING_F_SC_DEQ|RING_F_EXACT_SZ); + + if (ctl_in_ring == NULL) { + rte_event_ring_free(in_ring); + return -ENOMEM; + } + + port->in_ring = in_ring; + port->ctl_in_ring = ctl_in_ring; + + rte_atomic16_init(&port->load); + + port->load_update_interval = + (DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S; + + port->migration_interval = + (DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S; + + dev->data->ports[port_id] = port; + + return 0; +} + +static void +dsw_port_def_conf(struct rte_eventdev *dev __rte_unused, + uint8_t port_id __rte_unused, + struct rte_event_port_conf *port_conf) +{ + *port_conf = (struct rte_event_port_conf) { + .new_event_threshold = 1024, + .dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4, + .enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4 + }; +} + +static void +dsw_port_release(void *p) +{ + struct dsw_port *port = p; + + rte_event_ring_free(port->in_ring); + rte_ring_free(port->ctl_in_ring); +} + +static int +dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id, + const struct rte_event_queue_conf *conf) +{ + struct dsw_evdev *dsw = dsw_pmd_priv(dev); + struct dsw_queue *queue = &dsw->queues[queue_id]; + + if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg) + return -ENOTSUP; + + if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED) + return -ENOTSUP; + + /* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it + * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since + * the queue will only have a single serving port, no + * migration will ever happen, so the extra TYPE_ATOMIC + * migration overhead is avoided. + */ + if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg) + queue->schedule_type = RTE_SCHED_TYPE_ATOMIC; + else /* atomic or parallel */ + queue->schedule_type = conf->schedule_type; + + queue->num_serving_ports = 0; + + return 0; +} + +static void +dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused, + uint8_t queue_id __rte_unused, + struct rte_event_queue_conf *queue_conf) +{ + *queue_conf = (struct rte_event_queue_conf) { + .nb_atomic_flows = 4096, + .schedule_type = RTE_SCHED_TYPE_ATOMIC, + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL + }; +} + +static void +dsw_queue_release(struct rte_eventdev *dev __rte_unused, + uint8_t queue_id __rte_unused) +{ +} + +static void +queue_add_port(struct dsw_queue *queue, uint16_t port_id) +{ + queue->serving_ports[queue->num_serving_ports] = port_id; + queue->num_serving_ports++; +} + +static bool +queue_remove_port(struct dsw_queue *queue, uint16_t port_id) +{ + uint16_t i; + + for (i = 0; i < queue->num_serving_ports; i++) + if (queue->serving_ports[i] == port_id) { + uint16_t last_idx = queue->num_serving_ports - 1; + if (i != last_idx) + queue->serving_ports[i] = + queue->serving_ports[last_idx]; + queue->num_serving_ports--; + return true; + } + return false; +} + +static int +dsw_port_link_unlink(struct rte_eventdev *dev, void *port, + const uint8_t queues[], uint16_t num, bool link) +{ + struct dsw_evdev *dsw = dsw_pmd_priv(dev); + struct dsw_port *p = port; + uint16_t i; + uint16_t count = 0; + + for (i = 0; i < num; i++) { + uint8_t qid = queues[i]; + struct dsw_queue *q = &dsw->queues[qid]; + if (link) { + queue_add_port(q, p->id); + count++; + } else { + bool removed = queue_remove_port(q, p->id); + if (removed) + count++; + } + } + + return count; +} + +static int +dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[], + const uint8_t priorities[] __rte_unused, uint16_t num) +{ + return dsw_port_link_unlink(dev, port, queues, num, true); +} + +static int +dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[], + uint16_t num) +{ + return dsw_port_link_unlink(dev, port, queues, num, false); +} + +static void +dsw_info_get(struct rte_eventdev *dev __rte_unused, + struct rte_event_dev_info *info) +{ + *info = (struct rte_event_dev_info) { + .driver_name = DSW_PMD_NAME, + .max_event_queues = DSW_MAX_QUEUES, + .max_event_queue_flows = DSW_MAX_FLOWS, + .max_event_queue_priority_levels = 1, + .max_event_priority_levels = 1, + .max_event_ports = DSW_MAX_PORTS, + .max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH, + .max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH, + .max_num_events = DSW_MAX_EVENTS, + .event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE| + RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED + }; +} + +static int +dsw_configure(const struct rte_eventdev *dev) +{ + struct dsw_evdev *dsw = dsw_pmd_priv(dev); + const struct rte_event_dev_config *conf = &dev->data->dev_conf; + int32_t min_max_in_flight; + + dsw->num_ports = conf->nb_event_ports; + dsw->num_queues = conf->nb_event_queues; + + /* Avoid a situation where consumer ports are holding all the + * credits, without making use of them. + */ + min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS; + + dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight); + + return 0; +} + + +static void +initial_flow_to_port_assignment(struct dsw_evdev *dsw) +{ + uint8_t queue_id; + for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) { + struct dsw_queue *queue = &dsw->queues[queue_id]; + uint16_t flow_hash; + for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) { + uint8_t port_idx = + rte_rand() % queue->num_serving_ports; + uint8_t port_id = + queue->serving_ports[port_idx]; + dsw->queues[queue_id].flow_to_port_map[flow_hash] = + port_id; + } + } +} + +static int +dsw_start(struct rte_eventdev *dev) +{ + struct dsw_evdev *dsw = dsw_pmd_priv(dev); + uint16_t i; + uint64_t now; + + rte_atomic32_init(&dsw->credits_on_loan); + + initial_flow_to_port_assignment(dsw); + + now = rte_get_timer_cycles(); + for (i = 0; i < dsw->num_ports; i++) { + dsw->ports[i].measurement_start = now; + dsw->ports[i].busy_start = now; + } + + return 0; +} + +static void +dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len, + eventdev_stop_flush_t flush, void *flush_arg) +{ + uint16_t i; + + for (i = 0; i < buf_len; i++) + flush(dev_id, buf[i], flush_arg); +} + +static void +dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port, + eventdev_stop_flush_t flush, void *flush_arg) +{ + dsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len, + flush, flush_arg); +} + +static void +dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port, + eventdev_stop_flush_t flush, void *flush_arg) +{ + uint16_t dport_id; + + for (dport_id = 0; dport_id < dsw->num_ports; dport_id++) + if (dport_id != port->id) + dsw_port_drain_buf(dev_id, port->out_buffer[dport_id], + port->out_buffer_len[dport_id], + flush, flush_arg); +} + +static void +dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port, + eventdev_stop_flush_t flush, void *flush_arg) +{ + struct rte_event ev; + + while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL)) + flush(dev_id, ev, flush_arg); +} + +static void +dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw, + eventdev_stop_flush_t flush, void *flush_arg) +{ + uint16_t port_id; + + if (flush == NULL) + return; + + for (port_id = 0; port_id < dsw->num_ports; port_id++) { + struct dsw_port *port = &dsw->ports[port_id]; + + dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg); + dsw_port_drain_paused(dev_id, port, flush, flush_arg); + dsw_port_drain_in_ring(dev_id, port, flush, flush_arg); + } +} + +static void +dsw_stop(struct rte_eventdev *dev) +{ + struct dsw_evdev *dsw = dsw_pmd_priv(dev); + uint8_t dev_id; + eventdev_stop_flush_t flush; + void *flush_arg; + + dev_id = dev->data->dev_id; + flush = dev->dev_ops->dev_stop_flush; + flush_arg = dev->data->dev_stop_flush_arg; + + dsw_drain(dev_id, dsw, flush, flush_arg); +} + +static int +dsw_close(struct rte_eventdev *dev) +{ + struct dsw_evdev *dsw = dsw_pmd_priv(dev); + + dsw->num_ports = 0; + dsw->num_queues = 0; + + return 0; +} + +static struct rte_eventdev_ops dsw_evdev_ops = { + .port_setup = dsw_port_setup, + .port_def_conf = dsw_port_def_conf, + .port_release = dsw_port_release, + .queue_setup = dsw_queue_setup, + .queue_def_conf = dsw_queue_def_conf, + .queue_release = dsw_queue_release, + .port_link = dsw_port_link, + .port_unlink = dsw_port_unlink, + .dev_infos_get = dsw_info_get, + .dev_configure = dsw_configure, + .dev_start = dsw_start, + .dev_stop = dsw_stop, + .dev_close = dsw_close, + .xstats_get = dsw_xstats_get, + .xstats_get_names = dsw_xstats_get_names, + .xstats_get_by_name = dsw_xstats_get_by_name +}; + +static int +dsw_probe(struct rte_vdev_device *vdev) +{ + const char *name; + struct rte_eventdev *dev; + struct dsw_evdev *dsw; + + name = rte_vdev_device_name(vdev); + + dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev), + rte_socket_id()); + if (dev == NULL) + return -EFAULT; + + dev->dev_ops = &dsw_evdev_ops; + dev->enqueue = dsw_event_enqueue; + dev->enqueue_burst = dsw_event_enqueue_burst; + dev->enqueue_new_burst = dsw_event_enqueue_new_burst; + dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst; + dev->dequeue = dsw_event_dequeue; + dev->dequeue_burst = dsw_event_dequeue_burst; + + if (rte_eal_process_type() != RTE_PROC_PRIMARY) + return 0; + + dsw = dev->data->dev_private; + dsw->data = dev->data; + + return 0; +} + +static int +dsw_remove(struct rte_vdev_device *vdev) +{ + const char *name; + + name = rte_vdev_device_name(vdev); + if (name == NULL) + return -EINVAL; + + return rte_event_pmd_vdev_uninit(name); +} + +static struct rte_vdev_driver evdev_dsw_pmd_drv = { + .probe = dsw_probe, + .remove = dsw_remove +}; + +RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv); diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h new file mode 100644 index 00000000..dc28ab12 --- /dev/null +++ b/drivers/event/dsw/dsw_evdev.h @@ -0,0 +1,279 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2018 Ericsson AB + */ + +#ifndef _DSW_EVDEV_H_ +#define _DSW_EVDEV_H_ + +#include <rte_event_ring.h> +#include <rte_eventdev.h> + +#define DSW_PMD_NAME RTE_STR(event_dsw) + +/* Code changes are required to allow more ports. */ +#define DSW_MAX_PORTS (64) +#define DSW_MAX_PORT_DEQUEUE_DEPTH (128) +#define DSW_MAX_PORT_ENQUEUE_DEPTH (128) +#define DSW_MAX_PORT_OUT_BUFFER (32) + +#define DSW_MAX_QUEUES (16) + +#define DSW_MAX_EVENTS (16384) + +/* Code changes are required to allow more flows than 32k. */ +#define DSW_MAX_FLOWS_BITS (15) +#define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS)) +#define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1) + +/* Eventdev RTE_SCHED_TYPE_PARALLEL doesn't have a concept of flows, + * but the 'dsw' scheduler (more or less) randomly assign flow id to + * events on parallel queues, to be able to reuse some of the + * migration mechanism and scheduling logic from + * RTE_SCHED_TYPE_ATOMIC. By moving one of the parallel "flows" from a + * particular port, the likely-hood of events being scheduled to this + * port is reduced, and thus a kind of statistical load balancing is + * achieved. + */ +#define DSW_PARALLEL_FLOWS (1024) + +/* 'Background tasks' are polling the control rings for * + * migration-related messages, or flush the output buffer (so + * buffered events doesn't linger too long). Shouldn't be too low, + * since the system won't benefit from the 'batching' effects from + * the output buffer, and shouldn't be too high, since it will make + * buffered events linger too long in case the port goes idle. + */ +#define DSW_MAX_PORT_OPS_PER_BG_TASK (128) + +/* Avoid making small 'loans' from the central in-flight event credit + * pool, to improve efficiency. + */ +#define DSW_MIN_CREDIT_LOAN (64) +#define DSW_PORT_MAX_CREDITS (2*DSW_MIN_CREDIT_LOAN) +#define DSW_PORT_MIN_CREDITS (DSW_MIN_CREDIT_LOAN) + +/* The rings are dimensioned so that all in-flight events can reside + * on any one of the port rings, to avoid the trouble of having to + * care about the case where there's no room on the destination port's + * input ring. + */ +#define DSW_IN_RING_SIZE (DSW_MAX_EVENTS) + +#define DSW_MAX_LOAD (INT16_MAX) +#define DSW_LOAD_FROM_PERCENT(x) ((int16_t)(((x)*DSW_MAX_LOAD)/100)) +#define DSW_LOAD_TO_PERCENT(x) ((100*x)/DSW_MAX_LOAD) + +/* The thought behind keeping the load update interval shorter than + * the migration interval is that the load from newly migrated flows + * should 'show up' on the load measurement before new migrations are + * considered. This is to avoid having too many flows, from too many + * source ports, to be migrated too quickly to a lightly loaded port - + * in particular since this might cause the system to oscillate. + */ +#define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4) +#define DSW_OLD_LOAD_WEIGHT (1) + +/* The minimum time (in us) between two flow migrations. What puts an + * upper limit on the actual migration rate is primarily the pace in + * which the ports send and receive control messages, which in turn is + * largely a function of how much cycles are spent the processing of + * an event burst. + */ +#define DSW_MIGRATION_INTERVAL (1000) +#define DSW_MIN_SOURCE_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(70)) +#define DSW_MAX_TARGET_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(95)) + +#define DSW_MAX_EVENTS_RECORDED (128) + +/* Only one outstanding migration per port is allowed */ +#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS) + +/* Enough room for paus request/confirm and unpaus request/confirm for + * all possible senders. + */ +#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4) + +/* With DSW_SORT_DEQUEUED enabled, the scheduler will, at the point of + * dequeue(), arrange events so that events with the same flow id on + * the same queue forms a back-to-back "burst", and also so that such + * bursts of different flow ids, but on the same queue, also come + * consecutively. All this in an attempt to improve data and + * instruction cache usage for the application, at the cost of a + * scheduler overhead increase. + */ + +/* #define DSW_SORT_DEQUEUED */ + +struct dsw_queue_flow { + uint8_t queue_id; + uint16_t flow_hash; +}; + +enum dsw_migration_state { + DSW_MIGRATION_STATE_IDLE, + DSW_MIGRATION_STATE_PAUSING, + DSW_MIGRATION_STATE_FORWARDING, + DSW_MIGRATION_STATE_UNPAUSING +}; + +struct dsw_port { + uint16_t id; + + /* Keeping a pointer here to avoid container_of() calls, which + * are expensive since they are very frequent and will result + * in an integer multiplication (since the port id is an index + * into the dsw_evdev port array). + */ + struct dsw_evdev *dsw; + + uint16_t dequeue_depth; + uint16_t enqueue_depth; + + int32_t inflight_credits; + + int32_t new_event_threshold; + + uint16_t pending_releases; + + uint16_t next_parallel_flow_id; + + uint16_t ops_since_bg_task; + + /* most recent 'background' processing */ + uint64_t last_bg; + + /* For port load measurement. */ + uint64_t next_load_update; + uint64_t load_update_interval; + uint64_t measurement_start; + uint64_t busy_start; + uint64_t busy_cycles; + uint64_t total_busy_cycles; + + /* For the ctl interface and flow migration mechanism. */ + uint64_t next_migration; + uint64_t migration_interval; + enum dsw_migration_state migration_state; + + uint64_t migration_start; + uint64_t migrations; + uint64_t migration_latency; + + uint8_t migration_target_port_id; + struct dsw_queue_flow migration_target_qf; + uint8_t cfm_cnt; + + uint16_t paused_flows_len; + struct dsw_queue_flow paused_flows[DSW_MAX_PAUSED_FLOWS]; + + /* In a very contrived worst case all inflight events can be + * laying around paused here. + */ + uint16_t paused_events_len; + struct rte_event paused_events[DSW_MAX_EVENTS]; + + uint16_t seen_events_len; + uint16_t seen_events_idx; + struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED]; + + uint64_t new_enqueued; + uint64_t forward_enqueued; + uint64_t release_enqueued; + uint64_t queue_enqueued[DSW_MAX_QUEUES]; + + uint64_t dequeued; + uint64_t queue_dequeued[DSW_MAX_QUEUES]; + + uint16_t out_buffer_len[DSW_MAX_PORTS]; + struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER]; + + uint16_t in_buffer_len; + uint16_t in_buffer_start; + /* This buffer may contain events that were read up from the + * in_ring during the flow migration process. + */ + struct rte_event in_buffer[DSW_MAX_EVENTS]; + + struct rte_event_ring *in_ring __rte_cache_aligned; + + struct rte_ring *ctl_in_ring __rte_cache_aligned; + + /* Estimate of current port load. */ + rte_atomic16_t load __rte_cache_aligned; +} __rte_cache_aligned; + +struct dsw_queue { + uint8_t schedule_type; + uint8_t serving_ports[DSW_MAX_PORTS]; + uint16_t num_serving_ports; + + uint8_t flow_to_port_map[DSW_MAX_FLOWS] __rte_cache_aligned; +}; + +struct dsw_evdev { + struct rte_eventdev_data *data; + + struct dsw_port ports[DSW_MAX_PORTS]; + uint16_t num_ports; + struct dsw_queue queues[DSW_MAX_QUEUES]; + uint8_t num_queues; + int32_t max_inflight; + + rte_atomic32_t credits_on_loan __rte_cache_aligned; +}; + +#define DSW_CTL_PAUS_REQ (0) +#define DSW_CTL_UNPAUS_REQ (1) +#define DSW_CTL_CFM (2) + +/* sizeof(struct dsw_ctl_msg) must be equal or less than + * sizeof(void *), to fit on the control ring. + */ +struct dsw_ctl_msg { + uint8_t type:2; + uint8_t originating_port_id:6; + uint8_t queue_id; + uint16_t flow_hash; +} __rte_packed; + +uint16_t dsw_event_enqueue(void *port, const struct rte_event *event); +uint16_t dsw_event_enqueue_burst(void *port, + const struct rte_event events[], + uint16_t events_len); +uint16_t dsw_event_enqueue_new_burst(void *port, + const struct rte_event events[], + uint16_t events_len); +uint16_t dsw_event_enqueue_forward_burst(void *port, + const struct rte_event events[], + uint16_t events_len); + +uint16_t dsw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait); +uint16_t dsw_event_dequeue_burst(void *port, struct rte_event *events, + uint16_t num, uint64_t wait); + +int dsw_xstats_get_names(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, + uint8_t queue_port_id, + struct rte_event_dev_xstats_name *xstats_names, + unsigned int *ids, unsigned int size); +int dsw_xstats_get(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id, + const unsigned int ids[], uint64_t values[], unsigned int n); +uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev, + const char *name, unsigned int *id); + +static inline struct dsw_evdev * +dsw_pmd_priv(const struct rte_eventdev *eventdev) +{ + return eventdev->data->dev_private; +} + +#define DSW_LOG_DP(level, fmt, args...) \ + RTE_LOG_DP(level, EVENTDEV, "[%s] %s() line %u: " fmt, \ + DSW_PMD_NAME, \ + __func__, __LINE__, ## args) + +#define DSW_LOG_DP_PORT(level, port_id, fmt, args...) \ + DSW_LOG_DP(level, "<Port %d> " fmt, port_id, ## args) + +#endif diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c new file mode 100644 index 00000000..61a66fab --- /dev/null +++ b/drivers/event/dsw/dsw_event.c @@ -0,0 +1,1253 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2018 Ericsson AB + */ + +#include "dsw_evdev.h" + +#ifdef DSW_SORT_DEQUEUED +#include "dsw_sort.h" +#endif + +#include <stdbool.h> +#include <string.h> + +#include <rte_atomic.h> +#include <rte_cycles.h> +#include <rte_memcpy.h> +#include <rte_random.h> + +static bool +dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port, + int32_t credits) +{ + int32_t inflight_credits = port->inflight_credits; + int32_t missing_credits = credits - inflight_credits; + int32_t total_on_loan; + int32_t available; + int32_t acquired_credits; + int32_t new_total_on_loan; + + if (likely(missing_credits <= 0)) { + port->inflight_credits -= credits; + return true; + } + + total_on_loan = rte_atomic32_read(&dsw->credits_on_loan); + available = dsw->max_inflight - total_on_loan; + acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS); + + if (available < acquired_credits) + return false; + + /* This is a race, no locks are involved, and thus some other + * thread can allocate tokens in between the check and the + * allocation. + */ + new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan, + acquired_credits); + + if (unlikely(new_total_on_loan > dsw->max_inflight)) { + /* Some other port took the last credits */ + rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits); + return false; + } + + DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n", + acquired_credits); + + port->inflight_credits += acquired_credits; + port->inflight_credits -= credits; + + return true; +} + +static void +dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port, + int32_t credits) +{ + port->inflight_credits += credits; + + if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) { + int32_t leave_credits = DSW_PORT_MIN_CREDITS; + int32_t return_credits = + port->inflight_credits - leave_credits; + + port->inflight_credits = leave_credits; + + rte_atomic32_sub(&dsw->credits_on_loan, return_credits); + + DSW_LOG_DP_PORT(DEBUG, port->id, + "Returned %d tokens to pool.\n", + return_credits); + } +} + +static void +dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new, + uint16_t num_forward, uint16_t num_release) +{ + port->new_enqueued += num_new; + port->forward_enqueued += num_forward; + port->release_enqueued += num_release; +} + +static void +dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id) +{ + source_port->queue_enqueued[queue_id]++; +} + +static void +dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num) +{ + port->dequeued += num; +} + +static void +dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id) +{ + source_port->queue_dequeued[queue_id]++; +} + +static void +dsw_port_load_record(struct dsw_port *port, unsigned int dequeued) +{ + if (dequeued > 0 && port->busy_start == 0) + /* work period begins */ + port->busy_start = rte_get_timer_cycles(); + else if (dequeued == 0 && port->busy_start > 0) { + /* work period ends */ + uint64_t work_period = + rte_get_timer_cycles() - port->busy_start; + port->busy_cycles += work_period; + port->busy_start = 0; + } +} + +static int16_t +dsw_port_load_close_period(struct dsw_port *port, uint64_t now) +{ + uint64_t passed = now - port->measurement_start; + uint64_t busy_cycles = port->busy_cycles; + + if (port->busy_start > 0) { + busy_cycles += (now - port->busy_start); + port->busy_start = now; + } + + int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed; + + port->measurement_start = now; + port->busy_cycles = 0; + + port->total_busy_cycles += busy_cycles; + + return load; +} + +static void +dsw_port_load_update(struct dsw_port *port, uint64_t now) +{ + int16_t old_load; + int16_t period_load; + int16_t new_load; + + old_load = rte_atomic16_read(&port->load); + + period_load = dsw_port_load_close_period(port, now); + + new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) / + (DSW_OLD_LOAD_WEIGHT+1); + + rte_atomic16_set(&port->load, new_load); +} + +static void +dsw_port_consider_load_update(struct dsw_port *port, uint64_t now) +{ + if (now < port->next_load_update) + return; + + port->next_load_update = now + port->load_update_interval; + + dsw_port_load_update(port, now); +} + +static void +dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg) +{ + void *raw_msg; + + memcpy(&raw_msg, msg, sizeof(*msg)); + + /* there's always room on the ring */ + while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0) + rte_pause(); +} + +static int +dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg) +{ + void *raw_msg; + int rc; + + rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg); + + if (rc == 0) + memcpy(msg, &raw_msg, sizeof(*msg)); + + return rc; +} + +static void +dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port, + uint8_t type, uint8_t queue_id, uint16_t flow_hash) +{ + uint16_t port_id; + struct dsw_ctl_msg msg = { + .type = type, + .originating_port_id = source_port->id, + .queue_id = queue_id, + .flow_hash = flow_hash + }; + + for (port_id = 0; port_id < dsw->num_ports; port_id++) + if (port_id != source_port->id) + dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg); +} + +static bool +dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id, + uint16_t flow_hash) +{ + uint16_t i; + + for (i = 0; i < port->paused_flows_len; i++) { + struct dsw_queue_flow *qf = &port->paused_flows[i]; + if (qf->queue_id == queue_id && + qf->flow_hash == flow_hash) + return true; + } + return false; +} + +static void +dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id, + uint16_t paused_flow_hash) +{ + port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) { + .queue_id = queue_id, + .flow_hash = paused_flow_hash + }; + port->paused_flows_len++; +} + +static void +dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id, + uint16_t paused_flow_hash) +{ + uint16_t i; + + for (i = 0; i < port->paused_flows_len; i++) { + struct dsw_queue_flow *qf = &port->paused_flows[i]; + + if (qf->queue_id == queue_id && + qf->flow_hash == paused_flow_hash) { + uint16_t last_idx = port->paused_flows_len-1; + if (i != last_idx) + port->paused_flows[i] = + port->paused_flows[last_idx]; + port->paused_flows_len--; + break; + } + } +} + +static void +dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port); + +static void +dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port, + uint8_t originating_port_id, uint8_t queue_id, + uint16_t paused_flow_hash) +{ + struct dsw_ctl_msg cfm = { + .type = DSW_CTL_CFM, + .originating_port_id = port->id, + .queue_id = queue_id, + .flow_hash = paused_flow_hash + }; + + DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n", + queue_id, paused_flow_hash); + + /* There might be already-scheduled events belonging to the + * paused flow in the output buffers. + */ + dsw_port_flush_out_buffers(dsw, port); + + dsw_port_add_paused_flow(port, queue_id, paused_flow_hash); + + /* Make sure any stores to the original port's in_ring is seen + * before the ctl message. + */ + rte_smp_wmb(); + + dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm); +} + +static void +dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids, + uint8_t exclude_port_id, int16_t *port_loads, + uint8_t *target_port_id, int16_t *target_load) +{ + int16_t candidate_port_id = -1; + int16_t candidate_load = DSW_MAX_LOAD; + uint16_t i; + + for (i = 0; i < num_port_ids; i++) { + uint8_t port_id = port_ids[i]; + if (port_id != exclude_port_id) { + int16_t load = port_loads[port_id]; + if (candidate_port_id == -1 || + load < candidate_load) { + candidate_port_id = port_id; + candidate_load = load; + } + } + } + *target_port_id = candidate_port_id; + *target_load = candidate_load; +} + +struct dsw_queue_flow_burst { + struct dsw_queue_flow queue_flow; + uint16_t count; +}; + +static inline int +dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b) +{ + const struct dsw_queue_flow_burst *burst_a = v_burst_a; + const struct dsw_queue_flow_burst *burst_b = v_burst_b; + + int a_count = burst_a->count; + int b_count = burst_b->count; + + return a_count - b_count; +} + +#define DSW_QF_TO_INT(_qf) \ + ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash))) + +static inline int +dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b) +{ + const struct dsw_queue_flow *qf_a = v_qf_a; + const struct dsw_queue_flow *qf_b = v_qf_b; + + return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b); +} + +static uint16_t +dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len, + struct dsw_queue_flow_burst *bursts) +{ + uint16_t i; + struct dsw_queue_flow_burst *current_burst = NULL; + uint16_t num_bursts = 0; + + /* We don't need the stable property, and the list is likely + * large enough for qsort() to outperform dsw_stable_sort(), + * so we use qsort() here. + */ + qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf); + + /* arrange the (now-consecutive) events into bursts */ + for (i = 0; i < qfs_len; i++) { + if (i == 0 || + dsw_cmp_qf(&qfs[i], ¤t_burst->queue_flow) != 0) { + current_burst = &bursts[num_bursts]; + current_burst->queue_flow = qfs[i]; + current_burst->count = 0; + num_bursts++; + } + current_burst->count++; + } + + qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst); + + return num_bursts; +} + +static bool +dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads, + int16_t load_limit) +{ + bool below_limit = false; + uint16_t i; + + for (i = 0; i < dsw->num_ports; i++) { + int16_t load = rte_atomic16_read(&dsw->ports[i].load); + if (load < load_limit) + below_limit = true; + port_loads[i] = load; + } + return below_limit; +} + +static bool +dsw_select_migration_target(struct dsw_evdev *dsw, + struct dsw_port *source_port, + struct dsw_queue_flow_burst *bursts, + uint16_t num_bursts, int16_t *port_loads, + int16_t max_load, struct dsw_queue_flow *target_qf, + uint8_t *target_port_id) +{ + uint16_t source_load = port_loads[source_port->id]; + uint16_t i; + + for (i = 0; i < num_bursts; i++) { + struct dsw_queue_flow *qf = &bursts[i].queue_flow; + + if (dsw_port_is_flow_paused(source_port, qf->queue_id, + qf->flow_hash)) + continue; + + struct dsw_queue *queue = &dsw->queues[qf->queue_id]; + int16_t target_load; + + dsw_find_lowest_load_port(queue->serving_ports, + queue->num_serving_ports, + source_port->id, port_loads, + target_port_id, &target_load); + + if (target_load < source_load && + target_load < max_load) { + *target_qf = *qf; + return true; + } + } + + DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, " + "no target port found with load less than %d.\n", + num_bursts, DSW_LOAD_TO_PERCENT(max_load)); + + return false; +} + +static uint8_t +dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash) +{ + struct dsw_queue *queue = &dsw->queues[queue_id]; + uint8_t port_id; + + if (queue->num_serving_ports > 1) + port_id = queue->flow_to_port_map[flow_hash]; + else + /* A single-link queue, or atomic/ordered/parallel but + * with just a single serving port. + */ + port_id = queue->serving_ports[0]; + + DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled " + "to port %d.\n", queue_id, flow_hash, port_id); + + return port_id; +} + +static void +dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port, + uint8_t dest_port_id) +{ + struct dsw_port *dest_port = &(dsw->ports[dest_port_id]); + uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id]; + struct rte_event *buffer = source_port->out_buffer[dest_port_id]; + uint16_t enqueued = 0; + + if (*buffer_len == 0) + return; + + /* The rings are dimensioned to fit all in-flight events (even + * on a single ring), so looping will work. + */ + do { + enqueued += + rte_event_ring_enqueue_burst(dest_port->in_ring, + buffer+enqueued, + *buffer_len-enqueued, + NULL); + } while (unlikely(enqueued != *buffer_len)); + + (*buffer_len) = 0; +} + +static uint16_t +dsw_port_get_parallel_flow_id(struct dsw_port *port) +{ + uint16_t flow_id = port->next_parallel_flow_id; + + port->next_parallel_flow_id = + (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS; + + return flow_id; +} + +static void +dsw_port_buffer_paused(struct dsw_port *port, + const struct rte_event *paused_event) +{ + port->paused_events[port->paused_events_len] = *paused_event; + port->paused_events_len++; +} + +static void +dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port, + uint8_t dest_port_id, const struct rte_event *event) +{ + struct rte_event *buffer = source_port->out_buffer[dest_port_id]; + uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id]; + + if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER) + dsw_port_transmit_buffered(dsw, source_port, dest_port_id); + + buffer[*buffer_len] = *event; + + (*buffer_len)++; +} + +#define DSW_FLOW_ID_BITS (24) +static uint16_t +dsw_flow_id_hash(uint32_t flow_id) +{ + uint16_t hash = 0; + uint16_t offset = 0; + + do { + hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK); + offset += DSW_MAX_FLOWS_BITS; + } while (offset < DSW_FLOW_ID_BITS); + + return hash; +} + +static void +dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port, + struct rte_event event) +{ + uint8_t dest_port_id; + + event.flow_id = dsw_port_get_parallel_flow_id(source_port); + + dest_port_id = dsw_schedule(dsw, event.queue_id, + dsw_flow_id_hash(event.flow_id)); + + dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event); +} + +static void +dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port, + const struct rte_event *event) +{ + uint16_t flow_hash; + uint8_t dest_port_id; + + if (unlikely(dsw->queues[event->queue_id].schedule_type == + RTE_SCHED_TYPE_PARALLEL)) { + dsw_port_buffer_parallel(dsw, source_port, *event); + return; + } + + flow_hash = dsw_flow_id_hash(event->flow_id); + + if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id, + flow_hash))) { + dsw_port_buffer_paused(source_port, event); + return; + } + + dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash); + + dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event); +} + +static void +dsw_port_flush_paused_events(struct dsw_evdev *dsw, + struct dsw_port *source_port, + uint8_t queue_id, uint16_t paused_flow_hash) +{ + uint16_t paused_events_len = source_port->paused_events_len; + struct rte_event paused_events[paused_events_len]; + uint8_t dest_port_id; + uint16_t i; + + if (paused_events_len == 0) + return; + + if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash)) + return; + + rte_memcpy(paused_events, source_port->paused_events, + paused_events_len * sizeof(struct rte_event)); + + source_port->paused_events_len = 0; + + dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash); + + for (i = 0; i < paused_events_len; i++) { + struct rte_event *event = &paused_events[i]; + uint16_t flow_hash; + + flow_hash = dsw_flow_id_hash(event->flow_id); + + if (event->queue_id == queue_id && + flow_hash == paused_flow_hash) + dsw_port_buffer_non_paused(dsw, source_port, + dest_port_id, event); + else + dsw_port_buffer_paused(source_port, event); + } +} + +static void +dsw_port_migration_stats(struct dsw_port *port) +{ + uint64_t migration_latency; + + migration_latency = (rte_get_timer_cycles() - port->migration_start); + port->migration_latency += migration_latency; + port->migrations++; +} + +static void +dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port) +{ + uint8_t queue_id = port->migration_target_qf.queue_id; + uint16_t flow_hash = port->migration_target_qf.flow_hash; + + port->migration_state = DSW_MIGRATION_STATE_IDLE; + port->seen_events_len = 0; + + dsw_port_migration_stats(port); + + if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) { + dsw_port_remove_paused_flow(port, queue_id, flow_hash); + dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash); + } + + DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id " + "%d flow_hash %d.\n", queue_id, flow_hash); +} + +static void +dsw_port_consider_migration(struct dsw_evdev *dsw, + struct dsw_port *source_port, + uint64_t now) +{ + bool any_port_below_limit; + struct dsw_queue_flow *seen_events = source_port->seen_events; + uint16_t seen_events_len = source_port->seen_events_len; + struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED]; + uint16_t num_bursts; + int16_t source_port_load; + int16_t port_loads[dsw->num_ports]; + + if (now < source_port->next_migration) + return; + + if (dsw->num_ports == 1) + return; + + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n"); + + /* Randomize interval to avoid having all threads considering + * migration at the same in point in time, which might lead to + * all choosing the same target port. + */ + source_port->next_migration = now + + source_port->migration_interval / 2 + + rte_rand() % source_port->migration_interval; + + if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, + "Migration already in progress.\n"); + return; + } + + /* For simplicity, avoid migration in the unlikely case there + * is still events to consume in the in_buffer (from the last + * migration). + */ + if (source_port->in_buffer_len > 0) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still " + "events in the input buffer.\n"); + return; + } + + source_port_load = rte_atomic16_read(&source_port->load); + if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, + "Load %d is below threshold level %d.\n", + DSW_LOAD_TO_PERCENT(source_port_load), + DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)); + return; + } + + /* Avoid starting any expensive operations (sorting etc), in + * case of a scenario with all ports above the load limit. + */ + any_port_below_limit = + dsw_retrieve_port_loads(dsw, port_loads, + DSW_MAX_TARGET_LOAD_FOR_MIGRATION); + if (!any_port_below_limit) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, + "Candidate target ports are all too highly " + "loaded.\n"); + return; + } + + /* Sort flows into 'bursts' to allow attempting to migrating + * small (but still active) flows first - this it to avoid + * having large flows moving around the worker cores too much + * (to avoid cache misses, among other things). Of course, the + * number of recorded events (queue+flow ids) are limited, and + * provides only a snapshot, so only so many conclusions can + * be drawn from this data. + */ + num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len, + bursts); + /* For non-big-little systems, there's no point in moving the + * only (known) flow. + */ + if (num_bursts < 2) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow " + "queue_id %d flow_hash %d has been seen.\n", + bursts[0].queue_flow.queue_id, + bursts[0].queue_flow.flow_hash); + return; + } + + /* The strategy is to first try to find a flow to move to a + * port with low load (below the migration-attempt + * threshold). If that fails, we try to find a port which is + * below the max threshold, and also less loaded than this + * port is. + */ + if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts, + port_loads, + DSW_MIN_SOURCE_LOAD_FOR_MIGRATION, + &source_port->migration_target_qf, + &source_port->migration_target_port_id) + && + !dsw_select_migration_target(dsw, source_port, bursts, num_bursts, + port_loads, + DSW_MAX_TARGET_LOAD_FOR_MIGRATION, + &source_port->migration_target_qf, + &source_port->migration_target_port_id)) + return; + + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d " + "flow_hash %d from port %d to port %d.\n", + source_port->migration_target_qf.queue_id, + source_port->migration_target_qf.flow_hash, + source_port->id, source_port->migration_target_port_id); + + /* We have a winner. */ + + source_port->migration_state = DSW_MIGRATION_STATE_PAUSING; + source_port->migration_start = rte_get_timer_cycles(); + + /* No need to go through the whole pause procedure for + * parallel queues, since atomic/ordered semantics need not to + * be maintained. + */ + + if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type + == RTE_SCHED_TYPE_PARALLEL) { + uint8_t queue_id = source_port->migration_target_qf.queue_id; + uint16_t flow_hash = source_port->migration_target_qf.flow_hash; + uint8_t dest_port_id = source_port->migration_target_port_id; + + /* Single byte-sized stores are always atomic. */ + dsw->queues[queue_id].flow_to_port_map[flow_hash] = + dest_port_id; + rte_smp_wmb(); + + dsw_port_end_migration(dsw, source_port); + + return; + } + + /* There might be 'loopback' events already scheduled in the + * output buffers. + */ + dsw_port_flush_out_buffers(dsw, source_port); + + dsw_port_add_paused_flow(source_port, + source_port->migration_target_qf.queue_id, + source_port->migration_target_qf.flow_hash); + + dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ, + source_port->migration_target_qf.queue_id, + source_port->migration_target_qf.flow_hash); + source_port->cfm_cnt = 0; +} + +static void +dsw_port_flush_paused_events(struct dsw_evdev *dsw, + struct dsw_port *source_port, + uint8_t queue_id, uint16_t paused_flow_hash); + +static void +dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port, + uint8_t originating_port_id, uint8_t queue_id, + uint16_t paused_flow_hash) +{ + struct dsw_ctl_msg cfm = { + .type = DSW_CTL_CFM, + .originating_port_id = port->id, + .queue_id = queue_id, + .flow_hash = paused_flow_hash + }; + + DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n", + queue_id, paused_flow_hash); + + dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash); + + rte_smp_rmb(); + + dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm); + + dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash); +} + +#define FORWARD_BURST_SIZE (32) + +static void +dsw_port_forward_migrated_flow(struct dsw_port *source_port, + struct rte_event_ring *dest_ring, + uint8_t queue_id, + uint16_t flow_hash) +{ + uint16_t events_left; + + /* Control ring message should been seen before the ring count + * is read on the port's in_ring. + */ + rte_smp_rmb(); + + events_left = rte_event_ring_count(source_port->in_ring); + + while (events_left > 0) { + uint16_t in_burst_size = + RTE_MIN(FORWARD_BURST_SIZE, events_left); + struct rte_event in_burst[in_burst_size]; + uint16_t in_len; + uint16_t i; + + in_len = rte_event_ring_dequeue_burst(source_port->in_ring, + in_burst, + in_burst_size, NULL); + /* No need to care about bursting forwarded events (to + * the destination port's in_ring), since migration + * doesn't happen very often, and also the majority of + * the dequeued events will likely *not* be forwarded. + */ + for (i = 0; i < in_len; i++) { + struct rte_event *e = &in_burst[i]; + if (e->queue_id == queue_id && + dsw_flow_id_hash(e->flow_id) == flow_hash) { + while (rte_event_ring_enqueue_burst(dest_ring, + e, 1, + NULL) != 1) + rte_pause(); + } else { + uint16_t last_idx = source_port->in_buffer_len; + source_port->in_buffer[last_idx] = *e; + source_port->in_buffer_len++; + } + } + + events_left -= in_len; + } +} + +static void +dsw_port_move_migrating_flow(struct dsw_evdev *dsw, + struct dsw_port *source_port) +{ + uint8_t queue_id = source_port->migration_target_qf.queue_id; + uint16_t flow_hash = source_port->migration_target_qf.flow_hash; + uint8_t dest_port_id = source_port->migration_target_port_id; + struct dsw_port *dest_port = &dsw->ports[dest_port_id]; + + dsw_port_flush_out_buffers(dsw, source_port); + + rte_smp_wmb(); + + dsw->queues[queue_id].flow_to_port_map[flow_hash] = + dest_port_id; + + dsw_port_forward_migrated_flow(source_port, dest_port->in_ring, + queue_id, flow_hash); + + /* Flow table update and migration destination port's enqueues + * must be seen before the control message. + */ + rte_smp_wmb(); + + dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id, + flow_hash); + source_port->cfm_cnt = 0; + source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING; +} + +static void +dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port) +{ + port->cfm_cnt++; + + if (port->cfm_cnt == (dsw->num_ports-1)) { + switch (port->migration_state) { + case DSW_MIGRATION_STATE_PAUSING: + DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding " + "migration state.\n"); + port->migration_state = DSW_MIGRATION_STATE_FORWARDING; + break; + case DSW_MIGRATION_STATE_UNPAUSING: + dsw_port_end_migration(dsw, port); + break; + default: + RTE_ASSERT(0); + break; + } + } +} + +static void +dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port) +{ + struct dsw_ctl_msg msg; + + /* So any table loads happens before the ring dequeue, in the + * case of a 'paus' message. + */ + rte_smp_rmb(); + + if (dsw_port_ctl_dequeue(port, &msg) == 0) { + switch (msg.type) { + case DSW_CTL_PAUS_REQ: + dsw_port_handle_pause_flow(dsw, port, + msg.originating_port_id, + msg.queue_id, msg.flow_hash); + break; + case DSW_CTL_UNPAUS_REQ: + dsw_port_handle_unpause_flow(dsw, port, + msg.originating_port_id, + msg.queue_id, + msg.flow_hash); + break; + case DSW_CTL_CFM: + dsw_port_handle_confirm(dsw, port); + break; + } + } +} + +static void +dsw_port_note_op(struct dsw_port *port, uint16_t num_events) +{ + /* To pull the control ring reasonbly often on busy ports, + * each dequeued/enqueued event is considered an 'op' too. + */ + port->ops_since_bg_task += (num_events+1); +} + +static void +dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port) +{ + if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING && + port->pending_releases == 0)) + dsw_port_move_migrating_flow(dsw, port); + + /* Polling the control ring is relatively inexpensive, and + * polling it often helps bringing down migration latency, so + * do this for every iteration. + */ + dsw_port_ctl_process(dsw, port); + + /* To avoid considering migration and flushing output buffers + * on every dequeue/enqueue call, the scheduler only performs + * such 'background' tasks every nth + * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation. + */ + if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) { + uint64_t now; + + now = rte_get_timer_cycles(); + + port->last_bg = now; + + /* Logic to avoid having events linger in the output + * buffer too long. + */ + dsw_port_flush_out_buffers(dsw, port); + + dsw_port_consider_load_update(port, now); + + dsw_port_consider_migration(dsw, port, now); + + port->ops_since_bg_task = 0; + } +} + +static void +dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port) +{ + uint16_t dest_port_id; + + for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++) + dsw_port_transmit_buffered(dsw, source_port, dest_port_id); +} + +uint16_t +dsw_event_enqueue(void *port, const struct rte_event *ev) +{ + return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1); +} + +static __rte_always_inline uint16_t +dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[], + uint16_t events_len, bool op_types_known, + uint16_t num_new, uint16_t num_release, + uint16_t num_non_release) +{ + struct dsw_port *source_port = port; + struct dsw_evdev *dsw = source_port->dsw; + bool enough_credits; + uint16_t i; + + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d " + "events to port %d.\n", events_len, source_port->id); + + dsw_port_bg_process(dsw, source_port); + + /* XXX: For performance (=ring efficiency) reasons, the + * scheduler relies on internal non-ring buffers instead of + * immediately sending the event to the destination ring. For + * a producer that doesn't intend to produce or consume any + * more events, the scheduler provides a way to flush the + * buffer, by means of doing an enqueue of zero events. In + * addition, a port cannot be left "unattended" (e.g. unused) + * for long periods of time, since that would stall + * migration. Eventdev API extensions to provide a cleaner way + * to archieve both of these functions should be + * considered. + */ + if (unlikely(events_len == 0)) { + dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK); + return 0; + } + + if (unlikely(events_len > source_port->enqueue_depth)) + events_len = source_port->enqueue_depth; + + dsw_port_note_op(source_port, events_len); + + if (!op_types_known) + for (i = 0; i < events_len; i++) { + switch (events[i].op) { + case RTE_EVENT_OP_RELEASE: + num_release++; + break; + case RTE_EVENT_OP_NEW: + num_new++; + /* Falls through. */ + default: + num_non_release++; + break; + } + } + + /* Technically, we could allow the non-new events up to the + * first new event in the array into the system, but for + * simplicity reasons, we deny the whole burst if the port is + * above the water mark. + */ + if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) > + source_port->new_event_threshold)) + return 0; + + enough_credits = dsw_port_acquire_credits(dsw, source_port, + num_non_release); + if (unlikely(!enough_credits)) + return 0; + + source_port->pending_releases -= num_release; + + dsw_port_enqueue_stats(source_port, num_new, + num_non_release-num_new, num_release); + + for (i = 0; i < events_len; i++) { + const struct rte_event *event = &events[i]; + + if (likely(num_release == 0 || + event->op != RTE_EVENT_OP_RELEASE)) + dsw_port_buffer_event(dsw, source_port, event); + dsw_port_queue_enqueue_stats(source_port, event->queue_id); + } + + DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events " + "accepted.\n", num_non_release); + + return num_non_release; +} + +uint16_t +dsw_event_enqueue_burst(void *port, const struct rte_event events[], + uint16_t events_len) +{ + return dsw_event_enqueue_burst_generic(port, events, events_len, false, + 0, 0, 0); +} + +uint16_t +dsw_event_enqueue_new_burst(void *port, const struct rte_event events[], + uint16_t events_len) +{ + return dsw_event_enqueue_burst_generic(port, events, events_len, true, + events_len, 0, events_len); +} + +uint16_t +dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[], + uint16_t events_len) +{ + return dsw_event_enqueue_burst_generic(port, events, events_len, true, + 0, 0, events_len); +} + +uint16_t +dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait) +{ + return dsw_event_dequeue_burst(port, events, 1, wait); +} + +static void +dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events, + uint16_t num) +{ + uint16_t i; + + dsw_port_dequeue_stats(port, num); + + for (i = 0; i < num; i++) { + uint16_t l_idx = port->seen_events_idx; + struct dsw_queue_flow *qf = &port->seen_events[l_idx]; + struct rte_event *event = &events[i]; + qf->queue_id = event->queue_id; + qf->flow_hash = dsw_flow_id_hash(event->flow_id); + + port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED; + + dsw_port_queue_dequeued_stats(port, event->queue_id); + } + + if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED)) + port->seen_events_len = + RTE_MIN(port->seen_events_len + num, + DSW_MAX_EVENTS_RECORDED); +} + +#ifdef DSW_SORT_DEQUEUED + +#define DSW_EVENT_TO_INT(_event) \ + ((int)((((_event)->queue_id)<<16)|((_event)->flow_id))) + +static inline int +dsw_cmp_event(const void *v_event_a, const void *v_event_b) +{ + const struct rte_event *event_a = v_event_a; + const struct rte_event *event_b = v_event_b; + + return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b); +} +#endif + +static uint16_t +dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events, + uint16_t num) +{ + struct dsw_port *source_port = port; + struct dsw_evdev *dsw = source_port->dsw; + + dsw_port_ctl_process(dsw, source_port); + + if (unlikely(port->in_buffer_len > 0)) { + uint16_t dequeued = RTE_MIN(num, port->in_buffer_len); + + rte_memcpy(events, &port->in_buffer[port->in_buffer_start], + dequeued * sizeof(struct rte_event)); + + port->in_buffer_start += dequeued; + port->in_buffer_len -= dequeued; + + if (port->in_buffer_len == 0) + port->in_buffer_start = 0; + + return dequeued; + } + + return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL); +} + +uint16_t +dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num, + uint64_t wait __rte_unused) +{ + struct dsw_port *source_port = port; + struct dsw_evdev *dsw = source_port->dsw; + uint16_t dequeued; + + source_port->pending_releases = 0; + + dsw_port_bg_process(dsw, source_port); + + if (unlikely(num > source_port->dequeue_depth)) + num = source_port->dequeue_depth; + + dequeued = dsw_port_dequeue_burst(source_port, events, num); + + source_port->pending_releases = dequeued; + + dsw_port_load_record(source_port, dequeued); + + dsw_port_note_op(source_port, dequeued); + + if (dequeued > 0) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n", + dequeued); + + dsw_port_return_credits(dsw, source_port, dequeued); + + /* One potential optimization one might think of is to + * add a migration state (prior to 'pausing'), and + * only record seen events when the port is in this + * state (and transit to 'pausing' when enough events + * have been gathered). However, that schema doesn't + * seem to improve performance. + */ + dsw_port_record_seen_events(port, events, dequeued); + } + /* XXX: Assuming the port can't produce any more work, + * consider flushing the output buffer, on dequeued == + * 0. + */ + +#ifdef DSW_SORT_DEQUEUED + dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event); +#endif + + return dequeued; +} diff --git a/drivers/event/dsw/dsw_sort.h b/drivers/event/dsw/dsw_sort.h new file mode 100644 index 00000000..609767fd --- /dev/null +++ b/drivers/event/dsw/dsw_sort.h @@ -0,0 +1,48 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2018 Ericsson AB + */ + +#ifndef _DSW_SORT_ +#define _DSW_SORT_ + +#include <string.h> + +#include <rte_common.h> + +#define DSW_ARY_ELEM_PTR(_ary, _idx, _elem_size) \ + RTE_PTR_ADD(_ary, (_idx) * (_elem_size)) + +#define DSW_ARY_ELEM_SWAP(_ary, _a_idx, _b_idx, _elem_size) \ + do { \ + char tmp[_elem_size]; \ + void *_a_ptr = DSW_ARY_ELEM_PTR(_ary, _a_idx, _elem_size); \ + void *_b_ptr = DSW_ARY_ELEM_PTR(_ary, _b_idx, _elem_size); \ + memcpy(tmp, _a_ptr, _elem_size); \ + memcpy(_a_ptr, _b_ptr, _elem_size); \ + memcpy(_b_ptr, tmp, _elem_size); \ + } while (0) + +static inline void +dsw_insertion_sort(void *ary, uint16_t len, uint16_t elem_size, + int (*cmp_fn)(const void *, const void *)) +{ + uint16_t i; + + for (i = 1; i < len; i++) { + uint16_t j; + for (j = i; j > 0 && + cmp_fn(DSW_ARY_ELEM_PTR(ary, j-1, elem_size), + DSW_ARY_ELEM_PTR(ary, j, elem_size)) > 0; + j--) + DSW_ARY_ELEM_SWAP(ary, j, j-1, elem_size); + } +} + +static inline void +dsw_stable_sort(void *ary, uint16_t len, uint16_t elem_size, + int (*cmp_fn)(const void *, const void *)) +{ + dsw_insertion_sort(ary, len, elem_size, cmp_fn); +} + +#endif diff --git a/drivers/event/dsw/dsw_xstats.c b/drivers/event/dsw/dsw_xstats.c new file mode 100644 index 00000000..bf2eec52 --- /dev/null +++ b/drivers/event/dsw/dsw_xstats.c @@ -0,0 +1,288 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2018 Ericsson AB + */ + +#include "dsw_evdev.h" + +#include <stdbool.h> +#include <string.h> + +#include <rte_debug.h> + +/* The high bits in the xstats id is used to store an additional + * parameter (beyond the queue or port id already in the xstats + * interface). + */ +#define DSW_XSTATS_ID_PARAM_BITS (8) +#define DSW_XSTATS_ID_STAT_BITS \ + (sizeof(unsigned int)*CHAR_BIT - DSW_XSTATS_ID_PARAM_BITS) +#define DSW_XSTATS_ID_STAT_MASK ((1 << DSW_XSTATS_ID_STAT_BITS) - 1) + +#define DSW_XSTATS_ID_GET_PARAM(id) \ + ((id)>>DSW_XSTATS_ID_STAT_BITS) + +#define DSW_XSTATS_ID_GET_STAT(id) \ + ((id) & DSW_XSTATS_ID_STAT_MASK) + +#define DSW_XSTATS_ID_CREATE(id, param_value) \ + (((param_value) << DSW_XSTATS_ID_STAT_BITS) | id) + +typedef +uint64_t (*dsw_xstats_dev_get_value_fn)(struct dsw_evdev *dsw); + +struct dsw_xstat_dev { + const char *name; + dsw_xstats_dev_get_value_fn get_value_fn; +}; + +typedef +uint64_t (*dsw_xstats_port_get_value_fn)(struct dsw_evdev *dsw, + uint8_t port_id, uint8_t queue_id); + +struct dsw_xstats_port { + const char *name_fmt; + dsw_xstats_port_get_value_fn get_value_fn; + bool per_queue; +}; + +static uint64_t +dsw_xstats_dev_credits_on_loan(struct dsw_evdev *dsw) +{ + return rte_atomic32_read(&dsw->credits_on_loan); +} + +static struct dsw_xstat_dev dsw_dev_xstats[] = { + { "dev_credits_on_loan", dsw_xstats_dev_credits_on_loan } +}; + +#define DSW_GEN_PORT_ACCESS_FN(_variable) \ + static uint64_t \ + dsw_xstats_port_get_ ## _variable(struct dsw_evdev *dsw, \ + uint8_t port_id, \ + uint8_t queue_id __rte_unused) \ + { \ + return dsw->ports[port_id]._variable; \ + } + +DSW_GEN_PORT_ACCESS_FN(new_enqueued) +DSW_GEN_PORT_ACCESS_FN(forward_enqueued) +DSW_GEN_PORT_ACCESS_FN(release_enqueued) + +static uint64_t +dsw_xstats_port_get_queue_enqueued(struct dsw_evdev *dsw, uint8_t port_id, + uint8_t queue_id) +{ + return dsw->ports[port_id].queue_enqueued[queue_id]; +} + +DSW_GEN_PORT_ACCESS_FN(dequeued) + +static uint64_t +dsw_xstats_port_get_queue_dequeued(struct dsw_evdev *dsw, uint8_t port_id, + uint8_t queue_id) +{ + return dsw->ports[port_id].queue_dequeued[queue_id]; +} + +DSW_GEN_PORT_ACCESS_FN(migrations) + +static uint64_t +dsw_xstats_port_get_migration_latency(struct dsw_evdev *dsw, uint8_t port_id, + uint8_t queue_id __rte_unused) +{ + uint64_t total_latency = dsw->ports[port_id].migration_latency; + uint64_t num_migrations = dsw->ports[port_id].migrations; + + return num_migrations > 0 ? total_latency / num_migrations : 0; +} + +static uint64_t +dsw_xstats_port_get_event_proc_latency(struct dsw_evdev *dsw, uint8_t port_id, + uint8_t queue_id __rte_unused) +{ + uint64_t total_busy_cycles = + dsw->ports[port_id].total_busy_cycles; + uint64_t dequeued = + dsw->ports[port_id].dequeued; + + return dequeued > 0 ? total_busy_cycles / dequeued : 0; +} + +DSW_GEN_PORT_ACCESS_FN(inflight_credits) + +static uint64_t +dsw_xstats_port_get_load(struct dsw_evdev *dsw, uint8_t port_id, + uint8_t queue_id __rte_unused) +{ + int16_t load; + + load = rte_atomic16_read(&dsw->ports[port_id].load); + + return DSW_LOAD_TO_PERCENT(load); +} + +DSW_GEN_PORT_ACCESS_FN(last_bg) + +static struct dsw_xstats_port dsw_port_xstats[] = { + { "port_%u_new_enqueued", dsw_xstats_port_get_new_enqueued, + false }, + { "port_%u_forward_enqueued", dsw_xstats_port_get_forward_enqueued, + false }, + { "port_%u_release_enqueued", dsw_xstats_port_get_release_enqueued, + false }, + { "port_%u_queue_%u_enqueued", dsw_xstats_port_get_queue_enqueued, + true }, + { "port_%u_dequeued", dsw_xstats_port_get_dequeued, + false }, + { "port_%u_queue_%u_dequeued", dsw_xstats_port_get_queue_dequeued, + true }, + { "port_%u_migrations", dsw_xstats_port_get_migrations, + false }, + { "port_%u_migration_latency", dsw_xstats_port_get_migration_latency, + false }, + { "port_%u_event_proc_latency", dsw_xstats_port_get_event_proc_latency, + false }, + { "port_%u_inflight_credits", dsw_xstats_port_get_inflight_credits, + false }, + { "port_%u_load", dsw_xstats_port_get_load, + false }, + { "port_%u_last_bg", dsw_xstats_port_get_last_bg, + false } +}; + +static int +dsw_xstats_dev_get_names(struct rte_event_dev_xstats_name *xstats_names, + unsigned int *ids, unsigned int size) +{ + unsigned int i; + + for (i = 0; i < RTE_DIM(dsw_dev_xstats) && i < size; i++) { + ids[i] = i; + strcpy(xstats_names[i].name, dsw_dev_xstats[i].name); + } + + return i; +} + +static int +dsw_xstats_port_get_names(struct dsw_evdev *dsw, uint8_t port_id, + struct rte_event_dev_xstats_name *xstats_names, + unsigned int *ids, unsigned int size) +{ + uint8_t queue_id = 0; + unsigned int id_idx; + unsigned int stat_idx; + + for (id_idx = 0, stat_idx = 0; + id_idx < size && stat_idx < RTE_DIM(dsw_port_xstats); + id_idx++) { + struct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx]; + + if (xstat->per_queue) { + ids[id_idx] = DSW_XSTATS_ID_CREATE(stat_idx, queue_id); + snprintf(xstats_names[id_idx].name, + RTE_EVENT_DEV_XSTATS_NAME_SIZE, + dsw_port_xstats[stat_idx].name_fmt, port_id, + queue_id); + queue_id++; + } else { + ids[id_idx] = stat_idx; + snprintf(xstats_names[id_idx].name, + RTE_EVENT_DEV_XSTATS_NAME_SIZE, + dsw_port_xstats[stat_idx].name_fmt, port_id); + } + + if (!(xstat->per_queue && queue_id < dsw->num_queues)) { + stat_idx++; + queue_id = 0; + } + } + return id_idx; +} + +int +dsw_xstats_get_names(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, + uint8_t queue_port_id, + struct rte_event_dev_xstats_name *xstats_names, + unsigned int *ids, unsigned int size) +{ + struct dsw_evdev *dsw = dsw_pmd_priv(dev); + + switch (mode) { + case RTE_EVENT_DEV_XSTATS_DEVICE: + return dsw_xstats_dev_get_names(xstats_names, ids, size); + case RTE_EVENT_DEV_XSTATS_PORT: + return dsw_xstats_port_get_names(dsw, queue_port_id, + xstats_names, ids, size); + case RTE_EVENT_DEV_XSTATS_QUEUE: + return 0; + default: + RTE_ASSERT(false); + return -1; + } +} + +static int +dsw_xstats_dev_get(const struct rte_eventdev *dev, + const unsigned int ids[], uint64_t values[], unsigned int n) +{ + struct dsw_evdev *dsw = dsw_pmd_priv(dev); + unsigned int i; + + for (i = 0; i < n; i++) { + unsigned int id = ids[i]; + struct dsw_xstat_dev *xstat = &dsw_dev_xstats[id]; + values[i] = xstat->get_value_fn(dsw); + } + return n; +} + +static int +dsw_xstats_port_get(const struct rte_eventdev *dev, uint8_t port_id, + const unsigned int ids[], uint64_t values[], unsigned int n) +{ + struct dsw_evdev *dsw = dsw_pmd_priv(dev); + unsigned int i; + + for (i = 0; i < n; i++) { + unsigned int id = ids[i]; + unsigned int stat_idx = DSW_XSTATS_ID_GET_STAT(id); + struct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx]; + uint8_t queue_id = 0; + + if (xstat->per_queue) + queue_id = DSW_XSTATS_ID_GET_PARAM(id); + + values[i] = xstat->get_value_fn(dsw, port_id, queue_id); + } + return n; +} + +int +dsw_xstats_get(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id, + const unsigned int ids[], uint64_t values[], unsigned int n) +{ + switch (mode) { + case RTE_EVENT_DEV_XSTATS_DEVICE: + return dsw_xstats_dev_get(dev, ids, values, n); + case RTE_EVENT_DEV_XSTATS_PORT: + return dsw_xstats_port_get(dev, queue_port_id, ids, values, n); + case RTE_EVENT_DEV_XSTATS_QUEUE: + return 0; + default: + RTE_ASSERT(false); + return -1; + } + return 0; +} + +uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev, + const char *name, unsigned int *id) +{ + RTE_SET_USED(dev); + RTE_SET_USED(name); + RTE_SET_USED(id); + return 0; +} diff --git a/drivers/event/dsw/meson.build b/drivers/event/dsw/meson.build new file mode 100644 index 00000000..a6b7bfa5 --- /dev/null +++ b/drivers/event/dsw/meson.build @@ -0,0 +1,6 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2018 Ericsson AB + +allow_experimental_apis = true +deps += ['bus_vdev'] +sources = files('dsw_evdev.c', 'dsw_event.c', 'dsw_xstats.c') diff --git a/drivers/event/dsw/rte_pmd_dsw_event_version.map b/drivers/event/dsw/rte_pmd_dsw_event_version.map new file mode 100644 index 00000000..24bd5cdb --- /dev/null +++ b/drivers/event/dsw/rte_pmd_dsw_event_version.map @@ -0,0 +1,3 @@ +DPDK_18.11 { + local: *; +}; diff --git a/drivers/event/meson.build b/drivers/event/meson.build index e9511993..836ecbb7 100644 --- a/drivers/event/meson.build +++ b/drivers/event/meson.build @@ -1,7 +1,7 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2017 Intel Corporation -drivers = ['dpaa', 'dpaa2', 'octeontx', 'skeleton', 'sw'] +drivers = ['dpaa', 'dpaa2', 'octeontx', 'opdl', 'skeleton', 'sw', 'dsw'] std_deps = ['eventdev', 'kvargs'] config_flag_fmt = 'RTE_LIBRTE_@0@_EVENTDEV_PMD' driver_name_fmt = 'rte_pmd_@0@_event' diff --git a/drivers/event/octeontx/Makefile b/drivers/event/octeontx/Makefile index 90ad2217..2e07890b 100644 --- a/drivers/event/octeontx/Makefile +++ b/drivers/event/octeontx/Makefile @@ -17,7 +17,7 @@ CFLAGS += -DALLOW_EXPERIMENTAL_API LDLIBS += -lrte_eal -lrte_eventdev -lrte_common_octeontx -lrte_pmd_octeontx LDLIBS += -lrte_bus_pci -lrte_mempool -lrte_mbuf -lrte_kvargs -LDLIBS += -lrte_bus_vdev +LDLIBS += -lrte_bus_vdev -lrte_ethdev EXPORT_MAP := rte_pmd_octeontx_event_version.map diff --git a/drivers/event/octeontx/ssovf_evdev.c b/drivers/event/octeontx/ssovf_evdev.c index 16a3a04b..a273d4c9 100644 --- a/drivers/event/octeontx/ssovf_evdev.c +++ b/drivers/event/octeontx/ssovf_evdev.c @@ -146,6 +146,7 @@ ssovf_fastpath_fns_set(struct rte_eventdev *dev) dev->enqueue_forward_burst = ssows_enq_fwd_burst; dev->dequeue = ssows_deq; dev->dequeue_burst = ssows_deq_burst; + dev->txa_enqueue = sso_event_tx_adapter_enqueue; if (edev->is_timeout_deq) { dev->dequeue = ssows_deq_timeout; @@ -454,7 +455,6 @@ ssovf_eth_rx_adapter_queue_del(const struct rte_eventdev *dev, const struct octeontx_nic *nic = eth_dev->data->dev_private; pki_del_qos_t pki_qos; RTE_SET_USED(dev); - RTE_SET_USED(rx_queue_id); ret = strncmp(eth_dev->data->name, "eth_octeontx", 12); if (ret) @@ -466,7 +466,7 @@ ssovf_eth_rx_adapter_queue_del(const struct rte_eventdev *dev, ret = octeontx_pki_port_delete_qos(nic->port_id, &pki_qos); if (ret < 0) ssovf_log_err("Failed to delete QOS port=%d, q=%d", - nic->port_id, queue_conf->ev.queue_id); + nic->port_id, rx_queue_id); return ret; } @@ -491,6 +491,77 @@ ssovf_eth_rx_adapter_stop(const struct rte_eventdev *dev, return 0; } +static int +ssovf_eth_tx_adapter_caps_get(const struct rte_eventdev *dev, + const struct rte_eth_dev *eth_dev, uint32_t *caps) +{ + int ret; + RTE_SET_USED(dev); + + ret = strncmp(eth_dev->data->name, "eth_octeontx", 12); + if (ret) + *caps = 0; + else + *caps = RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT; + + return 0; +} + +static int +ssovf_eth_tx_adapter_create(uint8_t id, const struct rte_eventdev *dev) +{ + RTE_SET_USED(id); + RTE_SET_USED(dev); + return 0; +} + +static int +ssovf_eth_tx_adapter_free(uint8_t id, const struct rte_eventdev *dev) +{ + RTE_SET_USED(id); + RTE_SET_USED(dev); + return 0; +} + +static int +ssovf_eth_tx_adapter_queue_add(uint8_t id, const struct rte_eventdev *dev, + const struct rte_eth_dev *eth_dev, int32_t tx_queue_id) +{ + RTE_SET_USED(id); + RTE_SET_USED(dev); + RTE_SET_USED(eth_dev); + RTE_SET_USED(tx_queue_id); + return 0; +} + +static int +ssovf_eth_tx_adapter_queue_del(uint8_t id, const struct rte_eventdev *dev, + const struct rte_eth_dev *eth_dev, int32_t tx_queue_id) +{ + RTE_SET_USED(id); + RTE_SET_USED(dev); + RTE_SET_USED(eth_dev); + RTE_SET_USED(tx_queue_id); + return 0; +} + +static int +ssovf_eth_tx_adapter_start(uint8_t id, const struct rte_eventdev *dev) +{ + RTE_SET_USED(id); + RTE_SET_USED(dev); + return 0; +} + +static int +ssovf_eth_tx_adapter_stop(uint8_t id, const struct rte_eventdev *dev) +{ + RTE_SET_USED(id); + RTE_SET_USED(dev); + return 0; +} + + static void ssovf_dump(struct rte_eventdev *dev, FILE *f) { @@ -619,6 +690,14 @@ static struct rte_eventdev_ops ssovf_ops = { .eth_rx_adapter_start = ssovf_eth_rx_adapter_start, .eth_rx_adapter_stop = ssovf_eth_rx_adapter_stop, + .eth_tx_adapter_caps_get = ssovf_eth_tx_adapter_caps_get, + .eth_tx_adapter_create = ssovf_eth_tx_adapter_create, + .eth_tx_adapter_free = ssovf_eth_tx_adapter_free, + .eth_tx_adapter_queue_add = ssovf_eth_tx_adapter_queue_add, + .eth_tx_adapter_queue_del = ssovf_eth_tx_adapter_queue_del, + .eth_tx_adapter_start = ssovf_eth_tx_adapter_start, + .eth_tx_adapter_stop = ssovf_eth_tx_adapter_stop, + .timer_adapter_caps_get = ssovf_timvf_caps_get, .dev_selftest = test_eventdev_octeontx, diff --git a/drivers/event/octeontx/ssovf_evdev.h b/drivers/event/octeontx/ssovf_evdev.h index 18293e96..0e622152 100644 --- a/drivers/event/octeontx/ssovf_evdev.h +++ b/drivers/event/octeontx/ssovf_evdev.h @@ -5,6 +5,7 @@ #ifndef __SSOVF_EVDEV_H__ #define __SSOVF_EVDEV_H__ +#include <rte_event_eth_tx_adapter.h> #include <rte_eventdev_pmd_vdev.h> #include <rte_io.h> @@ -83,7 +84,7 @@ #define SSOVF_SELFTEST_ARG ("selftest") /* - * In Cavium OcteonTX SoC, all accesses to the device registers are + * In Cavium OCTEON TX SoC, all accesses to the device registers are * implictly strongly ordered. So, The relaxed version of IO operation is * safe to use with out any IO memory barriers. */ @@ -179,6 +180,8 @@ typedef void (*ssows_handle_event_t)(void *arg, struct rte_event ev); void ssows_flush_events(struct ssows *ws, uint8_t queue_id, ssows_handle_event_t fn, void *arg); void ssows_reset(struct ssows *ws); +uint16_t sso_event_tx_adapter_enqueue(void *port, + struct rte_event ev[], uint16_t nb_events); int ssovf_info(struct ssovf_info *info); void *ssovf_bar(enum ssovf_type, uint8_t id, uint8_t bar); int test_eventdev_octeontx(void); diff --git a/drivers/event/octeontx/ssovf_worker.c b/drivers/event/octeontx/ssovf_worker.c index fffa9024..d940b5dd 100644 --- a/drivers/event/octeontx/ssovf_worker.c +++ b/drivers/event/octeontx/ssovf_worker.c @@ -261,3 +261,47 @@ ssows_reset(struct ssows *ws) ssows_swtag_untag(ws); } } + +uint16_t +sso_event_tx_adapter_enqueue(void *port, + struct rte_event ev[], uint16_t nb_events) +{ + uint16_t port_id; + uint16_t queue_id; + struct rte_mbuf *m; + struct rte_eth_dev *ethdev; + struct ssows *ws = port; + struct octeontx_txq *txq; + octeontx_dq_t *dq; + + RTE_SET_USED(nb_events); + switch (ev->sched_type) { + case SSO_SYNC_ORDERED: + ssows_swtag_norm(ws, ev->event, SSO_SYNC_ATOMIC); + rte_cio_wmb(); + ssows_swtag_wait(ws); + break; + case SSO_SYNC_UNTAGGED: + ssows_swtag_full(ws, ev->u64, ev->event, SSO_SYNC_ATOMIC, + ev->queue_id); + rte_cio_wmb(); + ssows_swtag_wait(ws); + break; + case SSO_SYNC_ATOMIC: + rte_cio_wmb(); + break; + } + + m = ev[0].mbuf; + port_id = m->port; + queue_id = rte_event_eth_tx_adapter_txq_get(m); + ethdev = &rte_eth_devices[port_id]; + txq = ethdev->data->tx_queues[queue_id]; + dq = &txq->dq; + + if (__octeontx_xmit_pkts(dq->lmtline_va, dq->ioreg_va, dq->fc_status_va, + m) < 0) + return 0; + + return 1; +} diff --git a/drivers/event/octeontx/ssovf_worker.h b/drivers/event/octeontx/ssovf_worker.h index 7c7306b5..d1d3a52a 100644 --- a/drivers/event/octeontx/ssovf_worker.h +++ b/drivers/event/octeontx/ssovf_worker.h @@ -42,6 +42,7 @@ ssovf_octeontx_wqe_to_pkt(uint64_t work, uint16_t port_info) mbuf->ol_flags = 0; mbuf->port = rte_octeontx_pchan_map[port_info >> 4][port_info & 0xF]; rte_mbuf_refcnt_set(mbuf, 1); + return mbuf; } diff --git a/drivers/event/opdl/Makefile b/drivers/event/opdl/Makefile index cea8118d..bf50a60a 100644 --- a/drivers/event/opdl/Makefile +++ b/drivers/event/opdl/Makefile @@ -24,7 +24,7 @@ LDLIBS += -lrte_bus_vdev -lrte_mbuf -lrte_mempool LIBABIVER := 1 # versioning export map -EXPORT_MAP := rte_pmd_evdev_opdl_version.map +EXPORT_MAP := rte_pmd_opdl_event_version.map # library source files SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_ring.c diff --git a/drivers/event/opdl/meson.build b/drivers/event/opdl/meson.build new file mode 100644 index 00000000..cc6029c6 --- /dev/null +++ b/drivers/event/opdl/meson.build @@ -0,0 +1,11 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2018 Luca Boccassi <bluca@debian.org> + +sources = files( + 'opdl_evdev.c', + 'opdl_evdev_init.c', + 'opdl_evdev_xstats.c', + 'opdl_ring.c', + 'opdl_test.c', +) +deps += ['bus_vdev'] diff --git a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map b/drivers/event/opdl/rte_pmd_opdl_event_version.map index 58b94270..58b94270 100644 --- a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map +++ b/drivers/event/opdl/rte_pmd_opdl_event_version.map diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c index a6bb9138..1175d6cd 100644 --- a/drivers/event/sw/sw_evdev.c +++ b/drivers/event/sw/sw_evdev.c @@ -113,10 +113,22 @@ sw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[], } } } + + p->unlinks_in_progress += unlinked; + rte_smp_mb(); + return unlinked; } static int +sw_port_unlinks_in_progress(struct rte_eventdev *dev, void *port) +{ + RTE_SET_USED(dev); + struct sw_port *p = port; + return p->unlinks_in_progress; +} + +static int sw_port_setup(struct rte_eventdev *dev, uint8_t port_id, const struct rte_event_port_conf *conf) { @@ -925,6 +937,7 @@ sw_probe(struct rte_vdev_device *vdev) .port_release = sw_port_release, .port_link = sw_port_link, .port_unlink = sw_port_unlink, + .port_unlinks_in_progress = sw_port_unlinks_in_progress, .eth_rx_adapter_caps_get = sw_eth_rx_adapter_caps_get, diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h index d90b96d4..7c77b249 100644 --- a/drivers/event/sw/sw_evdev.h +++ b/drivers/event/sw/sw_evdev.h @@ -148,6 +148,14 @@ struct sw_port { /* A numeric ID for the port */ uint8_t id; + /* An atomic counter for when the port has been unlinked, and the + * scheduler has not yet acked this unlink - hence there may still be + * events in the buffers going to the port. When the unlinks in + * progress is read by the scheduler, no more events will be pushed to + * the port - hence the scheduler core can just assign zero. + */ + uint8_t unlinks_in_progress; + int16_t is_directed; /** Takes from a single directed QID */ /** * For loadbalanced we can optimise pulling packets from diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c index e3a41e02..cff747da 100644 --- a/drivers/event/sw/sw_evdev_scheduler.c +++ b/drivers/event/sw/sw_evdev_scheduler.c @@ -51,9 +51,11 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, int cq = fid->cq; if (cq < 0) { - uint32_t cq_idx = qid->cq_next_tx++; - if (qid->cq_next_tx == qid->cq_num_mapped_cqs) + uint32_t cq_idx; + if (qid->cq_next_tx >= qid->cq_num_mapped_cqs) qid->cq_next_tx = 0; + cq_idx = qid->cq_next_tx++; + cq = qid->cq_map[cq_idx]; /* find least used */ @@ -140,9 +142,10 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, do { if (++cq_check_count > qid->cq_num_mapped_cqs) goto exit; - cq = qid->cq_map[cq_idx]; - if (++cq_idx == qid->cq_num_mapped_cqs) + if (cq_idx >= qid->cq_num_mapped_cqs) cq_idx = 0; + cq = qid->cq_map[cq_idx++]; + } while (rte_event_ring_free_count( sw->ports[cq].cq_worker_ring) == 0 || sw->ports[cq].inflights == SW_PORT_HIST_LIST); @@ -220,7 +223,7 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw) int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask); /* zero mapped CQs indicates directed */ - if (iq_num >= SW_IQS_MAX) + if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0) continue; uint32_t pkts_done = 0; @@ -517,13 +520,18 @@ sw_event_schedule(struct rte_eventdev *dev) /* Pull from rx_ring for ports */ do { in_pkts = 0; - for (i = 0; i < sw->port_count; i++) + for (i = 0; i < sw->port_count; i++) { + /* ack the unlinks in progress as done */ + if (sw->ports[i].unlinks_in_progress) + sw->ports[i].unlinks_in_progress = 0; + if (sw->ports[i].is_directed) in_pkts += sw_schedule_pull_port_dir(sw, i); else if (sw->ports[i].num_ordered_qids > 0) in_pkts += sw_schedule_pull_port_lb(sw, i); else in_pkts += sw_schedule_pull_port_no_reorder(sw, i); + } /* QID scan for re-ordered */ in_pkts += sw_schedule_reorder(sw, 0, diff --git a/drivers/event/sw/sw_evdev_selftest.c b/drivers/event/sw/sw_evdev_selftest.c index c40912db..d00d5de6 100644 --- a/drivers/event/sw/sw_evdev_selftest.c +++ b/drivers/event/sw/sw_evdev_selftest.c @@ -1904,6 +1904,77 @@ qid_priorities(struct test *t) } static int +unlink_in_progress(struct test *t) +{ + /* Test unlinking API, in particular that when an unlink request has + * not yet been seen by the scheduler thread, that the + * unlink_in_progress() function returns the number of unlinks. + */ + unsigned int i; + /* Create instance with 1 ports, and 3 qids */ + if (init(t, 3, 1) < 0 || + create_ports(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + for (i = 0; i < 3; i++) { + /* Create QID */ + const struct rte_event_queue_conf conf = { + .schedule_type = RTE_SCHED_TYPE_ATOMIC, + /* increase priority (0 == highest), as we go */ + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL - i, + .nb_atomic_flows = 1024, + .nb_atomic_order_sequences = 1024, + }; + + if (rte_event_queue_setup(evdev, i, &conf) < 0) { + printf("%d: error creating qid %d\n", __LINE__, i); + return -1; + } + t->qid[i] = i; + } + t->nb_qids = i; + /* map all QIDs to port */ + rte_event_port_link(evdev, t->port[0], NULL, NULL, 0); + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /* unlink all ports to have outstanding unlink requests */ + int ret = rte_event_port_unlink(evdev, t->port[0], NULL, 0); + if (ret < 0) { + printf("%d: Failed to unlink queues\n", __LINE__); + return -1; + } + + /* get active unlinks here, expect 3 */ + int unlinks_in_progress = + rte_event_port_unlinks_in_progress(evdev, t->port[0]); + if (unlinks_in_progress != 3) { + printf("%d: Expected num unlinks in progress == 3, got %d\n", + __LINE__, unlinks_in_progress); + return -1; + } + + /* run scheduler service on this thread to ack the unlinks */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + /* active unlinks expected as 0 as scheduler thread has acked */ + unlinks_in_progress = + rte_event_port_unlinks_in_progress(evdev, t->port[0]); + if (unlinks_in_progress != 0) { + printf("%d: Expected num unlinks in progress == 0, got %d\n", + __LINE__, unlinks_in_progress); + } + + cleanup(t); + return 0; +} + +static int load_balancing(struct test *t) { const int rx_enq = 0; @@ -3260,6 +3331,12 @@ test_sw_eventdev(void) printf("ERROR - QID Priority test FAILED.\n"); goto test_fail; } + printf("*** Running Unlink-in-progress test...\n"); + ret = unlink_in_progress(t); + if (ret != 0) { + printf("ERROR - Unlink in progress test FAILED.\n"); + goto test_fail; + } printf("*** Running Ordered Reconfigure test...\n"); ret = ordered_reconfigure(t); if (ret != 0) { |