diff options
-rw-r--r-- | src/vnet/session/application_local.c | 51 |
1 files changed, 44 insertions, 7 deletions
diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c index 3a1d48b1a7e..db8d721b631 100644 --- a/src/vnet/session/application_local.c +++ b/src/vnet/session/application_local.c @@ -49,7 +49,9 @@ typedef struct ct_cleanup_req_ typedef struct ct_worker_ { ct_connection_t *connections; /**< Per-worker connection pools */ + u32 *pending_connects; /**< Fifo of pending ho indices */ ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */ + u8 have_connects; /**< Set if connect rpc pending */ u8 have_cleanups; /**< Set if cleanup rpc pending */ } ct_worker_t; @@ -631,22 +633,20 @@ ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct, } static void -ct_accept_rpc_wrk_handler (void *accept_args) +ct_accept_one (u32 thread_index, u32 ho_index) { - u32 cct_index, ho_index, thread_index, ll_index; ct_connection_t *sct, *cct, *ho; transport_connection_t *ll_ct; app_worker_t *server_wrk; + u32 cct_index, ll_index; session_t *ss, *ll; /* * Alloc client ct and initialize from ho */ - thread_index = vlib_get_thread_index (); cct = ct_connection_alloc (thread_index); cct_index = cct->c_c_index; - ho_index = pointer_to_uword (accept_args); ho = ct_connection_get (ho_index, 0); /* Unlikely but half-open session and transport could have been freed */ @@ -738,6 +738,34 @@ ct_accept_rpc_wrk_handler (void *accept_args) } } +static void +ct_accept_rpc_wrk_handler (void *rpc_args) +{ + u32 thread_index, ho_index, n_connects, i, n_pending; + const u32 max_connects = 32; + ct_worker_t *wrk; + + thread_index = pointer_to_uword (rpc_args); + wrk = ct_worker_get (thread_index); + + /* Sub without lock as main enqueues with worker barrier */ + n_pending = clib_fifo_elts (wrk->pending_connects); + n_connects = clib_min (n_pending, max_connects); + + for (i = 0; i < n_connects; i++) + { + clib_fifo_sub1 (wrk->pending_connects, ho_index); + ct_accept_one (thread_index, ho_index); + } + + if (n_pending == n_connects) + wrk->have_connects = 0; + else + session_send_rpc_evt_to_thread_force ( + thread_index, ct_accept_rpc_wrk_handler, + uword_to_pointer (thread_index, void *)); +} + static int ct_connect (app_worker_t * client_wrk, session_t * ll, session_endpoint_cfg_t * sep) @@ -745,6 +773,7 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, u32 thread_index, ho_index; ct_main_t *cm = &ct_main; ct_connection_t *ho; + ct_worker_t *wrk; /* Simple round-robin policy for spreading sessions over workers. We skip * thread index 0, i.e., offset the index by 1, when we have workers as it @@ -777,9 +806,17 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, * after server accepts the connection. */ - session_send_rpc_evt_to_thread_force (thread_index, - ct_accept_rpc_wrk_handler, - uword_to_pointer (ho_index, void *)); + wrk = ct_worker_get (thread_index); + + /* Worker barrier held, add without additional lock */ + clib_fifo_add1 (wrk->pending_connects, ho_index); + if (!wrk->have_connects) + { + wrk->have_connects = 1; + session_send_rpc_evt_to_thread_force ( + thread_index, ct_accept_rpc_wrk_handler, + uword_to_pointer (thread_index, void *)); + } return ho_index; } |