summaryrefslogtreecommitdiffstats
path: root/src/svm
diff options
context:
space:
mode:
Diffstat (limited to 'src/svm')
-rw-r--r--src/svm/fifo_segment.c7
-rw-r--r--src/svm/message_queue.c269
-rw-r--r--src/svm/message_queue.h107
3 files changed, 288 insertions, 95 deletions
diff --git a/src/svm/fifo_segment.c b/src/svm/fifo_segment.c
index d30932448c6..3e9aecb9dc8 100644
--- a/src/svm/fifo_segment.c
+++ b/src/svm/fifo_segment.c
@@ -1042,7 +1042,7 @@ fifo_segment_msg_q_attach (fifo_segment_t *fs, uword offset, u32 mq_index)
mq = vec_elt_at_index (fs->mqs, mq_index);
- if (!mq->q)
+ if (!mq->q.shr)
{
svm_msg_q_shared_t *smq;
smq = (svm_msg_q_shared_t *) ((u8 *) fs->h + offset);
@@ -1059,10 +1059,11 @@ fifo_segment_msg_q_offset (fifo_segment_t *fs, u32 mq_index)
{
svm_msg_q_t *mq = vec_elt_at_index (fs->mqs, mq_index);
- if (mq->q == 0)
+ if (mq->q.shr == 0)
return ~0ULL;
- return (uword) ((u8 *) mq->q - (u8 *) fs->h) - sizeof (svm_msg_q_shared_t);
+ return (uword) ((u8 *) mq->q.shr - (u8 *) fs->h) -
+ sizeof (svm_msg_q_shared_t);
}
int
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c
index 0ebce702430..fdf9293b18c 100644
--- a/src/svm/message_queue.c
+++ b/src/svm/message_queue.c
@@ -16,7 +16,9 @@
#include <svm/message_queue.h>
#include <vppinfra/mem.h>
#include <vppinfra/format.h>
+#include <vppinfra/time.h>
#include <sys/eventfd.h>
+#include <sys/socket.h>
static inline svm_msg_q_ring_t *
svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index)
@@ -37,19 +39,51 @@ svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
return (ring->shr->data + elt_index * ring->elsize);
}
+static void
+svm_msg_q_init_mutex (svm_msg_q_shared_queue_t *sq)
+{
+ pthread_mutexattr_t attr;
+ pthread_condattr_t cattr;
+
+ clib_memset (&attr, 0, sizeof (attr));
+ clib_memset (&cattr, 0, sizeof (cattr));
+
+ if (pthread_mutexattr_init (&attr))
+ clib_unix_warning ("mutexattr_init");
+ if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
+ clib_unix_warning ("pthread_mutexattr_setpshared");
+ if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
+ clib_unix_warning ("setrobust");
+ if (pthread_mutex_init (&sq->mutex, &attr))
+ clib_unix_warning ("mutex_init");
+ if (pthread_mutexattr_destroy (&attr))
+ clib_unix_warning ("mutexattr_destroy");
+ if (pthread_condattr_init (&cattr))
+ clib_unix_warning ("condattr_init");
+ if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
+ clib_unix_warning ("condattr_setpshared");
+ if (pthread_cond_init (&sq->condvar, &cattr))
+ clib_unix_warning ("cond_init1");
+ if (pthread_condattr_destroy (&cattr))
+ clib_unix_warning ("cond_init2");
+}
+
svm_msg_q_shared_t *
svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
{
svm_msg_q_ring_shared_t *ring;
+ svm_msg_q_shared_queue_t *sq;
svm_msg_q_shared_t *smq;
u32 q_sz, offset;
int i;
- q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
+ q_sz = sizeof (*sq) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
smq = (svm_msg_q_shared_t *) base;
- svm_queue_init (&smq->q, cfg->q_nitems, sizeof (svm_msg_q_msg_t));
- smq->q->consumer_pid = cfg->consumer_pid;
+ sq = smq->q;
+ clib_memset (sq, 0, sizeof (*sq));
+ sq->elsize = sizeof (svm_msg_q_msg_t);
+ sq->maxsize = cfg->q_nitems;
smq->n_rings = cfg->n_rings;
ring = (void *) ((u8 *) smq->q + q_sz);
for (i = 0; i < cfg->n_rings; i++)
@@ -61,6 +95,8 @@ svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
ring = (void *) ((u8 *) ring + offset);
}
+ svm_msg_q_init_mutex (sq);
+
return smq;
}
@@ -83,7 +119,8 @@ svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg)
rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
}
- q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
+ q_sz = sizeof (svm_msg_q_shared_queue_t) +
+ cfg->q_nitems * sizeof (svm_msg_q_msg_t);
mq_sz = sizeof (svm_msg_q_shared_t) + q_sz + rings_sz;
return mq_sz;
@@ -111,10 +148,12 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
u32 i, n_rings, q_sz, offset;
smq = (svm_msg_q_shared_t *) smq_base;
- mq->q = smq->q;
+ mq->q.shr = smq->q;
+ mq->q.evtfd = -1;
n_rings = smq->n_rings;
vec_validate (mq->rings, n_rings - 1);
- q_sz = sizeof (svm_queue_t) + mq->q->maxsize * sizeof (svm_msg_q_msg_t);
+ q_sz = sizeof (svm_msg_q_shared_queue_t) +
+ mq->q.shr->maxsize * sizeof (svm_msg_q_msg_t);
ring = (void *) ((u8 *) smq->q + q_sz);
for (i = 0; i < n_rings; i++)
{
@@ -129,10 +168,31 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
void
svm_msg_q_free (svm_msg_q_t * mq)
{
- svm_queue_free (mq->q);
+ clib_mem_free (mq->q.shr);
clib_mem_free (mq);
}
+static void
+svm_msg_q_send_signal (svm_msg_q_t *mq)
+{
+ if (mq->q.evtfd == -1)
+ {
+ (void) pthread_cond_broadcast (&mq->q.shr->condvar);
+ }
+ else
+ {
+ int __clib_unused rv;
+ u64 data = 1;
+
+ if (mq->q.evtfd < 0)
+ return;
+
+ rv = write (mq->q.evtfd, &data, sizeof (data));
+ if (PREDICT_FALSE (rv < 0))
+ clib_unix_warning ("signal write on %d returned %d", mq->q.evtfd, rv);
+ }
+}
+
svm_msg_q_msg_t
svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
{
@@ -147,7 +207,7 @@ svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
msg.ring_index = ring - mq->rings;
msg.elt_index = sr->tail;
sr->tail = (sr->tail + 1) % ring->nitems;
- clib_atomic_fetch_add (&sr->cursize, 1);
+ clib_atomic_fetch_add_rel (&sr->cursize, 1);
return msg;
}
@@ -193,7 +253,7 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
msg.ring_index = ring - mq->rings;
msg.elt_index = sr->tail;
sr->tail = (sr->tail + 1) % ring->nitems;
- clib_atomic_fetch_add (&sr->cursize, 1);
+ clib_atomic_fetch_add_rel (&sr->cursize, 1);
break;
}
return msg;
@@ -228,10 +288,10 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
}
need_signal = sr->cursize == ring->nitems;
- clib_atomic_fetch_sub (&sr->cursize, 1);
+ clib_atomic_fetch_sub_rel (&sr->cursize, 1);
if (PREDICT_FALSE (need_signal))
- svm_queue_send_signal (mq->q, 0);
+ svm_msg_q_send_signal (mq);
}
static int
@@ -257,71 +317,216 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
return (dist1 < dist2);
}
+static void
+svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem)
+{
+ svm_msg_q_shared_queue_t *sq = mq->q.shr;
+ i8 *tailp;
+ u32 sz;
+
+ tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail);
+ clib_memcpy_fast (tailp, elem, sq->elsize);
+
+ sq->tail = (sq->tail + 1) % sq->maxsize;
+
+ sz = clib_atomic_fetch_add_rel (&sq->cursize, 1);
+ if (!sz)
+ svm_msg_q_send_signal (mq);
+}
+
int
svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
{
ASSERT (svm_msq_q_msg_is_valid (mq, msg));
- return svm_queue_add (mq->q, (u8 *) msg, nowait);
+
+ if (nowait)
+ {
+ /* zero on success */
+ if (svm_msg_q_try_lock (mq))
+ {
+ return (-1);
+ }
+ }
+ else
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
+ {
+ if (nowait)
+ return (-2);
+ while (svm_msg_q_is_full (mq))
+ svm_msg_q_wait (mq);
+ }
+
+ svm_msg_q_add_raw (mq, (u8 *) msg);
+
+ svm_msg_q_unlock (mq);
+
+ return 0;
}
void
svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
ASSERT (svm_msq_q_msg_is_valid (mq, msg));
- svm_queue_add_raw (mq->q, (u8 *) msg);
+ svm_msg_q_add_raw (mq, (u8 *) msg);
svm_msg_q_unlock (mq);
}
+static int
+svm_msg_q_sub_raw (svm_msg_q_t *mq, u8 *elem)
+{
+ svm_msg_q_shared_queue_t *sq = mq->q.shr;
+ i8 *headp;
+ u32 sz;
+
+ ASSERT (!svm_msg_q_is_empty (mq));
+
+ headp = (i8 *) (&sq->data[0] + sq->elsize * sq->head);
+ clib_memcpy_fast (elem, headp, sq->elsize);
+
+ sq->head = (sq->head + 1) % sq->maxsize;
+
+ sz = clib_atomic_fetch_sub_rel (&sq->cursize, 1);
+ if (PREDICT_FALSE (sz == sq->maxsize))
+ svm_msg_q_send_signal (mq);
+
+ return 0;
+}
+
int
svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
svm_q_conditional_wait_t cond, u32 time)
{
- return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
-}
+ int rc = 0;
-void
-svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
-{
- svm_queue_sub_raw (mq->q, (u8 *) msg);
+ if (cond == SVM_Q_NOWAIT)
+ {
+ /* zero on success */
+ if (svm_msg_q_try_lock (mq))
+ {
+ return (-1);
+ }
+ }
+ else
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_is_empty (mq)))
+ {
+ if (cond == SVM_Q_NOWAIT)
+ {
+ svm_msg_q_unlock (mq);
+ return (-2);
+ }
+ else if (cond == SVM_Q_TIMEDWAIT)
+ {
+ while (svm_msg_q_is_empty (mq) && rc == 0)
+ rc = svm_msg_q_timedwait (mq, time);
+
+ if (rc == ETIMEDOUT)
+ {
+ svm_msg_q_unlock (mq);
+ return ETIMEDOUT;
+ }
+ }
+ else
+ {
+ while (svm_msg_q_is_empty (mq))
+ svm_msg_q_wait (mq);
+ }
+ }
+
+ svm_msg_q_sub_raw (mq, (u8 *) msg);
+
+ svm_msg_q_unlock (mq);
+
+ return 0;
}
void
-svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd)
+svm_msg_q_sub_w_lock (svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
{
- mq->q->consumer_evtfd = fd;
+ svm_msg_q_sub_raw (mq, (u8 *) msg);
}
void
-svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd)
+svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd)
{
- mq->q->producer_evtfd = fd;
+ mq->q.evtfd = fd;
}
int
-svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq)
+svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
{
int fd;
if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
return -1;
- svm_msg_q_set_consumer_eventfd (mq, fd);
+ svm_msg_q_set_eventfd (mq, fd);
return 0;
}
+void
+svm_msg_q_wait (svm_msg_q_t *mq)
+{
+ if (mq->q.evtfd == -1)
+ {
+ pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+ }
+ else
+ {
+ u64 buf;
+ int rv;
+
+ svm_msg_q_unlock (mq);
+ while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+ {
+ if (errno != EAGAIN)
+ {
+ clib_unix_warning ("read error");
+ return;
+ }
+ }
+ svm_msg_q_lock (mq);
+ }
+}
+
int
-svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq)
+svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
{
- int fd;
- if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
- return -1;
- svm_msg_q_set_producer_eventfd (mq, fd);
- return 0;
+ if (mq->q.evtfd == -1)
+ {
+ struct timespec ts;
+ ts.tv_sec = unix_time_now () + (u32) timeout;
+ ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
+ return pthread_cond_timedwait (&mq->q.shr->condvar, &mq->q.shr->mutex,
+ &ts);
+ }
+ else
+ {
+ struct timeval tv;
+ u64 buf;
+ int rv;
+
+ tv.tv_sec = (u64) timeout;
+ tv.tv_usec = ((u64) timeout - (u64) timeout) * 1e9;
+ setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv,
+ sizeof tv);
+
+ svm_msg_q_unlock (mq);
+ rv = read (mq->q.evtfd, &buf, sizeof (buf));
+ if (rv < 0)
+ clib_warning ("read %u", errno);
+ svm_msg_q_lock (mq);
+
+ return rv < 0 ? errno : 0;
+ }
}
u8 *
format_svm_msg_q (u8 * s, va_list * args)
{
svm_msg_q_t *mq = va_arg (*args, svm_msg_q_t *);
- s = format (s, " [Q:%d/%d]", mq->q->cursize, mq->q->maxsize);
+ s = format (s, " [Q:%d/%d]", mq->q.shr->cursize, mq->q.shr->maxsize);
for (u32 i = 0; i < vec_len (mq->rings); i++)
{
s = format (s, " [R%d:%d/%d]", i, mq->rings[i].shr->cursize,
diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h
index 4b314b837e9..7716c6724d8 100644
--- a/src/svm/message_queue.h
+++ b/src/svm/message_queue.h
@@ -24,6 +24,25 @@
#include <vppinfra/error.h>
#include <svm/queue.h>
+typedef struct svm_msg_q_shr_queue_
+{
+ pthread_mutex_t mutex; /* 8 bytes */
+ pthread_cond_t condvar; /* 8 bytes */
+ u32 head;
+ u32 tail;
+ volatile u32 cursize;
+ u32 maxsize;
+ u32 elsize;
+ u32 pad;
+ u8 data[0];
+} svm_msg_q_shared_queue_t;
+
+typedef struct svm_msg_q_queue_
+{
+ svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
+ int evtfd; /**< producer/consumer eventfd */
+} svm_msg_q_queue_t;
+
typedef struct svm_msg_q_ring_shared_
{
volatile u32 cursize; /**< current size of the ring */
@@ -43,14 +62,14 @@ typedef struct svm_msg_q_ring_
typedef struct svm_msg_q_shared_
{
- u32 n_rings; /**< number of rings after q */
- u32 pad; /**< 8 byte alignment for q */
- svm_queue_t q[0]; /**< queue for exchanging messages */
+ u32 n_rings; /**< number of rings after q */
+ u32 pad; /**< 8 byte alignment for q */
+ svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
} __clib_packed svm_msg_q_shared_t;
typedef struct svm_msg_q_
{
- svm_queue_t *q; /**< queue for exchanging messages */
+ svm_msg_q_queue_t q; /**< queue for exchanging messages */
svm_msg_q_ring_t *rings; /**< rings with message data*/
} __clib_packed svm_msg_q_t;
@@ -232,7 +251,7 @@ void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
/**
- * Set event fd for queue consumer
+ * Set event fd for queue
*
* If set, queue will exclusively use eventfds for signaling. Moreover,
* afterwards, the queue should only be used in non-blocking mode. Waiting
@@ -241,35 +260,26 @@ svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
* @param mq message queue
* @param fd consumer eventfd
*/
-void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd);
+void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
/**
- * Set event fd for queue producer
- *
- * If set, queue will exclusively use eventfds for signaling. Moreover,
- * afterwards, the queue should only be used in non-blocking mode. Waiting
- * for events should be done externally using something like epoll.
- *
- * @param mq message queue
- * @param fd producer eventfd
+ * Allocate event fd for queue
*/
-void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd);
+int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
/**
- * Allocate event fd for queue consumer
+ * Format message queue, shows msg count for each ring
*/
-int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq);
+u8 *format_svm_msg_q (u8 *s, va_list *args);
/**
- * Allocate event fd for queue consumer
- */
-int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq);
-
-
-/**
- * Format message queue, shows msg count for each ring
+ * Check length of message queue
*/
-u8 *format_svm_msg_q (u8 * s, va_list * args);
+static inline u32
+svm_msg_q_size (svm_msg_q_t *mq)
+{
+ return clib_atomic_load_relax_n (&mq->q.shr->cursize);
+}
/**
* Check if message queue is full
@@ -277,14 +287,14 @@ u8 *format_svm_msg_q (u8 * s, va_list * args);
static inline u8
svm_msg_q_is_full (svm_msg_q_t * mq)
{
- return (mq->q->cursize == mq->q->maxsize);
+ return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
}
static inline u8
svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
{
svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
- return (ring->shr->cursize >= ring->nitems);
+ return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
}
/**
@@ -293,16 +303,7 @@ svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
static inline u8
svm_msg_q_is_empty (svm_msg_q_t * mq)
{
- return (mq->q->cursize == 0);
-}
-
-/**
- * Check length of message queue
- */
-static inline u32
-svm_msg_q_size (svm_msg_q_t * mq)
-{
- return mq->q->cursize;
+ return (svm_msg_q_size (mq) == 0);
}
/**
@@ -320,9 +321,9 @@ svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
static inline int
svm_msg_q_try_lock (svm_msg_q_t * mq)
{
- int rv = pthread_mutex_trylock (&mq->q->mutex);
+ int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
if (PREDICT_FALSE (rv == EOWNERDEAD))
- rv = pthread_mutex_consistent (&mq->q->mutex);
+ rv = pthread_mutex_consistent (&mq->q.shr->mutex);
return rv;
}
@@ -332,9 +333,9 @@ svm_msg_q_try_lock (svm_msg_q_t * mq)
static inline int
svm_msg_q_lock (svm_msg_q_t * mq)
{
- int rv = pthread_mutex_lock (&mq->q->mutex);
+ int rv = pthread_mutex_lock (&mq->q.shr->mutex);
if (PREDICT_FALSE (rv == EOWNERDEAD))
- rv = pthread_mutex_consistent (&mq->q->mutex);
+ rv = pthread_mutex_consistent (&mq->q.shr->mutex);
return rv;
}
@@ -344,7 +345,7 @@ svm_msg_q_lock (svm_msg_q_t * mq)
static inline void
svm_msg_q_unlock (svm_msg_q_t * mq)
{
- pthread_mutex_unlock (&mq->q->mutex);
+ pthread_mutex_unlock (&mq->q.shr->mutex);
}
/**
@@ -353,11 +354,7 @@ svm_msg_q_unlock (svm_msg_q_t * mq)
* Must be called with mutex held. The queue only works non-blocking
* with eventfds, so handle blocking calls as an exception here.
*/
-static inline void
-svm_msg_q_wait (svm_msg_q_t * mq)
-{
- svm_queue_wait (mq->q);
-}
+void svm_msg_q_wait (svm_msg_q_t *mq);
/**
* Timed wait for message queue event
@@ -367,22 +364,12 @@ svm_msg_q_wait (svm_msg_q_t * mq)
* @param mq message queue
* @param timeout time in seconds
*/
-static inline int
-svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
-{
- return svm_queue_timedwait (mq->q, timeout);
-}
-
-static inline int
-svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq)
-{
- return mq->q->consumer_evtfd;
-}
+int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
static inline int
-svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq)
+svm_msg_q_get_eventfd (svm_msg_q_t *mq)
{
- return mq->q->producer_evtfd;
+ return mq->q.evtfd;
}
#endif /* SRC_SVM_MESSAGE_QUEUE_H_ */