summaryrefslogtreecommitdiffstats
path: root/src/svm
diff options
context:
space:
mode:
Diffstat (limited to 'src/svm')
-rw-r--r--src/svm/message_queue.c33
-rw-r--r--src/svm/message_queue.h58
-rw-r--r--src/svm/queue.c85
-rw-r--r--src/svm/queue.h18
4 files changed, 167 insertions, 27 deletions
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c
index e97cab898e8..d6a77e783e3 100644
--- a/src/svm/message_queue.c
+++ b/src/svm/message_queue.c
@@ -15,6 +15,7 @@
#include <svm/message_queue.h>
#include <vppinfra/mem.h>
+#include <sys/eventfd.h>
static inline svm_msg_q_ring_t *
svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index)
@@ -235,6 +236,38 @@ svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
svm_queue_sub_raw (mq->q, (u8 *) msg);
}
+void
+svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd)
+{
+ mq->q->consumer_evtfd = fd;
+}
+
+void
+svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd)
+{
+ mq->q->producer_evtfd = fd;
+}
+
+int
+svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq)
+{
+ int fd;
+ if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
+ return -1;
+ svm_msg_q_set_consumer_eventfd (mq, fd);
+ return 0;
+}
+
+int
+svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq)
+{
+ int fd;
+ if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
+ return -1;
+ svm_msg_q_set_producer_eventfd (mq, fd);
+ return 0;
+}
+
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h
index 4c16c97ca7c..28bf14e545e 100644
--- a/src/svm/message_queue.h
+++ b/src/svm/message_queue.h
@@ -22,7 +22,6 @@
#include <vppinfra/clib.h>
#include <vppinfra/error.h>
-#include <vppinfra/time.h>
#include <svm/queue.h>
typedef struct svm_msg_q_ring_
@@ -215,6 +214,40 @@ void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
/**
+ * Set event fd for queue consumer
+ *
+ * If set, queue will exclusively use eventfds for signaling. Moreover,
+ * afterwards, the queue should only be used in non-blocking mode. Waiting
+ * for events should be done externally using something like epoll.
+ *
+ * @param mq message queue
+ * @param fd consumer eventfd
+ */
+void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd);
+
+/**
+ * Set event fd for queue producer
+ *
+ * If set, queue will exclusively use eventfds for signaling. Moreover,
+ * afterwards, the queue should only be used in non-blocking mode. Waiting
+ * for events should be done externally using something like epoll.
+ *
+ * @param mq message queue
+ * @param fd producer eventfd
+ */
+void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd);
+
+/**
+ * Allocate event fd for queue consumer
+ */
+int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq);
+
+/**
+ * Allocate event fd for queue consumer
+ */
+int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq);
+
+/**
* Check if message queue is full
*/
static inline u8
@@ -290,12 +323,13 @@ svm_msg_q_unlock (svm_msg_q_t * mq)
/**
* Wait for message queue event
*
- * Must be called with mutex held
+ * Must be called with mutex held. The queue only works non-blocking
+ * with eventfds, so handle blocking calls as an exception here.
*/
static inline void
svm_msg_q_wait (svm_msg_q_t * mq)
{
- pthread_cond_wait (&mq->q->condvar, &mq->q->mutex);
+ svm_queue_wait (mq->q);
}
/**
@@ -309,13 +343,19 @@ svm_msg_q_wait (svm_msg_q_t * mq)
static inline int
svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
{
- struct timespec ts;
+ return svm_queue_timedwait (mq->q, timeout);
+}
+
+static inline int
+svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq)
+{
+ return mq->q->consumer_evtfd;
+}
- ts.tv_sec = unix_time_now () + (u32) timeout;
- ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
- if (pthread_cond_timedwait (&mq->q->condvar, &mq->q->mutex, &ts))
- return -1;
- return 0;
+static inline int
+svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq)
+{
+ return mq->q->producer_evtfd;
}
#endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
diff --git a/src/svm/queue.c b/src/svm/queue.c
index 0fa1fe9b230..771033d7d8a 100644
--- a/src/svm/queue.c
+++ b/src/svm/queue.c
@@ -27,6 +27,7 @@
#include <vppinfra/cache.h>
#include <svm/queue.h>
#include <vppinfra/time.h>
+#include <vppinfra/lock.h>
svm_queue_t *
svm_queue_init (void *base, int nels, int elsize)
@@ -127,6 +128,63 @@ svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
}
}
+static inline void
+svm_queue_wait_inline (svm_queue_t * q)
+{
+ if (q->producer_evtfd == -1)
+ {
+ pthread_cond_wait (&q->condvar, &q->mutex);
+ }
+ else
+ {
+ /* Fake a wait for event. We could use epoll but that would mean
+ * using yet another fd. Should do for now */
+ u32 cursize = q->cursize;
+ pthread_mutex_unlock (&q->mutex);
+ while (q->cursize == cursize)
+ CLIB_PAUSE ();
+ pthread_mutex_lock (&q->mutex);
+ }
+}
+
+void
+svm_queue_wait (svm_queue_t * q)
+{
+ svm_queue_wait_inline (q);
+}
+
+static inline int
+svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
+{
+ struct timespec ts;
+ ts.tv_sec = unix_time_now () + (u32) timeout;
+ ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
+
+ if (q->producer_evtfd == -1)
+ {
+ return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
+ }
+ else
+ {
+ double max_time = unix_time_now () + timeout;
+ u32 cursize = q->cursize;
+ int rv;
+
+ pthread_mutex_unlock (&q->mutex);
+ while (q->cursize == cursize && unix_time_now () < max_time)
+ CLIB_PAUSE ();
+ rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
+ pthread_mutex_lock (&q->mutex);
+ return rv;
+ }
+}
+
+int
+svm_queue_timedwait (svm_queue_t * q, double timeout)
+{
+ return svm_queue_timedwait_inline (q, timeout);
+}
+
/*
* svm_queue_add_nolock
*/
@@ -139,9 +197,7 @@ svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
if (PREDICT_FALSE (q->cursize == q->maxsize))
{
while (q->cursize == q->maxsize)
- {
- (void) pthread_cond_wait (&q->condvar, &q->mutex);
- }
+ svm_queue_wait_inline (q);
}
tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
@@ -170,6 +226,9 @@ svm_queue_add_raw (svm_queue_t * q, u8 * elem)
q->tail = (q->tail + 1) % q->maxsize;
q->cursize++;
+
+ if (q->cursize == 1)
+ svm_queue_send_signal (q, 1);
}
@@ -201,9 +260,7 @@ svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
return (-2);
}
while (q->cursize == q->maxsize)
- {
- (void) pthread_cond_wait (&q->condvar, &q->mutex);
- }
+ svm_queue_wait_inline (q);
}
tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
@@ -253,9 +310,7 @@ svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
return (-2);
}
while (q->cursize + 1 == q->maxsize)
- {
- (void) pthread_cond_wait (&q->condvar, &q->mutex);
- }
+ svm_queue_wait_inline (q);
}
tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
@@ -317,13 +372,9 @@ svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
}
else if (cond == SVM_Q_TIMEDWAIT)
{
- struct timespec ts;
- ts.tv_sec = unix_time_now () + time;
- ts.tv_nsec = 0;
while (q->cursize == 0 && rc == 0)
- {
- rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
- }
+ rc = svm_queue_timedwait_inline (q, time);
+
if (rc == ETIMEDOUT)
{
pthread_mutex_unlock (&q->mutex);
@@ -333,9 +384,7 @@ svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
else
{
while (q->cursize == 0)
- {
- (void) pthread_cond_wait (&q->condvar, &q->mutex);
- }
+ svm_queue_wait_inline (q);
}
}
diff --git a/src/svm/queue.h b/src/svm/queue.h
index 75e63a49319..3e8031e897b 100644
--- a/src/svm/queue.h
+++ b/src/svm/queue.h
@@ -82,6 +82,24 @@ int svm_queue_add_nolock (svm_queue_t * q, u8 * elem);
int svm_queue_sub_raw (svm_queue_t * q, u8 * elem);
/**
+ * Wait for queue event
+ *
+ * Must be called with mutex held.
+ */
+void svm_queue_wait (svm_queue_t * q);
+
+/**
+ * Timed wait for queue event
+ *
+ * Must be called with mutex held.
+ *
+ * @param q svm queue
+ * @param timeout time in seconds
+ * @return 0 on success, ETIMEDOUT on timeout or an error
+ */
+int svm_queue_timedwait (svm_queue_t * q, double timeout);
+
+/**
* Add element to queue with mutex held
* @param q queue
* @param elem pointer element data to add