diff options
author | Florin Coras <fcoras@cisco.com> | 2022-03-18 08:33:08 -0700 |
---|---|---|
committer | Dave Barach <vpp@barachs.net> | 2022-12-02 22:59:13 +0000 |
commit | 309f7aac170767028a2e6e7e9424ec3d13304aff (patch) | |
tree | 84b4e26408e994f196c4e0029709c806be5bfef6 /src/vnet/session | |
parent | 06bbab0c45c805544c981b8765ea3d85760d66a8 (diff) |
session: move connects to first worker
Type: improvement
Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I035e3fdbb52eca010ad7b2c20ca2930cb1645978
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/application.c | 2 | ||||
-rw-r--r-- | src/vnet/session/application_worker.c | 4 | ||||
-rw-r--r-- | src/vnet/session/session.c | 6 | ||||
-rw-r--r-- | src/vnet/session/session.h | 32 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 91 | ||||
-rw-r--r-- | src/vnet/session/transport.c | 3 |
6 files changed, 54 insertions, 84 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 3b2c7cdb35a..ad4d447c543 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -1357,7 +1357,7 @@ vnet_connect (vnet_connect_args_t * a) app_worker_t *client_wrk; application_t *client; - ASSERT (vlib_thread_is_main_w_barrier ()); + ASSERT (session_vlib_thread_is_cl_thread ()); if (session_endpoint_is_zero (&a->sep)) return SESSION_E_INVALID_RMT_IP; diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c index 844e78f7fa9..0cb179161b8 100644 --- a/src/vnet/session/application_worker.c +++ b/src/vnet/session/application_worker.c @@ -393,7 +393,7 @@ app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh) { session_handle_t *shp; - ASSERT (vlib_get_thread_index () == 0); + ASSERT (session_vlib_thread_is_cl_thread ()); pool_get (app_wrk->half_open_table, shp); *shp = sh; @@ -404,7 +404,7 @@ int app_worker_del_half_open (app_worker_t *app_wrk, session_t *s) { application_t *app = application_get (app_wrk->app_index); - ASSERT (vlib_get_thread_index () <= 1); + ASSERT (session_vlib_thread_is_cl_thread ()); pool_put_index (app_wrk->half_open_table, s->ho_index); if (app->cb_fns.half_open_cleanup_callback) app->cb_fns.half_open_cleanup_callback (s); diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 91e9ed5451d..eaba80f24e2 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -334,15 +334,15 @@ void session_half_open_delete_notify (transport_connection_t *tc) { /* Notification from ctrl thread accepted without rpc */ - if (!tc->thread_index) + if (tc->thread_index == transport_cl_thread ()) { session_half_open_free (ho_session_get (tc->s_index)); } else { void *args = uword_to_pointer ((uword) tc->s_index, void *); - session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc, - args); + session_send_rpc_evt_to_thread_force (transport_cl_thread (), + session_half_open_free_rpc, args); } } diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 16000e62fa8..b8cc1c383c9 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -157,9 +157,6 @@ typedef struct session_worker_ /** Flag that is set if main thread signaled to handle connects */ u32 n_pending_connects; - /** Main thread loops in poll mode without a connect */ - u32 no_connect_loops; - /** List head for first worker evts pending handling on main */ clib_llist_index_t evts_pending_main; @@ -212,7 +209,9 @@ typedef struct session_main_ * Trade memory for speed, for now */ u32 *session_type_to_next; - /** Thread for cl and ho that rely on cl allocs */ + /** Thread used for allocating active open connections, i.e., half-opens + * for transports like tcp, and sessions that will be migrated for cl + * transports like udp. If vpp has workers, this will be first worker. */ u32 transport_cl_thread; transport_proto_t last_transport_proto_type; @@ -616,6 +615,13 @@ transport_cl_thread (void) return session_main.transport_cl_thread; } +always_inline u32 +session_vlib_thread_is_cl_thread (void) +{ + return (vlib_get_thread_index () == transport_cl_thread () || + vlib_thread_is_main_w_barrier ()); +} + /* * Listen sessions */ @@ -668,29 +674,17 @@ always_inline session_t * ho_session_alloc (void) { session_t *s; - ASSERT (vlib_get_thread_index () == 0); - s = session_alloc (0); + ASSERT (session_vlib_thread_is_cl_thread ()); + s = session_alloc (transport_cl_thread ()); s->session_state = SESSION_STATE_CONNECTING; s->flags |= SESSION_F_HALF_OPEN; - /* Not ideal. Half-opens are only allocated from main with worker barrier - * but can be cleaned up, i.e., session_half_open_free, from main without - * a barrier. In debug images, the free_bitmap can grow while workers peek - * the sessions pool, e.g., session_half_open_migrate_notify, and as a - * result crash while validating the session. To avoid this, grow the bitmap - * now. */ - if (CLIB_DEBUG) - { - session_t *sp = session_main.wrk[0].sessions; - clib_bitmap_validate (pool_header (sp)->free_bitmap, - s->session_index + 1); - } return s; } always_inline session_t * ho_session_get (u32 ho_index) { - return session_get (ho_index, 0 /* half-open thread */); + return session_get (ho_index, transport_cl_thread ()); } always_inline void diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 8f6503d2806..be00925bb00 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -224,23 +224,20 @@ static void session_mq_handle_connects_rpc (void *arg) { u32 max_connects = 32, n_connects = 0; - vlib_main_t *vm = vlib_get_main (); session_evt_elt_t *he, *elt, *next; - session_worker_t *fwrk, *wrk; + session_worker_t *fwrk; - ASSERT (vlib_get_thread_index () == 0); + ASSERT (session_vlib_thread_is_cl_thread ()); /* Pending connects on linked list pertaining to first worker */ - fwrk = session_main_get_worker (1); + fwrk = session_main_get_worker (transport_cl_thread ()); if (!fwrk->n_pending_connects) - goto update_state; - - vlib_worker_thread_barrier_sync (vm); + return; he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects); elt = clib_llist_next (fwrk->event_elts, evt_list, he); - /* Avoid holding the barrier for too long */ + /* Avoid holding the worker for too long */ while (n_connects < max_connects && elt != he) { next = clib_llist_next (fwrk->event_elts, evt_list, elt); @@ -254,45 +251,10 @@ session_mq_handle_connects_rpc (void *arg) /* Decrement with worker barrier */ fwrk->n_pending_connects -= n_connects; - - vlib_worker_thread_barrier_release (vm); - -update_state: - - /* Switch worker to poll mode if it was in interrupt mode and had work or - * back to interrupt if threshold of loops without a connect is passed. - * While in poll mode, reprogram connects rpc */ - wrk = session_main_get_worker (0); - if (wrk->state != SESSION_WRK_POLLING) - { - if (n_connects) - { - session_wrk_set_state (wrk, SESSION_WRK_POLLING); - vlib_node_set_state (vm, session_queue_node.index, - VLIB_NODE_STATE_POLLING); - wrk->no_connect_loops = 0; - } - } - else - { - if (!n_connects) - { - if (++wrk->no_connect_loops > 1e5) - { - session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT); - vlib_node_set_state (vm, session_queue_node.index, - VLIB_NODE_STATE_INTERRUPT); - } - } - else - wrk->no_connect_loops = 0; - } - - if (wrk->state == SESSION_WRK_POLLING) + if (fwrk->n_pending_connects > 0) { - elt = session_evt_alloc_ctrl (wrk); - elt->evt.event_type = SESSION_CTRL_EVT_RPC; - elt->evt.rpc_args.fp = session_mq_handle_connects_rpc; + session_send_rpc_evt_to_thread_force (fwrk->vm->thread_index, + session_mq_handle_connects_rpc, 0); } } @@ -302,20 +264,28 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt) u32 thread_index = wrk - session_main.wrk; session_evt_elt_t *he; - /* No workers, so just deal with the connect now */ - if (PREDICT_FALSE (!thread_index)) + if (PREDICT_FALSE (thread_index > transport_cl_thread ())) { - session_mq_connect_one (session_evt_ctrl_data (wrk, elt)); + clib_warning ("Connect on wrong thread. Dropping"); return; } - if (PREDICT_FALSE (thread_index != 1)) + /* If on worker, check if main has any pending messages. Avoids reordering + * with other control messages that need to be handled by main + */ + if (thread_index) { - clib_warning ("Connect on wrong thread. Dropping"); - return; + he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main); + + /* Events pending on main, postpone to avoid reordering */ + if (!clib_llist_is_empty (wrk->event_elts, evt_list, he)) + { + clib_llist_add_tail (wrk->event_elts, evt_list, elt, he); + return; + } } - /* Add to pending list to be handled by main thread */ + /* Add to pending list to be handled by first worker */ he = clib_llist_elt (wrk->event_elts, wrk->pending_connects); clib_llist_add_tail (wrk->event_elts, evt_list, elt, he); @@ -323,9 +293,8 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt) wrk->n_pending_connects += 1; if (wrk->n_pending_connects == 1) { - vlib_node_set_interrupt_pending (vlib_get_main_by_index (0), - session_queue_node.index); - session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0); + session_send_rpc_evt_to_thread_force (thread_index, + session_mq_handle_connects_rpc, 0); } } @@ -812,6 +781,9 @@ session_wrk_handle_evts_main_rpc (void *args) case SESSION_CTRL_EVT_ACCEPTED_REPLY: session_mq_accepted_reply_handler (fwrk, elt); break; + case SESSION_CTRL_EVT_CONNECT: + session_mq_connect_handler (fwrk, elt); + break; default: clib_warning ("unhandled %u", elt->evt.event_type); ALWAYS_ASSERT (0); @@ -820,8 +792,11 @@ session_wrk_handle_evts_main_rpc (void *args) /* Regrab element in case pool moved */ elt = clib_llist_elt (fwrk->event_elts, ei); - session_evt_ctrl_data_free (fwrk, elt); - clib_llist_put (fwrk->event_elts, elt); + if (!clib_llist_elt_is_linked (elt, evt_list)) + { + session_evt_ctrl_data_free (fwrk, elt); + clib_llist_put (fwrk->event_elts, elt); + } ei = next_ei; } diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index b13370bcffd..8554c730bda 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -486,7 +486,8 @@ transport_program_endpoint_cleanup (u32 lepi) clib_spinlock_unlock (&tm->local_endpoints_lock); if (flush_fl) - session_send_rpc_evt_to_thread_force (0, transport_cleanup_freelist, 0); + session_send_rpc_evt_to_thread_force (transport_cl_thread (), + transport_cleanup_freelist, 0); } int |