From af97221c6ae8f08716b3de212fa111b13282f241 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Wed, 5 May 2021 09:54:00 -0700 Subject: session: rpc for connects to main Type: improvement Signed-off-by: Florin Coras Change-Id: Ifa47e1500e5cfb3c717f87b1d21131b9531c9005 --- src/vnet/session/session_node.c | 86 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 5 deletions(-) (limited to 'src/vnet/session/session_node.c') diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index b68ff53dd7a..bd60bd7056b 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -119,16 +119,13 @@ session_mq_listen_uri_handler (void *data) } static void -session_mq_connect_handler (void *data) +session_mq_connect_one (session_connect_msg_t *mp) { - session_connect_msg_t *mp = (session_connect_msg_t *) data; vnet_connect_args_t _a, *a = &_a; app_worker_t *app_wrk; application_t *app; int rv; - app_check_thread_and_barrier (session_mq_connect_handler, mp); - app = application_lookup (mp->client_index); if (!app) return; @@ -167,6 +164,85 @@ session_mq_connect_handler (void *data) session_mq_free_ext_config (app, mp->ext_config); } +static void +session_mq_handle_connects_rpc (void *arg) +{ + u32 max_connects = 32, n_connects = 0; + vlib_main_t *vm = vlib_get_main (); + session_evt_elt_t *he, *elt, *next; + session_worker_t *fwrk; + u8 need_reschedule = 1; + + ASSERT (vlib_get_thread_index () == 0); + + /* Pending connects on linked list pertaining to first worker */ + fwrk = session_main_get_worker (1); + + vlib_worker_thread_barrier_sync (vm); + + he = pool_elt_at_index (fwrk->event_elts, fwrk->pending_connects); + elt = clib_llist_next (fwrk->event_elts, evt_list, he); + + /* Avoid holding the barrier for too long */ + while (n_connects < max_connects && elt != he) + { + next = clib_llist_next (fwrk->event_elts, evt_list, elt); + clib_llist_remove (fwrk->event_elts, evt_list, elt); + session_mq_connect_one (session_evt_ctrl_data (fwrk, elt)); + session_evt_elt_free (fwrk, elt); + elt = next; + n_connects += 1; + } + + if (clib_llist_is_empty (fwrk->event_elts, evt_list, he)) + { + fwrk->pending_connects_ntf = 0; + need_reschedule = 0; + } + + vlib_worker_thread_barrier_release (vm); + + if (need_reschedule) + { + vlib_node_set_interrupt_pending (vm, session_queue_node.index); + elt = session_evt_alloc_ctrl (session_main_get_worker (0)); + elt->evt.event_type = SESSION_CTRL_EVT_RPC; + elt->evt.rpc_args.fp = session_mq_handle_connects_rpc; + } +} + +static void +session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt) +{ + u32 thread_index = wrk - session_main.wrk; + session_evt_elt_t *he; + + /* No workers, so just deal with the connect now */ + if (PREDICT_FALSE (!thread_index)) + { + session_mq_connect_one (session_evt_ctrl_data (wrk, elt)); + return; + } + + if (PREDICT_FALSE (thread_index != 1)) + { + clib_warning ("Connect on wrong thread. Dropping"); + return; + } + + /* Add to pending list to be handled by main thread */ + he = pool_elt_at_index (wrk->event_elts, wrk->pending_connects); + clib_llist_add_tail (wrk->event_elts, evt_list, elt, he); + + if (!wrk->pending_connects_ntf) + { + vlib_node_set_interrupt_pending (vlib_get_main_by_index (0), + session_queue_node.index); + session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0); + wrk->pending_connects_ntf = 1; + } +} + static void session_mq_connect_uri_handler (void *data) { @@ -1331,7 +1407,7 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt)); break; case SESSION_CTRL_EVT_CONNECT: - session_mq_connect_handler (session_evt_ctrl_data (wrk, elt)); + session_mq_connect_handler (wrk, elt); break; case SESSION_CTRL_EVT_CONNECT_URI: session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt)); -- cgit 1.2.3-korg