summaryrefslogtreecommitdiffstats
path: root/src/svm/message_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/svm/message_queue.h')
-rw-r--r--src/svm/message_queue.h78
1 files changed, 63 insertions, 15 deletions
diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h
index 7716c6724d8..1ef773d9f0a 100644
--- a/src/svm/message_queue.h
+++ b/src/svm/message_queue.h
@@ -22,6 +22,7 @@
#include <vppinfra/clib.h>
#include <vppinfra/error.h>
+#include <vppinfra/lock.h>
#include <svm/queue.h>
typedef struct svm_msg_q_shr_queue_
@@ -41,6 +42,7 @@ typedef struct svm_msg_q_queue_
{
svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
int evtfd; /**< producer/consumer eventfd */
+ clib_spinlock_t lock; /**< private lock for multi-producer */
} svm_msg_q_queue_t;
typedef struct svm_msg_q_ring_shared_
@@ -99,6 +101,13 @@ typedef union
} svm_msg_q_msg_t;
#define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
+
+typedef enum svm_msg_q_wait_type_
+{
+ SVM_MQ_WAIT_EMPTY,
+ SVM_MQ_WAIT_FULL
+} svm_msg_q_wait_type_t;
+
/**
* Allocate message queue
*
@@ -206,6 +215,7 @@ void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
* Consumer dequeue one message from queue
*
* This returns the message pointing to the data in the message rings.
+ * Should only be used in single consumer scenarios as no locks are grabbed.
* The consumer is expected to call @ref svm_msg_q_free_msg once it
* finishes processing/copies the message data.
*
@@ -219,18 +229,34 @@ int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
svm_q_conditional_wait_t cond, u32 time);
/**
- * Consumer dequeue one message from queue with mutex held
+ * Consumer dequeue one message from queue
*
- * Returns the message pointing to the data in the message rings under the
- * assumption that the message queue lock is already held. The consumer is
- * expected to call @ref svm_msg_q_free_msg once it finishes
+ * Returns the message pointing to the data in the message rings. Should only
+ * be used in single consumer scenarios as no locks are grabbed. The consumer
+ * is expected to call @ref svm_msg_q_free_msg once it finishes
* processing/copies the message data.
*
* @param mq message queue
* @param msg pointer to structure where message is to be received
* @return success status
*/
-void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
+int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
+
+/**
+ * Consumer dequeue multiple messages from queue
+ *
+ * Returns the message pointing to the data in the message rings. Should only
+ * be used in single consumer scenarios as no locks are grabbed. The consumer
+ * is expected to call @ref svm_msg_q_free_msg once it finishes
+ * processing/copies the message data.
+ *
+ * @param mq message queue
+ * @param msg_buf pointer to array of messages to received
+ * @param n_msgs lengt of msg_buf array
+ * @return number of messages dequeued
+ */
+int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
+ u32 n_msgs);
/**
* Get data for message in queue
@@ -321,10 +347,17 @@ svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
static inline int
svm_msg_q_try_lock (svm_msg_q_t * mq)
{
- int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
- if (PREDICT_FALSE (rv == EOWNERDEAD))
- rv = pthread_mutex_consistent (&mq->q.shr->mutex);
- return rv;
+ if (mq->q.evtfd == -1)
+ {
+ int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
+ if (PREDICT_FALSE (rv == EOWNERDEAD))
+ rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+ return rv;
+ }
+ else
+ {
+ return !clib_spinlock_trylock (&mq->q.lock);
+ }
}
/**
@@ -333,10 +366,18 @@ svm_msg_q_try_lock (svm_msg_q_t * mq)
static inline int
svm_msg_q_lock (svm_msg_q_t * mq)
{
- int rv = pthread_mutex_lock (&mq->q.shr->mutex);
- if (PREDICT_FALSE (rv == EOWNERDEAD))
- rv = pthread_mutex_consistent (&mq->q.shr->mutex);
- return rv;
+ if (mq->q.evtfd == -1)
+ {
+ int rv = pthread_mutex_lock (&mq->q.shr->mutex);
+ if (PREDICT_FALSE (rv == EOWNERDEAD))
+ rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+ return rv;
+ }
+ else
+ {
+ clib_spinlock_lock (&mq->q.lock);
+ return 0;
+ }
}
/**
@@ -345,7 +386,14 @@ svm_msg_q_lock (svm_msg_q_t * mq)
static inline void
svm_msg_q_unlock (svm_msg_q_t * mq)
{
- pthread_mutex_unlock (&mq->q.shr->mutex);
+ if (mq->q.evtfd == -1)
+ {
+ pthread_mutex_unlock (&mq->q.shr->mutex);
+ }
+ else
+ {
+ clib_spinlock_unlock (&mq->q.lock);
+ }
}
/**
@@ -354,7 +402,7 @@ svm_msg_q_unlock (svm_msg_q_t * mq)
* Must be called with mutex held. The queue only works non-blocking
* with eventfds, so handle blocking calls as an exception here.
*/
-void svm_msg_q_wait (svm_msg_q_t *mq);
+int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
/**
* Timed wait for message queue event