diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/plugins/hs_apps/sapi/vpp_echo_bapi.c | 2 | ||||
-rw-r--r-- | src/plugins/unittest/session_test.c | 5 | ||||
-rw-r--r-- | src/svm/fifo_segment.c | 7 | ||||
-rw-r--r-- | src/svm/message_queue.c | 269 | ||||
-rw-r--r-- | src/svm/message_queue.h | 107 | ||||
-rw-r--r-- | src/vcl/vcl_bapi.c | 4 | ||||
-rw-r--r-- | src/vcl/vcl_private.c | 2 | ||||
-rw-r--r-- | src/vcl/vcl_sapi.c | 5 | ||||
-rw-r--r-- | src/vnet/session/segment_manager.c | 2 | ||||
-rw-r--r-- | src/vnet/session/session.c | 3 | ||||
-rw-r--r-- | src/vnet/session/session_api.c | 8 | ||||
-rw-r--r-- | src/vnet/session/session_debug.c | 20 |
12 files changed, 313 insertions, 121 deletions
diff --git a/src/plugins/hs_apps/sapi/vpp_echo_bapi.c b/src/plugins/hs_apps/sapi/vpp_echo_bapi.c index 807ec62d951..915448a0be4 100644 --- a/src/plugins/hs_apps/sapi/vpp_echo_bapi.c +++ b/src/plugins/hs_apps/sapi/vpp_echo_bapi.c @@ -437,7 +437,7 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp) echo_segment_attach_mq (segment_handle, mp->app_mq, 0, &em->app_mq); if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD) - svm_msg_q_set_consumer_eventfd (em->app_mq, fds[n_fds++]); + svm_msg_q_set_eventfd (em->app_mq, fds[n_fds++]); vec_free (fds); } diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c index 68605b2cbd7..6496a99ba2c 100644 --- a/src/plugins/unittest/session_test.c +++ b/src/plugins/unittest/session_test.c @@ -1839,9 +1839,8 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input) mq = app_wrk->event_queue; if (use_eventfd) { - svm_msg_q_alloc_producer_eventfd (mq); - svm_msg_q_alloc_consumer_eventfd (mq); - prod_fd = svm_msg_q_get_producer_eventfd (mq); + svm_msg_q_alloc_eventfd (mq); + prod_fd = svm_msg_q_get_eventfd (mq); SESSION_TEST (prod_fd != -1, "mq producer eventd valid %u", prod_fd); } diff --git a/src/svm/fifo_segment.c b/src/svm/fifo_segment.c index d30932448c6..3e9aecb9dc8 100644 --- a/src/svm/fifo_segment.c +++ b/src/svm/fifo_segment.c @@ -1042,7 +1042,7 @@ fifo_segment_msg_q_attach (fifo_segment_t *fs, uword offset, u32 mq_index) mq = vec_elt_at_index (fs->mqs, mq_index); - if (!mq->q) + if (!mq->q.shr) { svm_msg_q_shared_t *smq; smq = (svm_msg_q_shared_t *) ((u8 *) fs->h + offset); @@ -1059,10 +1059,11 @@ fifo_segment_msg_q_offset (fifo_segment_t *fs, u32 mq_index) { svm_msg_q_t *mq = vec_elt_at_index (fs->mqs, mq_index); - if (mq->q == 0) + if (mq->q.shr == 0) return ~0ULL; - return (uword) ((u8 *) mq->q - (u8 *) fs->h) - sizeof (svm_msg_q_shared_t); + return (uword) ((u8 *) mq->q.shr - (u8 *) fs->h) - + sizeof (svm_msg_q_shared_t); } int diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c index 0ebce702430..fdf9293b18c 100644 --- a/src/svm/message_queue.c +++ b/src/svm/message_queue.c @@ -16,7 +16,9 @@ #include <svm/message_queue.h> #include <vppinfra/mem.h> #include <vppinfra/format.h> +#include <vppinfra/time.h> #include <sys/eventfd.h> +#include <sys/socket.h> static inline svm_msg_q_ring_t * svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index) @@ -37,19 +39,51 @@ svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index) return (ring->shr->data + elt_index * ring->elsize); } +static void +svm_msg_q_init_mutex (svm_msg_q_shared_queue_t *sq) +{ + pthread_mutexattr_t attr; + pthread_condattr_t cattr; + + clib_memset (&attr, 0, sizeof (attr)); + clib_memset (&cattr, 0, sizeof (cattr)); + + if (pthread_mutexattr_init (&attr)) + clib_unix_warning ("mutexattr_init"); + if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED)) + clib_unix_warning ("pthread_mutexattr_setpshared"); + if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST)) + clib_unix_warning ("setrobust"); + if (pthread_mutex_init (&sq->mutex, &attr)) + clib_unix_warning ("mutex_init"); + if (pthread_mutexattr_destroy (&attr)) + clib_unix_warning ("mutexattr_destroy"); + if (pthread_condattr_init (&cattr)) + clib_unix_warning ("condattr_init"); + if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED)) + clib_unix_warning ("condattr_setpshared"); + if (pthread_cond_init (&sq->condvar, &cattr)) + clib_unix_warning ("cond_init1"); + if (pthread_condattr_destroy (&cattr)) + clib_unix_warning ("cond_init2"); +} + svm_msg_q_shared_t * svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg) { svm_msg_q_ring_shared_t *ring; + svm_msg_q_shared_queue_t *sq; svm_msg_q_shared_t *smq; u32 q_sz, offset; int i; - q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t); + q_sz = sizeof (*sq) + cfg->q_nitems * sizeof (svm_msg_q_msg_t); smq = (svm_msg_q_shared_t *) base; - svm_queue_init (&smq->q, cfg->q_nitems, sizeof (svm_msg_q_msg_t)); - smq->q->consumer_pid = cfg->consumer_pid; + sq = smq->q; + clib_memset (sq, 0, sizeof (*sq)); + sq->elsize = sizeof (svm_msg_q_msg_t); + sq->maxsize = cfg->q_nitems; smq->n_rings = cfg->n_rings; ring = (void *) ((u8 *) smq->q + q_sz); for (i = 0; i < cfg->n_rings; i++) @@ -61,6 +95,8 @@ svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg) ring = (void *) ((u8 *) ring + offset); } + svm_msg_q_init_mutex (sq); + return smq; } @@ -83,7 +119,8 @@ svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg) rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize; } - q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t); + q_sz = sizeof (svm_msg_q_shared_queue_t) + + cfg->q_nitems * sizeof (svm_msg_q_msg_t); mq_sz = sizeof (svm_msg_q_shared_t) + q_sz + rings_sz; return mq_sz; @@ -111,10 +148,12 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base) u32 i, n_rings, q_sz, offset; smq = (svm_msg_q_shared_t *) smq_base; - mq->q = smq->q; + mq->q.shr = smq->q; + mq->q.evtfd = -1; n_rings = smq->n_rings; vec_validate (mq->rings, n_rings - 1); - q_sz = sizeof (svm_queue_t) + mq->q->maxsize * sizeof (svm_msg_q_msg_t); + q_sz = sizeof (svm_msg_q_shared_queue_t) + + mq->q.shr->maxsize * sizeof (svm_msg_q_msg_t); ring = (void *) ((u8 *) smq->q + q_sz); for (i = 0; i < n_rings; i++) { @@ -129,10 +168,31 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base) void svm_msg_q_free (svm_msg_q_t * mq) { - svm_queue_free (mq->q); + clib_mem_free (mq->q.shr); clib_mem_free (mq); } +static void +svm_msg_q_send_signal (svm_msg_q_t *mq) +{ + if (mq->q.evtfd == -1) + { + (void) pthread_cond_broadcast (&mq->q.shr->condvar); + } + else + { + int __clib_unused rv; + u64 data = 1; + + if (mq->q.evtfd < 0) + return; + + rv = write (mq->q.evtfd, &data, sizeof (data)); + if (PREDICT_FALSE (rv < 0)) + clib_unix_warning ("signal write on %d returned %d", mq->q.evtfd, rv); + } +} + svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index) { @@ -147,7 +207,7 @@ svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index) msg.ring_index = ring - mq->rings; msg.elt_index = sr->tail; sr->tail = (sr->tail + 1) % ring->nitems; - clib_atomic_fetch_add (&sr->cursize, 1); + clib_atomic_fetch_add_rel (&sr->cursize, 1); return msg; } @@ -193,7 +253,7 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes) msg.ring_index = ring - mq->rings; msg.elt_index = sr->tail; sr->tail = (sr->tail + 1) % ring->nitems; - clib_atomic_fetch_add (&sr->cursize, 1); + clib_atomic_fetch_add_rel (&sr->cursize, 1); break; } return msg; @@ -228,10 +288,10 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) } need_signal = sr->cursize == ring->nitems; - clib_atomic_fetch_sub (&sr->cursize, 1); + clib_atomic_fetch_sub_rel (&sr->cursize, 1); if (PREDICT_FALSE (need_signal)) - svm_queue_send_signal (mq->q, 0); + svm_msg_q_send_signal (mq); } static int @@ -257,71 +317,216 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) return (dist1 < dist2); } +static void +svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem) +{ + svm_msg_q_shared_queue_t *sq = mq->q.shr; + i8 *tailp; + u32 sz; + + tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail); + clib_memcpy_fast (tailp, elem, sq->elsize); + + sq->tail = (sq->tail + 1) % sq->maxsize; + + sz = clib_atomic_fetch_add_rel (&sq->cursize, 1); + if (!sz) + svm_msg_q_send_signal (mq); +} + int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait) { ASSERT (svm_msq_q_msg_is_valid (mq, msg)); - return svm_queue_add (mq->q, (u8 *) msg, nowait); + + if (nowait) + { + /* zero on success */ + if (svm_msg_q_try_lock (mq)) + { + return (-1); + } + } + else + svm_msg_q_lock (mq); + + if (PREDICT_FALSE (svm_msg_q_is_full (mq))) + { + if (nowait) + return (-2); + while (svm_msg_q_is_full (mq)) + svm_msg_q_wait (mq); + } + + svm_msg_q_add_raw (mq, (u8 *) msg); + + svm_msg_q_unlock (mq); + + return 0; } void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { ASSERT (svm_msq_q_msg_is_valid (mq, msg)); - svm_queue_add_raw (mq->q, (u8 *) msg); + svm_msg_q_add_raw (mq, (u8 *) msg); svm_msg_q_unlock (mq); } +static int +svm_msg_q_sub_raw (svm_msg_q_t *mq, u8 *elem) +{ + svm_msg_q_shared_queue_t *sq = mq->q.shr; + i8 *headp; + u32 sz; + + ASSERT (!svm_msg_q_is_empty (mq)); + + headp = (i8 *) (&sq->data[0] + sq->elsize * sq->head); + clib_memcpy_fast (elem, headp, sq->elsize); + + sq->head = (sq->head + 1) % sq->maxsize; + + sz = clib_atomic_fetch_sub_rel (&sq->cursize, 1); + if (PREDICT_FALSE (sz == sq->maxsize)) + svm_msg_q_send_signal (mq); + + return 0; +} + int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, svm_q_conditional_wait_t cond, u32 time) { - return svm_queue_sub (mq->q, (u8 *) msg, cond, time); -} + int rc = 0; -void -svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) -{ - svm_queue_sub_raw (mq->q, (u8 *) msg); + if (cond == SVM_Q_NOWAIT) + { + /* zero on success */ + if (svm_msg_q_try_lock (mq)) + { + return (-1); + } + } + else + svm_msg_q_lock (mq); + + if (PREDICT_FALSE (svm_msg_q_is_empty (mq))) + { + if (cond == SVM_Q_NOWAIT) + { + svm_msg_q_unlock (mq); + return (-2); + } + else if (cond == SVM_Q_TIMEDWAIT) + { + while (svm_msg_q_is_empty (mq) && rc == 0) + rc = svm_msg_q_timedwait (mq, time); + + if (rc == ETIMEDOUT) + { + svm_msg_q_unlock (mq); + return ETIMEDOUT; + } + } + else + { + while (svm_msg_q_is_empty (mq)) + svm_msg_q_wait (mq); + } + } + + svm_msg_q_sub_raw (mq, (u8 *) msg); + + svm_msg_q_unlock (mq); + + return 0; } void -svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd) +svm_msg_q_sub_w_lock (svm_msg_q_t *mq, svm_msg_q_msg_t *msg) { - mq->q->consumer_evtfd = fd; + svm_msg_q_sub_raw (mq, (u8 *) msg); } void -svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd) +svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd) { - mq->q->producer_evtfd = fd; + mq->q.evtfd = fd; } int -svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq) +svm_msg_q_alloc_eventfd (svm_msg_q_t *mq) { int fd; if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) return -1; - svm_msg_q_set_consumer_eventfd (mq, fd); + svm_msg_q_set_eventfd (mq, fd); return 0; } +void +svm_msg_q_wait (svm_msg_q_t *mq) +{ + if (mq->q.evtfd == -1) + { + pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex); + } + else + { + u64 buf; + int rv; + + svm_msg_q_unlock (mq); + while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0) + { + if (errno != EAGAIN) + { + clib_unix_warning ("read error"); + return; + } + } + svm_msg_q_lock (mq); + } +} + int -svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq) +svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout) { - int fd; - if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) - return -1; - svm_msg_q_set_producer_eventfd (mq, fd); - return 0; + if (mq->q.evtfd == -1) + { + struct timespec ts; + ts.tv_sec = unix_time_now () + (u32) timeout; + ts.tv_nsec = (timeout - (u32) timeout) * 1e9; + return pthread_cond_timedwait (&mq->q.shr->condvar, &mq->q.shr->mutex, + &ts); + } + else + { + struct timeval tv; + u64 buf; + int rv; + + tv.tv_sec = (u64) timeout; + tv.tv_usec = ((u64) timeout - (u64) timeout) * 1e9; + setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv, + sizeof tv); + + svm_msg_q_unlock (mq); + rv = read (mq->q.evtfd, &buf, sizeof (buf)); + if (rv < 0) + clib_warning ("read %u", errno); + svm_msg_q_lock (mq); + + return rv < 0 ? errno : 0; + } } u8 * format_svm_msg_q (u8 * s, va_list * args) { svm_msg_q_t *mq = va_arg (*args, svm_msg_q_t *); - s = format (s, " [Q:%d/%d]", mq->q->cursize, mq->q->maxsize); + s = format (s, " [Q:%d/%d]", mq->q.shr->cursize, mq->q.shr->maxsize); for (u32 i = 0; i < vec_len (mq->rings); i++) { s = format (s, " [R%d:%d/%d]", i, mq->rings[i].shr->cursize, diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h index 4b314b837e9..7716c6724d8 100644 --- a/src/svm/message_queue.h +++ b/src/svm/message_queue.h @@ -24,6 +24,25 @@ #include <vppinfra/error.h> #include <svm/queue.h> +typedef struct svm_msg_q_shr_queue_ +{ + pthread_mutex_t mutex; /* 8 bytes */ + pthread_cond_t condvar; /* 8 bytes */ + u32 head; + u32 tail; + volatile u32 cursize; + u32 maxsize; + u32 elsize; + u32 pad; + u8 data[0]; +} svm_msg_q_shared_queue_t; + +typedef struct svm_msg_q_queue_ +{ + svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */ + int evtfd; /**< producer/consumer eventfd */ +} svm_msg_q_queue_t; + typedef struct svm_msg_q_ring_shared_ { volatile u32 cursize; /**< current size of the ring */ @@ -43,14 +62,14 @@ typedef struct svm_msg_q_ring_ typedef struct svm_msg_q_shared_ { - u32 n_rings; /**< number of rings after q */ - u32 pad; /**< 8 byte alignment for q */ - svm_queue_t q[0]; /**< queue for exchanging messages */ + u32 n_rings; /**< number of rings after q */ + u32 pad; /**< 8 byte alignment for q */ + svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */ } __clib_packed svm_msg_q_shared_t; typedef struct svm_msg_q_ { - svm_queue_t *q; /**< queue for exchanging messages */ + svm_msg_q_queue_t q; /**< queue for exchanging messages */ svm_msg_q_ring_t *rings; /**< rings with message data*/ } __clib_packed svm_msg_q_t; @@ -232,7 +251,7 @@ void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index); /** - * Set event fd for queue consumer + * Set event fd for queue * * If set, queue will exclusively use eventfds for signaling. Moreover, * afterwards, the queue should only be used in non-blocking mode. Waiting @@ -241,35 +260,26 @@ svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index); * @param mq message queue * @param fd consumer eventfd */ -void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd); +void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd); /** - * Set event fd for queue producer - * - * If set, queue will exclusively use eventfds for signaling. Moreover, - * afterwards, the queue should only be used in non-blocking mode. Waiting - * for events should be done externally using something like epoll. - * - * @param mq message queue - * @param fd producer eventfd + * Allocate event fd for queue */ -void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd); +int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq); /** - * Allocate event fd for queue consumer + * Format message queue, shows msg count for each ring */ -int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq); +u8 *format_svm_msg_q (u8 *s, va_list *args); /** - * Allocate event fd for queue consumer - */ -int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq); - - -/** - * Format message queue, shows msg count for each ring + * Check length of message queue */ -u8 *format_svm_msg_q (u8 * s, va_list * args); +static inline u32 +svm_msg_q_size (svm_msg_q_t *mq) +{ + return clib_atomic_load_relax_n (&mq->q.shr->cursize); +} /** * Check if message queue is full @@ -277,14 +287,14 @@ u8 *format_svm_msg_q (u8 * s, va_list * args); static inline u8 svm_msg_q_is_full (svm_msg_q_t * mq) { - return (mq->q->cursize == mq->q->maxsize); + return (svm_msg_q_size (mq) == mq->q.shr->maxsize); } static inline u8 svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index) { svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index); - return (ring->shr->cursize >= ring->nitems); + return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems); } /** @@ -293,16 +303,7 @@ svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index) static inline u8 svm_msg_q_is_empty (svm_msg_q_t * mq) { - return (mq->q->cursize == 0); -} - -/** - * Check length of message queue - */ -static inline u32 -svm_msg_q_size (svm_msg_q_t * mq) -{ - return mq->q->cursize; + return (svm_msg_q_size (mq) == 0); } /** @@ -320,9 +321,9 @@ svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg) static inline int svm_msg_q_try_lock (svm_msg_q_t * mq) { - int rv = pthread_mutex_trylock (&mq->q->mutex); + int rv = pthread_mutex_trylock (&mq->q.shr->mutex); if (PREDICT_FALSE (rv == EOWNERDEAD)) - rv = pthread_mutex_consistent (&mq->q->mutex); + rv = pthread_mutex_consistent (&mq->q.shr->mutex); return rv; } @@ -332,9 +333,9 @@ svm_msg_q_try_lock (svm_msg_q_t * mq) static inline int svm_msg_q_lock (svm_msg_q_t * mq) { - int rv = pthread_mutex_lock (&mq->q->mutex); + int rv = pthread_mutex_lock (&mq->q.shr->mutex); if (PREDICT_FALSE (rv == EOWNERDEAD)) - rv = pthread_mutex_consistent (&mq->q->mutex); + rv = pthread_mutex_consistent (&mq->q.shr->mutex); return rv; } @@ -344,7 +345,7 @@ svm_msg_q_lock (svm_msg_q_t * mq) static inline void svm_msg_q_unlock (svm_msg_q_t * mq) { - pthread_mutex_unlock (&mq->q->mutex); + pthread_mutex_unlock (&mq->q.shr->mutex); } /** @@ -353,11 +354,7 @@ svm_msg_q_unlock (svm_msg_q_t * mq) * Must be called with mutex held. The queue only works non-blocking * with eventfds, so handle blocking calls as an exception here. */ -static inline void -svm_msg_q_wait (svm_msg_q_t * mq) -{ - svm_queue_wait (mq->q); -} +void svm_msg_q_wait (svm_msg_q_t *mq); /** * Timed wait for message queue event @@ -367,22 +364,12 @@ svm_msg_q_wait (svm_msg_q_t * mq) * @param mq message queue * @param timeout time in seconds */ -static inline int -svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout) -{ - return svm_queue_timedwait (mq->q, timeout); -} - -static inline int -svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq) -{ - return mq->q->consumer_evtfd; -} +int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout); static inline int -svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq) +svm_msg_q_get_eventfd (svm_msg_q_t *mq) { - return mq->q->producer_evtfd; + return mq->q.evtfd; } #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */ diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c index 7d241624d01..48695a31a3e 100644 --- a/src/vcl/vcl_bapi.c +++ b/src/vcl/vcl_bapi.c @@ -121,7 +121,7 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp) if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD) { - svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]); + svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]); vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue); n_fds++; } @@ -215,7 +215,7 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t * if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD) { - svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]); + svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]); vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue); n_fds++; } diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c index a140e5ea557..b9745d27e4e 100644 --- a/src/vcl/vcl_private.c +++ b/src/vcl/vcl_private.c @@ -46,7 +46,7 @@ vcl_mq_epoll_add_evfd (vcl_worker_t * wrk, svm_msg_q_t * mq) u32 mqc_index; int mq_fd; - mq_fd = svm_msg_q_get_consumer_eventfd (mq); + mq_fd = svm_msg_q_get_eventfd (mq); if (wrk->mqs_epfd < 0 || mq_fd == -1) return -1; diff --git a/src/vcl/vcl_sapi.c b/src/vcl/vcl_sapi.c index bc44272a5c9..1bab7eaba13 100644 --- a/src/vcl/vcl_sapi.c +++ b/src/vcl/vcl_sapi.c @@ -89,8 +89,7 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds) if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD) { - svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, - fds[n_fds_used++]); + svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds_used++]); vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue); } @@ -236,7 +235,7 @@ vcl_api_add_del_worker_reply_handler (app_sapi_worker_add_del_reply_msg_t * if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD) { - svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]); + svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]); vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue); n_fds++; } diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c index 61108326abc..ffac4e0bc26 100644 --- a/src/vnet/session/segment_manager.c +++ b/src/vnet/session/segment_manager.c @@ -870,7 +870,7 @@ segment_manager_alloc_queue (fifo_segment_t * segment, if (props->use_mq_eventfd) { - if (svm_msg_q_alloc_producer_eventfd (q)) + if (svm_msg_q_alloc_eventfd (q)) clib_warning ("failed to alloc eventfd"); } return q; diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index d1f21dafdd6..169cca5efe8 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -1548,9 +1548,6 @@ session_vpp_event_queues_allocate (session_main_t * smm) cfg->ring_cfgs = rc; smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (eqs, i, cfg); - - if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue)) - clib_warning ("eventfd returned"); } } diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index 2e215f76051..0116a7ece66 100644 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -667,7 +667,7 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp) if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD) { fd_flags |= SESSION_FD_F_MQ_EVENTFD; - fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q); + fds[n_fds] = svm_msg_q_get_eventfd (a->app_evt_q); n_fds += 1; } @@ -751,7 +751,7 @@ vl_api_app_worker_add_del_t_handler (vl_api_app_worker_add_del_t * mp) if (application_segment_manager_properties (app)->use_mq_eventfd) { fd_flags |= SESSION_FD_F_MQ_EVENTFD; - fds[n_fds] = svm_msg_q_get_producer_eventfd (args.evt_q); + fds[n_fds] = svm_msg_q_get_eventfd (args.evt_q); n_fds += 1; } @@ -1317,7 +1317,7 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs, if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD) { fd_flags |= SESSION_FD_F_MQ_EVENTFD; - fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q); + fds[n_fds] = svm_msg_q_get_eventfd (a->app_evt_q); n_fds += 1; } @@ -1426,7 +1426,7 @@ sapi_add_del_worker_handler (app_namespace_t * app_ns, if (application_segment_manager_properties (app)->use_mq_eventfd) { fd_flags |= SESSION_FD_F_MQ_EVENTFD; - fds[n_fds] = svm_msg_q_get_producer_eventfd (args.evt_q); + fds[n_fds] = svm_msg_q_get_eventfd (args.evt_q); n_fds += 1; } diff --git a/src/vnet/session/session_debug.c b/src/vnet/session/session_debug.c index cd4198c5da8..c042e9e53db 100644 --- a/src/vnet/session/session_debug.c +++ b/src/vnet/session/session_debug.c @@ -123,6 +123,7 @@ dump_thread_0_event_queue (void) vlib_main_t *vm = &vlib_global_main; u32 my_thread_index = vm->thread_index; session_event_t _e, *e = &_e; + svm_msg_q_shared_queue_t *sq; svm_msg_q_ring_t *ring; session_t *s0; svm_msg_q_msg_t *msg; @@ -130,11 +131,12 @@ dump_thread_0_event_queue (void) int i, index; mq = session_main_get_vpp_event_queue (my_thread_index); - index = mq->q->head; + sq = mq->q.shr; + index = sq->head; - for (i = 0; i < mq->q->cursize; i++) + for (i = 0; i < sq->cursize; i++) { - msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index); + msg = (svm_msg_q_msg_t *) (&sq->data[0] + sq->elsize * index); ring = svm_msg_q_ring (mq, msg->ring_index); clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize); @@ -170,7 +172,7 @@ dump_thread_0_event_queue (void) index++; - if (index == mq->q->maxsize) + if (index == sq->maxsize) index = 0; } } @@ -210,6 +212,7 @@ session_node_cmp_event (session_event_t * e, svm_fifo_t * f) u8 session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e) { + svm_msg_q_shared_queue_t *sq; session_evt_elt_t *elt; session_worker_t *wrk; int i, index, found = 0; @@ -226,16 +229,17 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e) * Search evt queue */ mq = wrk->vpp_event_queue; - index = mq->q->head; - for (i = 0; i < mq->q->cursize; i++) + sq = mq->q.shr; + index = sq->head; + for (i = 0; i < sq->cursize; i++) { - msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index); + msg = (svm_msg_q_msg_t *) (&sq->data[0] + sq->elsize * index); ring = svm_msg_q_ring (mq, msg->ring_index); clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize); found = session_node_cmp_event (e, f); if (found) return 1; - index = (index + 1) % mq->q->maxsize; + index = (index + 1) % sq->maxsize; } /* * Search pending events vector |