diff options
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/session.c | 2 | ||||
-rw-r--r-- | src/vnet/session/session.h | 4 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 157 |
3 files changed, 107 insertions, 56 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 60991cdd8d0..5d070240152 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -1896,6 +1896,8 @@ session_manager_main_enable (vlib_main_t * vm) wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list); + wrk->evts_pending_main = + clib_llist_make_head (wrk->event_elts, evt_list); wrk->vm = vlib_get_main_by_index (i); wrk->last_vlib_time = vlib_time_now (vm); wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ; diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 0ea621aa385..71f6d35d43b 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -155,6 +155,9 @@ typedef struct session_worker_ /** Main thread loops in poll mode without a connect */ u32 no_connect_loops; + /** List head for first worker evts pending handling on main */ + clib_llist_index_t evts_pending_main; + #if SESSION_DEBUG /** last event poll time by thread */ clib_time_type_t last_event_poll; @@ -786,6 +789,7 @@ void session_wrk_enable_adaptive_mode (session_worker_t *wrk); fifo_segment_t *session_main_get_wrk_mqs_segment (void); void session_node_enable_disable (u8 is_en); clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en); +void session_wrk_handle_evts_main_rpc (); session_t *session_alloc_for_connection (transport_connection_t * tc); session_t *session_alloc_for_half_open (transport_connection_t *tc); diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 2916c640c58..89c8ab0c891 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -26,12 +26,28 @@ #include <svm/queue.h> #include <sys/timerfd.h> -#define app_check_thread_and_barrier(_fn, _arg) \ - if (!vlib_thread_is_main_w_barrier ()) \ - { \ - vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg)); \ - return; \ - } +static inline void +session_wrk_send_evt_to_main (session_worker_t *wrk, session_evt_elt_t *elt) +{ + session_evt_elt_t *he; + u32 thread_index; + u8 is_empty; + + thread_index = wrk->vm->thread_index; + he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main); + is_empty = clib_llist_is_empty (wrk->event_elts, evt_list, he); + clib_llist_add_tail (wrk->event_elts, evt_list, elt, he); + if (is_empty) + vlib_rpc_call_main_thread (session_wrk_handle_evts_main_rpc, + (u8 *) &thread_index, sizeof (thread_index)); +} + +#define app_check_thread_and_barrier(_wrk, _elt) \ + if (!vlib_thread_is_main_w_barrier ()) \ + { \ + session_wrk_send_evt_to_main (wrk, elt); \ + return; \ + } static void session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns) @@ -93,16 +109,17 @@ session_mq_free_ext_config (application_t *app, uword offset) } static void -session_mq_listen_handler (void *data) +session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt) { - session_listen_msg_t *mp = (session_listen_msg_t *) data; vnet_listen_args_t _a, *a = &_a; + session_listen_msg_t *mp; app_worker_t *app_wrk; application_t *app; int rv; - app_check_thread_and_barrier (session_mq_listen_handler, mp); + app_check_thread_and_barrier (wrk, elt); + mp = session_evt_ctrl_data (wrk, elt); app = application_lookup (mp->client_index); if (!app) return; @@ -132,16 +149,17 @@ session_mq_listen_handler (void *data) } static void -session_mq_listen_uri_handler (void *data) +session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt) { - session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data; vnet_listen_args_t _a, *a = &_a; + session_listen_uri_msg_t *mp; app_worker_t *app_wrk; application_t *app; int rv; - app_check_thread_and_barrier (session_mq_listen_uri_handler, mp); + app_check_thread_and_barrier (wrk, elt); + mp = session_evt_ctrl_data (wrk, elt); app = application_lookup (mp->client_index); if (!app) return; @@ -312,16 +330,17 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt) } static void -session_mq_connect_uri_handler (void *data) +session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt) { - session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data; vnet_connect_args_t _a, *a = &_a; + session_connect_uri_msg_t *mp; app_worker_t *app_wrk; application_t *app; int rv; - app_check_thread_and_barrier (session_mq_connect_uri_handler, mp); + app_check_thread_and_barrier (wrk, elt); + mp = session_evt_ctrl_data (wrk, elt); app = application_lookup (mp->client_index); if (!app) return; @@ -371,14 +390,15 @@ session_mq_disconnect_handler (void *data) } static void -app_mq_detach_handler (void *data) +app_mq_detach_handler (session_worker_t *wrk, session_evt_elt_t *elt) { - session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data; vnet_app_detach_args_t _a, *a = &_a; + session_app_detach_msg_t *mp; application_t *app; - app_check_thread_and_barrier (app_mq_detach_handler, mp); + app_check_thread_and_barrier (wrk, elt); + mp = session_evt_ctrl_data (wrk, elt); app = application_lookup (mp->client_index); if (!app) return; @@ -389,18 +409,19 @@ app_mq_detach_handler (void *data) } static void -session_mq_unlisten_rpc (session_unlisten_msg_t *mp) +session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt) { - vlib_main_t *vm = vlib_get_main (); vnet_unlisten_args_t _a, *a = &_a; + session_unlisten_msg_t *mp; app_worker_t *app_wrk; session_handle_t sh; application_t *app; - u32 context; int rv; + app_check_thread_and_barrier (wrk, elt); + + mp = session_evt_ctrl_data (wrk, elt); sh = mp->handle; - context = mp->context; app = application_lookup (mp->client_index); if (!app) @@ -411,56 +432,34 @@ session_mq_unlisten_rpc (session_unlisten_msg_t *mp) a->handle = sh; a->wrk_map_index = mp->wrk_index; - vlib_worker_thread_barrier_sync (vm); - if ((rv = vnet_unlisten (a))) clib_warning ("unlisten returned: %d", rv); - vlib_worker_thread_barrier_release (vm); - app_wrk = application_get_worker (app, a->wrk_map_index); if (!app_wrk) return; - mq_send_unlisten_reply (app_wrk, sh, context, rv); - clib_mem_free (mp); -} - -static void -session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt) -{ - u32 thread_index = wrk - session_main.wrk; - session_unlisten_msg_t *mp, *arg; - - mp = session_evt_ctrl_data (wrk, elt); - arg = clib_mem_alloc (sizeof (session_unlisten_msg_t)); - clib_memcpy_fast (arg, mp, sizeof (*arg)); - - if (PREDICT_FALSE (!thread_index)) - { - session_mq_unlisten_rpc (arg); - return; - } - - session_send_rpc_evt_to_thread_force (0, session_mq_unlisten_rpc, arg); + mq_send_unlisten_reply (app_wrk, sh, mp->context, rv); } static void -session_mq_accepted_reply_handler (void *data) +session_mq_accepted_reply_handler (session_worker_t *wrk, + session_evt_elt_t *elt) { - session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data; vnet_disconnect_args_t _a = { 0 }, *a = &_a; + session_accepted_reply_msg_t *mp; session_state_t old_state; app_worker_t *app_wrk; session_t *s; + mp = session_evt_ctrl_data (wrk, elt); + /* Mail this back from the main thread. We're not polling in main * thread so we're using other workers for notifications. */ if (session_thread_from_handle (mp->handle) == 0 && vlib_num_workers () && vlib_get_thread_index () != 0) { - vlib_rpc_call_main_thread (session_mq_accepted_reply_handler, - (u8 *) mp, sizeof (*mp)); + session_wrk_send_evt_to_main (wrk, elt); return; } @@ -774,6 +773,52 @@ session_mq_transport_attr_handler (void *data) svm_msg_q_add_and_unlock (app_wrk->event_queue, msg); } +void +session_wrk_handle_evts_main_rpc (void *args) +{ + session_evt_elt_t *he, *elt, *next; + session_worker_t *fwrk; + u32 thread_index; + + ASSERT (vlib_thread_is_main_w_barrier ()); + thread_index = *(u32 *) args; + fwrk = session_main_get_worker (thread_index); + + he = clib_llist_elt (fwrk->event_elts, fwrk->evts_pending_main); + elt = clib_llist_next (fwrk->event_elts, evt_list, he); + + while (elt != he) + { + next = clib_llist_next (fwrk->event_elts, evt_list, elt); + clib_llist_remove (fwrk->event_elts, evt_list, elt); + switch (elt->evt.event_type) + { + case SESSION_CTRL_EVT_LISTEN: + session_mq_listen_handler (fwrk, elt); + break; + case SESSION_CTRL_EVT_UNLISTEN: + session_mq_unlisten_handler (fwrk, elt); + break; + case SESSION_CTRL_EVT_APP_DETACH: + app_mq_detach_handler (fwrk, elt); + break; + case SESSION_CTRL_EVT_CONNECT_URI: + session_mq_connect_uri_handler (fwrk, elt); + break; + case SESSION_CTRL_EVT_ACCEPTED_REPLY: + session_mq_accepted_reply_handler (fwrk, elt); + break; + default: + clib_warning ("unhandled %u", elt->evt.event_type); + ALWAYS_ASSERT (0); + break; + } + session_evt_ctrl_data_free (fwrk, elt); + clib_llist_put (fwrk->event_elts, elt); + elt = next; + } +} + vlib_node_registration_t session_queue_node; typedef struct @@ -1514,10 +1559,10 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) session_transport_reset (s); break; case SESSION_CTRL_EVT_LISTEN: - session_mq_listen_handler (session_evt_ctrl_data (wrk, elt)); + session_mq_listen_handler (wrk, elt); break; case SESSION_CTRL_EVT_LISTEN_URI: - session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt)); + session_mq_listen_uri_handler (wrk, elt); break; case SESSION_CTRL_EVT_UNLISTEN: session_mq_unlisten_handler (wrk, elt); @@ -1526,7 +1571,7 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) session_mq_connect_handler (wrk, elt); break; case SESSION_CTRL_EVT_CONNECT_URI: - session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt)); + session_mq_connect_uri_handler (wrk, elt); break; case SESSION_CTRL_EVT_SHUTDOWN: session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt)); @@ -1538,7 +1583,7 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt)); break; case SESSION_CTRL_EVT_ACCEPTED_REPLY: - session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt)); + session_mq_accepted_reply_handler (wrk, elt); break; case SESSION_CTRL_EVT_DISCONNECTED_REPLY: session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk, @@ -1551,7 +1596,7 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt)); break; case SESSION_CTRL_EVT_APP_DETACH: - app_mq_detach_handler (session_evt_ctrl_data (wrk, elt)); + app_mq_detach_handler (wrk, elt); break; case SESSION_CTRL_EVT_APP_WRK_RPC: session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt)); |