diff options
Diffstat (limited to 'src/vnet/session/application_local.c')
-rw-r--r-- | src/vnet/session/application_local.c | 248 |
1 files changed, 186 insertions, 62 deletions
diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c index 8590d041600..3cb743d10e0 100644 --- a/src/vnet/session/application_local.c +++ b/src/vnet/session/application_local.c @@ -53,6 +53,8 @@ typedef struct ct_worker_ ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */ u8 have_connects; /**< Set if connect rpc pending */ u8 have_cleanups; /**< Set if cleanup rpc pending */ + clib_spinlock_t pending_connects_lock; /**< Lock for pending connects */ + u32 *new_connects; /**< Burst of connects to be done */ } ct_worker_t; typedef struct ct_main_ @@ -65,6 +67,9 @@ typedef struct ct_main_ clib_rwlock_t app_segs_lock; /**< RW lock for seg contexts */ uword *app_segs_ctxs_table; /**< App handle to segment pool map */ ct_segments_ctx_t *app_seg_ctxs; /**< Pool of ct segment contexts */ + u32 **fwrk_pending_connects; /**< First wrk pending half-opens */ + u32 fwrk_thread; /**< First worker thread */ + u8 fwrk_have_flush; /**< Flag for connect flush rpc */ } ct_main_t; static ct_main_t ct_main; @@ -81,7 +86,8 @@ ct_connection_alloc (u32 thread_index) ct_worker_t *wrk = ct_worker_get (thread_index); ct_connection_t *ct; - pool_get_zero (wrk->connections, ct); + pool_get_aligned_safe (wrk->connections, ct, CLIB_CACHE_LINE_BYTES); + clib_memset (ct, 0, sizeof (*ct)); ct->c_c_index = ct - wrk->connections; ct->c_thread_index = thread_index; ct->client_wrk = ~0; @@ -123,11 +129,18 @@ ct_half_open_alloc (void) clib_spinlock_lock (&cm->ho_reuseable_lock); vec_foreach (hip, cm->ho_reusable) - pool_put_index (cm->wrk[0].connections, *hip); + pool_put_index (cm->wrk[cm->fwrk_thread].connections, *hip); vec_reset_length (cm->ho_reusable); clib_spinlock_unlock (&cm->ho_reuseable_lock); - return ct_connection_alloc (0); + return ct_connection_alloc (cm->fwrk_thread); +} + +static ct_connection_t * +ct_half_open_get (u32 ho_index) +{ + ct_main_t *cm = &ct_main; + return ct_connection_get (ho_index, cm->fwrk_thread); } void @@ -181,6 +194,12 @@ ct_set_invalid_app_wrk (ct_connection_t *ct, u8 is_client) } } +static inline u64 +ct_client_seg_handle (u64 server_sh, u32 client_wrk_index) +{ + return (((u64) client_wrk_index << 56) | server_sh); +} + static void ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo, svm_fifo_t *tx_fifo) @@ -301,7 +320,8 @@ ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo, segment_manager_t *csm; csm = app_worker_get_connect_segment_manager (app_wrk); if (!segment_manager_app_detached (csm)) - app_worker_del_segment_notify (app_wrk, ct->segment_handle); + app_worker_del_segment_notify ( + app_wrk, ct_client_seg_handle (ct->segment_handle, ct->client_wrk)); } /* Notify server app and free segment */ @@ -363,9 +383,10 @@ ct_session_connect_notify (session_t *ss, session_error_t err) ss = session_get (ss_index, thread_index); cs->session_type = ss->session_type; cs->listener_handle = SESSION_INVALID_HANDLE; - cs->session_state = SESSION_STATE_CONNECTING; + session_set_state (cs, SESSION_STATE_CONNECTING); cs->app_wrk_index = client_wrk->wrk_index; cs->connection_index = cct->c_c_index; + cs->opaque = opaque; cct->c_s_index = cs->session_index; /* This will allocate fifos for the session. They won't be used for @@ -379,7 +400,7 @@ ct_session_connect_notify (session_t *ss, session_error_t err) goto connect_error; } - cs->session_state = SESSION_STATE_CONNECTING; + session_set_state (cs, SESSION_STATE_CONNECTING); if (app_worker_connect_notify (client_wrk, cs, 0, opaque)) { @@ -390,7 +411,7 @@ ct_session_connect_notify (session_t *ss, session_error_t err) } cs = session_get (cct->c_s_index, cct->c_thread_index); - cs->session_state = SESSION_STATE_READY; + session_set_state (cs, SESSION_STATE_READY); return 0; @@ -441,11 +462,11 @@ ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle, segment_manager_t *sm, u32 client_wrk_index) { u32 seg_ctx_index = ~0, sm_index, pair_bytes; + u64 seg_size, seg_handle, client_seg_handle; segment_manager_props_t *props; const u32 margin = 16 << 10; ct_segments_ctx_t *seg_ctx; app_worker_t *client_wrk; - u64 seg_size, seg_handle; application_t *server; ct_segment_t *ct_seg; uword *spp; @@ -507,7 +528,11 @@ ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle, goto error; client_wrk = app_worker_get (client_wrk_index); - if (app_worker_add_segment_notify (client_wrk, seg_handle)) + /* Make sure client workers do not have overlapping segment handles. + * Ideally, we should attach fs to client worker segment manager and + * create a new handle but that's not currently possible. */ + client_seg_handle = ct_client_seg_handle (seg_handle, client_wrk_index); + if (app_worker_add_segment_notify (client_wrk, client_seg_handle)) { app_worker_del_segment_notify (server_wrk, seg_handle); goto error; @@ -645,7 +670,7 @@ ct_accept_one (u32 thread_index, u32 ho_index) cct = ct_connection_alloc (thread_index); cct_index = cct->c_c_index; - ho = ct_connection_get (ho_index, 0); + ho = ct_half_open_get (ho_index); /* Unlikely but half-open session and transport could have been freed */ if (PREDICT_FALSE (!ho)) @@ -701,7 +726,7 @@ ct_accept_one (u32 thread_index, u32 ho_index) sct->c_is_ip4); ss->connection_index = sct->c_c_index; ss->listener_handle = listen_session_get_handle (ll); - ss->session_state = SESSION_STATE_CREATED; + session_set_state (ss, SESSION_STATE_CREATED); server_wrk = application_listener_select_worker (ll); ss->app_wrk_index = server_wrk->wrk_index; @@ -724,9 +749,10 @@ ct_accept_one (u32 thread_index, u32 ho_index) cct->client_tx_fifo = ss->rx_fifo; cct->client_rx_fifo->refcnt++; cct->client_tx_fifo->refcnt++; - cct->segment_handle = sct->segment_handle; + cct->segment_handle = + ct_client_seg_handle (sct->segment_handle, cct->client_wrk); - ss->session_state = SESSION_STATE_ACCEPTING; + session_set_state (ss, SESSION_STATE_ACCEPTING); if (app_worker_accept_notify (server_wrk, ss)) { ct_session_connect_notify (ss, SESSION_E_REFUSED); @@ -739,39 +765,90 @@ ct_accept_one (u32 thread_index, u32 ho_index) static void ct_accept_rpc_wrk_handler (void *rpc_args) { - u32 thread_index, ho_index, n_connects, i, n_pending; + u32 thread_index, n_connects, i, n_pending; const u32 max_connects = 32; ct_worker_t *wrk; + u8 need_rpc = 0; thread_index = pointer_to_uword (rpc_args); wrk = ct_worker_get (thread_index); - /* Sub without lock as main enqueues with worker barrier */ + /* Connects could be handled without worker barrier so grab lock */ + clib_spinlock_lock (&wrk->pending_connects_lock); + n_pending = clib_fifo_elts (wrk->pending_connects); n_connects = clib_min (n_pending, max_connects); + vec_validate (wrk->new_connects, n_connects); for (i = 0; i < n_connects; i++) - { - clib_fifo_sub1 (wrk->pending_connects, ho_index); - ct_accept_one (thread_index, ho_index); - } + clib_fifo_sub1 (wrk->pending_connects, wrk->new_connects[i]); if (n_pending == n_connects) wrk->have_connects = 0; else + need_rpc = 1; + + clib_spinlock_unlock (&wrk->pending_connects_lock); + + for (i = 0; i < n_connects; i++) + ct_accept_one (thread_index, wrk->new_connects[i]); + + if (need_rpc) session_send_rpc_evt_to_thread_force ( thread_index, ct_accept_rpc_wrk_handler, uword_to_pointer (thread_index, void *)); } -static int -ct_connect (app_worker_t * client_wrk, session_t * ll, - session_endpoint_cfg_t * sep) +static void +ct_fwrk_flush_connects (void *rpc_args) { - u32 thread_index, ho_index; + u32 thread_index, fwrk_index, n_workers; ct_main_t *cm = &ct_main; - ct_connection_t *ho; ct_worker_t *wrk; + u8 need_rpc; + + fwrk_index = cm->fwrk_thread; + n_workers = vec_len (cm->fwrk_pending_connects); + + for (thread_index = fwrk_index; thread_index < n_workers; thread_index++) + { + if (!vec_len (cm->fwrk_pending_connects[thread_index])) + continue; + + wrk = ct_worker_get (thread_index); + + /* Connects can be done without worker barrier, grab dst worker lock */ + if (thread_index != fwrk_index) + clib_spinlock_lock (&wrk->pending_connects_lock); + + clib_fifo_add (wrk->pending_connects, + cm->fwrk_pending_connects[thread_index], + vec_len (cm->fwrk_pending_connects[thread_index])); + if (!wrk->have_connects) + { + wrk->have_connects = 1; + need_rpc = 1; + } + + if (thread_index != fwrk_index) + clib_spinlock_unlock (&wrk->pending_connects_lock); + + vec_reset_length (cm->fwrk_pending_connects[thread_index]); + + if (need_rpc) + session_send_rpc_evt_to_thread_force ( + thread_index, ct_accept_rpc_wrk_handler, + uword_to_pointer (thread_index, void *)); + } + + cm->fwrk_have_flush = 0; +} + +static void +ct_program_connect_to_wrk (u32 ho_index) +{ + ct_main_t *cm = &ct_main; + u32 thread_index; /* Simple round-robin policy for spreading sessions over workers. We skip * thread index 0, i.e., offset the index by 1, when we have workers as it @@ -780,6 +857,25 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, cm->n_sessions += 1; thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0; + /* Pospone flushing of connect request to dst worker until after session + * layer fully initializes the half-open session. */ + vec_add1 (cm->fwrk_pending_connects[thread_index], ho_index); + if (!cm->fwrk_have_flush) + { + session_send_rpc_evt_to_thread_force ( + cm->fwrk_thread, ct_fwrk_flush_connects, + uword_to_pointer (thread_index, void *)); + cm->fwrk_have_flush = 1; + } +} + +static int +ct_connect (app_worker_t *client_wrk, session_t *ll, + session_endpoint_cfg_t *sep) +{ + ct_connection_t *ho; + u32 ho_index; + /* * Alloc and init client half-open transport */ @@ -800,21 +896,10 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, ho->actual_tp = sep->original_tp; /* - * Accept connection on thread selected above. Connected reply comes + * Program connect on a worker, connected reply comes * after server accepts the connection. */ - - wrk = ct_worker_get (thread_index); - - /* Worker barrier held, add without additional lock */ - clib_fifo_add1 (wrk->pending_connects, ho_index); - if (!wrk->have_connects) - { - wrk->have_connects = 1; - session_send_rpc_evt_to_thread_force ( - thread_index, ct_accept_rpc_wrk_handler, - uword_to_pointer (thread_index, void *)); - } + ct_program_connect_to_wrk (ho_index); return ho_index; } @@ -852,9 +937,9 @@ ct_listener_get (u32 ct_index) } static transport_connection_t * -ct_half_open_get (u32 ct_index) +ct_session_half_open_get (u32 ct_index) { - return (transport_connection_t *) ct_connection_get (ct_index, 0); + return (transport_connection_t *) ct_half_open_get (ct_index); } static void @@ -876,7 +961,10 @@ ct_session_cleanup (u32 conn_index, u32 thread_index) static void ct_cleanup_ho (u32 ho_index) { - ct_connection_free (ct_connection_get (ho_index, 0)); + ct_connection_t *ho; + + ho = ct_half_open_get (ho_index); + ct_connection_free (ho); } static int @@ -907,7 +995,7 @@ ct_session_connect (transport_endpoint_cfg_t * tep) goto global_scope; ll = listen_session_get_from_handle (lh); - al = app_listener_get_w_session (ll); + al = app_listener_get (ll->al_index); /* * Break loop if rule in local table points to connecting app. This @@ -936,8 +1024,12 @@ global_scope: ll = session_lookup_listener_wildcard (table_index, sep); /* Avoid connecting app to own listener */ - if (ll && ll->app_index != app->app_index) - return ct_connect (app_wrk, ll, sep_ext); + if (ll) + { + al = app_listener_get (ll->al_index); + if (al->app_index != app->app_index) + return ct_connect (app_wrk, ll, sep_ext); + } /* Failed to connect but no error */ return SESSION_E_LOCAL_CONNECT; @@ -946,6 +1038,8 @@ global_scope: static inline int ct_close_is_reset (ct_connection_t *ct, session_t *s) { + if (ct->flags & CT_CONN_F_RESET) + return 1; if (ct->flags & CT_CONN_F_CLIENT) return (svm_fifo_max_dequeue (ct->client_rx_fifo) > 0); else @@ -953,6 +1047,17 @@ ct_close_is_reset (ct_connection_t *ct, session_t *s) } static void +ct_session_cleanup_server_session (session_t *s) +{ + ct_connection_t *ct; + + ct = (ct_connection_t *) session_get_transport (s); + ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo); + session_free (s); + ct_connection_free (ct); +} + +static void ct_session_postponed_cleanup (ct_connection_t *ct) { ct_connection_t *peer_ct; @@ -972,33 +1077,38 @@ ct_session_postponed_cleanup (ct_connection_t *ct) } session_transport_closed_notify (&ct->connection); + /* It would be cleaner to call session_transport_delete_notify + * but then we can't control session cleanup lower */ + session_set_state (s, SESSION_STATE_TRANSPORT_DELETED); + if (app_wrk) + app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT); + if (ct->flags & CT_CONN_F_CLIENT) { - if (app_wrk) - app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT); - /* Normal free for client session as the fifos are allocated through * the connects segment manager in a segment that's not shared with * the server */ ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo); - session_free_w_fifos (s); + session_program_cleanup (s); + ct_connection_free (ct); } else { /* Manual session and fifo segment cleanup to avoid implicit * segment manager cleanups and notifications */ - app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (app_wrk) { - app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT); - app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION); + /* Remove custom cleanup notify infra when/if switching to normal + * session cleanup. Note that ct is freed in the cb function */ + app_worker_cleanup_notify_custom (app_wrk, s, + SESSION_CLEANUP_SESSION, + ct_session_cleanup_server_session); + } + else + { + ct_connection_free (ct); } - - ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo); - session_free (s); } - - ct_connection_free (ct); } static void @@ -1022,10 +1132,10 @@ ct_handle_cleanups (void *args) clib_fifo_sub2 (wrk->pending_cleanups, req); ct = ct_connection_get (req->ct_index, thread_index); s = session_get (ct->c_s_index, ct->c_thread_index); - if (!svm_fifo_has_event (s->tx_fifo)) - ct_session_postponed_cleanup (ct); - else + if (svm_fifo_has_event (s->tx_fifo) || (s->flags & SESSION_F_RX_EVT)) clib_fifo_add1 (wrk->pending_cleanups, *req); + else + ct_session_postponed_cleanup (ct); n_to_handle -= 1; } @@ -1090,6 +1200,15 @@ ct_session_close (u32 ct_index, u32 thread_index) ct_program_cleanup (ct); } +static void +ct_session_reset (u32 ct_index, u32 thread_index) +{ + ct_connection_t *ct; + ct = ct_connection_get (ct_index, thread_index); + ct->flags |= CT_CONN_F_RESET; + ct_session_close (ct_index, thread_index); +} + static transport_connection_t * ct_session_get (u32 ct_index, u32 thread_index) { @@ -1178,7 +1297,7 @@ format_ct_half_open (u8 *s, va_list *args) { u32 ho_index = va_arg (*args, u32); u32 verbose = va_arg (*args, u32); - ct_connection_t *ct = ct_connection_get (ho_index, 0); + ct_connection_t *ct = ct_half_open_get (ho_index); s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct); if (verbose) s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN"); @@ -1229,26 +1348,31 @@ ct_enable_disable (vlib_main_t * vm, u8 is_en) { vlib_thread_main_t *vtm = &vlib_thread_main; ct_main_t *cm = &ct_main; + ct_worker_t *wrk; cm->n_workers = vlib_num_workers (); + cm->fwrk_thread = transport_cl_thread (); vec_validate (cm->wrk, vtm->n_vlib_mains); + vec_foreach (wrk, cm->wrk) + clib_spinlock_init (&wrk->pending_connects_lock); clib_spinlock_init (&cm->ho_reuseable_lock); clib_rwlock_init (&cm->app_segs_lock); + vec_validate (cm->fwrk_pending_connects, cm->n_workers); return 0; } -/* *INDENT-OFF* */ static const transport_proto_vft_t cut_thru_proto = { .enable = ct_enable_disable, .start_listen = ct_start_listen, .stop_listen = ct_stop_listen, .get_connection = ct_session_get, .get_listener = ct_listener_get, - .get_half_open = ct_half_open_get, + .get_half_open = ct_session_half_open_get, .cleanup = ct_session_cleanup, .cleanup_ho = ct_cleanup_ho, .connect = ct_session_connect, .close = ct_session_close, + .reset = ct_session_reset, .custom_tx = ct_custom_tx, .app_rx_evt = ct_app_rx_evt, .format_listener = format_ct_listener, @@ -1261,7 +1385,6 @@ static const transport_proto_vft_t cut_thru_proto = { .service_type = TRANSPORT_SERVICE_VC, }, }; -/* *INDENT-ON* */ static inline int ct_session_can_tx (session_t *s) @@ -1286,6 +1409,7 @@ ct_session_tx (session_t * s) peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index); if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) return 0; + peer_s->flags |= SESSION_F_RX_EVT; return session_enqueue_notify (peer_s); } |