diff options
Diffstat (limited to 'src/svm')
-rw-r--r-- | src/svm/message_queue.c | 33 | ||||
-rw-r--r-- | src/svm/message_queue.h | 58 | ||||
-rw-r--r-- | src/svm/queue.c | 85 | ||||
-rw-r--r-- | src/svm/queue.h | 18 |
4 files changed, 167 insertions, 27 deletions
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c index e97cab898e8..d6a77e783e3 100644 --- a/src/svm/message_queue.c +++ b/src/svm/message_queue.c @@ -15,6 +15,7 @@ #include <svm/message_queue.h> #include <vppinfra/mem.h> +#include <sys/eventfd.h> static inline svm_msg_q_ring_t * svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index) @@ -235,6 +236,38 @@ svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) svm_queue_sub_raw (mq->q, (u8 *) msg); } +void +svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd) +{ + mq->q->consumer_evtfd = fd; +} + +void +svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd) +{ + mq->q->producer_evtfd = fd; +} + +int +svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq) +{ + int fd; + if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) + return -1; + svm_msg_q_set_consumer_eventfd (mq, fd); + return 0; +} + +int +svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq) +{ + int fd; + if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) + return -1; + svm_msg_q_set_producer_eventfd (mq, fd); + return 0; +} + /* * fd.io coding-style-patch-verification: ON * diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h index 4c16c97ca7c..28bf14e545e 100644 --- a/src/svm/message_queue.h +++ b/src/svm/message_queue.h @@ -22,7 +22,6 @@ #include <vppinfra/clib.h> #include <vppinfra/error.h> -#include <vppinfra/time.h> #include <svm/queue.h> typedef struct svm_msg_q_ring_ @@ -215,6 +214,40 @@ void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index); /** + * Set event fd for queue consumer + * + * If set, queue will exclusively use eventfds for signaling. Moreover, + * afterwards, the queue should only be used in non-blocking mode. Waiting + * for events should be done externally using something like epoll. + * + * @param mq message queue + * @param fd consumer eventfd + */ +void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd); + +/** + * Set event fd for queue producer + * + * If set, queue will exclusively use eventfds for signaling. Moreover, + * afterwards, the queue should only be used in non-blocking mode. Waiting + * for events should be done externally using something like epoll. + * + * @param mq message queue + * @param fd producer eventfd + */ +void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd); + +/** + * Allocate event fd for queue consumer + */ +int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq); + +/** + * Allocate event fd for queue consumer + */ +int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq); + +/** * Check if message queue is full */ static inline u8 @@ -290,12 +323,13 @@ svm_msg_q_unlock (svm_msg_q_t * mq) /** * Wait for message queue event * - * Must be called with mutex held + * Must be called with mutex held. The queue only works non-blocking + * with eventfds, so handle blocking calls as an exception here. */ static inline void svm_msg_q_wait (svm_msg_q_t * mq) { - pthread_cond_wait (&mq->q->condvar, &mq->q->mutex); + svm_queue_wait (mq->q); } /** @@ -309,13 +343,19 @@ svm_msg_q_wait (svm_msg_q_t * mq) static inline int svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout) { - struct timespec ts; + return svm_queue_timedwait (mq->q, timeout); +} + +static inline int +svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq) +{ + return mq->q->consumer_evtfd; +} - ts.tv_sec = unix_time_now () + (u32) timeout; - ts.tv_nsec = (timeout - (u32) timeout) * 1e9; - if (pthread_cond_timedwait (&mq->q->condvar, &mq->q->mutex, &ts)) - return -1; - return 0; +static inline int +svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq) +{ + return mq->q->producer_evtfd; } #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */ diff --git a/src/svm/queue.c b/src/svm/queue.c index 0fa1fe9b230..771033d7d8a 100644 --- a/src/svm/queue.c +++ b/src/svm/queue.c @@ -27,6 +27,7 @@ #include <vppinfra/cache.h> #include <svm/queue.h> #include <vppinfra/time.h> +#include <vppinfra/lock.h> svm_queue_t * svm_queue_init (void *base, int nels, int elsize) @@ -127,6 +128,63 @@ svm_queue_send_signal (svm_queue_t * q, u8 is_prod) } } +static inline void +svm_queue_wait_inline (svm_queue_t * q) +{ + if (q->producer_evtfd == -1) + { + pthread_cond_wait (&q->condvar, &q->mutex); + } + else + { + /* Fake a wait for event. We could use epoll but that would mean + * using yet another fd. Should do for now */ + u32 cursize = q->cursize; + pthread_mutex_unlock (&q->mutex); + while (q->cursize == cursize) + CLIB_PAUSE (); + pthread_mutex_lock (&q->mutex); + } +} + +void +svm_queue_wait (svm_queue_t * q) +{ + svm_queue_wait_inline (q); +} + +static inline int +svm_queue_timedwait_inline (svm_queue_t * q, double timeout) +{ + struct timespec ts; + ts.tv_sec = unix_time_now () + (u32) timeout; + ts.tv_nsec = (timeout - (u32) timeout) * 1e9; + + if (q->producer_evtfd == -1) + { + return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts); + } + else + { + double max_time = unix_time_now () + timeout; + u32 cursize = q->cursize; + int rv; + + pthread_mutex_unlock (&q->mutex); + while (q->cursize == cursize && unix_time_now () < max_time) + CLIB_PAUSE (); + rv = unix_time_now () < max_time ? 0 : ETIMEDOUT; + pthread_mutex_lock (&q->mutex); + return rv; + } +} + +int +svm_queue_timedwait (svm_queue_t * q, double timeout) +{ + return svm_queue_timedwait_inline (q, timeout); +} + /* * svm_queue_add_nolock */ @@ -139,9 +197,7 @@ svm_queue_add_nolock (svm_queue_t * q, u8 * elem) if (PREDICT_FALSE (q->cursize == q->maxsize)) { while (q->cursize == q->maxsize) - { - (void) pthread_cond_wait (&q->condvar, &q->mutex); - } + svm_queue_wait_inline (q); } tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); @@ -170,6 +226,9 @@ svm_queue_add_raw (svm_queue_t * q, u8 * elem) q->tail = (q->tail + 1) % q->maxsize; q->cursize++; + + if (q->cursize == 1) + svm_queue_send_signal (q, 1); } @@ -201,9 +260,7 @@ svm_queue_add (svm_queue_t * q, u8 * elem, int nowait) return (-2); } while (q->cursize == q->maxsize) - { - (void) pthread_cond_wait (&q->condvar, &q->mutex); - } + svm_queue_wait_inline (q); } tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); @@ -253,9 +310,7 @@ svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait) return (-2); } while (q->cursize + 1 == q->maxsize) - { - (void) pthread_cond_wait (&q->condvar, &q->mutex); - } + svm_queue_wait_inline (q); } tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); @@ -317,13 +372,9 @@ svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond, } else if (cond == SVM_Q_TIMEDWAIT) { - struct timespec ts; - ts.tv_sec = unix_time_now () + time; - ts.tv_nsec = 0; while (q->cursize == 0 && rc == 0) - { - rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts); - } + rc = svm_queue_timedwait_inline (q, time); + if (rc == ETIMEDOUT) { pthread_mutex_unlock (&q->mutex); @@ -333,9 +384,7 @@ svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond, else { while (q->cursize == 0) - { - (void) pthread_cond_wait (&q->condvar, &q->mutex); - } + svm_queue_wait_inline (q); } } diff --git a/src/svm/queue.h b/src/svm/queue.h index 75e63a49319..3e8031e897b 100644 --- a/src/svm/queue.h +++ b/src/svm/queue.h @@ -82,6 +82,24 @@ int svm_queue_add_nolock (svm_queue_t * q, u8 * elem); int svm_queue_sub_raw (svm_queue_t * q, u8 * elem); /** + * Wait for queue event + * + * Must be called with mutex held. + */ +void svm_queue_wait (svm_queue_t * q); + +/** + * Timed wait for queue event + * + * Must be called with mutex held. + * + * @param q svm queue + * @param timeout time in seconds + * @return 0 on success, ETIMEDOUT on timeout or an error + */ +int svm_queue_timedwait (svm_queue_t * q, double timeout); + +/** * Add element to queue with mutex held * @param q queue * @param elem pointer element data to add |