diff options
author | Florin Coras <fcoras@cisco.com> | 2024-10-18 00:46:57 -0700 |
---|---|---|
committer | Dave Barach <vpp@barachs.net> | 2024-10-19 21:08:27 +0000 |
commit | 4c5ec738747c7e924af5d96a07e5d7c31cc2c998 (patch) | |
tree | 078e32677dee244c5c23431b9ffbb60327f0baef /src/plugins/hs_apps/proxy.c | |
parent | e6d2b04e860db83a24b0add8c48672dab1a8b927 (diff) |
hsa: refactor proxy to minimize lock usage
Use per worker context to minimize proxy session lock usage for io
events.
Type: improvement
Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Ia0ea204a8b09f72300fd40745b299246d5d0ddb7
Diffstat (limited to 'src/plugins/hs_apps/proxy.c')
-rw-r--r-- | src/plugins/hs_apps/proxy.c | 437 |
1 files changed, 273 insertions, 164 deletions
diff --git a/src/plugins/hs_apps/proxy.c b/src/plugins/hs_apps/proxy.c index 6b0b2465c56..a2776794c4a 100644 --- a/src/plugins/hs_apps/proxy.c +++ b/src/plugins/hs_apps/proxy.c @@ -24,9 +24,35 @@ proxy_main_t proxy_main; #define TCP_MSS 1460 +static proxy_session_side_ctx_t * +proxy_session_side_ctx_alloc (proxy_worker_t *wrk) +{ + proxy_session_side_ctx_t *ctx; + + pool_get_zero (wrk->ctx_pool, ctx); + ctx->sc_index = ctx - wrk->ctx_pool; + ctx->ps_index = ~0; + + return ctx; +} + +static void +proxy_session_side_ctx_free (proxy_worker_t *wrk, + proxy_session_side_ctx_t *ctx) +{ + pool_put (wrk->ctx_pool, ctx); +} + +static proxy_session_side_ctx_t * +proxy_session_side_ctx_get (proxy_worker_t *wrk, u32 ctx_index) +{ + return pool_elt_at_index (wrk->ctx_pool, ctx_index); +} + static void proxy_do_connect (vnet_connect_args_t *a) { + ASSERT (session_vlib_thread_is_cl_thread ()); vnet_connect (a); if (a->sep_ext.ext_cfg) clib_mem_free (a->sep_ext.ext_cfg); @@ -88,7 +114,7 @@ proxy_program_connect (vnet_connect_args_t *a) clib_spinlock_lock (&wrk->pending_connects_lock); clib_fifo_add1 (wrk->pending_connects, *a); - n_pending = vec_len (wrk->pending_connects); + n_pending = clib_fifo_elts (wrk->pending_connects); clib_spinlock_unlock (&wrk->pending_connects_lock); @@ -118,16 +144,6 @@ proxy_session_get (u32 ps_index) return pool_elt_at_index (pm->sessions, ps_index); } -static inline proxy_session_t * -proxy_session_get_if_valid (u32 ps_index) -{ - proxy_main_t *pm = &proxy_main; - - if (pool_is_free_index (pm->sessions, ps_index)) - return 0; - return pool_elt_at_index (pm->sessions, ps_index); -} - static void proxy_session_free (proxy_session_t *ps) { @@ -148,7 +164,7 @@ proxy_session_postponed_free_rpc (void *arg) clib_spinlock_lock_if_init (&pm->sessions_lock); ps = proxy_session_get (ps_index); - segment_manager_dealloc_fifos (ps->server_rx_fifo, ps->server_tx_fifo); + segment_manager_dealloc_fifos (ps->po.rx_fifo, ps->po.tx_fifo); proxy_session_free (ps); clib_spinlock_unlock_if_init (&pm->sessions_lock); @@ -159,54 +175,79 @@ proxy_session_postponed_free_rpc (void *arg) static void proxy_session_postponed_free (proxy_session_t *ps) { - session_send_rpc_evt_to_thread (ps->po_thread_index, + /* Passive open session handle has been invalidated so we don't have thread + * index at this point */ + session_send_rpc_evt_to_thread (ps->po.rx_fifo->master_thread_index, proxy_session_postponed_free_rpc, uword_to_pointer (ps->ps_index, void *)); } static void +proxy_session_close_po (proxy_session_t *ps) +{ + vnet_disconnect_args_t _a = {}, *a = &_a; + proxy_main_t *pm = &proxy_main; + + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&pm->sessions_lock)); + + a->handle = ps->po.session_handle; + a->app_index = pm->server_app_index; + vnet_disconnect_session (a); + + ps->po_disconnected = 1; +} + +static void +proxy_session_close_ao (proxy_session_t *ps) +{ + vnet_disconnect_args_t _a = {}, *a = &_a; + proxy_main_t *pm = &proxy_main; + + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&pm->sessions_lock)); + + a->handle = ps->ao.session_handle; + a->app_index = pm->active_open_app_index; + vnet_disconnect_session (a); + + ps->ao_disconnected = 1; +} + +static void proxy_try_close_session (session_t * s, int is_active_open) { proxy_main_t *pm = &proxy_main; - proxy_session_t *ps = 0; - vnet_disconnect_args_t _a, *a = &_a; + proxy_session_side_ctx_t *sc; + proxy_session_t *ps; + proxy_worker_t *wrk; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); clib_spinlock_lock_if_init (&pm->sessions_lock); - ps = proxy_session_get (s->opaque); + ps = proxy_session_get (sc->ps_index); if (is_active_open) { - a->handle = ps->vpp_active_open_handle; - a->app_index = pm->active_open_app_index; - vnet_disconnect_session (a); - ps->ao_disconnected = 1; + proxy_session_close_ao (ps); if (!ps->po_disconnected) { - ASSERT (ps->vpp_server_handle != SESSION_INVALID_HANDLE); - a->handle = ps->vpp_server_handle; - a->app_index = pm->server_app_index; - vnet_disconnect_session (a); - ps->po_disconnected = 1; + ASSERT (ps->po.session_handle != SESSION_INVALID_HANDLE); + proxy_session_close_po (ps); } } else { - a->handle = ps->vpp_server_handle; - a->app_index = pm->server_app_index; - vnet_disconnect_session (a); - ps->po_disconnected = 1; + proxy_session_close_po (ps); if (!ps->ao_disconnected && !ps->active_open_establishing) { /* Proxy session closed before active open */ - if (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE) - { - a->handle = ps->vpp_active_open_handle; - a->app_index = pm->active_open_app_index; - vnet_disconnect_session (a); - } + if (ps->ao.session_handle != SESSION_INVALID_HANDLE) + proxy_session_close_ao (ps); ps->ao_disconnected = 1; } } @@ -214,29 +255,65 @@ proxy_try_close_session (session_t * s, int is_active_open) } static void +proxy_try_side_ctx_cleanup (session_t *s) +{ + proxy_main_t *pm = &proxy_main; + proxy_session_t *ps; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); + if (sc->state == PROXY_SC_S_CREATED) + return; + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (sc->ps_index); + + if (!ps->po_disconnected) + proxy_session_close_po (ps); + + if (!ps->ao_disconnected) + proxy_session_close_ao (ps); + + clib_spinlock_unlock_if_init (&pm->sessions_lock); +} + +static void proxy_try_delete_session (session_t * s, u8 is_active_open) { proxy_main_t *pm = &proxy_main; proxy_session_t *ps = 0; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; + u32 ps_index; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); + ps_index = sc->ps_index; + + proxy_session_side_ctx_free (wrk, sc); clib_spinlock_lock_if_init (&pm->sessions_lock); - ps = proxy_session_get (s->opaque); + ps = proxy_session_get (ps_index); if (is_active_open) { - ps->vpp_active_open_handle = SESSION_INVALID_HANDLE; + ps->ao.session_handle = SESSION_INVALID_HANDLE; /* Revert master thread index change on connect notification */ - ps->server_rx_fifo->master_thread_index = ps->po_thread_index; + ps->po.rx_fifo->master_thread_index = + ps->po.tx_fifo->master_thread_index; /* Passive open already cleaned up */ - if (ps->vpp_server_handle == SESSION_INVALID_HANDLE) + if (ps->po.session_handle == SESSION_INVALID_HANDLE) { ASSERT (s->rx_fifo->refcnt == 1); /* The two sides of the proxy on different threads */ - if (ps->po_thread_index != s->thread_index) + if (ps->po.tx_fifo->master_thread_index != s->thread_index) { /* This is not the right thread to delete the fifos */ s->rx_fifo = 0; @@ -249,9 +326,9 @@ proxy_try_delete_session (session_t * s, u8 is_active_open) } else { - ps->vpp_server_handle = SESSION_INVALID_HANDLE; + ps->po.session_handle = SESSION_INVALID_HANDLE; - if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE) + if (ps->ao.session_handle == SESSION_INVALID_HANDLE) { if (!ps->active_open_establishing) proxy_session_free (ps); @@ -308,16 +385,24 @@ static int proxy_accept_callback (session_t * s) { proxy_main_t *pm = &proxy_main; + proxy_session_side_ctx_t *sc; proxy_session_t *ps; + proxy_worker_t *wrk; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_alloc (wrk); + s->opaque = sc->sc_index; clib_spinlock_lock_if_init (&pm->sessions_lock); ps = proxy_session_alloc (); - ps->vpp_server_handle = session_handle (s); - ps->vpp_active_open_handle = SESSION_INVALID_HANDLE; - ps->po_thread_index = s->thread_index; - s->opaque = ps->ps_index; + ps->po.session_handle = session_handle (s); + ps->po.rx_fifo = s->rx_fifo; + ps->po.tx_fifo = s->tx_fifo; + + ps->ao.session_handle = SESSION_INVALID_HANDLE; + sc->ps_index = ps->ps_index; clib_spinlock_unlock_if_init (&pm->sessions_lock); @@ -358,98 +443,105 @@ proxy_transport_needs_crypto (transport_proto_t proto) return proto == TRANSPORT_PROTO_TLS; } -static int -proxy_rx_callback (session_t * s) +static void +proxy_session_start_connect (proxy_session_side_ctx_t *sc, session_t *s) { + int actual_transfer __attribute__ ((unused)); + vnet_connect_args_t _a = {}, *a = &_a; proxy_main_t *pm = &proxy_main; - u32 thread_index = vlib_get_thread_index (); - svm_fifo_t *ao_tx_fifo; + u32 max_dequeue, ps_index; proxy_session_t *ps; - ASSERT (s->thread_index == thread_index); - clib_spinlock_lock_if_init (&pm->sessions_lock); - ps = proxy_session_get (s->opaque); + ps = proxy_session_get (sc->ps_index); - if (PREDICT_TRUE (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE)) + /* maybe we were already here */ + if (ps->active_open_establishing) { clib_spinlock_unlock_if_init (&pm->sessions_lock); + return; + } - ao_tx_fifo = s->rx_fifo; + ps->active_open_establishing = 1; + ps_index = ps->ps_index; - /* - * Send event for active open tx fifo - */ - if (svm_fifo_set_event (ao_tx_fifo)) - { - u32 ao_thread_index = ao_tx_fifo->master_thread_index; - u32 ao_session_index = ao_tx_fifo->shr->master_session_index; - if (session_send_io_evt_to_thread_custom (&ao_session_index, - ao_thread_index, - SESSION_IO_EVT_TX)) - clib_warning ("failed to enqueue tx evt"); - } + clib_spinlock_unlock_if_init (&pm->sessions_lock); - if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS) - svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); - } - else + max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo); + if (PREDICT_FALSE (max_dequeue == 0)) + return; + + max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue); + actual_transfer = svm_fifo_peek (s->rx_fifo, 0 /* relative_offset */, + max_dequeue, pm->rx_buf[s->thread_index]); + + /* Expectation is that here actual data just received is parsed and based + * on its contents, the destination and parameters of the connect to the + * upstream are decided + */ + + clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep)); + a->api_context = ps_index; + a->app_index = pm->active_open_app_index; + + if (proxy_transport_needs_crypto (a->sep.transport_proto)) { - vnet_connect_args_t _a, *a = &_a; - svm_fifo_t *tx_fifo, *rx_fifo; - u32 max_dequeue, ps_index; - int actual_transfer __attribute__ ((unused)); + session_endpoint_alloc_ext_cfg (&a->sep_ext, + TRANSPORT_ENDPT_EXT_CFG_CRYPTO); + a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index; + } - /* maybe we were already here */ - if (ps->active_open_establishing) - { - clib_spinlock_unlock_if_init (&pm->sessions_lock); - return 0; - } + proxy_program_connect (a); +} + +static int +proxy_rx_callback (session_t *s) +{ + proxy_session_side_ctx_t *sc; + svm_fifo_t *ao_tx_fifo; + proxy_session_t *ps; + proxy_worker_t *wrk; - rx_fifo = s->rx_fifo; - tx_fifo = s->tx_fifo; + ASSERT (s->thread_index == vlib_get_thread_index ()); - ASSERT (rx_fifo->master_thread_index == thread_index); - ASSERT (tx_fifo->master_thread_index == thread_index); + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); - max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo); + if (PREDICT_FALSE (sc->state < PROXY_SC_S_ESTABLISHED)) + { + proxy_main_t *pm = &proxy_main; - if (PREDICT_FALSE (max_dequeue == 0)) + if (sc->state == PROXY_SC_S_CREATED) { - clib_spinlock_unlock_if_init (&pm->sessions_lock); + proxy_session_start_connect (sc, s); + sc->state = PROXY_SC_S_CONNECTING; return 0; } - max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue); - actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ , - max_dequeue, pm->rx_buf[thread_index]); + clib_spinlock_lock_if_init (&pm->sessions_lock); - /* $$$ your message in this space: parse url, etc. */ + ps = proxy_session_get (sc->ps_index); + sc->pair = ps->ao; - clib_memset (a, 0, sizeof (*a)); + clib_spinlock_unlock_if_init (&pm->sessions_lock); - ps->server_rx_fifo = rx_fifo; - ps->server_tx_fifo = tx_fifo; - ps->active_open_establishing = 1; - ps_index = ps->ps_index; + if (sc->pair.session_handle == SESSION_INVALID_HANDLE) + return 0; - clib_spinlock_unlock_if_init (&pm->sessions_lock); + sc->state = PROXY_SC_S_ESTABLISHED; + } - clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep)); - a->api_context = ps_index; - a->app_index = pm->active_open_app_index; + ao_tx_fifo = s->rx_fifo; - if (proxy_transport_needs_crypto (a->sep.transport_proto)) - { - session_endpoint_alloc_ext_cfg (&a->sep_ext, - TRANSPORT_ENDPT_EXT_CFG_CRYPTO); - a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index; - } + /* + * Send event for active open tx fifo + */ + if (svm_fifo_set_event (ao_tx_fifo)) + session_program_tx_io_evt (sc->pair.session_handle, SESSION_IO_EVT_TX); - proxy_program_connect (a); - } + if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS) + svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); return 0; } @@ -470,8 +562,8 @@ proxy_force_ack (void *handlep) static int proxy_tx_callback (session_t * proxy_s) { - proxy_main_t *pm = &proxy_main; - proxy_session_t *ps; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; u32 min_free; min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10); @@ -481,21 +573,17 @@ proxy_tx_callback (session_t * proxy_s) return 0; } - clib_spinlock_lock_if_init (&pm->sessions_lock); - - ps = proxy_session_get (proxy_s->opaque); - - if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE) - goto unlock; + wrk = proxy_worker_get (proxy_s->thread_index); + sc = proxy_session_side_ctx_get (wrk, proxy_s->opaque); + if (sc->state < PROXY_SC_S_ESTABLISHED) + return 0; /* Force ack on active open side to update rcv wnd. Make sure it's done on * the right thread */ - void *arg = uword_to_pointer (ps->vpp_active_open_handle, void *); - session_send_rpc_evt_to_thread (ps->server_rx_fifo->master_thread_index, - proxy_force_ack, arg); - -unlock: - clib_spinlock_unlock_if_init (&pm->sessions_lock); + void *arg = uword_to_pointer (sc->pair.session_handle, void *); + session_send_rpc_evt_to_thread ( + session_thread_from_handle (sc->pair.session_handle), proxy_force_ack, + arg); return 0; } @@ -504,7 +592,10 @@ static void proxy_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf) { if (ntf == SESSION_CLEANUP_TRANSPORT) - return; + { + proxy_try_side_ctx_cleanup (s); + return; + } proxy_try_delete_session (s, 0 /* is_active_open */ ); } @@ -530,10 +621,17 @@ active_open_alloc_session_fifos (session_t *s) clib_spinlock_lock_if_init (&pm->sessions_lock); + /* Active open opaque is pointing at proxy session */ ps = proxy_session_get (s->opaque); - txf = ps->server_rx_fifo; - rxf = ps->server_tx_fifo; + if (ps->po_disconnected) + { + clib_spinlock_unlock_if_init (&pm->sessions_lock); + return SESSION_E_ALLOC; + } + + txf = ps->po.rx_fifo; + rxf = ps->po.tx_fifo; /* * Reset the active-open tx-fifo master indices so the active-open session @@ -564,31 +662,35 @@ active_open_connected_callback (u32 app_index, u32 opaque, { proxy_main_t *pm = &proxy_main; proxy_session_t *ps; - u8 thread_index = vlib_get_thread_index (); - - /* - * Setup proxy session handle. - */ - clib_spinlock_lock_if_init (&pm->sessions_lock); - - ps = proxy_session_get (opaque); + proxy_worker_t *wrk; + proxy_session_side_ctx_t *sc; /* Connection failed */ if (err) { - vnet_disconnect_args_t _a, *a = &_a; + clib_spinlock_lock_if_init (&pm->sessions_lock); - a->handle = ps->vpp_server_handle; - a->app_index = pm->server_app_index; - vnet_disconnect_session (a); - ps->po_disconnected = 1; - } - else - { - ps->vpp_active_open_handle = session_handle (s); - ps->active_open_establishing = 0; + ps = proxy_session_get (opaque); + ps->ao_disconnected = 1; + proxy_session_close_po (ps); + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + return 0; } + wrk = proxy_worker_get (s->thread_index); + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (opaque); + + ps->ao.rx_fifo = s->rx_fifo; + ps->ao.tx_fifo = s->tx_fifo; + ps->ao.session_handle = session_handle (s); + + ps->active_open_establishing = 0; + /* Passive open session was already closed! */ if (ps->po_disconnected) { @@ -598,16 +700,21 @@ active_open_connected_callback (u32 app_index, u32 opaque, return -1; } - s->opaque = opaque; + sc = proxy_session_side_ctx_alloc (wrk); + sc->pair = ps->po; + sc->ps_index = ps->ps_index; clib_spinlock_unlock_if_init (&pm->sessions_lock); + sc->state = PROXY_SC_S_ESTABLISHED; + s->opaque = sc->sc_index; + /* * Send event for active open tx fifo */ - ASSERT (s->thread_index == thread_index); + ASSERT (s->thread_index == vlib_get_thread_index ()); if (svm_fifo_set_event (s->tx_fifo)) - session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX); + session_program_tx_io_evt (session_handle (s), SESSION_IO_EVT_TX); return 0; } @@ -658,8 +765,8 @@ active_open_rx_callback (session_t * s) static int active_open_tx_callback (session_t * ao_s) { - proxy_main_t *pm = &proxy_main; - proxy_session_t *ps; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; u32 min_free; min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10); @@ -669,22 +776,17 @@ active_open_tx_callback (session_t * ao_s) return 0; } - clib_spinlock_lock_if_init (&pm->sessions_lock); - - ps = proxy_session_get_if_valid (ao_s->opaque); - if (!ps) - goto unlock; + wrk = proxy_worker_get (ao_s->thread_index); + sc = proxy_session_side_ctx_get (wrk, ao_s->opaque); - if (ps->vpp_server_handle == SESSION_INVALID_HANDLE) - goto unlock; + if (sc->state < PROXY_SC_S_ESTABLISHED) + return 0; /* Force ack on proxy side to update rcv wnd */ - void *arg = uword_to_pointer (ps->vpp_server_handle, void *); + void *arg = uword_to_pointer (sc->pair.session_handle, void *); session_send_rpc_evt_to_thread ( - session_thread_from_handle (ps->vpp_server_handle), proxy_force_ack, arg); - -unlock: - clib_spinlock_unlock_if_init (&pm->sessions_lock); + session_thread_from_handle (sc->pair.session_handle), proxy_force_ack, + arg); return 0; } @@ -834,15 +936,25 @@ proxy_server_create (vlib_main_t * vm) { vlib_thread_main_t *vtm = vlib_get_thread_main (); proxy_main_t *pm = &proxy_main; + proxy_worker_t *wrk; u32 num_threads; int i; + if (vlib_num_workers ()) + clib_spinlock_init (&pm->sessions_lock); + num_threads = 1 /* main thread */ + vtm->n_threads; vec_validate (pm->rx_buf, num_threads - 1); for (i = 0; i < num_threads; i++) vec_validate (pm->rx_buf[i], pm->rcv_buffer_size); + vec_validate (pm->workers, vlib_num_workers ()); + vec_foreach (wrk, pm->workers) + { + clib_spinlock_init (&wrk->pending_connects_lock); + } + proxy_server_add_ckpair (); if (proxy_server_attach ()) @@ -886,9 +998,6 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, pm->private_segment_count = 0; pm->segment_size = 512 << 20; - if (vlib_num_workers ()) - clib_spinlock_init (&pm->sessions_lock); - if (!unformat_user (input, unformat_line_input, line_input)) return 0; |