aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/plugins/hs_apps/proxy.c91
-rw-r--r--src/plugins/hs_apps/proxy.h15
2 files changed, 77 insertions, 29 deletions
diff --git a/src/plugins/hs_apps/proxy.c b/src/plugins/hs_apps/proxy.c
index c7e7b2a653c..6b0b2465c56 100644
--- a/src/plugins/hs_apps/proxy.c
+++ b/src/plugins/hs_apps/proxy.c
@@ -24,45 +24,78 @@ proxy_main_t proxy_main;
#define TCP_MSS 1460
-typedef struct
+static void
+proxy_do_connect (vnet_connect_args_t *a)
{
- session_endpoint_cfg_t sep;
- u32 app_index;
- u32 api_context;
-} proxy_connect_args_t;
+ vnet_connect (a);
+ if (a->sep_ext.ext_cfg)
+ clib_mem_free (a->sep_ext.ext_cfg);
+}
static void
-proxy_cb_fn (void *data, u32 data_len)
+proxy_handle_connects_rpc (void *args)
{
- proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
- vnet_connect_args_t a;
+ u32 thread_index = pointer_to_uword (args), n_connects = 0, n_pending;
+ proxy_worker_t *wrk;
+ u32 max_connects;
+
+ wrk = proxy_worker_get (thread_index);
+
+ clib_spinlock_lock (&wrk->pending_connects_lock);
+
+ n_pending = clib_fifo_elts (wrk->pending_connects);
+ max_connects = clib_min (32, n_pending);
+ vec_validate (wrk->burst_connects, max_connects);
+
+ while (n_connects < max_connects)
+ clib_fifo_sub1 (wrk->pending_connects, wrk->burst_connects[n_connects++]);
+
+ clib_spinlock_unlock (&wrk->pending_connects_lock);
+
+ /* Do connects without locking pending_connects */
+ n_connects = 0;
+ while (n_connects < max_connects)
+ {
+ proxy_do_connect (&wrk->burst_connects[n_connects]);
+ n_connects += 1;
+ }
- clib_memset (&a, 0, sizeof (a));
- a.api_context = pa->api_context;
- a.app_index = pa->app_index;
- clib_memcpy (&a.sep_ext, &pa->sep, sizeof (pa->sep));
- vnet_connect (&a);
- if (a.sep_ext.ext_cfg)
- clib_mem_free (a.sep_ext.ext_cfg);
+ /* More work to do, program rpc */
+ if (max_connects < n_pending)
+ session_send_rpc_evt_to_thread_force (
+ transport_cl_thread (), proxy_handle_connects_rpc,
+ uword_to_pointer ((uword) thread_index, void *));
}
static void
-proxy_call_main_thread (vnet_connect_args_t * a)
+proxy_program_connect (vnet_connect_args_t *a)
{
- if (vlib_get_thread_index () == 0)
- {
- vnet_connect (a);
- if (a->sep_ext.ext_cfg)
- clib_mem_free (a->sep_ext.ext_cfg);
- }
- else
+ u32 connects_thread = transport_cl_thread (), thread_index, n_pending;
+ proxy_worker_t *wrk;
+
+ thread_index = vlib_get_thread_index ();
+
+ /* If already on first worker, handle request */
+ if (thread_index == connects_thread)
{
- proxy_connect_args_t args;
- args.api_context = a->api_context;
- args.app_index = a->app_index;
- clib_memcpy (&args.sep, &a->sep_ext, sizeof (a->sep_ext));
- vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
+ proxy_do_connect (a);
+ return;
}
+
+ /* If not on first worker, queue request */
+ wrk = proxy_worker_get (thread_index);
+
+ clib_spinlock_lock (&wrk->pending_connects_lock);
+
+ clib_fifo_add1 (wrk->pending_connects, *a);
+ n_pending = vec_len (wrk->pending_connects);
+
+ clib_spinlock_unlock (&wrk->pending_connects_lock);
+
+ if (n_pending == 1)
+ session_send_rpc_evt_to_thread_force (
+ connects_thread, proxy_handle_connects_rpc,
+ uword_to_pointer ((uword) thread_index, void *));
}
static proxy_session_t *
@@ -415,7 +448,7 @@ proxy_rx_callback (session_t * s)
a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index;
}
- proxy_call_main_thread (a);
+ proxy_program_connect (a);
}
return 0;
diff --git a/src/plugins/hs_apps/proxy.h b/src/plugins/hs_apps/proxy.h
index 26f4de2f729..66361566fa6 100644
--- a/src/plugins/hs_apps/proxy.h
+++ b/src/plugins/hs_apps/proxy.h
@@ -41,8 +41,16 @@ typedef struct
u32 po_thread_index;
} proxy_session_t;
+typedef struct proxy_worker_
+{
+ clib_spinlock_t pending_connects_lock;
+ vnet_connect_args_t *pending_connects;
+ vnet_connect_args_t *burst_connects;
+} proxy_worker_t;
+
typedef struct
{
+ proxy_worker_t *workers; /**< per-thread data */
proxy_session_t *sessions; /**< session pool, shared */
clib_spinlock_t sessions_lock; /**< lock for session pool */
u8 **rx_buf; /**< intermediate rx buffers */
@@ -75,6 +83,13 @@ typedef struct
extern proxy_main_t proxy_main;
+static inline proxy_worker_t *
+proxy_worker_get (u32 thread_index)
+{
+ proxy_main_t *pm = &proxy_main;
+ return vec_elt_at_index (pm->workers, thread_index);
+}
+
#endif /* __included_proxy_h__ */
/*