diff options
Diffstat (limited to 'drivers/event/sw')
-rw-r--r-- | drivers/event/sw/event_ring.h | 14 | ||||
-rw-r--r-- | drivers/event/sw/iq_ring.h | 20 | ||||
-rw-r--r-- | drivers/event/sw/sw_evdev.c | 103 | ||||
-rw-r--r-- | drivers/event/sw/sw_evdev.h | 10 | ||||
-rw-r--r-- | drivers/event/sw/sw_evdev_scheduler.c | 24 | ||||
-rw-r--r-- | drivers/event/sw/sw_evdev_worker.c | 33 | ||||
-rw-r--r-- | drivers/event/sw/sw_evdev_xstats.c | 28 |
7 files changed, 158 insertions, 74 deletions
diff --git a/drivers/event/sw/event_ring.h b/drivers/event/sw/event_ring.h index cdaee95d..734a3b4b 100644 --- a/drivers/event/sw/event_ring.h +++ b/drivers/event/sw/event_ring.h @@ -61,10 +61,6 @@ struct qe_ring { struct rte_event ring[0] __rte_cache_aligned; }; -#ifndef force_inline -#define force_inline inline __attribute__((always_inline)) -#endif - static inline struct qe_ring * qe_ring_create(const char *name, unsigned int size, unsigned int socket_id) { @@ -91,19 +87,19 @@ qe_ring_destroy(struct qe_ring *r) rte_free(r); } -static force_inline unsigned int +static __rte_always_inline unsigned int qe_ring_count(const struct qe_ring *r) { return r->write_idx - r->read_idx; } -static force_inline unsigned int +static __rte_always_inline unsigned int qe_ring_free_count(const struct qe_ring *r) { return r->size - qe_ring_count(r); } -static force_inline unsigned int +static __rte_always_inline unsigned int qe_ring_enqueue_burst(struct qe_ring *r, const struct rte_event *qes, unsigned int nb_qes, uint16_t *free_count) { @@ -130,7 +126,7 @@ qe_ring_enqueue_burst(struct qe_ring *r, const struct rte_event *qes, return nb_qes; } -static force_inline unsigned int +static __rte_always_inline unsigned int qe_ring_enqueue_burst_with_ops(struct qe_ring *r, const struct rte_event *qes, unsigned int nb_qes, uint8_t *ops) { @@ -157,7 +153,7 @@ qe_ring_enqueue_burst_with_ops(struct qe_ring *r, const struct rte_event *qes, return nb_qes; } -static force_inline unsigned int +static __rte_always_inline unsigned int qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes, unsigned int nb_qes) { diff --git a/drivers/event/sw/iq_ring.h b/drivers/event/sw/iq_ring.h index d480d156..64cf6784 100644 --- a/drivers/event/sw/iq_ring.h +++ b/drivers/event/sw/iq_ring.h @@ -56,10 +56,6 @@ struct iq_ring { struct rte_event ring[QID_IQ_DEPTH]; }; -#ifndef force_inline -#define force_inline inline __attribute__((always_inline)) -#endif - static inline struct iq_ring * iq_ring_create(const char *name, unsigned int socket_id) { @@ -81,19 +77,19 @@ iq_ring_destroy(struct iq_ring *r) rte_free(r); } -static force_inline uint16_t +static __rte_always_inline uint16_t iq_ring_count(const struct iq_ring *r) { return r->write_idx - r->read_idx; } -static force_inline uint16_t +static __rte_always_inline uint16_t iq_ring_free_count(const struct iq_ring *r) { return QID_IQ_MASK - iq_ring_count(r); } -static force_inline uint16_t +static __rte_always_inline uint16_t iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes) { const uint16_t read = r->read_idx; @@ -112,7 +108,7 @@ iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes) return nb_qes; } -static force_inline uint16_t +static __rte_always_inline uint16_t iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes) { uint16_t read = r->read_idx; @@ -132,7 +128,7 @@ iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes) } /* assumes there is space, from a previous dequeue_burst */ -static force_inline uint16_t +static __rte_always_inline uint16_t iq_ring_put_back(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes) { uint16_t i, read = r->read_idx; @@ -144,19 +140,19 @@ iq_ring_put_back(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes) return nb_qes; } -static force_inline const struct rte_event * +static __rte_always_inline const struct rte_event * iq_ring_peek(const struct iq_ring *r) { return &r->ring[r->read_idx & QID_IQ_MASK]; } -static force_inline void +static __rte_always_inline void iq_ring_pop(struct iq_ring *r) { r->read_idx++; } -static force_inline int +static __rte_always_inline int iq_ring_enqueue(struct iq_ring *r, const struct rte_event *qe) { const uint16_t read = r->read_idx; diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c index a31aaa66..9c534b7f 100644 --- a/drivers/event/sw/sw_evdev.c +++ b/drivers/event/sw/sw_evdev.c @@ -30,6 +30,7 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include <inttypes.h> #include <string.h> #include <rte_vdev.h> @@ -37,10 +38,11 @@ #include <rte_kvargs.h> #include <rte_ring.h> #include <rte_errno.h> +#include <rte_event_ring.h> +#include <rte_service_component.h> #include "sw_evdev.h" #include "iq_ring.h" -#include "event_ring.h" #define EVENTDEV_NAME_SW_PMD event_sw #define NUMA_NODE_ARG "numa_node" @@ -90,7 +92,8 @@ sw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[], } else if (q->type == RTE_SCHED_TYPE_ORDERED) { p->num_ordered_qids++; p->num_qids_mapped++; - } else if (q->type == RTE_SCHED_TYPE_ATOMIC) { + } else if (q->type == RTE_SCHED_TYPE_ATOMIC || + q->type == RTE_SCHED_TYPE_PARALLEL) { p->num_qids_mapped++; } @@ -138,7 +141,7 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id, { struct sw_evdev *sw = sw_pmd_priv(dev); struct sw_port *p = &sw->ports[port_id]; - char buf[QE_RING_NAMESIZE]; + char buf[RTE_RING_NAMESIZE]; unsigned int i; struct rte_event_dev_info info; @@ -159,10 +162,19 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id, p->id = port_id; p->sw = sw; - snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id, - "rx_worker_ring"); - p->rx_worker_ring = qe_ring_create(buf, MAX_SW_PROD_Q_DEPTH, - dev->data->socket_id); + /* check to see if rings exists - port_setup() can be called multiple + * times legally (assuming device is stopped). If ring exists, free it + * to so it gets re-created with the correct size + */ + snprintf(buf, sizeof(buf), "sw%d_p%u_%s", dev->data->dev_id, + port_id, "rx_worker_ring"); + struct rte_event_ring *existing_ring = rte_event_ring_lookup(buf); + if (existing_ring) + rte_event_ring_free(existing_ring); + + p->rx_worker_ring = rte_event_ring_create(buf, MAX_SW_PROD_Q_DEPTH, + dev->data->socket_id, + RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ); if (p->rx_worker_ring == NULL) { SW_LOG_ERR("Error creating RX worker ring for port %d\n", port_id); @@ -171,12 +183,18 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id, p->inflight_max = conf->new_event_threshold; - snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id, - "cq_worker_ring"); - p->cq_worker_ring = qe_ring_create(buf, conf->dequeue_depth, - dev->data->socket_id); + /* check if ring exists, same as rx_worker above */ + snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id, + port_id, "cq_worker_ring"); + existing_ring = rte_event_ring_lookup(buf); + if (existing_ring) + rte_event_ring_free(existing_ring); + + p->cq_worker_ring = rte_event_ring_create(buf, conf->dequeue_depth, + dev->data->socket_id, + RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ); if (p->cq_worker_ring == NULL) { - qe_ring_destroy(p->rx_worker_ring); + rte_event_ring_free(p->rx_worker_ring); SW_LOG_ERR("Error creating CQ worker ring for port %d\n", port_id); return -1; @@ -202,8 +220,8 @@ sw_port_release(void *port) if (p == NULL) return; - qe_ring_destroy(p->rx_worker_ring); - qe_ring_destroy(p->cq_worker_ring); + rte_event_ring_free(p->rx_worker_ring); + rte_event_ring_free(p->cq_worker_ring); memset(p, 0, sizeof(*p)); } @@ -435,6 +453,7 @@ sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info) .max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH, .max_num_events = SW_INFLIGHT_EVENTS_TOTAL, .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS | + RTE_EVENT_DEV_CAP_BURST_MODE | RTE_EVENT_DEV_CAP_EVENT_QOS), }; @@ -509,8 +528,9 @@ sw_dump(struct rte_eventdev *dev, FILE *f) fprintf(f, "\n"); if (p->rx_worker_ring) { - uint64_t used = qe_ring_count(p->rx_worker_ring); - uint64_t space = qe_ring_free_count(p->rx_worker_ring); + uint64_t used = rte_event_ring_count(p->rx_worker_ring); + uint64_t space = rte_event_ring_free_count( + p->rx_worker_ring); const char *col = (space == 0) ? COL_RED : COL_RESET; fprintf(f, "\t%srx ring used: %4"PRIu64"\tfree: %4" PRIu64 COL_RESET"\n", col, used, space); @@ -518,8 +538,9 @@ sw_dump(struct rte_eventdev *dev, FILE *f) fprintf(f, "\trx ring not initialized.\n"); if (p->cq_worker_ring) { - uint64_t used = qe_ring_count(p->cq_worker_ring); - uint64_t space = qe_ring_free_count(p->cq_worker_ring); + uint64_t used = rte_event_ring_count(p->cq_worker_ring); + uint64_t space = rte_event_ring_free_count( + p->cq_worker_ring); const char *col = (space == 0) ? COL_RED : COL_RESET; fprintf(f, "\t%scq ring used: %4"PRIu64"\tfree: %4" PRIu64 COL_RESET"\n", col, used, space); @@ -559,12 +580,13 @@ sw_dump(struct rte_eventdev *dev, FILE *f) inflights += qid->fids[flow].pcount; } - uint32_t cq; - fprintf(f, "\tInflights: %u\tFlows pinned per port: ", - inflights); - for (cq = 0; cq < sw->port_count; cq++) - fprintf(f, "%d ", affinities_per_port[cq]); - fprintf(f, "\n"); + uint32_t port; + fprintf(f, "\tPer Port Stats:\n"); + for (port = 0; port < sw->port_count; port++) { + fprintf(f, "\t Port %d: Pkts: %"PRIu64, port, + qid->to_port[port]); + fprintf(f, "\tFlows: %d\n", affinities_per_port[port]); + } uint32_t iq; uint32_t iq_printed = 0; @@ -593,6 +615,13 @@ sw_start(struct rte_eventdev *dev) { unsigned int i, j; struct sw_evdev *sw = sw_pmd_priv(dev); + + /* check a service core is mapped to this service */ + struct rte_service_spec *s = rte_service_get_by_name(sw->service_name); + if (!rte_service_is_running(s)) + SW_LOG_ERR("Warning: No Service core enabled on service %s\n", + s->name); + /* check all ports are set up */ for (i = 0; i < sw->port_count; i++) if (sw->ports[i].rx_worker_ring == NULL) { @@ -695,6 +724,14 @@ set_credit_quanta(const char *key __rte_unused, const char *value, void *opaque) return 0; } + +static int32_t sw_sched_service_func(void *args) +{ + struct rte_eventdev *dev = args; + sw_event_schedule(dev); + return 0; +} + static int sw_probe(struct rte_vdev_device *vdev) { @@ -792,6 +829,8 @@ sw_probe(struct rte_vdev_device *vdev) dev->dev_ops = &evdev_sw_ops; dev->enqueue = sw_event_enqueue; dev->enqueue_burst = sw_event_enqueue_burst; + dev->enqueue_new_burst = sw_event_enqueue_burst; + dev->enqueue_forward_burst = sw_event_enqueue_burst; dev->dequeue = sw_event_dequeue; dev->dequeue_burst = sw_event_dequeue_burst; dev->schedule = sw_event_schedule; @@ -806,6 +845,22 @@ sw_probe(struct rte_vdev_device *vdev) sw->credit_update_quanta = credit_quanta; sw->sched_quanta = sched_quanta; + /* register service with EAL */ + struct rte_service_spec service; + memset(&service, 0, sizeof(struct rte_service_spec)); + snprintf(service.name, sizeof(service.name), "%s_service", name); + snprintf(sw->service_name, sizeof(sw->service_name), "%s_service", + name); + service.socket_id = socket_id; + service.callback = sw_sched_service_func; + service.callback_userdata = (void *)dev; + + int32_t ret = rte_service_register(&service); + if (ret) { + SW_LOG_ERR("service register() failed"); + return -ENOEXEC; + } + return 0; } diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h index 61c671d6..71de3c14 100644 --- a/drivers/event/sw/sw_evdev.h +++ b/drivers/event/sw/sw_evdev.h @@ -34,7 +34,7 @@ #define _SW_EVDEV_H_ #include <rte_eventdev.h> -#include <rte_eventdev_pmd.h> +#include <rte_eventdev_pmd_vdev.h> #include <rte_atomic.h> #define SW_DEFAULT_CREDIT_QUANTA 32 @@ -59,6 +59,7 @@ #define EVENTDEV_NAME_SW_PMD event_sw #define SW_PMD_NAME RTE_STR(event_sw) +#define SW_PMD_NAME_MAX 64 #define SW_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1) @@ -149,6 +150,7 @@ struct sw_qid { uint32_t cq_num_mapped_cqs; uint32_t cq_next_tx; /* cq to write next (non-atomic) packet */ uint32_t cq_map[SW_PORTS_MAX]; + uint64_t to_port[SW_PORTS_MAX]; /* Track flow ids for atomic load balancing */ struct sw_fid_t fids[SW_QID_NUM_FIDS]; @@ -189,9 +191,9 @@ struct sw_port { int16_t num_ordered_qids; /** Ring and buffer for pulling events from workers for scheduling */ - struct qe_ring *rx_worker_ring __rte_cache_aligned; + struct rte_event_ring *rx_worker_ring __rte_cache_aligned; /** Ring and buffer for pushing packets to workers after scheduling */ - struct qe_ring *cq_worker_ring; + struct rte_event_ring *cq_worker_ring; /* hole */ @@ -275,6 +277,8 @@ struct sw_evdev { /* store num stats and offset of the stats for each queue */ uint16_t xstats_count_per_qid[RTE_EVENT_MAX_QUEUES_PER_DEV]; uint16_t xstats_offset_for_qid[RTE_EVENT_MAX_QUEUES_PER_DEV]; + + char service_name[SW_PMD_NAME_MAX]; }; static inline struct sw_evdev * diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c index a333a6f0..8a2c9d4f 100644 --- a/drivers/event/sw/sw_evdev_scheduler.c +++ b/drivers/event/sw/sw_evdev_scheduler.c @@ -32,9 +32,9 @@ #include <rte_ring.h> #include <rte_hash_crc.h> +#include <rte_event_ring.h> #include "sw_evdev.h" #include "iq_ring.h" -#include "event_ring.h" #define SW_IQS_MASK (SW_IQS_MAX-1) @@ -119,11 +119,12 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, p->stats.tx_pkts++; qid->stats.tx_pkts++; + qid->to_port[cq]++; /* if we just filled in the last slot, flush the buffer */ if (sw->cq_ring_space[cq] == 0) { - struct qe_ring *worker = p->cq_worker_ring; - qe_ring_enqueue_burst(worker, p->cq_buf, + struct rte_event_ring *worker = p->cq_worker_ring; + rte_event_ring_enqueue_burst(worker, p->cq_buf, p->cq_buf_count, &sw->cq_ring_space[cq]); p->cq_buf_count = 0; @@ -170,7 +171,8 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, cq = qid->cq_map[cq_idx]; if (++cq_idx == qid->cq_num_mapped_cqs) cq_idx = 0; - } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 || + } while (rte_event_ring_free_count( + sw->ports[cq].cq_worker_ring) == 0 || sw->ports[cq].inflights == SW_PORT_HIST_LIST); struct sw_port *p = &sw->ports[cq]; @@ -362,17 +364,17 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) return pkts_iter; } -static inline void __attribute__((always_inline)) +static __rte_always_inline void sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) { RTE_SET_USED(sw); - struct qe_ring *worker = port->rx_worker_ring; + struct rte_event_ring *worker = port->rx_worker_ring; port->pp_buf_start = 0; - port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf, - RTE_DIM(port->pp_buf)); + port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf, + RTE_DIM(port->pp_buf), NULL); } -static inline uint32_t __attribute__((always_inline)) +static __rte_always_inline uint32_t __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) { static struct reorder_buffer_entry dummy_rob; @@ -585,8 +587,8 @@ sw_event_schedule(struct rte_eventdev *dev) * worker cores: aka, do the ring transfers batched. */ for (i = 0; i < sw->port_count; i++) { - struct qe_ring *worker = sw->ports[i].cq_worker_ring; - qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf, + struct rte_event_ring *worker = sw->ports[i].cq_worker_ring; + rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf, sw->ports[i].cq_buf_count, &sw->cq_ring_space[i]); sw->ports[i].cq_buf_count = 0; diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c index 9cb6bef5..d76d3d5c 100644 --- a/drivers/event/sw/sw_evdev_worker.c +++ b/drivers/event/sw/sw_evdev_worker.c @@ -32,9 +32,9 @@ #include <rte_atomic.h> #include <rte_cycles.h> +#include <rte_event_ring.h> #include "sw_evdev.h" -#include "event_ring.h" #define PORT_ENQUEUE_MAX_BURST_SIZE 64 @@ -52,13 +52,31 @@ sw_event_release(struct sw_port *p, uint8_t index) ev.op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE]; uint16_t free_count; - qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count); + rte_event_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count); /* each release returns one credit */ p->outstanding_releases--; p->inflight_credits++; } +/* + * special-case of rte_event_ring enqueue, with overriding the ops member on + * the events that get written to the ring. + */ +static inline unsigned int +enqueue_burst_with_ops(struct rte_event_ring *r, const struct rte_event *events, + unsigned int n, uint8_t *ops) +{ + struct rte_event tmp_evs[PORT_ENQUEUE_MAX_BURST_SIZE]; + unsigned int i; + + memcpy(tmp_evs, events, n * sizeof(events[0])); + for (i = 0; i < n; i++) + tmp_evs[i].op = ops[i]; + + return rte_event_ring_enqueue_burst(r, tmp_evs, n, NULL); +} + uint16_t sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) { @@ -87,6 +105,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) return 0; } + uint32_t forwards = 0; for (i = 0; i < num; i++) { int op = ev[i].op; int outstanding = p->outstanding_releases > 0; @@ -95,6 +114,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) p->inflight_credits -= (op == RTE_EVENT_OP_NEW); p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) * outstanding; + forwards += (op == RTE_EVENT_OP_FORWARD); new_ops[i] = sw_qe_flag_map[op]; new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT); @@ -113,8 +133,11 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) } } + /* handle directed port forward credits */ + p->inflight_credits -= forwards * p->is_directed; + /* returns number of events actually enqueued */ - uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i, + uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i, new_ops); if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) { uint64_t burst_ticks = rte_get_timer_cycles() - @@ -141,7 +164,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, RTE_SET_USED(wait); struct sw_port *p = (void *)port; struct sw_evdev *sw = (void *)p->sw; - struct qe_ring *ring = p->cq_worker_ring; + struct rte_event_ring *ring = p->cq_worker_ring; uint32_t credit_update_quanta = sw->credit_update_quanta; /* check that all previous dequeues have been released */ @@ -153,7 +176,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, } /* returns number of events actually dequeued */ - uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num); + uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL); if (unlikely(ndeq == 0)) { p->outstanding_releases = 0; p->zero_polls++; diff --git a/drivers/event/sw/sw_evdev_xstats.c b/drivers/event/sw/sw_evdev_xstats.c index c7b1abe8..8cb6d88d 100644 --- a/drivers/event/sw/sw_evdev_xstats.c +++ b/drivers/event/sw/sw_evdev_xstats.c @@ -30,9 +30,9 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include <rte_event_ring.h> #include "sw_evdev.h" #include "iq_ring.h" -#include "event_ring.h" enum xstats_type { /* common stats */ @@ -57,6 +57,7 @@ enum xstats_type { iq_used, /* qid port mapping specific */ pinned, + pkts, /* note: qid-to-port pkts */ }; typedef uint64_t (*xstats_fn)(const struct sw_evdev *dev, @@ -104,10 +105,10 @@ get_port_stat(const struct sw_evdev *sw, uint16_t obj_idx, case calls: return p->total_polls; case credits: return p->inflight_credits; case poll_return: return p->zero_polls; - case rx_used: return qe_ring_count(p->rx_worker_ring); - case rx_free: return qe_ring_free_count(p->rx_worker_ring); - case tx_used: return qe_ring_count(p->cq_worker_ring); - case tx_free: return qe_ring_free_count(p->cq_worker_ring); + case rx_used: return rte_event_ring_count(p->rx_worker_ring); + case rx_free: return rte_event_ring_free_count(p->rx_worker_ring); + case tx_used: return rte_event_ring_count(p->cq_worker_ring); + case tx_free: return rte_event_ring_free_count(p->cq_worker_ring); default: return -1; } } @@ -179,6 +180,8 @@ get_qid_port_stat(const struct sw_evdev *sw, uint16_t obj_idx, return pin; } while (0); break; + case pkts: + return qid->to_port[port]; default: return -1; } } @@ -246,8 +249,11 @@ sw_xstats_init(struct sw_evdev *sw) static const enum xstats_type qid_iq_types[] = { iq_used }; /* reset allowed */ - static const char * const qid_port_stats[] = { "pinned_flows" }; - static const enum xstats_type qid_port_types[] = { pinned }; + static const char * const qid_port_stats[] = { "pinned_flows", + "packets" + }; + static const enum xstats_type qid_port_types[] = { pinned, pkts }; + static const uint8_t qid_port_reset_allowed[] = {0, 1}; /* reset allowed */ /* ---- end of stat definitions ---- */ @@ -312,8 +318,9 @@ sw_xstats_init(struct sw_evdev *sw) port, port_stats[i]); } - for (bkt = 0; bkt < (sw->ports[port].cq_worker_ring->size >> - SW_DEQ_STAT_BUCKET_SHIFT) + 1; bkt++) { + for (bkt = 0; bkt < (rte_event_ring_get_capacity( + sw->ports[port].cq_worker_ring) >> + SW_DEQ_STAT_BUCKET_SHIFT) + 1; bkt++) { for (i = 0; i < RTE_DIM(port_bucket_stats); i++) { sw->xstats[stat] = (struct sw_xstats_entry){ .fn = get_port_bucket_stat, @@ -376,7 +383,8 @@ sw_xstats_init(struct sw_evdev *sw) .stat = qid_port_types[i], .mode = RTE_EVENT_DEV_XSTATS_QUEUE, .extra_arg = port, - .reset_allowed = 0, + .reset_allowed = + qid_port_reset_allowed[i], }; snprintf(sname, sizeof(sname), "qid_%u_port_%u_%s", |