diff options
author | Florin Coras <fcoras@cisco.com> | 2018-07-04 04:15:05 -0700 |
---|---|---|
committer | Damjan Marion <dmarion@me.com> | 2018-07-17 09:02:17 +0000 |
commit | 3c2fed5145d9e40a9ecd178c2866c813eddc6203 (patch) | |
tree | 7ff2408f3b1c4a52fb6d7cd091508de1ce950e5f /src | |
parent | 5da96a77a84ae5414debbc46d390464d51010113 (diff) |
session: use msg queue for events
Change-Id: I3c58367eec2243fe19b75be78a175c5261863e9e
Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src')
27 files changed, 773 insertions, 400 deletions
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c index 4f3e7642740..89411143c12 100644 --- a/src/svm/message_queue.c +++ b/src/svm/message_queue.c @@ -16,6 +16,25 @@ #include <svm/message_queue.h> #include <vppinfra/mem.h> +static inline svm_msg_q_ring_t * +svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index) +{ + return vec_elt_at_index (mq->rings, ring_index); +} + +svm_msg_q_ring_t * +svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index) +{ + return svm_msg_q_ring_inline (mq, ring_index); +} + +static inline void * +svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index) +{ + ASSERT (elt_index < ring->nitems); + return (ring->data + elt_index * ring->elsize); +} + svm_msg_q_t * svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) { @@ -63,6 +82,53 @@ svm_msg_q_free (svm_msg_q_t * mq) } svm_msg_q_msg_t +svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index) +{ + svm_msg_q_msg_t msg = {.as_u64 = ~0 }; + svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index); + + ASSERT (ring->cursize != ring->nitems); + msg.ring_index = ring - mq->rings; + msg.elt_index = ring->tail; + ring->tail = (ring->tail + 1) % ring->nitems; + __sync_fetch_and_add (&ring->cursize, 1); + return msg; +} + +int +svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index, + u8 noblock, svm_msg_q_msg_t * msg) +{ + if (noblock) + { + if (svm_msg_q_try_lock (mq)) + return -1; + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, ring_index))) + { + svm_msg_q_unlock (mq); + return -2; + } + *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); + if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (msg))) + { + svm_msg_q_unlock (mq); + return -2; + } + } + else + { + svm_msg_q_lock (mq); + *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); + while (svm_msg_q_msg_is_invalid (msg)) + { + svm_msg_q_wait (mq); + *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); + } + } + return 0; +} + +svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes) { svm_msg_q_msg_t msg = {.as_u64 = ~0 }; @@ -81,23 +147,10 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes) return msg; } -static inline svm_msg_q_ring_t * -svm_msg_q_get_ring (svm_msg_q_t * mq, u32 ring_index) -{ - return vec_elt_at_index (mq->rings, ring_index); -} - -static inline void * -svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index) -{ - ASSERT (elt_index < ring->nitems); - return (ring->data + elt_index * ring->elsize); -} - void * svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { - svm_msg_q_ring_t *ring = svm_msg_q_get_ring (mq, msg->ring_index); + svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, msg->ring_index); return svm_msg_q_ring_data (ring, msg->elt_index); } @@ -131,7 +184,7 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) return 0; ring = &mq->rings[msg->ring_index]; - dist1 = ((ring->nitems + msg->ring_index) - ring->head) % ring->nitems; + dist1 = ((ring->nitems + msg->elt_index) - ring->head) % ring->nitems; if (ring->tail == ring->head) dist2 = (ring->cursize == 0) ? 0 : ring->nitems; else @@ -140,10 +193,17 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) } int -svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait) +svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait) +{ + ASSERT (svm_msq_q_msg_is_valid (mq, msg)); + return svm_queue_add (mq->q, (u8 *) msg, nowait); +} + +void +svm_msg_q_add_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { - ASSERT (svm_msq_q_msg_is_valid (mq, &msg)); - return svm_queue_add (mq->q, (u8 *) & msg, nowait); + ASSERT (svm_msq_q_msg_is_valid (mq, msg)); + svm_queue_add_raw (mq->q, (u8 *) msg); } int @@ -153,6 +213,12 @@ svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, return svm_queue_sub (mq->q, (u8 *) msg, cond, time); } +void +svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) +{ + svm_queue_sub_raw (mq->q, (u8 *) msg); +} + /* * fd.io coding-style-patch-verification: ON * diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h index 5ec8547016e..708a03d716e 100644 --- a/src/svm/message_queue.h +++ b/src/svm/message_queue.h @@ -21,14 +21,15 @@ #define SRC_SVM_MESSAGE_QUEUE_H_ #include <vppinfra/clib.h> +#include <vppinfra/error.h> #include <svm/queue.h> typedef struct svm_msg_q_ring_ { volatile u32 cursize; /**< current size of the ring */ u32 nitems; /**< max size of the ring */ - u32 head; /**< current head (for dequeue) */ - u32 tail; /**< current tail (for enqueue) */ + volatile u32 head; /**< current head (for dequeue) */ + volatile u32 tail; /**< current tail (for enqueue) */ u32 elsize; /**< size of an element */ u8 *data; /**< chunk of memory for msg data */ } svm_msg_q_ring_t; @@ -64,6 +65,7 @@ typedef union u64 as_u64; } svm_msg_q_msg_t; +#define SVM_MQ_INVALID_MSG { .as_u64 = ~0 } /** * Allocate message queue * @@ -98,6 +100,36 @@ void svm_msg_q_free (svm_msg_q_t * mq); svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes); /** + * Allocate message buffer on ring + * + * Message is allocated, on requested ring. The caller MUST check that + * the ring is not full. + * + * @param mq message queue + * @param ring_index ring on which the allocation should occur + * @return message structure pointing to the ring and position + * allocated + */ +svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index); + +/** + * Lock message queue and allocate message buffer on ring + * + * This should be used when multiple writers/readers are expected to + * compete for the rings/queue. Message should be enqueued by calling + * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once + * the message in enqueued. + * + * @param mq message queue + * @param ring_index ring on which the allocation should occur + * @param noblock flag that indicates if request should block + * @param msg pointer to message to be filled in + * @return 0 on success, negative number otherwise + */ +int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index, + u8 noblock, svm_msg_q_msg_t * msg); + +/** * Free message buffer * * Marks message buffer on ring as free. @@ -106,6 +138,7 @@ svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes); * @param msg message to be freed */ void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); + /** * Producer enqueue one message to queue * @@ -117,7 +150,20 @@ void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); * @param nowait flag to indicate if request is blocking or not * @return success status */ -int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait); +int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait); + +/** + * Producer enqueue one message to queue with mutex held + * + * Prior to calling this, the producer should've obtained a message buffer + * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes + * the queue mutex is held. + * + * @param mq message queue + * @param msg message (pointer to ring position) to be enqueued + * @return success status + */ +void svm_msg_q_add_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); /** * Consumer dequeue one message from queue @@ -129,13 +175,28 @@ int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait); * @param mq message queue * @param msg pointer to structure where message is to be received * @param cond flag that indicates if request should block or not + * @param time time to wait if condition it SVM_Q_TIMEDWAIT * @return success status */ int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, svm_q_conditional_wait_t cond, u32 time); /** - * Get data for message in queu + * Consumer dequeue one message from queue with mutex held + * + * Returns the message pointing to the data in the message rings under the + * assumption that the message queue lock is already held. The consumer is + * expected to call @ref svm_msg_q_free_msg once it finishes + * processing/copies the message data. + * + * @param mq message queue + * @param msg pointer to structure where message is to be received + * @return success status + */ +void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); + +/** + * Get data for message in queue * * @param mq message queue * @param msg message for which the data is requested @@ -143,6 +204,94 @@ int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, */ void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); +/** + * Get message queue ring + * + * @param mq message queue + * @param ring_index index of ring + * @return pointer to ring + */ +svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index); + +/** + * Check if message queue is full + */ +static inline u8 +svm_msg_q_is_full (svm_msg_q_t * mq) +{ + return (mq->q->cursize == mq->q->maxsize); +} + +static inline u8 +svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index) +{ + ASSERT (ring_index < vec_len (mq->rings)); + return (mq->rings[ring_index].cursize == mq->rings[ring_index].nitems); +} + +/** + * Check if message queue is empty + */ +static inline u8 +svm_msg_q_is_empty (svm_msg_q_t * mq) +{ + return (mq->q->cursize == 0); +} + +/** + * Check length of message queue + */ +static inline u32 +svm_msg_q_size (svm_msg_q_t * mq) +{ + return mq->q->cursize; +} + +/** + * Check if message is invalid + */ +static inline u8 +svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg) +{ + return (msg->as_u64 == (u64) ~ 0); +} + +/** + * Try locking message queue + */ +static inline int +svm_msg_q_try_lock (svm_msg_q_t * mq) +{ + return pthread_mutex_trylock (&mq->q->mutex); +} + +/** + * Lock, or block trying, the message queue + */ +static inline int +svm_msg_q_lock (svm_msg_q_t * mq) +{ + return pthread_mutex_lock (&mq->q->mutex); +} + +static inline void +svm_msg_q_wait (svm_msg_q_t * mq) +{ + pthread_cond_wait (&mq->q->condvar, &mq->q->mutex); +} + +/** + * Unlock message queue + */ +static inline void +svm_msg_q_unlock (svm_msg_q_t * mq) +{ + /* The other side of the connection is not polling */ + if (mq->q->cursize < (mq->q->maxsize / 8)) + (void) pthread_cond_broadcast (&mq->q->condvar); + pthread_mutex_unlock (&mq->q->mutex); +} + #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */ /* diff --git a/src/svm/queue.c b/src/svm/queue.c index 96e40fc2aec..8e18f5832e3 100644 --- a/src/svm/queue.c +++ b/src/svm/queue.c @@ -154,26 +154,16 @@ svm_queue_add_nolock (svm_queue_t * q, u8 * elem) return 0; } -int +void svm_queue_add_raw (svm_queue_t * q, u8 * elem) { i8 *tailp; - if (PREDICT_FALSE (q->cursize == q->maxsize)) - { - while (q->cursize == q->maxsize) - ; - } - tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); clib_memcpy (tailp, elem, q->elsize); - q->tail++; + q->tail = (q->tail + 1) % q->maxsize; q->cursize++; - - if (q->tail == q->maxsize) - q->tail = 0; - return 0; } @@ -414,11 +404,9 @@ svm_queue_sub_raw (svm_queue_t * q, u8 * elem) headp = (i8 *) (&q->data[0] + q->elsize * q->head); clib_memcpy (elem, headp, q->elsize); - q->head++; + q->head = (q->head + 1) % q->maxsize; q->cursize--; - if (q->head == q->maxsize) - q->head = 0; return 0; } diff --git a/src/svm/queue.h b/src/svm/queue.h index 856c17237d1..68a63d769b6 100644 --- a/src/svm/queue.h +++ b/src/svm/queue.h @@ -69,7 +69,13 @@ void svm_queue_unlock (svm_queue_t * q); int svm_queue_is_full (svm_queue_t * q); int svm_queue_add_nolock (svm_queue_t * q, u8 * elem); int svm_queue_sub_raw (svm_queue_t * q, u8 * elem); -int svm_queue_add_raw (svm_queue_t * q, u8 * elem); + +/** + * Add element to queue with mutex held + * @param q queue + * @param elem pointer element data to add + */ +void svm_queue_add_raw (svm_queue_t * q, u8 * elem); /* * DEPRECATED please use svm_queue_t instead diff --git a/src/svm/svm_fifo.h b/src/svm/svm_fifo.h index 0d5a08b86ae..40182901db5 100644 --- a/src/svm/svm_fifo.h +++ b/src/svm/svm_fifo.h @@ -107,6 +107,12 @@ svm_fifo_max_dequeue (svm_fifo_t * f) return f->cursize; } +static inline int +svm_fifo_is_full (svm_fifo_t * f) +{ + return (f->cursize == f->nitems); +} + static inline u32 svm_fifo_max_enqueue (svm_fifo_t * f) { diff --git a/src/svm/test_svm_message_queue.c b/src/svm/test_svm_message_queue.c index 69ffd131ac2..758163ffeab 100644 --- a/src/svm/test_svm_message_queue.c +++ b/src/svm/test_svm_message_queue.c @@ -88,9 +88,9 @@ test1 (int verbose) test1_error ("failed: msg alloc3"); *(u32 *)svm_msg_q_msg_data (mq, &msg2) = 123; - svm_msg_q_add (mq, msg2, SVM_Q_NOWAIT); + svm_msg_q_add (mq, &msg2, SVM_Q_NOWAIT); for (i = 0; i < 12; i++) - svm_msg_q_add (mq, msg[i], SVM_Q_NOWAIT); + svm_msg_q_add (mq, &msg[i], SVM_Q_NOWAIT); if (svm_msg_q_sub (mq, &msg2, SVM_Q_NOWAIT, 0)) test1_error ("failed: dequeue1"); diff --git a/src/tests/vnet/session/tcp_echo.c b/src/tests/vnet/session/tcp_echo.c index 59314f943f0..f8b75d95ac7 100644 --- a/src/tests/vnet/session/tcp_echo.c +++ b/src/tests/vnet/session/tcp_echo.c @@ -48,7 +48,7 @@ typedef struct svm_fifo_t *server_rx_fifo; svm_fifo_t *server_tx_fifo; - svm_queue_t *vpp_evt_q; + svm_msg_q_t *vpp_evt_q; u64 vpp_session_handle; u64 bytes_sent; @@ -99,7 +99,7 @@ typedef struct int no_return; /* Our event queue */ - svm_queue_t *our_event_queue; + svm_msg_q_t *our_event_queue; /* $$$ single thread only for the moment */ svm_queue_t *vpp_event_queue; @@ -426,7 +426,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t * ASSERT (mp->app_event_queue_address); em->our_event_queue = uword_to_pointer (mp->app_event_queue_address, - svm_queue_t *); + svm_msg_q_t *); em->state = STATE_ATTACHED; } @@ -677,7 +677,6 @@ send_test_chunk (echo_main_t * em, session_t * s) svm_fifo_t *tx_fifo = s->server_tx_fifo; u8 *test_data = em->connect_test_data; u32 enq_space, min_chunk = 16 << 10; - session_fifo_event_t evt; int written; test_buf_len = vec_len (test_data); @@ -698,12 +697,8 @@ send_test_chunk (echo_main_t * em, session_t * s) s->bytes_sent += written; if (svm_fifo_set_event (tx_fifo)) - { - /* Fabricate TX event, send to vpp */ - evt.fifo = tx_fifo; - evt.event_type = FIFO_EVENT_APP_TX; - svm_queue_add (s->vpp_evt_q, (u8 *) & evt, 0 /* wait for mutex */ ); - } + app_send_io_evt_to_vpp (s->vpp_evt_q, tx_fifo, FIFO_EVENT_APP_TX, + 0 /* do wait for mutex */ ); } } @@ -751,6 +746,7 @@ client_rx_thread_fn (void *arg) session_fifo_event_t _e, *e = &_e; echo_main_t *em = &echo_main; static u8 *rx_buf = 0; + svm_msg_q_msg_t msg; vec_validate (rx_buf, 1 << 20); @@ -759,7 +755,8 @@ client_rx_thread_fn (void *arg) while (!em->time_to_stop) { - svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0); + svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0); + e = svm_msg_q_msg_data (em->our_event_queue, &msg); switch (e->event_type) { case FIFO_EVENT_APP_RX: @@ -769,6 +766,7 @@ client_rx_thread_fn (void *arg) clib_warning ("unknown event type %d", e->event_type); break; } + svm_msg_q_free_msg (em->our_event_queue, &msg); } pthread_exit (0); } @@ -808,7 +806,7 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp) session->vpp_session_handle = mp->handle; session->start = clib_time_now (&em->clib_time); session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, - svm_queue_t *); + svm_msg_q_t *); hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); @@ -869,8 +867,8 @@ client_disconnect (echo_main_t * em, session_t * s) static void clients_run (echo_main_t * em) { - session_fifo_event_t _e, *e = &_e; f64 start_time, deltat, timeout = 100.0; + svm_msg_q_msg_t msg; session_t *s; int i; @@ -918,9 +916,15 @@ clients_run (echo_main_t * em) start_time = clib_time_now (&em->clib_time); em->state = STATE_READY; while (em->n_active_clients) - if (em->our_event_queue->cursize) - svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_NOWAIT, 0); - + if (!svm_msg_q_is_empty (em->our_event_queue)) + { + if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0)) + { + clib_warning ("svm msg q returned"); + } + else + svm_msg_q_free_msg (em->our_event_queue, &msg); + } for (i = 0; i < em->n_clients; i++) { @@ -1180,10 +1184,12 @@ void server_handle_event_queue (echo_main_t * em) { session_fifo_event_t _e, *e = &_e; + svm_msg_q_msg_t msg; while (1) { - svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0); + svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0); + e = svm_msg_q_msg_data (em->our_event_queue, &msg); switch (e->event_type) { case FIFO_EVENT_APP_RX: @@ -1191,6 +1197,7 @@ server_handle_event_queue (echo_main_t * em) break; case FIFO_EVENT_DISCONNECT: + svm_msg_q_free_msg (em->our_event_queue, &msg); return; default: @@ -1204,6 +1211,7 @@ server_handle_event_queue (echo_main_t * em) em->time_to_print_stats = 0; fformat (stdout, "%d connections\n", pool_elts (em->sessions)); } + svm_msg_q_free_msg (em->our_event_queue, &msg); } } diff --git a/src/tests/vnet/session/udp_echo.c b/src/tests/vnet/session/udp_echo.c index 54e00b181bd..d796b6b8de0 100644 --- a/src/tests/vnet/session/udp_echo.c +++ b/src/tests/vnet/session/udp_echo.c @@ -87,10 +87,10 @@ typedef struct u8 is_connected; /* Our event queue */ - svm_queue_t *our_event_queue; + svm_msg_q_t *our_event_queue; /* $$$ single thread only for the moment */ - svm_queue_t *vpp_event_queue; + svm_msg_q_t *vpp_event_queue; /* $$$$ hack: cut-through session index */ volatile u32 cut_through_session_index; @@ -369,7 +369,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t * } utm->our_event_queue = - uword_to_pointer (mp->app_event_queue_address, svm_queue_t *); + uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *); } static void @@ -736,7 +736,7 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp) sizeof (ip46_address_t)); session->transport.is_ip4 = mp->lcl_is_ip4; session->transport.lcl_port = mp->lcl_port; - session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_queue_t *); + session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *); utm->state = utm->is_connected ? STATE_BOUND : STATE_READY; } @@ -896,7 +896,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp) start_time = clib_time_now (&utm->clib_time); utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address, - svm_queue_t *); + svm_msg_q_t *); rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); @@ -909,7 +909,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp) { clib_warning ("cut-through session"); utm->our_event_queue = uword_to_pointer (mp->server_event_queue_address, - svm_queue_t *); + svm_msg_q_t *); rx_fifo->master_session_index = session_index; tx_fifo->master_session_index = session_index; utm->cut_through_session_index = session_index; @@ -1012,23 +1012,23 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp) session->rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); session->tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, - svm_queue_t *); + svm_msg_q_t *); /* Cut-through case */ if (mp->client_event_queue_address) { clib_warning ("cut-through session"); utm->cut_through_session_index = session - utm->sessions; utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address, - svm_queue_t *); + svm_msg_q_t *); utm->our_event_queue = uword_to_pointer (mp->client_event_queue_address, - svm_queue_t *); + svm_msg_q_t *); utm->do_echo = 1; } else { utm->connected_session = session - utm->sessions; utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address, - svm_queue_t *); + svm_msg_q_t *); clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip, sizeof (ip46_address_t)); @@ -1134,14 +1134,20 @@ server_handle_fifo_event_rx (udp_echo_main_t * utm, session_fifo_event_t * e) void server_handle_event_queue (udp_echo_main_t * utm) { - session_fifo_event_t _e, *e = &_e; + session_fifo_event_t *e; + svm_msg_q_msg_t msg; while (utm->state != STATE_READY) sleep (5); while (1) { - svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0); + if (svm_msg_q_sub (utm->our_event_queue, &msg, SVM_Q_WAIT, 0)) + { + clib_warning ("svm msg q returned"); + continue; + } + e = svm_msg_q_msg_data (utm->our_event_queue, &msg); switch (e->event_type) { case FIFO_EVENT_APP_RX: @@ -1149,12 +1155,14 @@ server_handle_event_queue (udp_echo_main_t * utm) break; case FIFO_EVENT_DISCONNECT: - return; + utm->time_to_stop = 1; + break; default: clib_warning ("unknown event type %d", e->event_type); break; } + svm_msg_q_free_msg (utm->our_event_queue, &msg); if (PREDICT_FALSE (utm->time_to_stop == 1)) return; if (PREDICT_FALSE (utm->time_to_print_stats == 1)) diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c index cc3179d860a..ca65782e9c6 100644 --- a/src/vcl/vcl_bapi.c +++ b/src/vcl/vcl_bapi.c @@ -99,7 +99,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t * } vcm->app_event_queue = - uword_to_pointer (mp->app_event_queue_address, svm_queue_t *); + uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *); vcm->app_state = STATE_APP_ATTACHED; } @@ -291,7 +291,7 @@ done: VCL_IO_SESSIONS_UNLOCK (); } session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, - svm_queue_t *); + svm_msg_q_t *); rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); rx_fifo->client_session_index = session_index; @@ -431,7 +431,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp) session->rx_fifo = rx_fifo; session->tx_fifo = tx_fifo; session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, - svm_queue_t *); + svm_msg_q_t *); session->session_state = STATE_ACCEPT; session->transport.rmt_port = mp->port; session->transport.is_ip4 = mp->is_ip4; diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 4283b6e1167..aba4839f129 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -194,7 +194,7 @@ typedef struct vppcom_main_t_ clib_bitmap_t *ex_bitmap; /* Our event queue */ - svm_queue_t *app_event_queue; + svm_msg_q_t *app_event_queue; /* unique segment name counter */ u32 unique_segment_index; diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 1fd138e6dd3..0d077caa797 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -1164,16 +1164,19 @@ vppcom_session_read_ready (vcl_session_t * session, u32 session_index) } rv = ready; - if (vcm->app_event_queue->cursize && - !pthread_mutex_trylock (&vcm->app_event_queue->mutex)) + if (!svm_msg_q_is_empty (vcm->app_event_queue) && + !pthread_mutex_trylock (&vcm->app_event_queue->q->mutex)) { - u32 i, n_to_dequeue = vcm->app_event_queue->cursize; - session_fifo_event_t e; + u32 i, n_to_dequeue = vcm->app_event_queue->q->cursize; + svm_msg_q_msg_t msg; for (i = 0; i < n_to_dequeue; i++) - svm_queue_sub_raw (vcm->app_event_queue, (u8 *) & e); + { + svm_queue_sub_raw (vcm->app_event_queue->q, (u8 *) & msg); + svm_msg_q_free_msg (vcm->app_event_queue, &msg); + } - pthread_mutex_unlock (&vcm->app_event_queue->mutex); + pthread_mutex_unlock (&vcm->app_event_queue->q->mutex); } done: return rv; @@ -1184,8 +1187,7 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n) { vcl_session_t *session = 0; svm_fifo_t *tx_fifo = 0; - svm_queue_t *q; - session_fifo_event_t evt; + svm_msg_q_t *mq; session_state_t state; int rv, n_write, is_nonblocking; u32 poll_et; @@ -1241,18 +1243,15 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n) if ((n_write > 0) && svm_fifo_set_event (tx_fifo)) { - /* Fabricate TX event, send to vpp */ - evt.fifo = tx_fifo; - evt.event_type = FIFO_EVENT_APP_TX; - + /* Send TX event to vpp */ VCL_SESSION_LOCK_AND_GET (session_index, &session); - q = session->vpp_evt_q; - ASSERT (q); - svm_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ ); + mq = session->vpp_evt_q; + ASSERT (mq); + app_send_io_evt_to_vpp (mq, tx_fifo, FIFO_EVENT_APP_TX, SVM_Q_WAIT); VCL_SESSION_UNLOCK (); VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: added FIFO_EVENT_APP_TX " "to vpp_event_q %p, n_write %d", getpid (), - vpp_handle, session_index, q, n_write); + vpp_handle, session_index, mq, n_write); } if (n_write <= 0) diff --git a/src/vnet/session-apps/echo_client.c b/src/vnet/session-apps/echo_client.c index 6ee91f9c54b..3d1af676186 100644 --- a/src/vnet/session-apps/echo_client.c +++ b/src/vnet/session-apps/echo_client.c @@ -62,13 +62,8 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) svm_fifo_t *f = s->data.tx_fifo; rv = clib_min (svm_fifo_max_enqueue (f), bytes_this_chunk); svm_fifo_enqueue_nocopy (f, rv); - if (svm_fifo_set_event (f)) - { - session_fifo_event_t evt; - evt.fifo = f; - evt.event_type = FIFO_EVENT_APP_TX; - svm_queue_add (s->data.vpp_evt_q, (u8 *) & evt, 0); - } + session_send_io_evt_to_thread_custom (f, s->thread_index, + FIFO_EVENT_APP_TX); } else rv = app_send_stream (&s->data, test_data + test_buf_offset, @@ -98,13 +93,8 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) hdr.lcl_port = at->lcl_port; svm_fifo_enqueue_nowait (f, sizeof (hdr), (u8 *) & hdr); svm_fifo_enqueue_nocopy (f, rv); - if (svm_fifo_set_event (f)) - { - session_fifo_event_t evt; - evt.fifo = f; - evt.event_type = FIFO_EVENT_APP_TX; - svm_queue_add (s->data.vpp_evt_q, (u8 *) & evt, 0); - } + session_send_io_evt_to_thread_custom (f, s->thread_index, + FIFO_EVENT_APP_TX); } else rv = app_send_dgram (&s->data, test_data + test_buf_offset, @@ -462,18 +452,9 @@ echo_clients_rx_callback (stream_session_t * s) if (svm_fifo_max_dequeue (s->server_rx_fifo)) { - session_fifo_event_t evt; - svm_queue_t *q; if (svm_fifo_set_event (s->server_rx_fifo)) - { - evt.fifo = s->server_rx_fifo; - evt.event_type = FIFO_EVENT_BUILTIN_RX; - q = session_manager_get_vpp_event_queue (s->thread_index); - if (PREDICT_FALSE (q->cursize == q->maxsize)) - clib_warning ("out of event queue space"); - else if (svm_queue_add (q, (u8 *) & evt, 0)) - clib_warning ("failed to enqueue self-tap"); - } + session_send_io_evt_to_thread (s->server_rx_fifo, + FIFO_EVENT_BUILTIN_RX); } return 0; } diff --git a/src/vnet/session-apps/echo_client.h b/src/vnet/session-apps/echo_client.h index f3fc8d2b59e..db5ba163628 100644 --- a/src/vnet/session-apps/echo_client.h +++ b/src/vnet/session-apps/echo_client.h @@ -46,7 +46,7 @@ typedef struct * Application setup parameters */ svm_queue_t *vl_input_queue; /**< vpe input queue */ - svm_queue_t **vpp_event_queue; + svm_msg_q_t **vpp_event_queue; u32 cli_node_index; /**< cli process node index */ u32 my_client_index; /**< loopback API client handle */ diff --git a/src/vnet/session-apps/echo_server.c b/src/vnet/session-apps/echo_server.c index 7d1ae5a4d49..770f4ba7337 100644 --- a/src/vnet/session-apps/echo_server.c +++ b/src/vnet/session-apps/echo_server.c @@ -23,7 +23,7 @@ typedef struct /* * Server app parameters */ - svm_queue_t **vpp_queue; + svm_msg_q_t **vpp_queue; svm_queue_t *vl_input_queue; /**< Sever's event queue */ u32 app_index; /**< Server app index */ @@ -145,10 +145,8 @@ echo_server_rx_callback (stream_session_t * s) int actual_transfer; svm_fifo_t *tx_fifo, *rx_fifo; echo_server_main_t *esm = &echo_server_main; - session_fifo_event_t evt; u32 thread_index = vlib_get_thread_index (); app_session_transport_t at; - svm_queue_t *q; ASSERT (s->thread_index == thread_index); @@ -170,8 +168,9 @@ echo_server_rx_callback (stream_session_t * s) max_dequeue = ph.data_length - ph.data_offset; if (!esm->vpp_queue[s->thread_index]) { - q = session_manager_get_vpp_event_queue (s->thread_index); - esm->vpp_queue[s->thread_index] = q; + svm_msg_q_t *mq; + mq = session_manager_get_vpp_event_queue (s->thread_index); + esm->vpp_queue[s->thread_index] = mq; } max_enqueue -= sizeof (session_dgram_hdr_t); } @@ -191,13 +190,7 @@ echo_server_rx_callback (stream_session_t * s) /* Program self-tap to retry */ if (svm_fifo_set_event (rx_fifo)) { - evt.fifo = rx_fifo; - evt.event_type = FIFO_EVENT_BUILTIN_RX; - - q = esm->vpp_queue[s->thread_index]; - if (PREDICT_FALSE (q->cursize == q->maxsize)) - clib_warning ("out of event queue space"); - else if (svm_queue_add (q, (u8 *) & evt, 0)) + if (session_send_io_evt_to_thread (rx_fifo, FIFO_EVENT_BUILTIN_RX)) clib_warning ("failed to enqueue self-tap"); vec_validate (esm->rx_retries[s->thread_index], s->session_index); diff --git a/src/vnet/session-apps/http_server.c b/src/vnet/session-apps/http_server.c index 9ad1297b901..ef9f760c992 100644 --- a/src/vnet/session-apps/http_server.c +++ b/src/vnet/session-apps/http_server.c @@ -32,7 +32,7 @@ typedef struct typedef struct { u8 **rx_buf; - svm_queue_t **vpp_queue; + svm_msg_q_t **vpp_queue; u64 byte_index; uword *handler_by_get_request; @@ -140,7 +140,6 @@ http_cli_output (uword arg, u8 * buffer, uword buffer_bytes) void send_data (stream_session_t * s, u8 * data) { - session_fifo_event_t evt; u32 offset, bytes_to_send; f64 delay = 10e-3; http_server_main_t *hsm = &http_server_main; @@ -178,14 +177,8 @@ send_data (stream_session_t * s, u8 * data) bytes_to_send -= actual_transfer; if (svm_fifo_set_event (s->server_tx_fifo)) - { - /* Fabricate TX event, send to vpp */ - evt.fifo = s->server_tx_fifo; - evt.event_type = FIFO_EVENT_APP_TX; - - svm_queue_add (hsm->vpp_queue[s->thread_index], - (u8 *) & evt, 0 /* do wait for mutex */ ); - } + session_send_io_evt_to_thread (s->server_tx_fifo, + FIFO_EVENT_APP_TX); delay = 10e-3; } } @@ -379,15 +372,7 @@ http_server_rx_callback (stream_session_t * s) /* Send an RPC request via the thread-0 input node */ if (vlib_get_thread_index () != 0) - { - session_fifo_event_t evt; - evt.rpc_args.fp = alloc_http_process_callback; - evt.rpc_args.arg = args; - evt.event_type = FIFO_EVENT_RPC; - svm_queue_add - (session_manager_get_vpp_event_queue (0 /* main thread */ ), - (u8 *) & evt, 0 /* do wait for mutex */ ); - } + session_send_rpc_evt_to_thread (0, alloc_http_process_callback, args); else alloc_http_process (args); return 0; diff --git a/src/vnet/session-apps/proxy.c b/src/vnet/session-apps/proxy.c index 78aa0de2840..6260ad350f0 100644 --- a/src/vnet/session-apps/proxy.c +++ b/src/vnet/session-apps/proxy.c @@ -194,7 +194,6 @@ proxy_rx_callback (stream_session_t * s) int proxy_index; uword *p; svm_fifo_t *active_open_tx_fifo; - session_fifo_event_t evt; ASSERT (s->thread_index == thread_index); @@ -212,10 +211,9 @@ proxy_rx_callback (stream_session_t * s) if (svm_fifo_set_event (active_open_tx_fifo)) { u32 ao_thread_index = active_open_tx_fifo->master_thread_index; - evt.fifo = active_open_tx_fifo; - evt.event_type = FIFO_EVENT_APP_TX; - if (svm_queue_add (pm->active_open_event_queue[ao_thread_index], - (u8 *) & evt, 0 /* do wait for mutex */ )) + if (session_send_io_evt_to_thread_custom (active_open_tx_fifo, + ao_thread_index, + FIFO_EVENT_APP_TX)) clib_warning ("failed to enqueue tx evt"); } } @@ -278,7 +276,6 @@ active_open_connected_callback (u32 app_index, u32 opaque, proxy_main_t *pm = &proxy_main; proxy_session_t *ps; u8 thread_index = vlib_get_thread_index (); - session_fifo_event_t evt; if (is_fail) { @@ -320,15 +317,9 @@ active_open_connected_callback (u32 app_index, u32 opaque, /* * Send event for active open tx fifo */ + ASSERT (s->thread_index == thread_index); if (svm_fifo_set_event (s->server_tx_fifo)) - { - evt.fifo = s->server_tx_fifo; - evt.event_type = FIFO_EVENT_APP_TX; - if (svm_queue_add - (pm->active_open_event_queue[thread_index], (u8 *) & evt, - 0 /* do wait for mutex */ )) - clib_warning ("failed to enqueue tx evt"); - } + session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX); return 0; } @@ -354,8 +345,6 @@ active_open_disconnect_callback (stream_session_t * s) static int active_open_rx_callback (stream_session_t * s) { - proxy_main_t *pm = &proxy_main; - session_fifo_event_t evt; svm_fifo_t *proxy_tx_fifo; proxy_tx_fifo = s->server_rx_fifo; @@ -365,12 +354,10 @@ active_open_rx_callback (stream_session_t * s) */ if (svm_fifo_set_event (proxy_tx_fifo)) { - u32 p_thread_index = proxy_tx_fifo->master_thread_index; - evt.fifo = proxy_tx_fifo; - evt.event_type = FIFO_EVENT_APP_TX; - if (svm_queue_add (pm->server_event_queue[p_thread_index], (u8 *) & evt, - 0 /* do wait for mutex */ )) - clib_warning ("failed to enqueue server rx evt"); + u8 thread_index = proxy_tx_fifo->master_thread_index; + return session_send_io_evt_to_thread_custom (proxy_tx_fifo, + thread_index, + FIFO_EVENT_APP_TX); } return 0; diff --git a/src/vnet/session-apps/proxy.h b/src/vnet/session-apps/proxy.h index 4bca0a02cd8..c221a5e75f2 100644 --- a/src/vnet/session-apps/proxy.h +++ b/src/vnet/session-apps/proxy.h @@ -40,8 +40,8 @@ typedef struct { svm_queue_t *vl_input_queue; /**< vpe input queue */ /** per-thread vectors */ - svm_queue_t **server_event_queue; - svm_queue_t **active_open_event_queue; + svm_msg_q_t **server_event_queue; + svm_msg_q_t **active_open_event_queue; u8 **rx_buf; /**< intermediate rx buffers */ u32 cli_node_index; /**< cli process node index */ diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 60019dd7961..6041b49712d 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -807,6 +807,143 @@ application_get_segment_manager_properties (u32 app_index) return &app->sm_properties; } +static inline int +app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock) +{ + if (PREDICT_TRUE (!svm_msg_q_is_full (mq))) + { + if (lock) + { + svm_msg_q_add_w_lock (mq, msg); + svm_msg_q_unlock (mq); + } + else if (svm_msg_q_add (mq, msg, SVM_Q_WAIT)) + { + clib_warning ("msg q add returned"); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + } + else + { + clib_warning ("evt q full"); + svm_msg_q_free_msg (mq, msg); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + return 0; +} + +static inline int +app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock) +{ + session_fifo_event_t *evt; + svm_msg_q_msg_t msg; + svm_msg_q_t *mq; + + if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED)) + { + /* Session is closed so app will never clean up. Flush rx fifo */ + svm_fifo_dequeue_drop_all (s->server_rx_fifo); + return 0; + } + + /* Built-in app? Hand event to the callback... */ + if (app->cb_fns.builtin_app_rx_callback) + return app->cb_fns.builtin_app_rx_callback (s); + + /* If no need for event, return */ + if (!svm_fifo_set_event (s->server_rx_fifo)) + return 0; + + mq = app->event_queue; + if (lock) + svm_msg_q_lock (mq); + + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + clib_warning ("evt q rings full"); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + ASSERT (!svm_msg_q_msg_is_invalid (&msg)); + + evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->fifo = s->server_rx_fifo; + evt->event_type = FIFO_EVENT_APP_RX; + + return app_enqueue_evt (mq, &msg, lock); +} + +static inline int +app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock) +{ + svm_msg_q_t *mq; + session_fifo_event_t *evt; + svm_msg_q_msg_t msg; + + if (application_is_builtin (app)) + return 0; + + mq = app->event_queue; + if (lock) + svm_msg_q_lock (mq); + + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + clib_warning ("evt q rings full"); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + ASSERT (!svm_msg_q_msg_is_invalid (&msg)); + + evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->event_type = FIFO_EVENT_APP_TX; + evt->fifo = s->server_tx_fifo; + + return app_enqueue_evt (mq, &msg, lock); +} + +/* *INDENT-OFF* */ +typedef int (app_send_evt_handler_fn) (application_t *app, + stream_session_t *s, + u8 lock); +static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = { + app_send_io_evt_rx, + app_send_io_evt_tx, +}; +/* *INDENT-ON* */ + +/** + * Send event to application + * + * Logic from queue perspective is non-blocking. That is, if there's + * not enough space to enqueue a message, we return. However, if the lock + * flag is set, we do wait for queue mutex. + */ +int +application_send_event (application_t * app, stream_session_t * s, + u8 evt_type) +{ + ASSERT (app && evt_type <= FIFO_EVENT_APP_TX); + return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ ); +} + +int +application_lock_and_send_event (application_t * app, stream_session_t * s, + u8 evt_type) +{ + return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ ); +} + local_session_t * application_alloc_local_session (application_t * app) { @@ -949,14 +1086,14 @@ application_local_session_connect (u32 table_index, application_t * client, svm_fifo_segment_private_t *seg; segment_manager_t *sm; local_session_t *ls; - svm_queue_t *sq, *cq; + svm_msg_q_t *sq, *cq; ls = application_alloc_local_session (server); props = application_segment_manager_properties (server); cprops = application_segment_manager_properties (client); evt_q_elts = props->evt_q_size + cprops->evt_q_size; - evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t); + evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts); seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin; has_transport = session_has_transport ((stream_session_t *) ll); diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index aad7089ba6f..f6c81275826 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -72,7 +72,7 @@ typedef struct _application u32 ns_index; /** Application listens for events on this svm queue */ - svm_queue_t *event_queue; + svm_msg_q_t *event_queue; /* * Callbacks: shoulder-taps for the server/client @@ -207,6 +207,11 @@ int application_local_session_disconnect_w_index (u32 app_index, u32 ls_index); void application_local_sessions_del (application_t * app); +int application_send_event (application_t * app, stream_session_t * s, + u8 evt); +int application_lock_and_send_event (application_t * app, + stream_session_t * s, u8 evt_type); + always_inline u32 local_session_id (local_session_t * ll) { diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 2c171487e70..50c043493f2 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -136,7 +136,7 @@ typedef enum _(IS_BUILTIN, "Application is builtin") \ _(IS_PROXY, "Application is proxying") \ _(USE_GLOBAL_SCOPE, "App can use global session scope") \ - _(USE_LOCAL_SCOPE, "App can use local session scope") + _(USE_LOCAL_SCOPE, "App can use local session scope") \ typedef enum _app_options { @@ -187,7 +187,7 @@ typedef struct app_session_transport_ _(volatile u8, session_state) /**< session state */ \ _(u32, session_index) /**< index in owning pool */ \ _(app_session_transport_t, transport) /**< transport info */ \ - _(svm_queue_t, *vpp_evt_q) /**< vpp event queue */ \ + _(svm_msg_q_t, *vpp_evt_q) /**< vpp event queue */ \ _(u8, is_dgram) /**< flag for dgram mode */ \ typedef struct @@ -197,13 +197,73 @@ typedef struct #undef _ } app_session_t; +/** + * Send fifo io event to vpp worker thread + * + * Because there may be multiple writers to one of vpp's queues, this + * protects message allocation and enqueueing. + * + * @param mq vpp message queue + * @param f fifo for which the event is sent + * @param evt_type type of event + * @param noblock flag to indicate is request is blocking or not + * @return 0 if success, negative integer otherwise + */ +static inline int +app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type, + u8 noblock) +{ + session_fifo_event_t *evt; + svm_msg_q_msg_t msg; + + if (noblock) + { + if (svm_msg_q_try_lock (mq)) + return -1; + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + svm_msg_q_unlock (mq); + return -2; + } + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg))) + { + svm_msg_q_unlock (mq); + return -2; + } + evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->fifo = f; + evt->event_type = evt_type; + svm_msg_q_add_w_lock (mq, &msg); + svm_msg_q_unlock (mq); + return 0; + } + else + { + svm_msg_q_lock (mq); + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + while (svm_msg_q_msg_is_invalid (&msg)) + { + svm_msg_q_wait (mq); + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + } + evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->fifo = f; + evt->event_type = evt_type; + if (svm_msg_q_is_full (mq)) + svm_msg_q_wait (mq); + svm_msg_q_add_w_lock (mq, &msg); + svm_msg_q_unlock (mq); + return 0; + } +} + always_inline int app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at, - svm_queue_t * vpp_evt_q, u8 * data, u32 len, u8 noblock) + svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 noblock) { u32 max_enqueue, actual_write; session_dgram_hdr_t hdr; - session_fifo_event_t evt; int rv; max_enqueue = svm_fifo_max_enqueue (f); @@ -225,11 +285,7 @@ app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at, if ((rv = svm_fifo_enqueue_nowait (f, actual_write, data)) > 0) { if (svm_fifo_set_event (f)) - { - evt.fifo = f; - evt.event_type = FIFO_EVENT_APP_TX; - svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock); - } + app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock); } ASSERT (rv); return rv; @@ -243,20 +299,15 @@ app_send_dgram (app_session_t * s, u8 * data, u32 len, u8 noblock) } always_inline int -app_send_stream_raw (svm_fifo_t * f, svm_queue_t * vpp_evt_q, u8 * data, +app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 noblock) { - session_fifo_event_t evt; int rv; if ((rv = svm_fifo_enqueue_nowait (f, len, data)) > 0) { if (svm_fifo_set_event (f)) - { - evt.fifo = f; - evt.event_type = FIFO_EVENT_APP_TX; - svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock); - } + app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock); } return rv; } diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c index 31bb0c3aab5..b00bcd5bbc8 100644 --- a/src/vnet/session/segment_manager.c +++ b/src/vnet/session/segment_manager.c @@ -599,25 +599,52 @@ segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo, segment_manager_segment_reader_unlock (sm); } +u32 +segment_manager_evt_q_expected_size (u32 q_len) +{ + u32 fifo_evt_size, notif_q_size, q_hdrs; + u32 msg_q_sz, fifo_evt_ring_sz, session_ntf_ring_sz; + + fifo_evt_size = 1 << max_log2 (sizeof (session_fifo_event_t)); + notif_q_size = clib_max (16, q_len >> 4); + + msg_q_sz = q_len * sizeof (svm_msg_q_msg_t); + fifo_evt_ring_sz = q_len * fifo_evt_size; + session_ntf_ring_sz = notif_q_size * 256; + q_hdrs = sizeof (svm_queue_t) + sizeof (svm_msg_q_t); + + return (msg_q_sz + fifo_evt_ring_sz + session_ntf_ring_sz + q_hdrs); +} + /** * Allocates shm queue in the first segment * * Must be called with lock held */ -svm_queue_t * +svm_msg_q_t * segment_manager_alloc_queue (svm_fifo_segment_private_t * segment, u32 queue_size) { - ssvm_shared_header_t *sh; - svm_queue_t *q; + u32 fifo_evt_size, session_evt_size = 256, notif_q_size; + svm_msg_q_cfg_t _cfg, *cfg = &_cfg; + svm_msg_q_t *q; void *oldheap; - sh = segment->ssvm.sh; + fifo_evt_size = sizeof (session_fifo_event_t); + notif_q_size = clib_max (16, queue_size >> 4); + /* *INDENT-OFF* */ + svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = { + {queue_size, fifo_evt_size, 0}, + {notif_q_size, session_evt_size, 0} + }; + /* *INDENT-ON* */ + cfg->consumer_pid = 0; + cfg->n_rings = 2; + cfg->q_nitems = queue_size; + cfg->ring_cfgs = rc; - oldheap = ssvm_push_heap (sh); - q = svm_queue_init (queue_size, sizeof (session_fifo_event_t), - 0 /* consumer pid */ , - 0 /* signal when queue non-empty */ ); + oldheap = ssvm_push_heap (segment->ssvm.sh); + q = svm_msg_q_alloc (cfg); ssvm_pop_heap (oldheap); return q; } diff --git a/src/vnet/session/segment_manager.h b/src/vnet/session/segment_manager.h index 62e5e97e703..73cb4827a8b 100644 --- a/src/vnet/session/segment_manager.h +++ b/src/vnet/session/segment_manager.h @@ -17,7 +17,7 @@ #include <vnet/vnet.h> #include <svm/svm_fifo_segment.h> -#include <svm/queue.h> +#include <svm/message_queue.h> #include <vlibmemory/api.h> #include <vppinfra/lock.h> #include <vppinfra/valloc.h> @@ -61,7 +61,7 @@ typedef struct _segment_manager /** * App event queue allocated in first segment */ - svm_queue_t *event_queue; + svm_msg_q_t *event_queue; } segment_manager_t; #define segment_manager_foreach_segment_w_lock(VAR, SM, BODY) \ @@ -114,7 +114,7 @@ segment_manager_index (segment_manager_t * sm) return sm - segment_manager_main.segment_managers; } -always_inline svm_queue_t * +always_inline svm_msg_q_t * segment_manager_event_queue (segment_manager_t * sm) { return sm->event_queue; @@ -152,7 +152,8 @@ int segment_manager_try_alloc_fifos (svm_fifo_segment_private_t * fs, svm_fifo_t ** tx_fifo); void segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo); -svm_queue_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs, +u32 segment_manager_evt_q_expected_size (u32 q_size); +svm_msg_q_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs, u32 queue_size); void segment_manager_dealloc_queue (segment_manager_t * sm, svm_queue_t * q); void segment_manager_app_detach (segment_manager_t * sm); diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 26bc70e6fd7..38a0521af94 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -27,49 +27,90 @@ session_manager_main_t session_manager_main; extern transport_proto_vft_t *tp_vfts; -static void -session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type, - u32 thread_index, void *fp, void *rpc_args) +static inline int +session_send_evt_to_thread (void *data, void *args, u32 thread_index, + session_evt_type_t evt_type) { - session_fifo_event_t evt = { {0}, }; - svm_queue_t *q; + session_fifo_event_t *evt; + svm_msg_q_msg_t msg; + svm_msg_q_t *mq; u32 tries = 0, max_tries; - evt.event_type = evt_type; - if (evt_type == FIFO_EVENT_RPC) - { - evt.rpc_args.fp = fp; - evt.rpc_args.arg = rpc_args; - } - else - evt.session_handle = session_handle; - - q = session_manager_get_vpp_event_queue (thread_index); - while (svm_queue_add (q, (u8 *) & evt, 1)) + mq = session_manager_get_vpp_event_queue (thread_index); + while (svm_msg_q_try_lock (mq)) { max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3; if (tries++ == max_tries) { SESSION_DBG ("failed to enqueue evt"); - break; + return -1; } } + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + svm_msg_q_unlock (mq); + return -2; + } + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg))) + { + svm_msg_q_unlock (mq); + return -2; + } + evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->event_type = evt_type; + switch (evt_type) + { + case FIFO_EVENT_RPC: + evt->rpc_args.fp = data; + evt->rpc_args.arg = args; + break; + case FIFO_EVENT_APP_TX: + case FIFO_EVENT_BUILTIN_RX: + evt->fifo = data; + break; + case FIFO_EVENT_DISCONNECT: + evt->session_handle = session_handle ((stream_session_t *) data); + break; + default: + clib_warning ("evt unhandled!"); + svm_msg_q_unlock (mq); + return -1; + } + + svm_msg_q_add_w_lock (mq, &msg); + svm_msg_q_unlock (mq); + return 0; } -void -session_send_session_evt_to_thread (u64 session_handle, - fifo_event_type_t evt_type, - u32 thread_index) +int +session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type) +{ + return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type); +} + +int +session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index, + session_evt_type_t evt_type) { - session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0); + return session_send_evt_to_thread (f, 0, thread_index, evt_type); +} + +int +session_send_ctrl_evt_to_thread (stream_session_t * s, + session_evt_type_t evt_type) +{ + /* only event supported for now is disconnect */ + ASSERT (evt_type == FIFO_EVENT_DISCONNECT); + return session_send_evt_to_thread (s, 0, s->thread_index, + FIFO_EVENT_DISCONNECT); } void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) { if (thread_index != vlib_get_thread_index ()) - session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp, - rpc_args); + session_send_evt_to_thread (fp, rpc_args, thread_index, FIFO_EVENT_RPC); else { void (*fnp) (void *) = fp; @@ -440,24 +481,15 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes) /** * Notify session peer that new data has been enqueued. * - * @param s Stream session for which the event is to be generated. - * @param block Flag to indicate if call should block if event queue is full. + * @param s Stream session for which the event is to be generated. + * @param lock Flag to indicate if call should lock message queue. * - * @return 0 on succes or negative number if failed to send notification. + * @return 0 on success or negative number if failed to send notification. */ -static int -session_enqueue_notify (stream_session_t * s, u8 block) +static inline int +session_enqueue_notify (stream_session_t * s, u8 lock) { application_t *app; - session_fifo_event_t evt; - svm_queue_t *q; - - if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED)) - { - /* Session is closed so app will never clean up. Flush rx fifo */ - svm_fifo_dequeue_drop_all (s->server_rx_fifo); - return 0; - } app = application_get_if_valid (s->app_index); if (PREDICT_FALSE (app == 0)) @@ -466,68 +498,32 @@ session_enqueue_notify (stream_session_t * s, u8 block) return 0; } - /* Built-in app? Hand event to the callback... */ - if (app->cb_fns.builtin_app_rx_callback) - return app->cb_fns.builtin_app_rx_callback (s); - - /* If no event, send one */ - if (svm_fifo_set_event (s->server_rx_fifo)) - { - /* Fabricate event */ - evt.fifo = s->server_rx_fifo; - evt.event_type = FIFO_EVENT_APP_RX; - - /* Add event to server's event queue */ - q = app->event_queue; - - /* Based on request block (or not) for lack of space */ - if (block || PREDICT_TRUE (q->cursize < q->maxsize)) - svm_queue_add (app->event_queue, (u8 *) & evt, - 0 /* do wait for mutex */ ); - else - { - clib_warning ("fifo full"); - return -1; - } - } - /* *INDENT-OFF* */ SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({ - ed->data[0] = evt.event_type; + ed->data[0] = FIFO_EVENT_APP_RX; ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo); })); /* *INDENT-ON* */ - return 0; + if (lock) + return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); + + return application_send_event (app, s, FIFO_EVENT_APP_RX); } int session_dequeue_notify (stream_session_t * s) { application_t *app; - svm_queue_t *q; app = application_get_if_valid (s->app_index); if (PREDICT_FALSE (!app)) return -1; - if (application_is_builtin (app)) - return 0; + if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL) + return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); - q = app->event_queue; - if (PREDICT_TRUE (q->cursize < q->maxsize)) - { - session_fifo_event_t evt = { - .event_type = FIFO_EVENT_APP_TX, - .fifo = s->server_tx_fifo - }; - svm_queue_add (app->event_queue, (u8 *) & evt, SVM_Q_WAIT); - } - else - { - return -1; - } - return 0; + return application_send_event (app, s, FIFO_EVENT_APP_TX); } /** @@ -542,16 +538,24 @@ int session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index) { session_manager_main_t *smm = &session_manager_main; - u32 *indices; + transport_service_type_t tp_service; + int i, errors = 0, lock; stream_session_t *s; - int i, errors = 0; + u32 *indices; indices = smm->session_to_enqueue[transport_proto][thread_index]; + tp_service = transport_protocol_service_type (transport_proto); + lock = tp_service == TRANSPORT_SERVICE_CL; for (i = 0; i < vec_len (indices); i++) { s = session_get_if_valid (indices[i], thread_index); - if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ )) + if (PREDICT_FALSE (!s)) + { + errors++; + continue; + } + if (PREDICT_FALSE (session_enqueue_notify (s, lock))) errors++; } @@ -1118,9 +1122,7 @@ stream_session_disconnect (stream_session_t * s) evt->event_type = FIFO_EVENT_DISCONNECT; } else - session_send_session_evt_to_thread (session_handle (s), - FIFO_EVENT_DISCONNECT, - s->thread_index); + session_send_ctrl_evt_to_thread (s, FIFO_EVENT_DISCONNECT); } /** @@ -1231,8 +1233,18 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm) for (i = 0; i < vec_len (smm->vpp_event_queues); i++) { - smm->vpp_event_queues[i] = svm_queue_init (evt_q_length, evt_size, - vpp_pid, 0); + svm_msg_q_cfg_t _cfg, *cfg = &_cfg; + u32 notif_q_size = clib_max (16, evt_q_length >> 4); + svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = { + {evt_q_length, evt_size, 0} + , + {notif_q_size, 256, 0} + }; + cfg->consumer_pid = 0; + cfg->n_rings = 2; + cfg->q_nitems = evt_q_length; + cfg->ring_cfgs = rc; + smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg); } if (smm->evt_qs_use_memfd_seg) diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index fe3477b63e3..879b3823e5d 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -38,10 +38,10 @@ typedef enum FIFO_EVENT_DISCONNECT, FIFO_EVENT_BUILTIN_RX, FIFO_EVENT_RPC, -} fifo_event_type_t; +} session_evt_type_t; static inline const char * -fifo_event_type_str (fifo_event_type_t et) +fifo_event_type_str (session_evt_type_t et) { switch (et) { @@ -62,6 +62,13 @@ fifo_event_type_str (fifo_event_type_t et) } } +typedef enum +{ + SESSION_MQ_IO_EVT_RING, + SESSION_MQ_CTRL_EVT_RING, + SESSION_MQ_N_RINGS +} session_mq_rings_e; + #define foreach_session_input_error \ _(NO_SESSION, "No session drops") \ _(NO_LISTENER, "No listener for dst port drops") \ @@ -86,23 +93,30 @@ typedef struct { void *fp; void *arg; -} rpc_args_t; +} session_rpc_args_t; typedef u64 session_handle_t; /* *INDENT-OFF* */ -typedef CLIB_PACKED (struct { +typedef struct +{ + u8 event_type; + u8 postponed; union + { + svm_fifo_t *fifo; + session_handle_t session_handle; + session_rpc_args_t rpc_args; + struct { - svm_fifo_t * fifo; - session_handle_t session_handle; - rpc_args_t rpc_args; + u8 data[0]; }; - u8 event_type; - u8 postponed; -}) session_fifo_event_t; + }; +} __clib_packed session_fifo_event_t; /* *INDENT-ON* */ +#define SESSION_MSG_NULL { } + typedef struct session_dgram_pre_hdr_ { u32 data_length; @@ -193,7 +207,7 @@ struct _session_manager_main session_tx_context_t *ctx; /** vpp fifo event queue */ - svm_queue_t **vpp_event_queues; + svm_msg_q_t **vpp_event_queues; /** Event queues memfd segment initialized only if so configured */ ssvm_private_t evt_qs_segment; @@ -533,9 +547,12 @@ int stream_session_stop_listen (stream_session_t * s); void stream_session_disconnect (stream_session_t * s); void stream_session_disconnect_transport (stream_session_t * s); void stream_session_cleanup (stream_session_t * s); -void session_send_session_evt_to_thread (u64 session_handle, - fifo_event_type_t evt_type, - u32 thread_index); +int session_send_io_evt_to_thread (svm_fifo_t * f, + session_evt_type_t evt_type); +int session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index, + session_evt_type_t evt_type); +void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, + void *rpc_args); ssvm_private_t *session_manager_get_evt_q_segment (void); u8 *format_stream_session (u8 * s, va_list * args); @@ -549,7 +566,7 @@ void session_register_transport (transport_proto_t transport_proto, clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en); -always_inline svm_queue_t * +always_inline svm_msg_q_t * session_manager_get_vpp_event_queue (u32 thread_index) { return session_manager_main.vpp_event_queues[thread_index]; diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index 1a41dbd9e08..f9fddea6148 100755 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -155,7 +155,7 @@ send_session_accept_callback (stream_session_t * s) vl_api_registration_t *reg; transport_connection_t *tc; stream_session_t *listener; - svm_queue_t *vpp_queue; + svm_msg_q_t *vpp_queue; reg = vl_mem_api_client_index_to_registration (server->api_client_index); if (!reg) @@ -300,7 +300,7 @@ send_session_connected_callback (u32 app_index, u32 api_context, vl_api_connect_session_reply_t *mp; transport_connection_t *tc; vl_api_registration_t *reg; - svm_queue_t *vpp_queue; + svm_msg_q_t *vpp_queue; application_t *app; app = application_get (app_index); @@ -485,7 +485,7 @@ vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp) vl_api_bind_uri_reply_t *rmp; stream_session_t *s; application_t *app = 0; - svm_queue_t *vpp_evt_q; + svm_msg_q_t *vpp_evt_q; int rv; if (session_manager_is_enabled () == 0) @@ -759,7 +759,7 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp) stream_session_t *s; transport_connection_t *tc = 0; ip46_address_t *ip46; - svm_queue_t *vpp_evt_q; + svm_msg_q_t *vpp_evt_q; if (session_manager_is_enabled () == 0) { diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 85fd28db7a9..350282bd902 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -575,15 +575,14 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame) { session_manager_main_t *smm = vnet_get_session_manager_main (); + u32 thread_index = vm->thread_index, n_to_dequeue, n_events; session_fifo_event_t *pending_events, *e; session_fifo_event_t *fifo_events; - u32 n_to_dequeue, n_events; - svm_queue_t *q; - application_t *app; - int n_tx_packets = 0; - u32 thread_index = vm->thread_index; - int i, rv; + svm_msg_q_msg_t _msg, *msg = &_msg; f64 now = vlib_time_now (vm); + int n_tx_packets = 0, i, rv; + application_t *app; + svm_msg_q_t *mq; void (*fp) (void *); SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index); @@ -594,16 +593,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, transport_update_time (now, thread_index); /* - * Get vpp queue events + * Get vpp queue events that we can dequeue without blocking */ - q = smm->vpp_event_queues[thread_index]; - if (PREDICT_FALSE (q == 0)) - return 0; - + mq = smm->vpp_event_queues[thread_index]; fifo_events = smm->free_event_vector[thread_index]; - - /* min number of events we can dequeue without blocking */ - n_to_dequeue = q->cursize; + n_to_dequeue = svm_msg_q_size (mq); pending_events = smm->pending_event_vector[thread_index]; if (!n_to_dequeue && !vec_len (pending_events) @@ -624,21 +618,19 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, } /* See you in the next life, don't be late - * XXX: we may need priorities here - */ - if (pthread_mutex_trylock (&q->mutex)) + * XXX: we may need priorities here */ + if (svm_msg_q_try_lock (mq)) return 0; for (i = 0; i < n_to_dequeue; i++) { vec_add2 (fifo_events, e, 1); - svm_queue_sub_raw (q, (u8 *) e); + svm_msg_q_sub_w_lock (mq, msg); + clib_memcpy (e, svm_msg_q_msg_data (mq, msg), sizeof (*e)); + svm_msg_q_free_msg (mq, msg); } - /* The other side of the connection is not polling */ - if (q->cursize < (q->maxsize / 8)) - (void) pthread_cond_broadcast (&q->condvar); - pthread_mutex_unlock (&q->mutex); + svm_msg_q_unlock (mq); vec_append (fifo_events, pending_events); vec_append (fifo_events, smm->pending_disconnects[thread_index]); @@ -760,19 +752,20 @@ dump_thread_0_event_queue (void) vlib_main_t *vm = &vlib_global_main; u32 my_thread_index = vm->thread_index; session_fifo_event_t _e, *e = &_e; + svm_msg_q_ring_t *ring; stream_session_t *s0; + svm_msg_q_msg_t *msg; + svm_msg_q_t *mq; int i, index; - i8 *headp; - - svm_queue_t *q; - q = smm->vpp_event_queues[my_thread_index]; - index = q->head; + mq = smm->vpp_event_queues[my_thread_index]; + index = mq->q->head; - for (i = 0; i < q->cursize; i++) + for (i = 0; i < mq->q->cursize; i++) { - headp = (i8 *) (&q->data[0] + q->elsize * index); - clib_memcpy (e, headp, q->elsize); + msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index); + ring = svm_msg_q_ring (mq, msg->ring_index); + clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize); switch (e->event_type) { @@ -805,7 +798,7 @@ dump_thread_0_event_queue (void) index++; - if (index == q->maxsize) + if (index == mq->q->maxsize) index = 0; } } @@ -844,10 +837,11 @@ u8 session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e) { session_manager_main_t *smm = vnet_get_session_manager_main (); - svm_queue_t *q; + svm_msg_q_t *mq; session_fifo_event_t *pending_event_vector, *evt; int i, index, found = 0; - i8 *headp; + svm_msg_q_msg_t *msg; + svm_msg_q_ring_t *ring; u8 thread_index; ASSERT (e); @@ -855,16 +849,17 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e) /* * Search evt queue */ - q = smm->vpp_event_queues[thread_index]; - index = q->head; - for (i = 0; i < q->cursize; i++) + mq = smm->vpp_event_queues[thread_index]; + index = mq->q->head; + for (i = 0; i < mq->q->cursize; i++) { - headp = (i8 *) (&q->data[0] + q->elsize * index); - clib_memcpy (e, headp, q->elsize); + msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index); + ring = svm_msg_q_ring (mq, msg->ring_index); + clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize); found = session_node_cmp_event (e, f); if (found) return 1; - if (++index == q->maxsize) + if (++index == mq->q->maxsize) index = 0; } /* diff --git a/src/vnet/tls/tls.c b/src/vnet/tls/tls.c index 492b3cc7a2a..cea449163f8 100644 --- a/src/vnet/tls/tls.c +++ b/src/vnet/tls/tls.c @@ -41,63 +41,15 @@ tls_get_available_engine (void) int tls_add_vpp_q_evt (svm_fifo_t * f, u8 evt_type) { - session_fifo_event_t evt; - svm_queue_t *q; - if (svm_fifo_set_event (f)) - { - evt.fifo = f; - evt.event_type = evt_type; - - q = session_manager_get_vpp_event_queue (f->master_thread_index); - if (PREDICT_TRUE (q->cursize < q->maxsize)) - { - svm_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ ); - } - else - { - clib_warning ("vpp's evt q full"); - return -1; - } - } + session_send_io_evt_to_thread (f, evt_type); return 0; } static inline int tls_add_app_q_evt (application_t * app, stream_session_t * app_session) { - session_fifo_event_t evt; - svm_queue_t *q; - - if (PREDICT_FALSE (app_session->session_state == SESSION_STATE_CLOSED)) - { - /* Session is closed so app will never clean up. Flush rx fifo */ - u32 to_dequeue = svm_fifo_max_dequeue (app_session->server_rx_fifo); - if (to_dequeue) - svm_fifo_dequeue_drop (app_session->server_rx_fifo, to_dequeue); - return 0; - } - - if (app->cb_fns.builtin_app_rx_callback) - return app->cb_fns.builtin_app_rx_callback (app_session); - - if (svm_fifo_set_event (app_session->server_rx_fifo)) - { - evt.fifo = app_session->server_rx_fifo; - evt.event_type = FIFO_EVENT_APP_RX; - q = app->event_queue; - - if (PREDICT_TRUE (q->cursize < q->maxsize)) - { - svm_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ ); - } - else - { - clib_warning ("app evt q full"); - return -1; - } - } - return 0; + return application_send_event (app, app_session, FIFO_EVENT_APP_RX); } u32 |