diff options
Diffstat (limited to 'src/vnet/session/session_node.c')
-rw-r--r-- | src/vnet/session/session_node.c | 91 |
1 files changed, 33 insertions, 58 deletions
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 8f6503d2806..be00925bb00 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -224,23 +224,20 @@ static void session_mq_handle_connects_rpc (void *arg) { u32 max_connects = 32, n_connects = 0; - vlib_main_t *vm = vlib_get_main (); session_evt_elt_t *he, *elt, *next; - session_worker_t *fwrk, *wrk; + session_worker_t *fwrk; - ASSERT (vlib_get_thread_index () == 0); + ASSERT (session_vlib_thread_is_cl_thread ()); /* Pending connects on linked list pertaining to first worker */ - fwrk = session_main_get_worker (1); + fwrk = session_main_get_worker (transport_cl_thread ()); if (!fwrk->n_pending_connects) - goto update_state; - - vlib_worker_thread_barrier_sync (vm); + return; he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects); elt = clib_llist_next (fwrk->event_elts, evt_list, he); - /* Avoid holding the barrier for too long */ + /* Avoid holding the worker for too long */ while (n_connects < max_connects && elt != he) { next = clib_llist_next (fwrk->event_elts, evt_list, elt); @@ -254,45 +251,10 @@ session_mq_handle_connects_rpc (void *arg) /* Decrement with worker barrier */ fwrk->n_pending_connects -= n_connects; - - vlib_worker_thread_barrier_release (vm); - -update_state: - - /* Switch worker to poll mode if it was in interrupt mode and had work or - * back to interrupt if threshold of loops without a connect is passed. - * While in poll mode, reprogram connects rpc */ - wrk = session_main_get_worker (0); - if (wrk->state != SESSION_WRK_POLLING) - { - if (n_connects) - { - session_wrk_set_state (wrk, SESSION_WRK_POLLING); - vlib_node_set_state (vm, session_queue_node.index, - VLIB_NODE_STATE_POLLING); - wrk->no_connect_loops = 0; - } - } - else - { - if (!n_connects) - { - if (++wrk->no_connect_loops > 1e5) - { - session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT); - vlib_node_set_state (vm, session_queue_node.index, - VLIB_NODE_STATE_INTERRUPT); - } - } - else - wrk->no_connect_loops = 0; - } - - if (wrk->state == SESSION_WRK_POLLING) + if (fwrk->n_pending_connects > 0) { - elt = session_evt_alloc_ctrl (wrk); - elt->evt.event_type = SESSION_CTRL_EVT_RPC; - elt->evt.rpc_args.fp = session_mq_handle_connects_rpc; + session_send_rpc_evt_to_thread_force (fwrk->vm->thread_index, + session_mq_handle_connects_rpc, 0); } } @@ -302,20 +264,28 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt) u32 thread_index = wrk - session_main.wrk; session_evt_elt_t *he; - /* No workers, so just deal with the connect now */ - if (PREDICT_FALSE (!thread_index)) + if (PREDICT_FALSE (thread_index > transport_cl_thread ())) { - session_mq_connect_one (session_evt_ctrl_data (wrk, elt)); + clib_warning ("Connect on wrong thread. Dropping"); return; } - if (PREDICT_FALSE (thread_index != 1)) + /* If on worker, check if main has any pending messages. Avoids reordering + * with other control messages that need to be handled by main + */ + if (thread_index) { - clib_warning ("Connect on wrong thread. Dropping"); - return; + he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main); + + /* Events pending on main, postpone to avoid reordering */ + if (!clib_llist_is_empty (wrk->event_elts, evt_list, he)) + { + clib_llist_add_tail (wrk->event_elts, evt_list, elt, he); + return; + } } - /* Add to pending list to be handled by main thread */ + /* Add to pending list to be handled by first worker */ he = clib_llist_elt (wrk->event_elts, wrk->pending_connects); clib_llist_add_tail (wrk->event_elts, evt_list, elt, he); @@ -323,9 +293,8 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt) wrk->n_pending_connects += 1; if (wrk->n_pending_connects == 1) { - vlib_node_set_interrupt_pending (vlib_get_main_by_index (0), - session_queue_node.index); - session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0); + session_send_rpc_evt_to_thread_force (thread_index, + session_mq_handle_connects_rpc, 0); } } @@ -812,6 +781,9 @@ session_wrk_handle_evts_main_rpc (void *args) case SESSION_CTRL_EVT_ACCEPTED_REPLY: session_mq_accepted_reply_handler (fwrk, elt); break; + case SESSION_CTRL_EVT_CONNECT: + session_mq_connect_handler (fwrk, elt); + break; default: clib_warning ("unhandled %u", elt->evt.event_type); ALWAYS_ASSERT (0); @@ -820,8 +792,11 @@ session_wrk_handle_evts_main_rpc (void *args) /* Regrab element in case pool moved */ elt = clib_llist_elt (fwrk->event_elts, ei); - session_evt_ctrl_data_free (fwrk, elt); - clib_llist_put (fwrk->event_elts, elt); + if (!clib_llist_elt_is_linked (elt, evt_list)) + { + session_evt_ctrl_data_free (fwrk, elt); + clib_llist_put (fwrk->event_elts, elt); + } ei = next_ei; } |