diff options
Diffstat (limited to 'src/svm/message_queue.h')
-rw-r--r-- | src/svm/message_queue.h | 107 |
1 files changed, 47 insertions, 60 deletions
diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h index 4b314b837e9..7716c6724d8 100644 --- a/src/svm/message_queue.h +++ b/src/svm/message_queue.h @@ -24,6 +24,25 @@ #include <vppinfra/error.h> #include <svm/queue.h> +typedef struct svm_msg_q_shr_queue_ +{ + pthread_mutex_t mutex; /* 8 bytes */ + pthread_cond_t condvar; /* 8 bytes */ + u32 head; + u32 tail; + volatile u32 cursize; + u32 maxsize; + u32 elsize; + u32 pad; + u8 data[0]; +} svm_msg_q_shared_queue_t; + +typedef struct svm_msg_q_queue_ +{ + svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */ + int evtfd; /**< producer/consumer eventfd */ +} svm_msg_q_queue_t; + typedef struct svm_msg_q_ring_shared_ { volatile u32 cursize; /**< current size of the ring */ @@ -43,14 +62,14 @@ typedef struct svm_msg_q_ring_ typedef struct svm_msg_q_shared_ { - u32 n_rings; /**< number of rings after q */ - u32 pad; /**< 8 byte alignment for q */ - svm_queue_t q[0]; /**< queue for exchanging messages */ + u32 n_rings; /**< number of rings after q */ + u32 pad; /**< 8 byte alignment for q */ + svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */ } __clib_packed svm_msg_q_shared_t; typedef struct svm_msg_q_ { - svm_queue_t *q; /**< queue for exchanging messages */ + svm_msg_q_queue_t q; /**< queue for exchanging messages */ svm_msg_q_ring_t *rings; /**< rings with message data*/ } __clib_packed svm_msg_q_t; @@ -232,7 +251,7 @@ void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index); /** - * Set event fd for queue consumer + * Set event fd for queue * * If set, queue will exclusively use eventfds for signaling. Moreover, * afterwards, the queue should only be used in non-blocking mode. Waiting @@ -241,35 +260,26 @@ svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index); * @param mq message queue * @param fd consumer eventfd */ -void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd); +void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd); /** - * Set event fd for queue producer - * - * If set, queue will exclusively use eventfds for signaling. Moreover, - * afterwards, the queue should only be used in non-blocking mode. Waiting - * for events should be done externally using something like epoll. - * - * @param mq message queue - * @param fd producer eventfd + * Allocate event fd for queue */ -void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd); +int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq); /** - * Allocate event fd for queue consumer + * Format message queue, shows msg count for each ring */ -int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq); +u8 *format_svm_msg_q (u8 *s, va_list *args); /** - * Allocate event fd for queue consumer - */ -int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq); - - -/** - * Format message queue, shows msg count for each ring + * Check length of message queue */ -u8 *format_svm_msg_q (u8 * s, va_list * args); +static inline u32 +svm_msg_q_size (svm_msg_q_t *mq) +{ + return clib_atomic_load_relax_n (&mq->q.shr->cursize); +} /** * Check if message queue is full @@ -277,14 +287,14 @@ u8 *format_svm_msg_q (u8 * s, va_list * args); static inline u8 svm_msg_q_is_full (svm_msg_q_t * mq) { - return (mq->q->cursize == mq->q->maxsize); + return (svm_msg_q_size (mq) == mq->q.shr->maxsize); } static inline u8 svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index) { svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index); - return (ring->shr->cursize >= ring->nitems); + return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems); } /** @@ -293,16 +303,7 @@ svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index) static inline u8 svm_msg_q_is_empty (svm_msg_q_t * mq) { - return (mq->q->cursize == 0); -} - -/** - * Check length of message queue - */ -static inline u32 -svm_msg_q_size (svm_msg_q_t * mq) -{ - return mq->q->cursize; + return (svm_msg_q_size (mq) == 0); } /** @@ -320,9 +321,9 @@ svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg) static inline int svm_msg_q_try_lock (svm_msg_q_t * mq) { - int rv = pthread_mutex_trylock (&mq->q->mutex); + int rv = pthread_mutex_trylock (&mq->q.shr->mutex); if (PREDICT_FALSE (rv == EOWNERDEAD)) - rv = pthread_mutex_consistent (&mq->q->mutex); + rv = pthread_mutex_consistent (&mq->q.shr->mutex); return rv; } @@ -332,9 +333,9 @@ svm_msg_q_try_lock (svm_msg_q_t * mq) static inline int svm_msg_q_lock (svm_msg_q_t * mq) { - int rv = pthread_mutex_lock (&mq->q->mutex); + int rv = pthread_mutex_lock (&mq->q.shr->mutex); if (PREDICT_FALSE (rv == EOWNERDEAD)) - rv = pthread_mutex_consistent (&mq->q->mutex); + rv = pthread_mutex_consistent (&mq->q.shr->mutex); return rv; } @@ -344,7 +345,7 @@ svm_msg_q_lock (svm_msg_q_t * mq) static inline void svm_msg_q_unlock (svm_msg_q_t * mq) { - pthread_mutex_unlock (&mq->q->mutex); + pthread_mutex_unlock (&mq->q.shr->mutex); } /** @@ -353,11 +354,7 @@ svm_msg_q_unlock (svm_msg_q_t * mq) * Must be called with mutex held. The queue only works non-blocking * with eventfds, so handle blocking calls as an exception here. */ -static inline void -svm_msg_q_wait (svm_msg_q_t * mq) -{ - svm_queue_wait (mq->q); -} +void svm_msg_q_wait (svm_msg_q_t *mq); /** * Timed wait for message queue event @@ -367,22 +364,12 @@ svm_msg_q_wait (svm_msg_q_t * mq) * @param mq message queue * @param timeout time in seconds */ -static inline int -svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout) -{ - return svm_queue_timedwait (mq->q, timeout); -} - -static inline int -svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq) -{ - return mq->q->consumer_evtfd; -} +int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout); static inline int -svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq) +svm_msg_q_get_eventfd (svm_msg_q_t *mq) { - return mq->q->producer_evtfd; + return mq->q.evtfd; } #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */ |