diff options
author | Florin Coras <fcoras@cisco.com> | 2021-01-25 20:31:27 -0800 |
---|---|---|
committer | Dave Barach <openvpp@barachs.net> | 2021-02-05 17:28:34 +0000 |
commit | 5398dfb2592d525018997a991a4f7bfde515adc4 (patch) | |
tree | d986e2bac410e8b630a0d5907b729550a28838da /src/vcl | |
parent | 2b97f597c6705809201ce6a6846d46c47c0933ba (diff) |
session svm: non blocking mq
Avoid synchronizing producers and the consumer. Instead, only use mutex
or spinlock (if eventfds are configured) to synchronize producers.
Type: improvement
Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Ie2aafbdc2e07fced5d5e46ee2df6b30a186faa2f
Diffstat (limited to 'src/vcl')
-rw-r--r-- | src/vcl/vppcom.c | 136 |
1 files changed, 33 insertions, 103 deletions
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index cf8bb72d451..21892438c37 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -25,15 +25,14 @@ __thread uword __vcl_worker_index = ~0; static inline int vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg) { - svm_msg_q_msg_t *msg; - u32 n_msgs; - int i; + u32 n_msgs = 0, sz, len; - n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg); - for (i = 0; i < n_msgs; i++) + while ((sz = svm_msg_q_size (mq))) { - vec_add2 (wrk->mq_msg_vector, msg, 1); - svm_msg_q_sub_w_lock (mq, msg); + len = vec_len (wrk->mq_msg_vector); + vec_validate (wrk->mq_msg_vector, len + sz - 1); + svm_msg_q_sub_raw_batch (mq, wrk->mq_msg_vector + len, sz); + n_msgs += sz; } return n_msgs; } @@ -1101,18 +1100,15 @@ vcl_handle_pending_wrk_updates (vcl_worker_t * wrk) } void -vcl_flush_mq_events (void) +vcl_worker_flush_mq_events (vcl_worker_t *wrk) { - vcl_worker_t *wrk = vcl_worker_get_current (); svm_msg_q_msg_t *msg; session_event_t *e; svm_msg_q_t *mq; int i; mq = wrk->app_event_queue; - svm_msg_q_lock (mq); vcl_mq_dequeue_batch (wrk, mq, ~0); - svm_msg_q_unlock (mq); for (i = 0; i < vec_len (wrk->mq_msg_vector); i++) { @@ -1125,6 +1121,12 @@ vcl_flush_mq_events (void) vcl_handle_pending_wrk_updates (wrk); } +void +vcl_flush_mq_events (void) +{ + vcl_worker_flush_mq_events (vcl_worker_get_current ()); +} + static int vppcom_session_unbind (u32 session_handle) { @@ -1539,11 +1541,11 @@ vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep, vcl_session_t *listen_session = 0; vcl_session_t *client_session = 0; vcl_session_msg_t *evt; - svm_msg_q_msg_t msg; - session_event_t *e; u8 is_nonblocking; int rv; +again: + listen_session = vcl_session_get_w_handle (wrk, listen_session_handle); if (!listen_session) return VPPCOM_EBADFD; @@ -1567,19 +1569,9 @@ vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep, if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking) return VPPCOM_EAGAIN; - if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0)) - return VPPCOM_EAGAIN; - - e = svm_msg_q_msg_data (wrk->app_event_queue, &msg); - if (e->event_type != SESSION_CTRL_EVT_ACCEPTED) - { - vcl_handle_mq_event (wrk, e); - svm_msg_q_free_msg (wrk->app_event_queue, &msg); - continue; - } - clib_memcpy_fast (&accepted_msg, e->data, sizeof (accepted_msg)); - svm_msg_q_free_msg (wrk->app_event_queue, &msg); - break; + svm_msg_q_wait (wrk->app_event_queue, SVM_MQ_WAIT_EMPTY); + vcl_worker_flush_mq_events (wrk); + goto again; } handle: @@ -1785,12 +1777,6 @@ vppcom_session_stream_connect (uint32_t session_handle, return rv; } -static u8 -vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct) -{ - return (e->event_type == SESSION_IO_EVT_RX && e->session_index == sid); -} - static inline int vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, u8 peek) @@ -1799,7 +1785,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, int rv, n_read = 0, is_nonblocking; vcl_session_t *s = 0; svm_fifo_t *rx_fifo; - svm_msg_q_msg_t msg; session_event_t *e; svm_msg_q_t *mq; u8 is_ct; @@ -1844,16 +1829,9 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, if (is_ct) svm_fifo_unset_event (s->rx_fifo); svm_fifo_unset_event (rx_fifo); - svm_msg_q_lock (mq); - if (svm_msg_q_is_empty (mq)) - svm_msg_q_wait (mq); - - svm_msg_q_sub_w_lock (mq, &msg); - e = svm_msg_q_msg_data (mq, &msg); - svm_msg_q_unlock (mq); - if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct)) - vcl_handle_mq_event (wrk, e); - svm_msg_q_free_msg (mq, &msg); + + svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY); + vcl_worker_flush_mq_events (wrk); } } @@ -1924,8 +1902,6 @@ vppcom_session_read_segments (uint32_t session_handle, int n_read = 0, is_nonblocking; vcl_session_t *s = 0; svm_fifo_t *rx_fifo; - svm_msg_q_msg_t msg; - session_event_t *e; svm_msg_q_t *mq; u8 is_ct; @@ -1959,16 +1935,9 @@ vppcom_session_read_segments (uint32_t session_handle, if (is_ct) svm_fifo_unset_event (s->rx_fifo); svm_fifo_unset_event (rx_fifo); - svm_msg_q_lock (mq); - if (svm_msg_q_is_empty (mq)) - svm_msg_q_wait (mq); - - svm_msg_q_sub_w_lock (mq, &msg); - e = svm_msg_q_msg_data (mq, &msg); - svm_msg_q_unlock (mq); - if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct)) - vcl_handle_mq_event (wrk, e); - svm_msg_q_free_msg (mq, &msg); + + svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY); + vcl_worker_flush_mq_events (wrk); } } @@ -2015,12 +1984,6 @@ vppcom_session_free_segments (uint32_t session_handle, uint32_t n_bytes) s->rx_bytes_pending -= n_bytes; } -static u8 -vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct) -{ - return (e->event_type == SESSION_IO_EVT_TX && e->session_index == sid); -} - always_inline u8 vcl_fifo_is_writeable (svm_fifo_t * f, u32 len, u8 is_dgram) { @@ -2037,9 +2000,7 @@ vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf, { int n_write, is_nonblocking; session_evt_type_t et; - svm_msg_q_msg_t msg; svm_fifo_t *tx_fifo; - session_event_t *e; svm_msg_q_t *mq; u8 is_ct; @@ -2077,17 +2038,9 @@ vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf, svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); if (vcl_session_is_closing (s)) return vcl_session_closing_error (s); - svm_msg_q_lock (mq); - if (svm_msg_q_is_empty (mq)) - svm_msg_q_wait (mq); - svm_msg_q_sub_w_lock (mq, &msg); - e = svm_msg_q_msg_data (mq, &msg); - svm_msg_q_unlock (mq); - - if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct)) - vcl_handle_mq_event (wrk, e); - svm_msg_q_free_msg (mq, &msg); + svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY); + vcl_worker_flush_mq_events (wrk); } } @@ -2290,35 +2243,22 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq, session_event_t *e; u32 i; - svm_msg_q_lock (mq); if (svm_msg_q_is_empty (mq)) { if (*bits_set) - { - svm_msg_q_unlock (mq); - return 0; - } + return 0; if (!time_to_wait) - { - svm_msg_q_unlock (mq); - return 0; - } + return 0; else if (time_to_wait < 0) - { - svm_msg_q_wait (mq); - } + svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY); else { if (svm_msg_q_timedwait (mq, time_to_wait)) - { - svm_msg_q_unlock (mq); - return 0; - } + return 0; } } vcl_mq_dequeue_batch (wrk, mq, ~0); - svm_msg_q_unlock (mq); for (i = 0; i < vec_len (wrk->mq_msg_vector); i++) { @@ -2939,30 +2879,20 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq, if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq)) goto handle_dequeued; - svm_msg_q_lock (mq); if (svm_msg_q_is_empty (mq)) { if (!wait_for_time) - { - svm_msg_q_unlock (mq); - return 0; - } + return 0; else if (wait_for_time < 0) - { - svm_msg_q_wait (mq); - } + svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY); else { if (svm_msg_q_timedwait (mq, wait_for_time / 1e3)) - { - svm_msg_q_unlock (mq); - return 0; - } + return 0; } } ASSERT (maxevents > *num_ev); vcl_mq_dequeue_batch (wrk, mq, ~0); - svm_msg_q_unlock (mq); handle_dequeued: for (i = 0; i < vec_len (wrk->mq_msg_vector); i++) |