aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2021-01-22 15:05:14 -0800
committerDave Barach <openvpp@barachs.net>2021-01-25 15:34:21 +0000
commit86f1232ddee5b1751c6ff683892072111d0e2dee (patch)
treebfa213f887f6c4142531b7585df7b6707a40feb9 /src
parent15036ad0bc0b41e42d924e6b1cd897cca8f98c3c (diff)
svm: add custom q implementation for mq
Add separate queue implementation for the message queue as it's custom tailored for fifo segments as opposed to binary api. Also move eventfds to the private data structures. Type: refactor Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I6df0c824ecd94c7904516373f92a9fffc6b04736
Diffstat (limited to 'src')
-rw-r--r--src/plugins/hs_apps/sapi/vpp_echo_bapi.c2
-rw-r--r--src/plugins/unittest/session_test.c5
-rw-r--r--src/svm/fifo_segment.c7
-rw-r--r--src/svm/message_queue.c269
-rw-r--r--src/svm/message_queue.h107
-rw-r--r--src/vcl/vcl_bapi.c4
-rw-r--r--src/vcl/vcl_private.c2
-rw-r--r--src/vcl/vcl_sapi.c5
-rw-r--r--src/vnet/session/segment_manager.c2
-rw-r--r--src/vnet/session/session.c3
-rw-r--r--src/vnet/session/session_api.c8
-rw-r--r--src/vnet/session/session_debug.c20
12 files changed, 313 insertions, 121 deletions
diff --git a/src/plugins/hs_apps/sapi/vpp_echo_bapi.c b/src/plugins/hs_apps/sapi/vpp_echo_bapi.c
index 807ec62d951..915448a0be4 100644
--- a/src/plugins/hs_apps/sapi/vpp_echo_bapi.c
+++ b/src/plugins/hs_apps/sapi/vpp_echo_bapi.c
@@ -437,7 +437,7 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
echo_segment_attach_mq (segment_handle, mp->app_mq, 0, &em->app_mq);
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
- svm_msg_q_set_consumer_eventfd (em->app_mq, fds[n_fds++]);
+ svm_msg_q_set_eventfd (em->app_mq, fds[n_fds++]);
vec_free (fds);
}
diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c
index 68605b2cbd7..6496a99ba2c 100644
--- a/src/plugins/unittest/session_test.c
+++ b/src/plugins/unittest/session_test.c
@@ -1839,9 +1839,8 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
mq = app_wrk->event_queue;
if (use_eventfd)
{
- svm_msg_q_alloc_producer_eventfd (mq);
- svm_msg_q_alloc_consumer_eventfd (mq);
- prod_fd = svm_msg_q_get_producer_eventfd (mq);
+ svm_msg_q_alloc_eventfd (mq);
+ prod_fd = svm_msg_q_get_eventfd (mq);
SESSION_TEST (prod_fd != -1, "mq producer eventd valid %u", prod_fd);
}
diff --git a/src/svm/fifo_segment.c b/src/svm/fifo_segment.c
index d30932448c6..3e9aecb9dc8 100644
--- a/src/svm/fifo_segment.c
+++ b/src/svm/fifo_segment.c
@@ -1042,7 +1042,7 @@ fifo_segment_msg_q_attach (fifo_segment_t *fs, uword offset, u32 mq_index)
mq = vec_elt_at_index (fs->mqs, mq_index);
- if (!mq->q)
+ if (!mq->q.shr)
{
svm_msg_q_shared_t *smq;
smq = (svm_msg_q_shared_t *) ((u8 *) fs->h + offset);
@@ -1059,10 +1059,11 @@ fifo_segment_msg_q_offset (fifo_segment_t *fs, u32 mq_index)
{
svm_msg_q_t *mq = vec_elt_at_index (fs->mqs, mq_index);
- if (mq->q == 0)
+ if (mq->q.shr == 0)
return ~0ULL;
- return (uword) ((u8 *) mq->q - (u8 *) fs->h) - sizeof (svm_msg_q_shared_t);
+ return (uword) ((u8 *) mq->q.shr - (u8 *) fs->h) -
+ sizeof (svm_msg_q_shared_t);
}
int
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c
index 0ebce702430..fdf9293b18c 100644
--- a/src/svm/message_queue.c
+++ b/src/svm/message_queue.c
@@ -16,7 +16,9 @@
#include <svm/message_queue.h>
#include <vppinfra/mem.h>
#include <vppinfra/format.h>
+#include <vppinfra/time.h>
#include <sys/eventfd.h>
+#include <sys/socket.h>
static inline svm_msg_q_ring_t *
svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index)
@@ -37,19 +39,51 @@ svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
return (ring->shr->data + elt_index * ring->elsize);
}
+static void
+svm_msg_q_init_mutex (svm_msg_q_shared_queue_t *sq)
+{
+ pthread_mutexattr_t attr;
+ pthread_condattr_t cattr;
+
+ clib_memset (&attr, 0, sizeof (attr));
+ clib_memset (&cattr, 0, sizeof (cattr));
+
+ if (pthread_mutexattr_init (&attr))
+ clib_unix_warning ("mutexattr_init");
+ if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
+ clib_unix_warning ("pthread_mutexattr_setpshared");
+ if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
+ clib_unix_warning ("setrobust");
+ if (pthread_mutex_init (&sq->mutex, &attr))
+ clib_unix_warning ("mutex_init");
+ if (pthread_mutexattr_destroy (&attr))
+ clib_unix_warning ("mutexattr_destroy");
+ if (pthread_condattr_init (&cattr))
+ clib_unix_warning ("condattr_init");
+ if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
+ clib_unix_warning ("condattr_setpshared");
+ if (pthread_cond_init (&sq->condvar, &cattr))
+ clib_unix_warning ("cond_init1");
+ if (pthread_condattr_destroy (&cattr))
+ clib_unix_warning ("cond_init2");
+}
+
svm_msg_q_shared_t *
svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
{
svm_msg_q_ring_shared_t *ring;
+ svm_msg_q_shared_queue_t *sq;
svm_msg_q_shared_t *smq;
u32 q_sz, offset;
int i;
- q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
+ q_sz = sizeof (*sq) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
smq = (svm_msg_q_shared_t *) base;
- svm_queue_init (&smq->q, cfg->q_nitems, sizeof (svm_msg_q_msg_t));
- smq->q->consumer_pid = cfg->consumer_pid;
+ sq = smq->q;
+ clib_memset (sq, 0, sizeof (*sq));
+ sq->elsize = sizeof (svm_msg_q_msg_t);
+ sq->maxsize = cfg->q_nitems;
smq->n_rings = cfg->n_rings;
ring = (void *) ((u8 *) smq->q + q_sz);
for (i = 0; i < cfg->n_rings; i++)
@@ -61,6 +95,8 @@ svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
ring = (void *) ((u8 *) ring + offset);
}
+ svm_msg_q_init_mutex (sq);
+
return smq;
}
@@ -83,7 +119,8 @@ svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg)
rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
}
- q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
+ q_sz = sizeof (svm_msg_q_shared_queue_t) +
+ cfg->q_nitems * sizeof (svm_msg_q_msg_t);
mq_sz = sizeof (svm_msg_q_shared_t) + q_sz + rings_sz;
return mq_sz;
@@ -111,10 +148,12 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
u32 i, n_rings, q_sz, offset;
smq = (svm_msg_q_shared_t *) smq_base;
- mq->q = smq->q;
+ mq->q.shr = smq->q;
+ mq->q.evtfd = -1;
n_rings = smq->n_rings;
vec_validate (mq->rings, n_rings - 1);
- q_sz = sizeof (svm_queue_t) + mq->q->maxsize * sizeof (svm_msg_q_msg_t);
+ q_sz = sizeof (svm_msg_q_shared_queue_t) +
+ mq->q.shr->maxsize * sizeof (svm_msg_q_msg_t);
ring = (void *) ((u8 *) smq->q + q_sz);
for (i = 0; i < n_rings; i++)
{
@@ -129,10 +168,31 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
void
svm_msg_q_free (svm_msg_q_t * mq)
{
- svm_queue_free (mq->q);
+ clib_mem_free (mq->q.shr);
clib_mem_free (mq);
}
+static void
+svm_msg_q_send_signal (svm_msg_q_t *mq)
+{
+ if (mq->q.evtfd == -1)
+ {
+ (void) pthread_cond_broadcast (&mq->q.shr->condvar);
+ }
+ else
+ {
+ int __clib_unused rv;
+ u64 data = 1;
+
+ if (mq->q.evtfd < 0)
+ return;
+
+ rv = write (mq->q.evtfd, &data, sizeof (data));
+ if (PREDICT_FALSE (rv < 0))
+ clib_unix_warning ("signal write on %d returned %d", mq->q.evtfd, rv);
+ }
+}
+
svm_msg_q_msg_t
svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
{
@@ -147,7 +207,7 @@ svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
msg.ring_index = ring - mq->rings;
msg.elt_index = sr->tail;
sr->tail = (sr->tail + 1) % ring->nitems;
- clib_atomic_fetch_add (&sr->cursize, 1);
+ clib_atomic_fetch_add_rel (&sr->cursize, 1);
return msg;
}
@@ -193,7 +253,7 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
msg.ring_index = ring - mq->rings;
msg.elt_index = sr->tail;
sr->tail = (sr->tail + 1) % ring->nitems;
- clib_atomic_fetch_add (&sr->cursize, 1);
+ clib_atomic_fetch_add_rel (&sr->cursize, 1);
break;
}
return msg;
@@ -228,10 +288,10 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
}
need_signal = sr->cursize == ring->nitems;
- clib_atomic_fetch_sub (&sr->cursize, 1);
+ clib_atomic_fetch_sub_rel (&sr->cursize, 1);
if (PREDICT_FALSE (need_signal))
- svm_queue_send_signal (mq->q, 0);
+ svm_msg_q_send_signal (mq);
}
static int
@@ -257,71 +317,216 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
return (dist1 < dist2);
}
+static void
+svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem)
+{
+ svm_msg_q_shared_queue_t *sq = mq->q.shr;
+ i8 *tailp;
+ u32 sz;
+
+ tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail);
+ clib_memcpy_fast (tailp, elem, sq->elsize);
+
+ sq->tail = (sq->tail + 1) % sq->maxsize;
+
+ sz = clib_atomic_fetch_add_rel (&sq->cursize, 1);
+ if (!sz)
+ svm_msg_q_send_signal (mq);
+}
+
int
svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
{
ASSERT (svm_msq_q_msg_is_valid (mq, msg));
- return svm_queue_add (mq->q, (u8 *) msg, nowait);
+
+ if (nowait)
+ {
+ /* zero on success */
+ if (svm_msg_q_try_lock (mq))
+ {
+ return (-1);
+ }
+ }
+ else
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
+ {
+ if (nowait)
+ return (-2);
+ while (svm_msg_q_is_full (mq))
+ svm_msg_q_wait (mq);
+ }
+
+ svm_msg_q_add_raw (mq, (u8 *) msg);
+
+ svm_msg_q_unlock (mq);
+
+ return 0;
}
void
svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
ASSERT (svm_msq_q_msg_is_valid (mq, msg));
- svm_queue_add_raw (mq->q, (u8 *) msg);
+ svm_msg_q_add_raw (mq, (u8 *) msg);
svm_msg_q_unlock (mq);
}
+static int
+svm_msg_q_sub_raw (svm_msg_q_t *mq, u8 *elem)
+{
+ svm_msg_q_shared_queue_t *sq = mq->q.shr;
+ i8 *headp;
+ u32 sz;
+
+ ASSERT (!svm_msg_q_is_empty (mq));
+
+ headp = (i8 *) (&sq->data[0] + sq->elsize * sq->head);
+ clib_memcpy_fast (elem, headp, sq->elsize);
+
+ sq->head = (sq->head + 1) % sq->maxsize;
+
+ sz = clib_atomic_fetch_sub_rel (&sq->cursize, 1);
+ if (PREDICT_FALSE (sz == sq->maxsize))
+ svm_msg_q_send_signal (mq);
+
+ return 0;
+}
+
int
svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
svm_q_conditional_wait_t cond, u32 time)
{
- return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
-}
+ int rc = 0;
-void
-svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
-{
- svm_queue_sub_raw (mq->q, (u8 *) msg);
+ if (cond == SVM_Q_NOWAIT)
+ {
+ /* zero on success */
+ if (svm_msg_q_try_lock (mq))
+ {
+ return (-1);
+ }
+ }
+ else
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_is_empty (mq)))
+ {
+ if (cond == SVM_Q_NOWAIT)
+ {
+ svm_msg_q_unlock (mq);
+ return (-2);
+ }
+ else if (cond == SVM_Q_TIMEDWAIT)
+ {
+ while (svm_msg_q_is_empty (mq) && rc == 0)
+ rc = svm_msg_q_timedwait (mq, time);
+
+ if (rc == ETIMEDOUT)
+ {
+ svm_msg_q_unlock (mq);
+ return ETIMEDOUT;
+ }
+ }
+ else
+ {
+ while (svm_msg_q_is_empty (mq))
+ svm_msg_q_wait (mq);
+ }
+ }
+
+ svm_msg_q_sub_raw (mq, (u8 *) msg);
+
+ svm_msg_q_unlock (mq);
+
+ return 0;
}
void
-svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd)
+svm_msg_q_sub_w_lock (svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
{
- mq->q->consumer_evtfd = fd;
+ svm_msg_q_sub_raw (mq, (u8 *) msg);
}
void
-svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd)
+svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd)
{
- mq->q->producer_evtfd = fd;
+ mq->q.evtfd = fd;
}
int
-svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq)
+svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
{
int fd;
if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
return -1;
- svm_msg_q_set_consumer_eventfd (mq, fd);
+ svm_msg_q_set_eventfd (mq, fd);
return 0;
}
+void
+svm_msg_q_wait (svm_msg_q_t *mq)
+{
+ if (mq->q.evtfd == -1)
+ {
+ pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+ }
+ else
+ {
+ u64 buf;
+ int rv;
+
+ svm_msg_q_unlock (mq);
+ while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+ {
+ if (errno != EAGAIN)
+ {
+ clib_unix_warning ("read error");
+ return;
+ }
+ }
+ svm_msg_q_lock (mq);
+ }
+}
+
int
-svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq)
+svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
{
- int fd;
- if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
- return -1;
- svm_msg_q_set_producer_eventfd (mq, fd);
- return 0;
+ if (mq->q.evtfd == -1)
+ {
+ struct timespec ts;
+ ts.tv_sec = unix_time_now () + (u32) timeout;
+ ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
+ return pthread_cond_timedwait (&mq->q.shr->condvar, &mq->q.shr->mutex,
+ &ts);
+ }
+ else
+ {
+ struct timeval tv;
+ u64 buf;
+ int rv;
+
+ tv.tv_sec = (u64) timeout;
+ tv.tv_usec = ((u64) timeout - (u64) timeout) * 1e9;
+ setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv,
+ sizeof tv);
+
+ svm_msg_q_unlock (mq);
+ rv = read (mq->q.evtfd, &buf, sizeof (buf));
+ if (rv < 0)
+ clib_warning ("read %u", errno);
+ svm_msg_q_lock (mq);
+
+ return rv < 0 ? errno : 0;
+ }
}
u8 *
format_svm_msg_q (u8 * s, va_list * args)
{
svm_msg_q_t *mq = va_arg (*args, svm_msg_q_t *);
- s = format (s, " [Q:%d/%d]", mq->q->cursize, mq->q->maxsize);
+ s = format (s, " [Q:%d/%d]", mq->q.shr->cursize, mq->q.shr->maxsize);
for (u32 i = 0; i < vec_len (mq->rings); i++)
{
s = format (s, " [R%d:%d/%d]", i, mq->rings[i].shr->cursize,
diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h
index 4b314b837e9..7716c6724d8 100644
--- a/src/svm/message_queue.h
+++ b/src/svm/message_queue.h
@@ -24,6 +24,25 @@
#include <vppinfra/error.h>
#include <svm/queue.h>
+typedef struct svm_msg_q_shr_queue_
+{
+ pthread_mutex_t mutex; /* 8 bytes */
+ pthread_cond_t condvar; /* 8 bytes */
+ u32 head;
+ u32 tail;
+ volatile u32 cursize;
+ u32 maxsize;
+ u32 elsize;
+ u32 pad;
+ u8 data[0];
+} svm_msg_q_shared_queue_t;
+
+typedef struct svm_msg_q_queue_
+{
+ svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
+ int evtfd; /**< producer/consumer eventfd */
+} svm_msg_q_queue_t;
+
typedef struct svm_msg_q_ring_shared_
{
volatile u32 cursize; /**< current size of the ring */
@@ -43,14 +62,14 @@ typedef struct svm_msg_q_ring_
typedef struct svm_msg_q_shared_
{
- u32 n_rings; /**< number of rings after q */
- u32 pad; /**< 8 byte alignment for q */
- svm_queue_t q[0]; /**< queue for exchanging messages */
+ u32 n_rings; /**< number of rings after q */
+ u32 pad; /**< 8 byte alignment for q */
+ svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
} __clib_packed svm_msg_q_shared_t;
typedef struct svm_msg_q_
{
- svm_queue_t *q; /**< queue for exchanging messages */
+ svm_msg_q_queue_t q; /**< queue for exchanging messages */
svm_msg_q_ring_t *rings; /**< rings with message data*/
} __clib_packed svm_msg_q_t;
@@ -232,7 +251,7 @@ void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
/**
- * Set event fd for queue consumer
+ * Set event fd for queue
*
* If set, queue will exclusively use eventfds for signaling. Moreover,
* afterwards, the queue should only be used in non-blocking mode. Waiting
@@ -241,35 +260,26 @@ svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
* @param mq message queue
* @param fd consumer eventfd
*/
-void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd);
+void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
/**
- * Set event fd for queue producer
- *
- * If set, queue will exclusively use eventfds for signaling. Moreover,
- * afterwards, the queue should only be used in non-blocking mode. Waiting
- * for events should be done externally using something like epoll.
- *
- * @param mq message queue
- * @param fd producer eventfd
+ * Allocate event fd for queue
*/
-void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd);
+int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
/**
- * Allocate event fd for queue consumer
+ * Format message queue, shows msg count for each ring
*/
-int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq);
+u8 *format_svm_msg_q (u8 *s, va_list *args);
/**
- * Allocate event fd for queue consumer
- */
-int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq);
-
-
-/**
- * Format message queue, shows msg count for each ring
+ * Check length of message queue
*/
-u8 *format_svm_msg_q (u8 * s, va_list * args);
+static inline u32
+svm_msg_q_size (svm_msg_q_t *mq)
+{
+ return clib_atomic_load_relax_n (&mq->q.shr->cursize);
+}
/**
* Check if message queue is full
@@ -277,14 +287,14 @@ u8 *format_svm_msg_q (u8 * s, va_list * args);
static inline u8
svm_msg_q_is_full (svm_msg_q_t * mq)
{
- return (mq->q->cursize == mq->q->maxsize);
+ return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
}
static inline u8
svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
{
svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
- return (ring->shr->cursize >= ring->nitems);
+ return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
}
/**
@@ -293,16 +303,7 @@ svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
static inline u8
svm_msg_q_is_empty (svm_msg_q_t * mq)
{
- return (mq->q->cursize == 0);
-}
-
-/**
- * Check length of message queue
- */
-static inline u32
-svm_msg_q_size (svm_msg_q_t * mq)
-{
- return mq->q->cursize;
+ return (svm_msg_q_size (mq) == 0);
}
/**
@@ -320,9 +321,9 @@ svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
static inline int
svm_msg_q_try_lock (svm_msg_q_t * mq)
{
- int rv = pthread_mutex_trylock (&mq->q->mutex);
+ int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
if (PREDICT_FALSE (rv == EOWNERDEAD))
- rv = pthread_mutex_consistent (&mq->q->mutex);
+ rv = pthread_mutex_consistent (&mq->q.shr->mutex);
return rv;
}
@@ -332,9 +333,9 @@ svm_msg_q_try_lock (svm_msg_q_t * mq)
static inline int
svm_msg_q_lock (svm_msg_q_t * mq)
{
- int rv = pthread_mutex_lock (&mq->q->mutex);
+ int rv = pthread_mutex_lock (&mq->q.shr->mutex);
if (PREDICT_FALSE (rv == EOWNERDEAD))
- rv = pthread_mutex_consistent (&mq->q->mutex);
+ rv = pthread_mutex_consistent (&mq->q.shr->mutex);
return rv;
}
@@ -344,7 +345,7 @@ svm_msg_q_lock (svm_msg_q_t * mq)
static inline void
svm_msg_q_unlock (svm_msg_q_t * mq)
{
- pthread_mutex_unlock (&mq->q->mutex);
+ pthread_mutex_unlock (&mq->q.shr->mutex);
}
/**
@@ -353,11 +354,7 @@ svm_msg_q_unlock (svm_msg_q_t * mq)
* Must be called with mutex held. The queue only works non-blocking
* with eventfds, so handle blocking calls as an exception here.
*/
-static inline void
-svm_msg_q_wait (svm_msg_q_t * mq)
-{
- svm_queue_wait (mq->q);
-}
+void svm_msg_q_wait (svm_msg_q_t *mq);
/**
* Timed wait for message queue event
@@ -367,22 +364,12 @@ svm_msg_q_wait (svm_msg_q_t * mq)
* @param mq message queue
* @param timeout time in seconds
*/
-static inline int
-svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
-{
- return svm_queue_timedwait (mq->q, timeout);
-}
-
-static inline int
-svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq)
-{
- return mq->q->consumer_evtfd;
-}
+int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
static inline int
-svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq)
+svm_msg_q_get_eventfd (svm_msg_q_t *mq)
{
- return mq->q->producer_evtfd;
+ return mq->q.evtfd;
}
#endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c
index 7d241624d01..48695a31a3e 100644
--- a/src/vcl/vcl_bapi.c
+++ b/src/vcl/vcl_bapi.c
@@ -121,7 +121,7 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
{
- svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
+ svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]);
vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
n_fds++;
}
@@ -215,7 +215,7 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t *
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
{
- svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
+ svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]);
vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
n_fds++;
}
diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c
index a140e5ea557..b9745d27e4e 100644
--- a/src/vcl/vcl_private.c
+++ b/src/vcl/vcl_private.c
@@ -46,7 +46,7 @@ vcl_mq_epoll_add_evfd (vcl_worker_t * wrk, svm_msg_q_t * mq)
u32 mqc_index;
int mq_fd;
- mq_fd = svm_msg_q_get_consumer_eventfd (mq);
+ mq_fd = svm_msg_q_get_eventfd (mq);
if (wrk->mqs_epfd < 0 || mq_fd == -1)
return -1;
diff --git a/src/vcl/vcl_sapi.c b/src/vcl/vcl_sapi.c
index bc44272a5c9..1bab7eaba13 100644
--- a/src/vcl/vcl_sapi.c
+++ b/src/vcl/vcl_sapi.c
@@ -89,8 +89,7 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
{
- svm_msg_q_set_consumer_eventfd (wrk->app_event_queue,
- fds[n_fds_used++]);
+ svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds_used++]);
vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
}
@@ -236,7 +235,7 @@ vcl_api_add_del_worker_reply_handler (app_sapi_worker_add_del_reply_msg_t *
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
{
- svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
+ svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]);
vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
n_fds++;
}
diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c
index 61108326abc..ffac4e0bc26 100644
--- a/src/vnet/session/segment_manager.c
+++ b/src/vnet/session/segment_manager.c
@@ -870,7 +870,7 @@ segment_manager_alloc_queue (fifo_segment_t * segment,
if (props->use_mq_eventfd)
{
- if (svm_msg_q_alloc_producer_eventfd (q))
+ if (svm_msg_q_alloc_eventfd (q))
clib_warning ("failed to alloc eventfd");
}
return q;
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index d1f21dafdd6..169cca5efe8 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -1548,9 +1548,6 @@ session_vpp_event_queues_allocate (session_main_t * smm)
cfg->ring_cfgs = rc;
smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (eqs, i, cfg);
-
- if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
- clib_warning ("eventfd returned");
}
}
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 2e215f76051..0116a7ece66 100644
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -667,7 +667,7 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
{
fd_flags |= SESSION_FD_F_MQ_EVENTFD;
- fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q);
+ fds[n_fds] = svm_msg_q_get_eventfd (a->app_evt_q);
n_fds += 1;
}
@@ -751,7 +751,7 @@ vl_api_app_worker_add_del_t_handler (vl_api_app_worker_add_del_t * mp)
if (application_segment_manager_properties (app)->use_mq_eventfd)
{
fd_flags |= SESSION_FD_F_MQ_EVENTFD;
- fds[n_fds] = svm_msg_q_get_producer_eventfd (args.evt_q);
+ fds[n_fds] = svm_msg_q_get_eventfd (args.evt_q);
n_fds += 1;
}
@@ -1317,7 +1317,7 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
{
fd_flags |= SESSION_FD_F_MQ_EVENTFD;
- fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q);
+ fds[n_fds] = svm_msg_q_get_eventfd (a->app_evt_q);
n_fds += 1;
}
@@ -1426,7 +1426,7 @@ sapi_add_del_worker_handler (app_namespace_t * app_ns,
if (application_segment_manager_properties (app)->use_mq_eventfd)
{
fd_flags |= SESSION_FD_F_MQ_EVENTFD;
- fds[n_fds] = svm_msg_q_get_producer_eventfd (args.evt_q);
+ fds[n_fds] = svm_msg_q_get_eventfd (args.evt_q);
n_fds += 1;
}
diff --git a/src/vnet/session/session_debug.c b/src/vnet/session/session_debug.c
index cd4198c5da8..c042e9e53db 100644
--- a/src/vnet/session/session_debug.c
+++ b/src/vnet/session/session_debug.c
@@ -123,6 +123,7 @@ dump_thread_0_event_queue (void)
vlib_main_t *vm = &vlib_global_main;
u32 my_thread_index = vm->thread_index;
session_event_t _e, *e = &_e;
+ svm_msg_q_shared_queue_t *sq;
svm_msg_q_ring_t *ring;
session_t *s0;
svm_msg_q_msg_t *msg;
@@ -130,11 +131,12 @@ dump_thread_0_event_queue (void)
int i, index;
mq = session_main_get_vpp_event_queue (my_thread_index);
- index = mq->q->head;
+ sq = mq->q.shr;
+ index = sq->head;
- for (i = 0; i < mq->q->cursize; i++)
+ for (i = 0; i < sq->cursize; i++)
{
- msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+ msg = (svm_msg_q_msg_t *) (&sq->data[0] + sq->elsize * index);
ring = svm_msg_q_ring (mq, msg->ring_index);
clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
@@ -170,7 +172,7 @@ dump_thread_0_event_queue (void)
index++;
- if (index == mq->q->maxsize)
+ if (index == sq->maxsize)
index = 0;
}
}
@@ -210,6 +212,7 @@ session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
u8
session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
{
+ svm_msg_q_shared_queue_t *sq;
session_evt_elt_t *elt;
session_worker_t *wrk;
int i, index, found = 0;
@@ -226,16 +229,17 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
* Search evt queue
*/
mq = wrk->vpp_event_queue;
- index = mq->q->head;
- for (i = 0; i < mq->q->cursize; i++)
+ sq = mq->q.shr;
+ index = sq->head;
+ for (i = 0; i < sq->cursize; i++)
{
- msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+ msg = (svm_msg_q_msg_t *) (&sq->data[0] + sq->elsize * index);
ring = svm_msg_q_ring (mq, msg->ring_index);
clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
found = session_node_cmp_event (e, f);
if (found)
return 1;
- index = (index + 1) % mq->q->maxsize;
+ index = (index + 1) % sq->maxsize;
}
/*
* Search pending events vector