diff options
Diffstat (limited to 'src/plugins')
-rw-r--r-- | src/plugins/hs_apps/echo_client.c | 6 | ||||
-rw-r--r-- | src/plugins/hs_apps/echo_server.c | 2 | ||||
-rw-r--r-- | src/plugins/http/http.c | 9 | ||||
-rw-r--r-- | src/plugins/quic/quic.c | 7 | ||||
-rw-r--r-- | src/plugins/srtp/srtp.c | 5 | ||||
-rw-r--r-- | src/plugins/unittest/session_test.c | 70 |
6 files changed, 82 insertions, 17 deletions
diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c index a7358a61952..d449045cf6a 100644 --- a/src/plugins/hs_apps/echo_client.c +++ b/src/plugins/hs_apps/echo_client.c @@ -706,10 +706,8 @@ ec_session_rx_callback (session_t *s) receive_data_chunk (wrk, es); if (svm_fifo_max_dequeue_cons (s->rx_fifo)) - { - if (svm_fifo_set_event (s->rx_fifo)) - session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX); - } + session_enqueue_notify (s); + return 0; } diff --git a/src/plugins/hs_apps/echo_server.c b/src/plugins/hs_apps/echo_server.c index 6a291163bd1..178e9eef4fb 100644 --- a/src/plugins/hs_apps/echo_server.c +++ b/src/plugins/hs_apps/echo_server.c @@ -228,6 +228,8 @@ echo_server_rx_callback (session_t * s) /* Program self-tap to retry */ if (svm_fifo_set_event (rx_fifo)) { + /* TODO should be session_enqueue_notify(s) but quic tests seem + * to fail if that's the case */ if (session_send_io_evt_to_thread (rx_fifo, SESSION_IO_EVT_BUILTIN_RX)) clib_warning ("failed to enqueue self-tap"); diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c index d25123a757d..503752c377e 100644 --- a/src/plugins/http/http.c +++ b/src/plugins/http/http.c @@ -503,7 +503,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp) hc->http_state = HTTP_STATE_WAIT_APP; app_wrk = app_worker_get_if_valid (as->app_wrk_index); - app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, as); return HTTP_SM_STOP; @@ -777,7 +777,7 @@ state_cln_wait_method (http_conn_t *hc, transport_send_params_t *sp) } app_wrk = app_worker_get_if_valid (as->app_wrk_index); - app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, as); return HTTP_SM_STOP; } @@ -808,7 +808,7 @@ cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as) app_wrk = app_worker_get_if_valid (as->app_wrk_index); ASSERT (app_wrk); - app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, as); return 1; } @@ -864,8 +864,9 @@ maybe_reschedule: if (hc->rx_buf_offset < vec_len (hc->rx_buf) || svm_fifo_max_dequeue_cons (ts->rx_fifo)) { + /* TODO is the flag really needed? */ if (svm_fifo_set_event (ts->rx_fifo)) - session_send_io_evt_to_thread (ts->rx_fifo, SESSION_IO_EVT_BUILTIN_RX); + session_enqueue_notify (ts); } return HTTP_SM_CONTINUE; } diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c index c3c4540353b..61380b8048c 100644 --- a/src/plugins/quic/quic.c +++ b/src/plugins/quic/quic.c @@ -830,7 +830,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, size_t len) { QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off); - u32 max_enq, rv; + u32 max_enq; quic_ctx_t *sctx; session_t *stream_session; app_worker_t *app_wrk; @@ -895,10 +895,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index); if (PREDICT_TRUE (app_wrk != 0)) { - rv = app_worker_lock_and_send_event (app_wrk, stream_session, - SESSION_IO_EVT_RX); - if (rv) - QUIC_ERR ("Failed to ping app for RX"); + app_worker_rx_notify (app_wrk, stream_session); } quic_ack_rx_data (stream_session); } diff --git a/src/plugins/srtp/srtp.c b/src/plugins/srtp/srtp.c index aacadcee265..8b7c5b6374c 100644 --- a/src/plugins/srtp/srtp.c +++ b/src/plugins/srtp/srtp.c @@ -309,8 +309,7 @@ done: int srtp_add_vpp_q_builtin_rx_evt (session_t *s) { - if (svm_fifo_set_event (s->rx_fifo)) - session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX); + session_enqueue_notify (s); return 0; } @@ -320,7 +319,7 @@ srtp_notify_app_enqueue (srtp_tc_t *ctx, session_t *app_session) app_worker_t *app_wrk; app_wrk = app_worker_get_if_valid (app_session->app_wrk_index); if (PREDICT_TRUE (app_wrk != 0)) - app_worker_lock_and_send_event (app_wrk, app_session, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, app_session); } static inline int diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c index c4e41c34dd0..70b3b32a2e4 100644 --- a/src/plugins/unittest/session_test.c +++ b/src/plugins/unittest/session_test.c @@ -1771,6 +1771,74 @@ wait_for_event (svm_msg_q_t * mq, int fd, int epfd, u8 use_eventfd) } } +/* Used to be part of application_worker.c prior to adding support for + * async rx + */ +static int +test_mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring, + svm_msg_q_msg_t *msg) +{ + int rv, n_try = 0; + + while (n_try < 75) + { + rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg); + if (!rv) + return 0; + /* + * Break the loop if mq is full, usually this is because the + * app has crashed or is hanging on somewhere. + */ + if (rv != -1) + break; + n_try += 1; + usleep (1); + } + + return -1; +} + +/* Used to be part of application_worker.c prior to adding support for + * async rx and was used for delivering io events over mq + * NB: removed handling of mq congestion + */ +static inline int +test_app_send_io_evt_rx (app_worker_t *app_wrk, session_t *s) +{ + svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg; + session_event_t *evt; + svm_msg_q_t *mq; + u32 app_session; + int rv; + + if (app_worker_application_is_builtin (app_wrk)) + return app_worker_rx_notify (app_wrk, s); + + if (svm_fifo_has_event (s->rx_fifo)) + return 0; + + app_session = s->rx_fifo->shr->client_session_index; + mq = app_wrk->event_queue; + + rv = test_mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg); + + if (PREDICT_FALSE (rv)) + { + clib_warning ("failed to alloc mq message"); + return -1; + } + + evt = svm_msg_q_msg_data (mq, mq_msg); + evt->event_type = SESSION_IO_EVT_RX; + evt->session_index = app_session; + + (void) svm_fifo_set_event (s->rx_fifo); + + svm_msg_q_add_and_unlock (mq, mq_msg); + + return 0; +} + static int session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input) { @@ -1885,7 +1953,7 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input) { while (svm_fifo_has_event (rx_fifo)) ; - app_worker_lock_and_send_event (app_wrk, &s, SESSION_IO_EVT_RX); + test_app_send_io_evt_rx (app_wrk, &s); } } |