aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/event/sw
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/event/sw')
-rw-r--r--drivers/event/sw/event_ring.h14
-rw-r--r--drivers/event/sw/iq_ring.h20
-rw-r--r--drivers/event/sw/sw_evdev.c103
-rw-r--r--drivers/event/sw/sw_evdev.h10
-rw-r--r--drivers/event/sw/sw_evdev_scheduler.c24
-rw-r--r--drivers/event/sw/sw_evdev_worker.c33
-rw-r--r--drivers/event/sw/sw_evdev_xstats.c28
7 files changed, 158 insertions, 74 deletions
diff --git a/drivers/event/sw/event_ring.h b/drivers/event/sw/event_ring.h
index cdaee95d..734a3b4b 100644
--- a/drivers/event/sw/event_ring.h
+++ b/drivers/event/sw/event_ring.h
@@ -61,10 +61,6 @@ struct qe_ring {
struct rte_event ring[0] __rte_cache_aligned;
};
-#ifndef force_inline
-#define force_inline inline __attribute__((always_inline))
-#endif
-
static inline struct qe_ring *
qe_ring_create(const char *name, unsigned int size, unsigned int socket_id)
{
@@ -91,19 +87,19 @@ qe_ring_destroy(struct qe_ring *r)
rte_free(r);
}
-static force_inline unsigned int
+static __rte_always_inline unsigned int
qe_ring_count(const struct qe_ring *r)
{
return r->write_idx - r->read_idx;
}
-static force_inline unsigned int
+static __rte_always_inline unsigned int
qe_ring_free_count(const struct qe_ring *r)
{
return r->size - qe_ring_count(r);
}
-static force_inline unsigned int
+static __rte_always_inline unsigned int
qe_ring_enqueue_burst(struct qe_ring *r, const struct rte_event *qes,
unsigned int nb_qes, uint16_t *free_count)
{
@@ -130,7 +126,7 @@ qe_ring_enqueue_burst(struct qe_ring *r, const struct rte_event *qes,
return nb_qes;
}
-static force_inline unsigned int
+static __rte_always_inline unsigned int
qe_ring_enqueue_burst_with_ops(struct qe_ring *r, const struct rte_event *qes,
unsigned int nb_qes, uint8_t *ops)
{
@@ -157,7 +153,7 @@ qe_ring_enqueue_burst_with_ops(struct qe_ring *r, const struct rte_event *qes,
return nb_qes;
}
-static force_inline unsigned int
+static __rte_always_inline unsigned int
qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes,
unsigned int nb_qes)
{
diff --git a/drivers/event/sw/iq_ring.h b/drivers/event/sw/iq_ring.h
index d480d156..64cf6784 100644
--- a/drivers/event/sw/iq_ring.h
+++ b/drivers/event/sw/iq_ring.h
@@ -56,10 +56,6 @@ struct iq_ring {
struct rte_event ring[QID_IQ_DEPTH];
};
-#ifndef force_inline
-#define force_inline inline __attribute__((always_inline))
-#endif
-
static inline struct iq_ring *
iq_ring_create(const char *name, unsigned int socket_id)
{
@@ -81,19 +77,19 @@ iq_ring_destroy(struct iq_ring *r)
rte_free(r);
}
-static force_inline uint16_t
+static __rte_always_inline uint16_t
iq_ring_count(const struct iq_ring *r)
{
return r->write_idx - r->read_idx;
}
-static force_inline uint16_t
+static __rte_always_inline uint16_t
iq_ring_free_count(const struct iq_ring *r)
{
return QID_IQ_MASK - iq_ring_count(r);
}
-static force_inline uint16_t
+static __rte_always_inline uint16_t
iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
{
const uint16_t read = r->read_idx;
@@ -112,7 +108,7 @@ iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
return nb_qes;
}
-static force_inline uint16_t
+static __rte_always_inline uint16_t
iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
{
uint16_t read = r->read_idx;
@@ -132,7 +128,7 @@ iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
}
/* assumes there is space, from a previous dequeue_burst */
-static force_inline uint16_t
+static __rte_always_inline uint16_t
iq_ring_put_back(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
{
uint16_t i, read = r->read_idx;
@@ -144,19 +140,19 @@ iq_ring_put_back(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
return nb_qes;
}
-static force_inline const struct rte_event *
+static __rte_always_inline const struct rte_event *
iq_ring_peek(const struct iq_ring *r)
{
return &r->ring[r->read_idx & QID_IQ_MASK];
}
-static force_inline void
+static __rte_always_inline void
iq_ring_pop(struct iq_ring *r)
{
r->read_idx++;
}
-static force_inline int
+static __rte_always_inline int
iq_ring_enqueue(struct iq_ring *r, const struct rte_event *qe)
{
const uint16_t read = r->read_idx;
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index a31aaa66..9c534b7f 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -30,6 +30,7 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+#include <inttypes.h>
#include <string.h>
#include <rte_vdev.h>
@@ -37,10 +38,11 @@
#include <rte_kvargs.h>
#include <rte_ring.h>
#include <rte_errno.h>
+#include <rte_event_ring.h>
+#include <rte_service_component.h>
#include "sw_evdev.h"
#include "iq_ring.h"
-#include "event_ring.h"
#define EVENTDEV_NAME_SW_PMD event_sw
#define NUMA_NODE_ARG "numa_node"
@@ -90,7 +92,8 @@ sw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
} else if (q->type == RTE_SCHED_TYPE_ORDERED) {
p->num_ordered_qids++;
p->num_qids_mapped++;
- } else if (q->type == RTE_SCHED_TYPE_ATOMIC) {
+ } else if (q->type == RTE_SCHED_TYPE_ATOMIC ||
+ q->type == RTE_SCHED_TYPE_PARALLEL) {
p->num_qids_mapped++;
}
@@ -138,7 +141,7 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
{
struct sw_evdev *sw = sw_pmd_priv(dev);
struct sw_port *p = &sw->ports[port_id];
- char buf[QE_RING_NAMESIZE];
+ char buf[RTE_RING_NAMESIZE];
unsigned int i;
struct rte_event_dev_info info;
@@ -159,10 +162,19 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
p->id = port_id;
p->sw = sw;
- snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id,
- "rx_worker_ring");
- p->rx_worker_ring = qe_ring_create(buf, MAX_SW_PROD_Q_DEPTH,
- dev->data->socket_id);
+ /* check to see if rings exists - port_setup() can be called multiple
+ * times legally (assuming device is stopped). If ring exists, free it
+ * to so it gets re-created with the correct size
+ */
+ snprintf(buf, sizeof(buf), "sw%d_p%u_%s", dev->data->dev_id,
+ port_id, "rx_worker_ring");
+ struct rte_event_ring *existing_ring = rte_event_ring_lookup(buf);
+ if (existing_ring)
+ rte_event_ring_free(existing_ring);
+
+ p->rx_worker_ring = rte_event_ring_create(buf, MAX_SW_PROD_Q_DEPTH,
+ dev->data->socket_id,
+ RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ);
if (p->rx_worker_ring == NULL) {
SW_LOG_ERR("Error creating RX worker ring for port %d\n",
port_id);
@@ -171,12 +183,18 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
p->inflight_max = conf->new_event_threshold;
- snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id,
- "cq_worker_ring");
- p->cq_worker_ring = qe_ring_create(buf, conf->dequeue_depth,
- dev->data->socket_id);
+ /* check if ring exists, same as rx_worker above */
+ snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id,
+ port_id, "cq_worker_ring");
+ existing_ring = rte_event_ring_lookup(buf);
+ if (existing_ring)
+ rte_event_ring_free(existing_ring);
+
+ p->cq_worker_ring = rte_event_ring_create(buf, conf->dequeue_depth,
+ dev->data->socket_id,
+ RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ);
if (p->cq_worker_ring == NULL) {
- qe_ring_destroy(p->rx_worker_ring);
+ rte_event_ring_free(p->rx_worker_ring);
SW_LOG_ERR("Error creating CQ worker ring for port %d\n",
port_id);
return -1;
@@ -202,8 +220,8 @@ sw_port_release(void *port)
if (p == NULL)
return;
- qe_ring_destroy(p->rx_worker_ring);
- qe_ring_destroy(p->cq_worker_ring);
+ rte_event_ring_free(p->rx_worker_ring);
+ rte_event_ring_free(p->cq_worker_ring);
memset(p, 0, sizeof(*p));
}
@@ -435,6 +453,7 @@ sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
.max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH,
.max_num_events = SW_INFLIGHT_EVENTS_TOTAL,
.event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
+ RTE_EVENT_DEV_CAP_BURST_MODE |
RTE_EVENT_DEV_CAP_EVENT_QOS),
};
@@ -509,8 +528,9 @@ sw_dump(struct rte_eventdev *dev, FILE *f)
fprintf(f, "\n");
if (p->rx_worker_ring) {
- uint64_t used = qe_ring_count(p->rx_worker_ring);
- uint64_t space = qe_ring_free_count(p->rx_worker_ring);
+ uint64_t used = rte_event_ring_count(p->rx_worker_ring);
+ uint64_t space = rte_event_ring_free_count(
+ p->rx_worker_ring);
const char *col = (space == 0) ? COL_RED : COL_RESET;
fprintf(f, "\t%srx ring used: %4"PRIu64"\tfree: %4"
PRIu64 COL_RESET"\n", col, used, space);
@@ -518,8 +538,9 @@ sw_dump(struct rte_eventdev *dev, FILE *f)
fprintf(f, "\trx ring not initialized.\n");
if (p->cq_worker_ring) {
- uint64_t used = qe_ring_count(p->cq_worker_ring);
- uint64_t space = qe_ring_free_count(p->cq_worker_ring);
+ uint64_t used = rte_event_ring_count(p->cq_worker_ring);
+ uint64_t space = rte_event_ring_free_count(
+ p->cq_worker_ring);
const char *col = (space == 0) ? COL_RED : COL_RESET;
fprintf(f, "\t%scq ring used: %4"PRIu64"\tfree: %4"
PRIu64 COL_RESET"\n", col, used, space);
@@ -559,12 +580,13 @@ sw_dump(struct rte_eventdev *dev, FILE *f)
inflights += qid->fids[flow].pcount;
}
- uint32_t cq;
- fprintf(f, "\tInflights: %u\tFlows pinned per port: ",
- inflights);
- for (cq = 0; cq < sw->port_count; cq++)
- fprintf(f, "%d ", affinities_per_port[cq]);
- fprintf(f, "\n");
+ uint32_t port;
+ fprintf(f, "\tPer Port Stats:\n");
+ for (port = 0; port < sw->port_count; port++) {
+ fprintf(f, "\t Port %d: Pkts: %"PRIu64, port,
+ qid->to_port[port]);
+ fprintf(f, "\tFlows: %d\n", affinities_per_port[port]);
+ }
uint32_t iq;
uint32_t iq_printed = 0;
@@ -593,6 +615,13 @@ sw_start(struct rte_eventdev *dev)
{
unsigned int i, j;
struct sw_evdev *sw = sw_pmd_priv(dev);
+
+ /* check a service core is mapped to this service */
+ struct rte_service_spec *s = rte_service_get_by_name(sw->service_name);
+ if (!rte_service_is_running(s))
+ SW_LOG_ERR("Warning: No Service core enabled on service %s\n",
+ s->name);
+
/* check all ports are set up */
for (i = 0; i < sw->port_count; i++)
if (sw->ports[i].rx_worker_ring == NULL) {
@@ -695,6 +724,14 @@ set_credit_quanta(const char *key __rte_unused, const char *value, void *opaque)
return 0;
}
+
+static int32_t sw_sched_service_func(void *args)
+{
+ struct rte_eventdev *dev = args;
+ sw_event_schedule(dev);
+ return 0;
+}
+
static int
sw_probe(struct rte_vdev_device *vdev)
{
@@ -792,6 +829,8 @@ sw_probe(struct rte_vdev_device *vdev)
dev->dev_ops = &evdev_sw_ops;
dev->enqueue = sw_event_enqueue;
dev->enqueue_burst = sw_event_enqueue_burst;
+ dev->enqueue_new_burst = sw_event_enqueue_burst;
+ dev->enqueue_forward_burst = sw_event_enqueue_burst;
dev->dequeue = sw_event_dequeue;
dev->dequeue_burst = sw_event_dequeue_burst;
dev->schedule = sw_event_schedule;
@@ -806,6 +845,22 @@ sw_probe(struct rte_vdev_device *vdev)
sw->credit_update_quanta = credit_quanta;
sw->sched_quanta = sched_quanta;
+ /* register service with EAL */
+ struct rte_service_spec service;
+ memset(&service, 0, sizeof(struct rte_service_spec));
+ snprintf(service.name, sizeof(service.name), "%s_service", name);
+ snprintf(sw->service_name, sizeof(sw->service_name), "%s_service",
+ name);
+ service.socket_id = socket_id;
+ service.callback = sw_sched_service_func;
+ service.callback_userdata = (void *)dev;
+
+ int32_t ret = rte_service_register(&service);
+ if (ret) {
+ SW_LOG_ERR("service register() failed");
+ return -ENOEXEC;
+ }
+
return 0;
}
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index 61c671d6..71de3c14 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -34,7 +34,7 @@
#define _SW_EVDEV_H_
#include <rte_eventdev.h>
-#include <rte_eventdev_pmd.h>
+#include <rte_eventdev_pmd_vdev.h>
#include <rte_atomic.h>
#define SW_DEFAULT_CREDIT_QUANTA 32
@@ -59,6 +59,7 @@
#define EVENTDEV_NAME_SW_PMD event_sw
#define SW_PMD_NAME RTE_STR(event_sw)
+#define SW_PMD_NAME_MAX 64
#define SW_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
@@ -149,6 +150,7 @@ struct sw_qid {
uint32_t cq_num_mapped_cqs;
uint32_t cq_next_tx; /* cq to write next (non-atomic) packet */
uint32_t cq_map[SW_PORTS_MAX];
+ uint64_t to_port[SW_PORTS_MAX];
/* Track flow ids for atomic load balancing */
struct sw_fid_t fids[SW_QID_NUM_FIDS];
@@ -189,9 +191,9 @@ struct sw_port {
int16_t num_ordered_qids;
/** Ring and buffer for pulling events from workers for scheduling */
- struct qe_ring *rx_worker_ring __rte_cache_aligned;
+ struct rte_event_ring *rx_worker_ring __rte_cache_aligned;
/** Ring and buffer for pushing packets to workers after scheduling */
- struct qe_ring *cq_worker_ring;
+ struct rte_event_ring *cq_worker_ring;
/* hole */
@@ -275,6 +277,8 @@ struct sw_evdev {
/* store num stats and offset of the stats for each queue */
uint16_t xstats_count_per_qid[RTE_EVENT_MAX_QUEUES_PER_DEV];
uint16_t xstats_offset_for_qid[RTE_EVENT_MAX_QUEUES_PER_DEV];
+
+ char service_name[SW_PMD_NAME_MAX];
};
static inline struct sw_evdev *
diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c
index a333a6f0..8a2c9d4f 100644
--- a/drivers/event/sw/sw_evdev_scheduler.c
+++ b/drivers/event/sw/sw_evdev_scheduler.c
@@ -32,9 +32,9 @@
#include <rte_ring.h>
#include <rte_hash_crc.h>
+#include <rte_event_ring.h>
#include "sw_evdev.h"
#include "iq_ring.h"
-#include "event_ring.h"
#define SW_IQS_MASK (SW_IQS_MAX-1)
@@ -119,11 +119,12 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
p->stats.tx_pkts++;
qid->stats.tx_pkts++;
+ qid->to_port[cq]++;
/* if we just filled in the last slot, flush the buffer */
if (sw->cq_ring_space[cq] == 0) {
- struct qe_ring *worker = p->cq_worker_ring;
- qe_ring_enqueue_burst(worker, p->cq_buf,
+ struct rte_event_ring *worker = p->cq_worker_ring;
+ rte_event_ring_enqueue_burst(worker, p->cq_buf,
p->cq_buf_count,
&sw->cq_ring_space[cq]);
p->cq_buf_count = 0;
@@ -170,7 +171,8 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
cq = qid->cq_map[cq_idx];
if (++cq_idx == qid->cq_num_mapped_cqs)
cq_idx = 0;
- } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 ||
+ } while (rte_event_ring_free_count(
+ sw->ports[cq].cq_worker_ring) == 0 ||
sw->ports[cq].inflights == SW_PORT_HIST_LIST);
struct sw_port *p = &sw->ports[cq];
@@ -362,17 +364,17 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
return pkts_iter;
}
-static inline void __attribute__((always_inline))
+static __rte_always_inline void
sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
{
RTE_SET_USED(sw);
- struct qe_ring *worker = port->rx_worker_ring;
+ struct rte_event_ring *worker = port->rx_worker_ring;
port->pp_buf_start = 0;
- port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
- RTE_DIM(port->pp_buf));
+ port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
+ RTE_DIM(port->pp_buf), NULL);
}
-static inline uint32_t __attribute__((always_inline))
+static __rte_always_inline uint32_t
__pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
{
static struct reorder_buffer_entry dummy_rob;
@@ -585,8 +587,8 @@ sw_event_schedule(struct rte_eventdev *dev)
* worker cores: aka, do the ring transfers batched.
*/
for (i = 0; i < sw->port_count; i++) {
- struct qe_ring *worker = sw->ports[i].cq_worker_ring;
- qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
+ struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
+ rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
sw->ports[i].cq_buf_count,
&sw->cq_ring_space[i]);
sw->ports[i].cq_buf_count = 0;
diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c
index 9cb6bef5..d76d3d5c 100644
--- a/drivers/event/sw/sw_evdev_worker.c
+++ b/drivers/event/sw/sw_evdev_worker.c
@@ -32,9 +32,9 @@
#include <rte_atomic.h>
#include <rte_cycles.h>
+#include <rte_event_ring.h>
#include "sw_evdev.h"
-#include "event_ring.h"
#define PORT_ENQUEUE_MAX_BURST_SIZE 64
@@ -52,13 +52,31 @@ sw_event_release(struct sw_port *p, uint8_t index)
ev.op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE];
uint16_t free_count;
- qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);
+ rte_event_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);
/* each release returns one credit */
p->outstanding_releases--;
p->inflight_credits++;
}
+/*
+ * special-case of rte_event_ring enqueue, with overriding the ops member on
+ * the events that get written to the ring.
+ */
+static inline unsigned int
+enqueue_burst_with_ops(struct rte_event_ring *r, const struct rte_event *events,
+ unsigned int n, uint8_t *ops)
+{
+ struct rte_event tmp_evs[PORT_ENQUEUE_MAX_BURST_SIZE];
+ unsigned int i;
+
+ memcpy(tmp_evs, events, n * sizeof(events[0]));
+ for (i = 0; i < n; i++)
+ tmp_evs[i].op = ops[i];
+
+ return rte_event_ring_enqueue_burst(r, tmp_evs, n, NULL);
+}
+
uint16_t
sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
{
@@ -87,6 +105,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
return 0;
}
+ uint32_t forwards = 0;
for (i = 0; i < num; i++) {
int op = ev[i].op;
int outstanding = p->outstanding_releases > 0;
@@ -95,6 +114,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
outstanding;
+ forwards += (op == RTE_EVENT_OP_FORWARD);
new_ops[i] = sw_qe_flag_map[op];
new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
@@ -113,8 +133,11 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
}
}
+ /* handle directed port forward credits */
+ p->inflight_credits -= forwards * p->is_directed;
+
/* returns number of events actually enqueued */
- uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
+ uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
new_ops);
if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) {
uint64_t burst_ticks = rte_get_timer_cycles() -
@@ -141,7 +164,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
RTE_SET_USED(wait);
struct sw_port *p = (void *)port;
struct sw_evdev *sw = (void *)p->sw;
- struct qe_ring *ring = p->cq_worker_ring;
+ struct rte_event_ring *ring = p->cq_worker_ring;
uint32_t credit_update_quanta = sw->credit_update_quanta;
/* check that all previous dequeues have been released */
@@ -153,7 +176,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
}
/* returns number of events actually dequeued */
- uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num);
+ uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL);
if (unlikely(ndeq == 0)) {
p->outstanding_releases = 0;
p->zero_polls++;
diff --git a/drivers/event/sw/sw_evdev_xstats.c b/drivers/event/sw/sw_evdev_xstats.c
index c7b1abe8..8cb6d88d 100644
--- a/drivers/event/sw/sw_evdev_xstats.c
+++ b/drivers/event/sw/sw_evdev_xstats.c
@@ -30,9 +30,9 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+#include <rte_event_ring.h>
#include "sw_evdev.h"
#include "iq_ring.h"
-#include "event_ring.h"
enum xstats_type {
/* common stats */
@@ -57,6 +57,7 @@ enum xstats_type {
iq_used,
/* qid port mapping specific */
pinned,
+ pkts, /* note: qid-to-port pkts */
};
typedef uint64_t (*xstats_fn)(const struct sw_evdev *dev,
@@ -104,10 +105,10 @@ get_port_stat(const struct sw_evdev *sw, uint16_t obj_idx,
case calls: return p->total_polls;
case credits: return p->inflight_credits;
case poll_return: return p->zero_polls;
- case rx_used: return qe_ring_count(p->rx_worker_ring);
- case rx_free: return qe_ring_free_count(p->rx_worker_ring);
- case tx_used: return qe_ring_count(p->cq_worker_ring);
- case tx_free: return qe_ring_free_count(p->cq_worker_ring);
+ case rx_used: return rte_event_ring_count(p->rx_worker_ring);
+ case rx_free: return rte_event_ring_free_count(p->rx_worker_ring);
+ case tx_used: return rte_event_ring_count(p->cq_worker_ring);
+ case tx_free: return rte_event_ring_free_count(p->cq_worker_ring);
default: return -1;
}
}
@@ -179,6 +180,8 @@ get_qid_port_stat(const struct sw_evdev *sw, uint16_t obj_idx,
return pin;
} while (0);
break;
+ case pkts:
+ return qid->to_port[port];
default: return -1;
}
}
@@ -246,8 +249,11 @@ sw_xstats_init(struct sw_evdev *sw)
static const enum xstats_type qid_iq_types[] = { iq_used };
/* reset allowed */
- static const char * const qid_port_stats[] = { "pinned_flows" };
- static const enum xstats_type qid_port_types[] = { pinned };
+ static const char * const qid_port_stats[] = { "pinned_flows",
+ "packets"
+ };
+ static const enum xstats_type qid_port_types[] = { pinned, pkts };
+ static const uint8_t qid_port_reset_allowed[] = {0, 1};
/* reset allowed */
/* ---- end of stat definitions ---- */
@@ -312,8 +318,9 @@ sw_xstats_init(struct sw_evdev *sw)
port, port_stats[i]);
}
- for (bkt = 0; bkt < (sw->ports[port].cq_worker_ring->size >>
- SW_DEQ_STAT_BUCKET_SHIFT) + 1; bkt++) {
+ for (bkt = 0; bkt < (rte_event_ring_get_capacity(
+ sw->ports[port].cq_worker_ring) >>
+ SW_DEQ_STAT_BUCKET_SHIFT) + 1; bkt++) {
for (i = 0; i < RTE_DIM(port_bucket_stats); i++) {
sw->xstats[stat] = (struct sw_xstats_entry){
.fn = get_port_bucket_stat,
@@ -376,7 +383,8 @@ sw_xstats_init(struct sw_evdev *sw)
.stat = qid_port_types[i],
.mode = RTE_EVENT_DEV_XSTATS_QUEUE,
.extra_arg = port,
- .reset_allowed = 0,
+ .reset_allowed =
+ qid_port_reset_allowed[i],
};
snprintf(sname, sizeof(sname),
"qid_%u_port_%u_%s",