aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/vnet/session/application_local.c51
1 files changed, 44 insertions, 7 deletions
diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c
index 3a1d48b1a7e..db8d721b631 100644
--- a/src/vnet/session/application_local.c
+++ b/src/vnet/session/application_local.c
@@ -49,7 +49,9 @@ typedef struct ct_cleanup_req_
typedef struct ct_worker_
{
ct_connection_t *connections; /**< Per-worker connection pools */
+ u32 *pending_connects; /**< Fifo of pending ho indices */
ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */
+ u8 have_connects; /**< Set if connect rpc pending */
u8 have_cleanups; /**< Set if cleanup rpc pending */
} ct_worker_t;
@@ -631,22 +633,20 @@ ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
}
static void
-ct_accept_rpc_wrk_handler (void *accept_args)
+ct_accept_one (u32 thread_index, u32 ho_index)
{
- u32 cct_index, ho_index, thread_index, ll_index;
ct_connection_t *sct, *cct, *ho;
transport_connection_t *ll_ct;
app_worker_t *server_wrk;
+ u32 cct_index, ll_index;
session_t *ss, *ll;
/*
* Alloc client ct and initialize from ho
*/
- thread_index = vlib_get_thread_index ();
cct = ct_connection_alloc (thread_index);
cct_index = cct->c_c_index;
- ho_index = pointer_to_uword (accept_args);
ho = ct_connection_get (ho_index, 0);
/* Unlikely but half-open session and transport could have been freed */
@@ -738,6 +738,34 @@ ct_accept_rpc_wrk_handler (void *accept_args)
}
}
+static void
+ct_accept_rpc_wrk_handler (void *rpc_args)
+{
+ u32 thread_index, ho_index, n_connects, i, n_pending;
+ const u32 max_connects = 32;
+ ct_worker_t *wrk;
+
+ thread_index = pointer_to_uword (rpc_args);
+ wrk = ct_worker_get (thread_index);
+
+ /* Sub without lock as main enqueues with worker barrier */
+ n_pending = clib_fifo_elts (wrk->pending_connects);
+ n_connects = clib_min (n_pending, max_connects);
+
+ for (i = 0; i < n_connects; i++)
+ {
+ clib_fifo_sub1 (wrk->pending_connects, ho_index);
+ ct_accept_one (thread_index, ho_index);
+ }
+
+ if (n_pending == n_connects)
+ wrk->have_connects = 0;
+ else
+ session_send_rpc_evt_to_thread_force (
+ thread_index, ct_accept_rpc_wrk_handler,
+ uword_to_pointer (thread_index, void *));
+}
+
static int
ct_connect (app_worker_t * client_wrk, session_t * ll,
session_endpoint_cfg_t * sep)
@@ -745,6 +773,7 @@ ct_connect (app_worker_t * client_wrk, session_t * ll,
u32 thread_index, ho_index;
ct_main_t *cm = &ct_main;
ct_connection_t *ho;
+ ct_worker_t *wrk;
/* Simple round-robin policy for spreading sessions over workers. We skip
* thread index 0, i.e., offset the index by 1, when we have workers as it
@@ -777,9 +806,17 @@ ct_connect (app_worker_t * client_wrk, session_t * ll,
* after server accepts the connection.
*/
- session_send_rpc_evt_to_thread_force (thread_index,
- ct_accept_rpc_wrk_handler,
- uword_to_pointer (ho_index, void *));
+ wrk = ct_worker_get (thread_index);
+
+ /* Worker barrier held, add without additional lock */
+ clib_fifo_add1 (wrk->pending_connects, ho_index);
+ if (!wrk->have_connects)
+ {
+ wrk->have_connects = 1;
+ session_send_rpc_evt_to_thread_force (
+ thread_index, ct_accept_rpc_wrk_handler,
+ uword_to_pointer (thread_index, void *));
+ }
return ho_index;
}