diff options
author | Florin Coras <fcoras@cisco.com> | 2022-12-22 15:03:44 -0800 |
---|---|---|
committer | Dave Barach <vpp@barachs.net> | 2023-08-09 18:45:26 +0000 |
commit | 0242d30fc717aeacb758281dad8e5b2e56bf6709 (patch) | |
tree | eb7addb00bbe78061fa58442a6e9bdbd7f3e181c /src/vnet/session/application_worker.c | |
parent | 6d733a93b2eb9c16196ee17d5cdc77db21589571 (diff) |
session: async rx event notifications
Move from synchronous flushing of io and ctrl events from transports to
applications to an async model via a new session_input input node that
runs in interrupt mode. Events are coalesced per application worker.
On the one hand, this helps by minimizing message queue locking churn.
And on the other, it opens the possibility for further optimizations of
event message generation, obviates need for rx rescheduling rpcs and is
a first step towards a fully async data/io rx path.
Type: improvement
Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Id6bebcb65fc9feef8aa02ddf1af6d9ba6f6745ce
Diffstat (limited to 'src/vnet/session/application_worker.c')
-rw-r--r-- | src/vnet/session/application_worker.c | 406 |
1 files changed, 138 insertions, 268 deletions
diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c index c3941d1fd7b..127963a1eda 100644 --- a/src/vnet/session/application_worker.c +++ b/src/vnet/session/application_worker.c @@ -26,6 +26,7 @@ app_worker_t * app_worker_alloc (application_t * app) { app_worker_t *app_wrk; + pool_get (app_workers, app_wrk); clib_memset (app_wrk, 0, sizeof (*app_wrk)); app_wrk->wrk_index = app_wrk - app_workers; @@ -33,7 +34,8 @@ app_worker_alloc (application_t * app) app_wrk->wrk_map_index = ~0; app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; clib_spinlock_init (&app_wrk->detached_seg_managers_lock); - clib_spinlock_init (&app_wrk->postponed_mq_msgs_lock); + vec_validate (app_wrk->wrk_evts, vlib_num_workers ()); + vec_validate (app_wrk->wrk_mq_congested, vlib_num_workers ()); APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index); return app_wrk; } @@ -65,17 +67,25 @@ app_worker_free (app_worker_t * app_wrk) int i; /* + * Cleanup vpp wrk events + */ + app_worker_del_all_events (app_wrk); + for (i = 0; i < vec_len (app_wrk->wrk_evts); i++) + clib_fifo_free (app_wrk->wrk_evts[i]); + + vec_free (app_wrk->wrk_evts); + vec_free (app_wrk->wrk_mq_congested); + + /* * Listener cleanup */ - /* *INDENT-OFF* */ hash_foreach (handle, sm_index, app_wrk->listeners_table, ({ ls = listen_session_get_from_handle (handle); vec_add1 (handles, app_listen_session_handle (ls)); vec_add1 (sm_indices, sm_index); sm = segment_manager_get (sm_index); })); - /* *INDENT-ON* */ for (i = 0; i < vec_len (handles); i++) { @@ -127,7 +137,6 @@ app_worker_free (app_worker_t * app_wrk) } vec_free (app_wrk->detached_seg_managers); clib_spinlock_free (&app_wrk->detached_seg_managers_lock); - clib_spinlock_free (&app_wrk->postponed_mq_msgs_lock); if (CLIB_DEBUG) clib_memset (app_wrk, 0xfe, sizeof (*app_wrk)); @@ -339,7 +348,7 @@ app_worker_init_accepted (session_t * s) listener = listen_session_get_from_handle (s->listener_handle); app_wrk = application_listener_select_worker (listener); - if (PREDICT_FALSE (app_wrk->mq_congested)) + if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk))) return -1; s->app_wrk_index = app_wrk->wrk_index; @@ -355,10 +364,39 @@ app_worker_init_accepted (session_t * s) } int +app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh, + u32 opaque, session_error_t err) +{ + session_event_t evt; + + evt.event_type = SESSION_CTRL_EVT_BOUND; + evt.session_handle = alsh; + evt.as_u64[1] = (u64) opaque << 32 | err; + + app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt); + + return 0; +} + +int +app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh, + u32 opaque, session_error_t err) +{ + session_event_t evt = {}; + + evt.event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY; + evt.session_handle = sh; + evt.as_u64[1] = (u64) opaque << 32 | (u32) err; + + app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt); + return 0; +} + +int app_worker_accept_notify (app_worker_t * app_wrk, session_t * s) { - application_t *app = application_get (app_wrk->app_index); - return app->cb_fns.session_accept_callback (s); + app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_ACCEPTED); + return 0; } int @@ -382,9 +420,16 @@ int app_worker_connect_notify (app_worker_t * app_wrk, session_t * s, session_error_t err, u32 opaque) { - application_t *app = application_get (app_wrk->app_index); - return app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, - s, err); + session_event_t evt = {}; + u32 thread_index; + + evt.event_type = SESSION_CTRL_EVT_CONNECTED; + evt.session_index = s ? s->session_index : ~0; + evt.as_u64[1] = (u64) opaque << 32 | (u32) err; + thread_index = s ? s->thread_index : vlib_get_thread_index (); + + app_worker_add_event_custom (app_wrk, thread_index, &evt); + return 0; } int @@ -402,36 +447,28 @@ app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh) int app_worker_del_half_open (app_worker_t *app_wrk, session_t *s) { - application_t *app = application_get (app_wrk->app_index); - ASSERT (session_vlib_thread_is_cl_thread ()); - pool_put_index (app_wrk->half_open_table, s->ho_index); - if (app->cb_fns.half_open_cleanup_callback) - app->cb_fns.half_open_cleanup_callback (s); + app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_HALF_CLEANUP); return 0; } int app_worker_close_notify (app_worker_t * app_wrk, session_t * s) { - application_t *app = application_get (app_wrk->app_index); - app->cb_fns.session_disconnect_callback (s); + app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_DISCONNECTED); return 0; } int app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s) { - application_t *app = application_get (app_wrk->app_index); - if (app->cb_fns.session_transport_closed_callback) - app->cb_fns.session_transport_closed_callback (s); + app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_TRANSPORT_CLOSED); return 0; } int app_worker_reset_notify (app_worker_t * app_wrk, session_t * s) { - application_t *app = application_get (app_wrk->app_index); - app->cb_fns.session_reset_callback (s); + app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_RESET); return 0; } @@ -439,29 +476,37 @@ int app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s, session_cleanup_ntf_t ntf) { - application_t *app = application_get (app_wrk->app_index); - if (app->cb_fns.session_cleanup_callback) - app->cb_fns.session_cleanup_callback (s, ntf); + session_event_t evt; + + evt.event_type = SESSION_CTRL_EVT_CLEANUP; + evt.as_u64[0] = (u64) ntf << 32 | s->session_index; + evt.as_u64[1] = pointer_to_uword (session_cleanup); + + app_worker_add_event_custom (app_wrk, s->thread_index, &evt); + return 0; } int -app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s) +app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s, + session_cleanup_ntf_t ntf, + void (*cleanup_cb) (session_t *s)) { - application_t *app = application_get (app_wrk->app_index); - app->cb_fns.builtin_app_rx_callback (s); + session_event_t evt; + + evt.event_type = SESSION_CTRL_EVT_CLEANUP; + evt.as_u64[0] = (u64) ntf << 32 | s->session_index; + evt.as_u64[1] = pointer_to_uword (cleanup_cb); + + app_worker_add_event_custom (app_wrk, s->thread_index, &evt); + return 0; } int -app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s) +app_worker_rx_notify (app_worker_t *app_wrk, session_t *s) { - application_t *app = application_get (app_wrk->app_index); - - if (!app->cb_fns.builtin_app_tx_callback) - return 0; - - app->cb_fns.builtin_app_tx_callback (s); + app_worker_add_event (app_wrk, s, SESSION_IO_EVT_RX); return 0; } @@ -469,8 +514,13 @@ int app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s, session_handle_t new_sh) { - application_t *app = application_get (app_wrk->app_index); - app->cb_fns.session_migrate_callback (s, new_sh); + session_event_t evt; + + evt.event_type = SESSION_CTRL_EVT_MIGRATED; + evt.session_index = s->session_index; + evt.as_u64[1] = new_sh; + + app_worker_add_event_custom (app_wrk, s->thread_index, &evt); return 0; } @@ -514,7 +564,7 @@ int app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep, session_handle_t *rsh) { - if (PREDICT_FALSE (app_wrk->mq_congested)) + if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk))) return SESSION_E_REFUSED; sep->app_wrk_index = app_wrk->wrk_index; @@ -601,24 +651,27 @@ app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto, int app_worker_add_segment_notify (app_worker_t * app_wrk, u64 segment_handle) { - application_t *app = application_get (app_wrk->app_index); + session_event_t evt; + + evt.event_type = SESSION_CTRL_EVT_APP_ADD_SEGMENT; + evt.as_u64[1] = segment_handle; - return app->cb_fns.add_segment_callback (app_wrk->wrk_index, - segment_handle); + app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt); + + return 0; } int app_worker_del_segment_notify (app_worker_t * app_wrk, u64 segment_handle) { - application_t *app = application_get (app_wrk->app_index); - return app->cb_fns.del_segment_callback (app_wrk->wrk_index, - segment_handle); -} + session_event_t evt; -static inline u8 -app_worker_application_is_builtin (app_worker_t * app_wrk) -{ - return app_wrk->app_is_builtin; + evt.event_type = SESSION_CTRL_EVT_APP_DEL_SEGMENT; + evt.as_u64[1] = segment_handle; + + app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt); + + return 0; } static int @@ -677,126 +730,38 @@ app_wrk_send_fd (app_worker_t *app_wrk, int fd) return 0; } -static int -mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring, - svm_msg_q_msg_t *msg) -{ - int rv, n_try = 0; - - while (n_try < 75) - { - rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg); - if (!rv) - return 0; - /* - * Break the loop if mq is full, usually this is because the - * app has crashed or is hanging on somewhere. - */ - if (rv != -1) - break; - n_try += 1; - usleep (1); - } - - return -1; -} - -typedef union app_wrk_mq_rpc_args_ -{ - struct - { - u32 thread_index; - u32 app_wrk_index; - }; - uword as_uword; -} app_wrk_mq_rpc_ags_t; - -static int -app_wrk_handle_mq_postponed_msgs (void *arg) +void +app_worker_add_event (app_worker_t *app_wrk, session_t *s, + session_evt_type_t evt_type) { - svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg; - app_wrk_postponed_msg_t *pm; - app_wrk_mq_rpc_ags_t args; - u32 max_msg, n_msg = 0; - app_worker_t *app_wrk; session_event_t *evt; - svm_msg_q_t *mq; - - args.as_uword = pointer_to_uword (arg); - app_wrk = app_worker_get_if_valid (args.app_wrk_index); - if (!app_wrk) - return 0; - - mq = app_wrk->event_queue; - - clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock); - - max_msg = clib_min (32, clib_fifo_elts (app_wrk->postponed_mq_msgs)); - - while (n_msg < max_msg) - { - pm = clib_fifo_head (app_wrk->postponed_mq_msgs); - if (mq_try_lock_and_alloc_msg (mq, pm->ring, mq_msg)) - break; - evt = svm_msg_q_msg_data (mq, mq_msg); - clib_memset (evt, 0, sizeof (*evt)); - evt->event_type = pm->event_type; - clib_memcpy_fast (evt->data, pm->data, pm->len); - - if (pm->fd != -1) - app_wrk_send_fd (app_wrk, pm->fd); - - svm_msg_q_add_and_unlock (mq, mq_msg); - - clib_fifo_advance_head (app_wrk->postponed_mq_msgs, 1); - n_msg += 1; - } + ASSERT (s->thread_index == vlib_get_thread_index ()); + clib_fifo_add2 (app_wrk->wrk_evts[s->thread_index], evt); + evt->session_index = s->session_index; + evt->event_type = evt_type; + evt->postponed = 0; - if (!clib_fifo_elts (app_wrk->postponed_mq_msgs)) + /* First event for this app_wrk. Schedule it for handling in session input */ + if (clib_fifo_elts (app_wrk->wrk_evts[s->thread_index]) == 1) { - app_wrk->mq_congested = 0; + session_worker_t *wrk = session_main_get_worker (s->thread_index); + session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index); } - else - { - session_send_rpc_evt_to_thread_force ( - args.thread_index, app_wrk_handle_mq_postponed_msgs, - uword_to_pointer (args.as_uword, void *)); - } - - clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock); - - return 0; } -static void -app_wrk_add_mq_postponed_msg (app_worker_t *app_wrk, session_mq_rings_e ring, - u8 evt_type, void *msg, u32 msg_len, int fd) +void +app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index, + session_event_t *evt) { - app_wrk_postponed_msg_t *pm; - - clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock); + clib_fifo_add1 (app_wrk->wrk_evts[thread_index], *evt); - app_wrk->mq_congested = 1; - - clib_fifo_add2 (app_wrk->postponed_mq_msgs, pm); - clib_memcpy_fast (pm->data, msg, msg_len); - pm->event_type = evt_type; - pm->ring = ring; - pm->len = msg_len; - pm->fd = fd; - - if (clib_fifo_elts (app_wrk->postponed_mq_msgs) == 1) + /* First event for this app_wrk. Schedule it for handling in session input */ + if (clib_fifo_elts (app_wrk->wrk_evts[thread_index]) == 1) { - app_wrk_mq_rpc_ags_t args = { .thread_index = vlib_get_thread_index (), - .app_wrk_index = app_wrk->wrk_index }; - - session_send_rpc_evt_to_thread_force ( - args.thread_index, app_wrk_handle_mq_postponed_msgs, - uword_to_pointer (args.as_uword, void *)); + session_worker_t *wrk = session_main_get_worker (thread_index); + session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index); } - - clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock); } always_inline void @@ -806,14 +771,9 @@ app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg, svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg; svm_msg_q_t *mq = app_wrk->event_queue; session_event_t *evt; - int rv; - - if (PREDICT_FALSE (app_wrk->mq_congested)) - goto handle_congestion; - rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_CTRL_EVT_RING, mq_msg); - if (PREDICT_FALSE (rv)) - goto handle_congestion; + ASSERT (!svm_msg_q_or_ring_is_full (mq, SESSION_MQ_CTRL_EVT_RING)); + *mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_CTRL_EVT_RING); evt = svm_msg_q_msg_data (mq, mq_msg); clib_memset (evt, 0, sizeof (*evt)); @@ -823,14 +783,7 @@ app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg, if (fd != -1) app_wrk_send_fd (app_wrk, fd); - svm_msg_q_add_and_unlock (mq, mq_msg); - - return; - -handle_congestion: - - app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_CTRL_EVT_RING, evt_type, - msg, msg_len, fd); + svm_msg_q_add_raw (mq, mq_msg); } void @@ -847,109 +800,26 @@ app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg, app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1); } -static inline int -app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s) +u8 +app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index) { - svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg; - session_event_t *evt; - svm_msg_q_t *mq; - u32 app_session; - int rv; - - if (app_worker_application_is_builtin (app_wrk)) - return app_worker_builtin_rx (app_wrk, s); - - if (svm_fifo_has_event (s->rx_fifo)) - return 0; - - app_session = s->rx_fifo->shr->client_session_index; - mq = app_wrk->event_queue; - - if (PREDICT_FALSE (app_wrk->mq_congested)) - goto handle_congestion; - - rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg); - - if (PREDICT_FALSE (rv)) - goto handle_congestion; - - evt = svm_msg_q_msg_data (mq, mq_msg); - evt->event_type = SESSION_IO_EVT_RX; - evt->session_index = app_session; - - (void) svm_fifo_set_event (s->rx_fifo); - - svm_msg_q_add_and_unlock (mq, mq_msg); - - return 0; - -handle_congestion: - - app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING, - SESSION_IO_EVT_RX, &app_session, - sizeof (app_session), -1); - return -1; + return app_wrk->wrk_mq_congested[thread_index] > 0; } -static inline int -app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s) +void +app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index) { - svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg; - session_event_t *evt; - svm_msg_q_t *mq; - u32 app_session; - int rv; - - if (app_worker_application_is_builtin (app_wrk)) - return app_worker_builtin_tx (app_wrk, s); - - app_session = s->tx_fifo->shr->client_session_index; - mq = app_wrk->event_queue; - - if (PREDICT_FALSE (app_wrk->mq_congested)) - goto handle_congestion; - - rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg); - - if (PREDICT_FALSE (rv)) - goto handle_congestion; - - evt = svm_msg_q_msg_data (mq, mq_msg); - evt->event_type = SESSION_IO_EVT_TX; - evt->session_index = app_session; - - svm_msg_q_add_and_unlock (mq, mq_msg); - - return 0; - -handle_congestion: - - app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING, - SESSION_IO_EVT_TX, &app_session, - sizeof (app_session), -1); - return -1; + clib_atomic_fetch_add_relax (&app_wrk->mq_congested, 1); + ASSERT (thread_index == vlib_get_thread_index ()); + app_wrk->wrk_mq_congested[thread_index] = 1; } -/* *INDENT-OFF* */ -typedef int (app_send_evt_handler_fn) (app_worker_t *app, - session_t *s); -static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = { - app_send_io_evt_rx, - app_send_io_evt_tx, -}; -/* *INDENT-ON* */ - -/** - * Send event to application - * - * Logic from queue perspective is blocking. However, if queue is full, - * we return. - */ -int -app_worker_lock_and_send_event (app_worker_t * app, session_t * s, - u8 evt_type) +void +app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk, u32 thread_index) { - return app_send_evt_handler_fns[evt_type] (app, s); + clib_atomic_fetch_sub_relax (&app_wrk->mq_congested, 1); + ASSERT (thread_index == vlib_get_thread_index ()); + app_wrk->wrk_mq_congested[thread_index] = 0; } u8 * |