aboutsummaryrefslogtreecommitdiffstats
path: root/src/svm/message_queue.c
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2021-01-22 15:05:14 -0800
committerDave Barach <openvpp@barachs.net>2021-01-25 15:34:21 +0000
commit86f1232ddee5b1751c6ff683892072111d0e2dee (patch)
treebfa213f887f6c4142531b7585df7b6707a40feb9 /src/svm/message_queue.c
parent15036ad0bc0b41e42d924e6b1cd897cca8f98c3c (diff)
svm: add custom q implementation for mq
Add separate queue implementation for the message queue as it's custom tailored for fifo segments as opposed to binary api. Also move eventfds to the private data structures. Type: refactor Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I6df0c824ecd94c7904516373f92a9fffc6b04736
Diffstat (limited to 'src/svm/message_queue.c')
-rw-r--r--src/svm/message_queue.c269
1 files changed, 237 insertions, 32 deletions
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c
index 0ebce702430..fdf9293b18c 100644
--- a/src/svm/message_queue.c
+++ b/src/svm/message_queue.c
@@ -16,7 +16,9 @@
#include <svm/message_queue.h>
#include <vppinfra/mem.h>
#include <vppinfra/format.h>
+#include <vppinfra/time.h>
#include <sys/eventfd.h>
+#include <sys/socket.h>
static inline svm_msg_q_ring_t *
svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index)
@@ -37,19 +39,51 @@ svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
return (ring->shr->data + elt_index * ring->elsize);
}
+static void
+svm_msg_q_init_mutex (svm_msg_q_shared_queue_t *sq)
+{
+ pthread_mutexattr_t attr;
+ pthread_condattr_t cattr;
+
+ clib_memset (&attr, 0, sizeof (attr));
+ clib_memset (&cattr, 0, sizeof (cattr));
+
+ if (pthread_mutexattr_init (&attr))
+ clib_unix_warning ("mutexattr_init");
+ if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
+ clib_unix_warning ("pthread_mutexattr_setpshared");
+ if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
+ clib_unix_warning ("setrobust");
+ if (pthread_mutex_init (&sq->mutex, &attr))
+ clib_unix_warning ("mutex_init");
+ if (pthread_mutexattr_destroy (&attr))
+ clib_unix_warning ("mutexattr_destroy");
+ if (pthread_condattr_init (&cattr))
+ clib_unix_warning ("condattr_init");
+ if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
+ clib_unix_warning ("condattr_setpshared");
+ if (pthread_cond_init (&sq->condvar, &cattr))
+ clib_unix_warning ("cond_init1");
+ if (pthread_condattr_destroy (&cattr))
+ clib_unix_warning ("cond_init2");
+}
+
svm_msg_q_shared_t *
svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
{
svm_msg_q_ring_shared_t *ring;
+ svm_msg_q_shared_queue_t *sq;
svm_msg_q_shared_t *smq;
u32 q_sz, offset;
int i;
- q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
+ q_sz = sizeof (*sq) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
smq = (svm_msg_q_shared_t *) base;
- svm_queue_init (&smq->q, cfg->q_nitems, sizeof (svm_msg_q_msg_t));
- smq->q->consumer_pid = cfg->consumer_pid;
+ sq = smq->q;
+ clib_memset (sq, 0, sizeof (*sq));
+ sq->elsize = sizeof (svm_msg_q_msg_t);
+ sq->maxsize = cfg->q_nitems;
smq->n_rings = cfg->n_rings;
ring = (void *) ((u8 *) smq->q + q_sz);
for (i = 0; i < cfg->n_rings; i++)
@@ -61,6 +95,8 @@ svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
ring = (void *) ((u8 *) ring + offset);
}
+ svm_msg_q_init_mutex (sq);
+
return smq;
}
@@ -83,7 +119,8 @@ svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg)
rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
}
- q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
+ q_sz = sizeof (svm_msg_q_shared_queue_t) +
+ cfg->q_nitems * sizeof (svm_msg_q_msg_t);
mq_sz = sizeof (svm_msg_q_shared_t) + q_sz + rings_sz;
return mq_sz;
@@ -111,10 +148,12 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
u32 i, n_rings, q_sz, offset;
smq = (svm_msg_q_shared_t *) smq_base;
- mq->q = smq->q;
+ mq->q.shr = smq->q;
+ mq->q.evtfd = -1;
n_rings = smq->n_rings;
vec_validate (mq->rings, n_rings - 1);
- q_sz = sizeof (svm_queue_t) + mq->q->maxsize * sizeof (svm_msg_q_msg_t);
+ q_sz = sizeof (svm_msg_q_shared_queue_t) +
+ mq->q.shr->maxsize * sizeof (svm_msg_q_msg_t);
ring = (void *) ((u8 *) smq->q + q_sz);
for (i = 0; i < n_rings; i++)
{
@@ -129,10 +168,31 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
void
svm_msg_q_free (svm_msg_q_t * mq)
{
- svm_queue_free (mq->q);
+ clib_mem_free (mq->q.shr);
clib_mem_free (mq);
}
+static void
+svm_msg_q_send_signal (svm_msg_q_t *mq)
+{
+ if (mq->q.evtfd == -1)
+ {
+ (void) pthread_cond_broadcast (&mq->q.shr->condvar);
+ }
+ else
+ {
+ int __clib_unused rv;
+ u64 data = 1;
+
+ if (mq->q.evtfd < 0)
+ return;
+
+ rv = write (mq->q.evtfd, &data, sizeof (data));
+ if (PREDICT_FALSE (rv < 0))
+ clib_unix_warning ("signal write on %d returned %d", mq->q.evtfd, rv);
+ }
+}
+
svm_msg_q_msg_t
svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
{
@@ -147,7 +207,7 @@ svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
msg.ring_index = ring - mq->rings;
msg.elt_index = sr->tail;
sr->tail = (sr->tail + 1) % ring->nitems;
- clib_atomic_fetch_add (&sr->cursize, 1);
+ clib_atomic_fetch_add_rel (&sr->cursize, 1);
return msg;
}
@@ -193,7 +253,7 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
msg.ring_index = ring - mq->rings;
msg.elt_index = sr->tail;
sr->tail = (sr->tail + 1) % ring->nitems;
- clib_atomic_fetch_add (&sr->cursize, 1);
+ clib_atomic_fetch_add_rel (&sr->cursize, 1);
break;
}
return msg;
@@ -228,10 +288,10 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
}
need_signal = sr->cursize == ring->nitems;
- clib_atomic_fetch_sub (&sr->cursize, 1);
+ clib_atomic_fetch_sub_rel (&sr->cursize, 1);
if (PREDICT_FALSE (need_signal))
- svm_queue_send_signal (mq->q, 0);
+ svm_msg_q_send_signal (mq);
}
static int
@@ -257,71 +317,216 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
return (dist1 < dist2);
}
+static void
+svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem)
+{
+ svm_msg_q_shared_queue_t *sq = mq->q.shr;
+ i8 *tailp;
+ u32 sz;
+
+ tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail);
+ clib_memcpy_fast (tailp, elem, sq->elsize);
+
+ sq->tail = (sq->tail + 1) % sq->maxsize;
+
+ sz = clib_atomic_fetch_add_rel (&sq->cursize, 1);
+ if (!sz)
+ svm_msg_q_send_signal (mq);
+}
+
int
svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
{
ASSERT (svm_msq_q_msg_is_valid (mq, msg));
- return svm_queue_add (mq->q, (u8 *) msg, nowait);
+
+ if (nowait)
+ {
+ /* zero on success */
+ if (svm_msg_q_try_lock (mq))
+ {
+ return (-1);
+ }
+ }
+ else
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
+ {
+ if (nowait)
+ return (-2);
+ while (svm_msg_q_is_full (mq))
+ svm_msg_q_wait (mq);
+ }
+
+ svm_msg_q_add_raw (mq, (u8 *) msg);
+
+ svm_msg_q_unlock (mq);
+
+ return 0;
}
void
svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
ASSERT (svm_msq_q_msg_is_valid (mq, msg));
- svm_queue_add_raw (mq->q, (u8 *) msg);
+ svm_msg_q_add_raw (mq, (u8 *) msg);
svm_msg_q_unlock (mq);
}
+static int
+svm_msg_q_sub_raw (svm_msg_q_t *mq, u8 *elem)
+{
+ svm_msg_q_shared_queue_t *sq = mq->q.shr;
+ i8 *headp;
+ u32 sz;
+
+ ASSERT (!svm_msg_q_is_empty (mq));
+
+ headp = (i8 *) (&sq->data[0] + sq->elsize * sq->head);
+ clib_memcpy_fast (elem, headp, sq->elsize);
+
+ sq->head = (sq->head + 1) % sq->maxsize;
+
+ sz = clib_atomic_fetch_sub_rel (&sq->cursize, 1);
+ if (PREDICT_FALSE (sz == sq->maxsize))
+ svm_msg_q_send_signal (mq);
+
+ return 0;
+}
+
int
svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
svm_q_conditional_wait_t cond, u32 time)
{
- return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
-}
+ int rc = 0;
-void
-svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
-{
- svm_queue_sub_raw (mq->q, (u8 *) msg);
+ if (cond == SVM_Q_NOWAIT)
+ {
+ /* zero on success */
+ if (svm_msg_q_try_lock (mq))
+ {
+ return (-1);
+ }
+ }
+ else
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_is_empty (mq)))
+ {
+ if (cond == SVM_Q_NOWAIT)
+ {
+ svm_msg_q_unlock (mq);
+ return (-2);
+ }
+ else if (cond == SVM_Q_TIMEDWAIT)
+ {
+ while (svm_msg_q_is_empty (mq) && rc == 0)
+ rc = svm_msg_q_timedwait (mq, time);
+
+ if (rc == ETIMEDOUT)
+ {
+ svm_msg_q_unlock (mq);
+ return ETIMEDOUT;
+ }
+ }
+ else
+ {
+ while (svm_msg_q_is_empty (mq))
+ svm_msg_q_wait (mq);
+ }
+ }
+
+ svm_msg_q_sub_raw (mq, (u8 *) msg);
+
+ svm_msg_q_unlock (mq);
+
+ return 0;
}
void
-svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd)
+svm_msg_q_sub_w_lock (svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
{
- mq->q->consumer_evtfd = fd;
+ svm_msg_q_sub_raw (mq, (u8 *) msg);
}
void
-svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd)
+svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd)
{
- mq->q->producer_evtfd = fd;
+ mq->q.evtfd = fd;
}
int
-svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq)
+svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
{
int fd;
if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
return -1;
- svm_msg_q_set_consumer_eventfd (mq, fd);
+ svm_msg_q_set_eventfd (mq, fd);
return 0;
}
+void
+svm_msg_q_wait (svm_msg_q_t *mq)
+{
+ if (mq->q.evtfd == -1)
+ {
+ pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+ }
+ else
+ {
+ u64 buf;
+ int rv;
+
+ svm_msg_q_unlock (mq);
+ while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+ {
+ if (errno != EAGAIN)
+ {
+ clib_unix_warning ("read error");
+ return;
+ }
+ }
+ svm_msg_q_lock (mq);
+ }
+}
+
int
-svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq)
+svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
{
- int fd;
- if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
- return -1;
- svm_msg_q_set_producer_eventfd (mq, fd);
- return 0;
+ if (mq->q.evtfd == -1)
+ {
+ struct timespec ts;
+ ts.tv_sec = unix_time_now () + (u32) timeout;
+ ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
+ return pthread_cond_timedwait (&mq->q.shr->condvar, &mq->q.shr->mutex,
+ &ts);
+ }
+ else
+ {
+ struct timeval tv;
+ u64 buf;
+ int rv;
+
+ tv.tv_sec = (u64) timeout;
+ tv.tv_usec = ((u64) timeout - (u64) timeout) * 1e9;
+ setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv,
+ sizeof tv);
+
+ svm_msg_q_unlock (mq);
+ rv = read (mq->q.evtfd, &buf, sizeof (buf));
+ if (rv < 0)
+ clib_warning ("read %u", errno);
+ svm_msg_q_lock (mq);
+
+ return rv < 0 ? errno : 0;
+ }
}
u8 *
format_svm_msg_q (u8 * s, va_list * args)
{
svm_msg_q_t *mq = va_arg (*args, svm_msg_q_t *);
- s = format (s, " [Q:%d/%d]", mq->q->cursize, mq->q->maxsize);
+ s = format (s, " [Q:%d/%d]", mq->q.shr->cursize, mq->q.shr->maxsize);
for (u32 i = 0; i < vec_len (mq->rings); i++)
{
s = format (s, " [R%d:%d/%d]", i, mq->rings[i].shr->cursize,