diff options
-rw-r--r-- | src/vnet/session/session.c | 1 | ||||
-rw-r--r-- | src/vnet/session/session.h | 6 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 86 |
3 files changed, 88 insertions, 5 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index f6b61abc2e0..f38db777ae3 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -1818,6 +1818,7 @@ session_manager_main_enable (vlib_main_t * vm) wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list); + wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list); wrk->vm = vlib_get_main_by_index (i); wrk->last_vlib_time = vlib_time_now (vm); wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ; diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index bf326811b24..245ec25f135 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -145,6 +145,12 @@ typedef struct session_worker_ /** Clib file for timerfd. Used only if adaptive mode is on */ uword timerfd_file; + /** List of pending connects for first worker */ + clib_llist_index_t pending_connects; + + /** Flag that is set if main thread signaled to handle connects */ + u32 pending_connects_ntf; + #if SESSION_DEBUG /** last event poll time by thread */ clib_time_type_t last_event_poll; diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index b68ff53dd7a..bd60bd7056b 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -119,16 +119,13 @@ session_mq_listen_uri_handler (void *data) } static void -session_mq_connect_handler (void *data) +session_mq_connect_one (session_connect_msg_t *mp) { - session_connect_msg_t *mp = (session_connect_msg_t *) data; vnet_connect_args_t _a, *a = &_a; app_worker_t *app_wrk; application_t *app; int rv; - app_check_thread_and_barrier (session_mq_connect_handler, mp); - app = application_lookup (mp->client_index); if (!app) return; @@ -168,6 +165,85 @@ session_mq_connect_handler (void *data) } static void +session_mq_handle_connects_rpc (void *arg) +{ + u32 max_connects = 32, n_connects = 0; + vlib_main_t *vm = vlib_get_main (); + session_evt_elt_t *he, *elt, *next; + session_worker_t *fwrk; + u8 need_reschedule = 1; + + ASSERT (vlib_get_thread_index () == 0); + + /* Pending connects on linked list pertaining to first worker */ + fwrk = session_main_get_worker (1); + + vlib_worker_thread_barrier_sync (vm); + + he = pool_elt_at_index (fwrk->event_elts, fwrk->pending_connects); + elt = clib_llist_next (fwrk->event_elts, evt_list, he); + + /* Avoid holding the barrier for too long */ + while (n_connects < max_connects && elt != he) + { + next = clib_llist_next (fwrk->event_elts, evt_list, elt); + clib_llist_remove (fwrk->event_elts, evt_list, elt); + session_mq_connect_one (session_evt_ctrl_data (fwrk, elt)); + session_evt_elt_free (fwrk, elt); + elt = next; + n_connects += 1; + } + + if (clib_llist_is_empty (fwrk->event_elts, evt_list, he)) + { + fwrk->pending_connects_ntf = 0; + need_reschedule = 0; + } + + vlib_worker_thread_barrier_release (vm); + + if (need_reschedule) + { + vlib_node_set_interrupt_pending (vm, session_queue_node.index); + elt = session_evt_alloc_ctrl (session_main_get_worker (0)); + elt->evt.event_type = SESSION_CTRL_EVT_RPC; + elt->evt.rpc_args.fp = session_mq_handle_connects_rpc; + } +} + +static void +session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt) +{ + u32 thread_index = wrk - session_main.wrk; + session_evt_elt_t *he; + + /* No workers, so just deal with the connect now */ + if (PREDICT_FALSE (!thread_index)) + { + session_mq_connect_one (session_evt_ctrl_data (wrk, elt)); + return; + } + + if (PREDICT_FALSE (thread_index != 1)) + { + clib_warning ("Connect on wrong thread. Dropping"); + return; + } + + /* Add to pending list to be handled by main thread */ + he = pool_elt_at_index (wrk->event_elts, wrk->pending_connects); + clib_llist_add_tail (wrk->event_elts, evt_list, elt, he); + + if (!wrk->pending_connects_ntf) + { + vlib_node_set_interrupt_pending (vlib_get_main_by_index (0), + session_queue_node.index); + session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0); + wrk->pending_connects_ntf = 1; + } +} + +static void session_mq_connect_uri_handler (void *data) { session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data; @@ -1331,7 +1407,7 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt)); break; case SESSION_CTRL_EVT_CONNECT: - session_mq_connect_handler (session_evt_ctrl_data (wrk, elt)); + session_mq_connect_handler (wrk, elt); break; case SESSION_CTRL_EVT_CONNECT_URI: session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt)); |