diff options
author | Florin Coras <fcoras@cisco.com> | 2022-03-18 08:33:08 -0700 |
---|---|---|
committer | Dave Barach <vpp@barachs.net> | 2022-12-02 22:59:13 +0000 |
commit | 309f7aac170767028a2e6e7e9424ec3d13304aff (patch) | |
tree | 84b4e26408e994f196c4e0029709c806be5bfef6 /src | |
parent | 06bbab0c45c805544c981b8765ea3d85760d66a8 (diff) |
session: move connects to first worker
Type: improvement
Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I035e3fdbb52eca010ad7b2c20ca2930cb1645978
Diffstat (limited to 'src')
-rw-r--r-- | src/plugins/hs_apps/echo_client.c | 8 | ||||
-rw-r--r-- | src/plugins/unittest/session_test.c | 8 | ||||
-rw-r--r-- | src/vnet/session/application.c | 2 | ||||
-rw-r--r-- | src/vnet/session/application_worker.c | 4 | ||||
-rw-r--r-- | src/vnet/session/session.c | 6 | ||||
-rw-r--r-- | src/vnet/session/session.h | 32 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 91 | ||||
-rw-r--r-- | src/vnet/session/transport.c | 3 | ||||
-rw-r--r-- | src/vnet/tcp/tcp.c | 6 | ||||
-rw-r--r-- | src/vnet/tcp/tcp_inlines.h | 2 | ||||
-rw-r--r-- | src/vnet/tcp/tcp_timer.h | 15 | ||||
-rw-r--r-- | src/vnet/tls/tls.c | 1 |
12 files changed, 72 insertions, 106 deletions
diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c index c6c7ede2f92..14d47be7422 100644 --- a/src/plugins/hs_apps/echo_client.c +++ b/src/plugins/hs_apps/echo_client.c @@ -826,7 +826,6 @@ ec_connect_rpc (void *args) { ec_main_t *ecm = &ec_main; vnet_connect_args_t _a = {}, *a = &_a; - vlib_main_t *vm = vlib_get_main (); int rv, needs_crypto; u32 n_clients, ci; @@ -838,8 +837,6 @@ ec_connect_rpc (void *args) ci = ecm->connect_conn_index; - vlib_worker_thread_barrier_sync (vm); - while (ci < n_clients) { /* Crude pacing for call setups */ @@ -873,8 +870,6 @@ ec_connect_rpc (void *args) ci += 1; } - vlib_worker_thread_barrier_release (vm); - if (ci < ecm->expected_connections && ecm->run_test != EC_EXITING) ec_program_connects (); @@ -884,7 +879,8 @@ ec_connect_rpc (void *args) void ec_program_connects (void) { - session_send_rpc_evt_to_thread_force (0, ec_connect_rpc, 0); + session_send_rpc_evt_to_thread_force (transport_cl_thread (), ec_connect_rpc, + 0); } #define ec_cli(_fmt, _args...) \ diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c index 6a292c783eb..70d9fd02048 100644 --- a/src/plugins/unittest/session_test.c +++ b/src/plugins/unittest/session_test.c @@ -404,14 +404,6 @@ session_test_endpoint_cfg (vlib_main_t * vm, unformat_input_t * input) SESSION_TEST ((tc->lcl_port == placeholder_client_port), "ports should be equal"); - /* These sessions, because of the way they're established are pinned to - * main thread, even when we have workers and we avoid polling main thread, - * i.e., we can't cleanup pending disconnects, so force cleanup for both - */ - session_transport_cleanup (s); - s = session_get (accepted_session_index, accepted_session_thread); - session_transport_cleanup (s); - vnet_app_detach_args_t detach_args = { .app_index = server_index, .api_client_index = ~0, diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 3b2c7cdb35a..ad4d447c543 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -1357,7 +1357,7 @@ vnet_connect (vnet_connect_args_t * a) app_worker_t *client_wrk; application_t *client; - ASSERT (vlib_thread_is_main_w_barrier ()); + ASSERT (session_vlib_thread_is_cl_thread ()); if (session_endpoint_is_zero (&a->sep)) return SESSION_E_INVALID_RMT_IP; diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c index 844e78f7fa9..0cb179161b8 100644 --- a/src/vnet/session/application_worker.c +++ b/src/vnet/session/application_worker.c @@ -393,7 +393,7 @@ app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh) { session_handle_t *shp; - ASSERT (vlib_get_thread_index () == 0); + ASSERT (session_vlib_thread_is_cl_thread ()); pool_get (app_wrk->half_open_table, shp); *shp = sh; @@ -404,7 +404,7 @@ int app_worker_del_half_open (app_worker_t *app_wrk, session_t *s) { application_t *app = application_get (app_wrk->app_index); - ASSERT (vlib_get_thread_index () <= 1); + ASSERT (session_vlib_thread_is_cl_thread ()); pool_put_index (app_wrk->half_open_table, s->ho_index); if (app->cb_fns.half_open_cleanup_callback) app->cb_fns.half_open_cleanup_callback (s); diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 91e9ed5451d..eaba80f24e2 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -334,15 +334,15 @@ void session_half_open_delete_notify (transport_connection_t *tc) { /* Notification from ctrl thread accepted without rpc */ - if (!tc->thread_index) + if (tc->thread_index == transport_cl_thread ()) { session_half_open_free (ho_session_get (tc->s_index)); } else { void *args = uword_to_pointer ((uword) tc->s_index, void *); - session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc, - args); + session_send_rpc_evt_to_thread_force (transport_cl_thread (), + session_half_open_free_rpc, args); } } diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 16000e62fa8..b8cc1c383c9 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -157,9 +157,6 @@ typedef struct session_worker_ /** Flag that is set if main thread signaled to handle connects */ u32 n_pending_connects; - /** Main thread loops in poll mode without a connect */ - u32 no_connect_loops; - /** List head for first worker evts pending handling on main */ clib_llist_index_t evts_pending_main; @@ -212,7 +209,9 @@ typedef struct session_main_ * Trade memory for speed, for now */ u32 *session_type_to_next; - /** Thread for cl and ho that rely on cl allocs */ + /** Thread used for allocating active open connections, i.e., half-opens + * for transports like tcp, and sessions that will be migrated for cl + * transports like udp. If vpp has workers, this will be first worker. */ u32 transport_cl_thread; transport_proto_t last_transport_proto_type; @@ -616,6 +615,13 @@ transport_cl_thread (void) return session_main.transport_cl_thread; } +always_inline u32 +session_vlib_thread_is_cl_thread (void) +{ + return (vlib_get_thread_index () == transport_cl_thread () || + vlib_thread_is_main_w_barrier ()); +} + /* * Listen sessions */ @@ -668,29 +674,17 @@ always_inline session_t * ho_session_alloc (void) { session_t *s; - ASSERT (vlib_get_thread_index () == 0); - s = session_alloc (0); + ASSERT (session_vlib_thread_is_cl_thread ()); + s = session_alloc (transport_cl_thread ()); s->session_state = SESSION_STATE_CONNECTING; s->flags |= SESSION_F_HALF_OPEN; - /* Not ideal. Half-opens are only allocated from main with worker barrier - * but can be cleaned up, i.e., session_half_open_free, from main without - * a barrier. In debug images, the free_bitmap can grow while workers peek - * the sessions pool, e.g., session_half_open_migrate_notify, and as a - * result crash while validating the session. To avoid this, grow the bitmap - * now. */ - if (CLIB_DEBUG) - { - session_t *sp = session_main.wrk[0].sessions; - clib_bitmap_validate (pool_header (sp)->free_bitmap, - s->session_index + 1); - } return s; } always_inline session_t * ho_session_get (u32 ho_index) { - return session_get (ho_index, 0 /* half-open thread */); + return session_get (ho_index, transport_cl_thread ()); } always_inline void diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 8f6503d2806..be00925bb00 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -224,23 +224,20 @@ static void session_mq_handle_connects_rpc (void *arg) { u32 max_connects = 32, n_connects = 0; - vlib_main_t *vm = vlib_get_main (); session_evt_elt_t *he, *elt, *next; - session_worker_t *fwrk, *wrk; + session_worker_t *fwrk; - ASSERT (vlib_get_thread_index () == 0); + ASSERT (session_vlib_thread_is_cl_thread ()); /* Pending connects on linked list pertaining to first worker */ - fwrk = session_main_get_worker (1); + fwrk = session_main_get_worker (transport_cl_thread ()); if (!fwrk->n_pending_connects) - goto update_state; - - vlib_worker_thread_barrier_sync (vm); + return; he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects); elt = clib_llist_next (fwrk->event_elts, evt_list, he); - /* Avoid holding the barrier for too long */ + /* Avoid holding the worker for too long */ while (n_connects < max_connects && elt != he) { next = clib_llist_next (fwrk->event_elts, evt_list, elt); @@ -254,45 +251,10 @@ session_mq_handle_connects_rpc (void *arg) /* Decrement with worker barrier */ fwrk->n_pending_connects -= n_connects; - - vlib_worker_thread_barrier_release (vm); - -update_state: - - /* Switch worker to poll mode if it was in interrupt mode and had work or - * back to interrupt if threshold of loops without a connect is passed. - * While in poll mode, reprogram connects rpc */ - wrk = session_main_get_worker (0); - if (wrk->state != SESSION_WRK_POLLING) - { - if (n_connects) - { - session_wrk_set_state (wrk, SESSION_WRK_POLLING); - vlib_node_set_state (vm, session_queue_node.index, - VLIB_NODE_STATE_POLLING); - wrk->no_connect_loops = 0; - } - } - else - { - if (!n_connects) - { - if (++wrk->no_connect_loops > 1e5) - { - session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT); - vlib_node_set_state (vm, session_queue_node.index, - VLIB_NODE_STATE_INTERRUPT); - } - } - else - wrk->no_connect_loops = 0; - } - - if (wrk->state == SESSION_WRK_POLLING) + if (fwrk->n_pending_connects > 0) { - elt = session_evt_alloc_ctrl (wrk); - elt->evt.event_type = SESSION_CTRL_EVT_RPC; - elt->evt.rpc_args.fp = session_mq_handle_connects_rpc; + session_send_rpc_evt_to_thread_force (fwrk->vm->thread_index, + session_mq_handle_connects_rpc, 0); } } @@ -302,20 +264,28 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt) u32 thread_index = wrk - session_main.wrk; session_evt_elt_t *he; - /* No workers, so just deal with the connect now */ - if (PREDICT_FALSE (!thread_index)) + if (PREDICT_FALSE (thread_index > transport_cl_thread ())) { - session_mq_connect_one (session_evt_ctrl_data (wrk, elt)); + clib_warning ("Connect on wrong thread. Dropping"); return; } - if (PREDICT_FALSE (thread_index != 1)) + /* If on worker, check if main has any pending messages. Avoids reordering + * with other control messages that need to be handled by main + */ + if (thread_index) { - clib_warning ("Connect on wrong thread. Dropping"); - return; + he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main); + + /* Events pending on main, postpone to avoid reordering */ + if (!clib_llist_is_empty (wrk->event_elts, evt_list, he)) + { + clib_llist_add_tail (wrk->event_elts, evt_list, elt, he); + return; + } } - /* Add to pending list to be handled by main thread */ + /* Add to pending list to be handled by first worker */ he = clib_llist_elt (wrk->event_elts, wrk->pending_connects); clib_llist_add_tail (wrk->event_elts, evt_list, elt, he); @@ -323,9 +293,8 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt) wrk->n_pending_connects += 1; if (wrk->n_pending_connects == 1) { - vlib_node_set_interrupt_pending (vlib_get_main_by_index (0), - session_queue_node.index); - session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0); + session_send_rpc_evt_to_thread_force (thread_index, + session_mq_handle_connects_rpc, 0); } } @@ -812,6 +781,9 @@ session_wrk_handle_evts_main_rpc (void *args) case SESSION_CTRL_EVT_ACCEPTED_REPLY: session_mq_accepted_reply_handler (fwrk, elt); break; + case SESSION_CTRL_EVT_CONNECT: + session_mq_connect_handler (fwrk, elt); + break; default: clib_warning ("unhandled %u", elt->evt.event_type); ALWAYS_ASSERT (0); @@ -820,8 +792,11 @@ session_wrk_handle_evts_main_rpc (void *args) /* Regrab element in case pool moved */ elt = clib_llist_elt (fwrk->event_elts, ei); - session_evt_ctrl_data_free (fwrk, elt); - clib_llist_put (fwrk->event_elts, elt); + if (!clib_llist_elt_is_linked (elt, evt_list)) + { + session_evt_ctrl_data_free (fwrk, elt); + clib_llist_put (fwrk->event_elts, elt); + } ei = next_ei; } diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index b13370bcffd..8554c730bda 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -486,7 +486,8 @@ transport_program_endpoint_cleanup (u32 lepi) clib_spinlock_unlock (&tm->local_endpoints_lock); if (flush_fl) - session_send_rpc_evt_to_thread_force (0, transport_cleanup_freelist, 0); + session_send_rpc_evt_to_thread_force (transport_cl_thread (), + transport_cleanup_freelist, 0); } int diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c index bdf1751ff14..c0445b9048b 100644 --- a/src/vnet/tcp/tcp.c +++ b/src/vnet/tcp/tcp.c @@ -188,8 +188,7 @@ tcp_session_get_listener (u32 listener_index) static tcp_connection_t * tcp_half_open_connection_alloc (void) { - ASSERT (vlib_get_thread_index () == 0); - return tcp_connection_alloc (0); + return tcp_connection_alloc (transport_cl_thread ()); } /** @@ -199,7 +198,8 @@ tcp_half_open_connection_alloc (void) static void tcp_half_open_connection_free (tcp_connection_t * tc) { - ASSERT (vlib_get_thread_index () == 0); + ASSERT (vlib_get_thread_index () == tc->c_thread_index || + vlib_thread_is_main_w_barrier ()); return tcp_connection_free (tc); } diff --git a/src/vnet/tcp/tcp_inlines.h b/src/vnet/tcp/tcp_inlines.h index 69f8ce7ff27..91c57577067 100644 --- a/src/vnet/tcp/tcp_inlines.h +++ b/src/vnet/tcp/tcp_inlines.h @@ -66,7 +66,7 @@ tcp_listener_get (u32 tli) always_inline tcp_connection_t * tcp_half_open_connection_get (u32 conn_index) { - return tcp_connection_get (conn_index, 0); + return tcp_connection_get (conn_index, transport_cl_thread ()); } /** diff --git a/src/vnet/tcp/tcp_timer.h b/src/vnet/tcp/tcp_timer.h index 7f7dbf193eb..c0907cae1cc 100644 --- a/src/vnet/tcp/tcp_timer.h +++ b/src/vnet/tcp/tcp_timer.h @@ -17,11 +17,18 @@ #include <vnet/tcp/tcp_types.h> +static inline u8 +tcp_timer_thread_is_valid (tcp_connection_t *tc) +{ + return ((tc->c_thread_index == vlib_get_thread_index ()) || + vlib_thread_is_main_w_barrier ()); +} + always_inline void -tcp_timer_set (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id, +tcp_timer_set (tcp_timer_wheel_t *tw, tcp_connection_t *tc, u8 timer_id, u32 interval) { - ASSERT (tc->c_thread_index == vlib_get_thread_index ()); + ASSERT (tcp_timer_thread_is_valid (tc)); ASSERT (tc->timers[timer_id] == TCP_TIMER_HANDLE_INVALID); tc->timers[timer_id] = tw_timer_start_tcp_twsl (tw, tc->c_c_index, timer_id, interval); @@ -30,7 +37,7 @@ tcp_timer_set (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id, always_inline void tcp_timer_reset (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id) { - ASSERT (tc->c_thread_index == vlib_get_thread_index ()); + ASSERT (tcp_timer_thread_is_valid (tc)); tc->pending_timers &= ~(1 << timer_id); if (tc->timers[timer_id] == TCP_TIMER_HANDLE_INVALID) return; @@ -43,7 +50,7 @@ always_inline void tcp_timer_update (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id, u32 interval) { - ASSERT (tc->c_thread_index == vlib_get_thread_index ()); + ASSERT (tcp_timer_thread_is_valid (tc)); if (tc->timers[timer_id] != TCP_TIMER_HANDLE_INVALID) tw_timer_update_tcp_twsl (tw, tc->timers[timer_id], interval); else diff --git a/src/vnet/tls/tls.c b/src/vnet/tls/tls.c index 85ac7f8022c..b082467c95f 100644 --- a/src/vnet/tls/tls.c +++ b/src/vnet/tls/tls.c @@ -121,6 +121,7 @@ tls_ctx_half_open_alloc (void) clib_memset (ctx, 0, sizeof (*ctx)); ctx->c_c_index = ctx - tm->half_open_ctx_pool; + ctx->c_thread_index = transport_cl_thread (); return ctx->c_c_index; } |