aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/vlib/main.c7
-rw-r--r--src/vlib/main.h4
-rw-r--r--src/vlib/threads.c3
-rw-r--r--src/vlibmemory/memory_api.c34
-rw-r--r--src/vlibmemory/memory_api.h2
-rw-r--r--src/vlibmemory/vlib_api.c43
6 files changed, 62 insertions, 31 deletions
diff --git a/src/vlib/main.c b/src/vlib/main.c
index a6ad4032dae..df7c2f1e007 100644
--- a/src/vlib/main.c
+++ b/src/vlib/main.c
@@ -1534,7 +1534,10 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
vlib_node_runtime_t *n;
if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0))
- vl_api_send_pending_rpc_requests (vm);
+ {
+ if (!is_main)
+ vl_api_send_pending_rpc_requests (vm);
+ }
if (!is_main)
{
@@ -1842,6 +1845,8 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
vec_validate (vm->pending_rpc_requests, 0);
_vec_len (vm->pending_rpc_requests) = 0;
+ vec_validate (vm->processing_rpc_requests, 0);
+ _vec_len (vm->processing_rpc_requests) = 0;
switch (clib_setjmp (&vm->main_loop_exit, VLIB_MAIN_LOOP_EXIT_NONE))
{
diff --git a/src/vlib/main.h b/src/vlib/main.h
index 7c34fb6528d..1cf8fe188b1 100644
--- a/src/vlib/main.h
+++ b/src/vlib/main.h
@@ -207,8 +207,10 @@ typedef struct vlib_main_t
/* Earliest barrier can be closed again */
f64 barrier_no_close_before;
- /* Vector of pending RPC requests */
+ /* RPC requests, main thread only */
uword *pending_rpc_requests;
+ uword *processing_rpc_requests;
+ clib_spinlock_t pending_rpc_lock;
} vlib_main_t;
diff --git a/src/vlib/threads.c b/src/vlib/threads.c
index c99458ddaec..7ecfa309378 100644
--- a/src/vlib/threads.c
+++ b/src/vlib/threads.c
@@ -699,6 +699,9 @@ start_workers (vlib_main_t * vm)
vlib_worker_threads->node_reforks_required =
clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
+ /* We'll need the rpc vector lock... */
+ clib_spinlock_init (&vm->pending_rpc_lock);
+
/* Ask for an initial barrier sync */
*vlib_worker_threads->workers_at_barrier = 0;
*vlib_worker_threads->wait_at_barrier = 1;
diff --git a/src/vlibmemory/memory_api.c b/src/vlibmemory/memory_api.c
index a444ec785a7..7a7644a0998 100644
--- a/src/vlibmemory/memory_api.c
+++ b/src/vlibmemory/memory_api.c
@@ -103,6 +103,14 @@ memclnt_queue_callback (vlib_main_t * vm)
break;
}
}
+ if (vec_len (vm->pending_rpc_requests))
+ {
+ vm->queue_signal_pending = 1;
+ vm->api_queue_nonempty = 1;
+ vlib_process_signal_event (vm, vl_api_clnt_node.index,
+ /* event_type */ QUEUE_SIGNAL_EVENT,
+ /* event_data */ 0);
+ }
}
/*
@@ -704,6 +712,32 @@ vl_mem_api_handle_msg_main (vlib_main_t * vm, vlib_node_runtime_t * node)
}
int
+vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
+{
+ api_main_t *am = &api_main;
+ int i;
+ uword *tmp, mp;
+
+ /*
+ * Swap pending and processing vectors, then process the RPCs
+ * Avoid deadlock conditions by construction.
+ */
+ clib_spinlock_lock_if_init (&vm->pending_rpc_lock);
+ tmp = vm->processing_rpc_requests;
+ vec_reset_length (tmp);
+ vm->processing_rpc_requests = vm->pending_rpc_requests;
+ vm->pending_rpc_requests = tmp;
+ clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);
+
+ for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)
+ {
+ mp = vm->processing_rpc_requests[i];
+ vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
+ }
+ return 0;
+}
+
+int
vl_mem_api_handle_msg_private (vlib_main_t * vm, vlib_node_runtime_t * node,
u32 reg_index)
{
diff --git a/src/vlibmemory/memory_api.h b/src/vlibmemory/memory_api.h
index 4cda04b3d9f..f6580067d46 100644
--- a/src/vlibmemory/memory_api.h
+++ b/src/vlibmemory/memory_api.h
@@ -32,6 +32,8 @@ void vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm,
int vl_mem_api_handle_msg_main (vlib_main_t * vm, vlib_node_runtime_t * node);
int vl_mem_api_handle_msg_private (vlib_main_t * vm,
vlib_node_runtime_t * node, u32 reg_index);
+int vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node);
+
vl_api_registration_t *vl_mem_api_client_index_to_registration (u32 handle);
void vl_mem_api_enable_disable (vlib_main_t * vm, int yesno);
u32 vl_api_memclnt_create_internal (char *, svm_queue_t *);
diff --git a/src/vlibmemory/vlib_api.c b/src/vlibmemory/vlib_api.c
index b72f1335c29..16e6402d9f0 100644
--- a/src/vlibmemory/vlib_api.c
+++ b/src/vlibmemory/vlib_api.c
@@ -346,7 +346,8 @@ vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,
start_time = vlib_time_now (vm);
while (1)
{
- if (vl_mem_api_handle_msg_main (vm, node))
+ if (vl_mem_api_handle_rpc (vm, node)
+ || vl_mem_api_handle_msg_main (vm, node))
{
vm->api_queue_nonempty = 0;
VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0);
@@ -564,36 +565,14 @@ vl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp)
void
vl_api_send_pending_rpc_requests (vlib_main_t * vm)
{
- api_main_t *am = &api_main;
- vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
- svm_queue_t *q;
- int i;
-
- /*
- * Use the "normal" control-plane mechanism for the main thread.
- * Well, almost. if the main input queue is full, we cannot
- * block. Otherwise, we can expect a barrier sync timeout.
- */
- q = shmem_hdr->vl_input_queue;
+ vlib_main_t *vm_global = &vlib_global_main;
- for (i = 0; i < vec_len (vm->pending_rpc_requests); i++)
- {
- while (pthread_mutex_trylock (&q->mutex))
- vlib_worker_thread_barrier_check ();
+ ASSERT (vm != vm_global);
- while (PREDICT_FALSE (svm_queue_is_full (q)))
- {
- pthread_mutex_unlock (&q->mutex);
- vlib_worker_thread_barrier_check ();
- while (pthread_mutex_trylock (&q->mutex))
- vlib_worker_thread_barrier_check ();
- }
-
- vl_msg_api_send_shmem_nolock (q, (u8 *) (vm->pending_rpc_requests + i));
-
- pthread_mutex_unlock (&q->mutex);
- }
- _vec_len (vm->pending_rpc_requests) = 0;
+ clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
+ vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests);
+ vec_reset_length (vm->pending_rpc_requests);
+ clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
}
always_inline void
@@ -601,6 +580,7 @@ vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,
u8 force_rpc)
{
vl_api_rpc_call_t *mp;
+ vlib_main_t *vm_global = &vlib_global_main;
vlib_main_t *vm = vlib_get_main ();
/* Main thread and not a forced RPC: call the function directly */
@@ -626,7 +606,12 @@ vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,
mp->function = pointer_to_uword (fp);
mp->need_barrier_sync = 1;
+ /* Add to the pending vector. Thread 0 requires locking. */
+ if (vm == vm_global)
+ clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
vec_add1 (vm->pending_rpc_requests, (uword) mp);
+ if (vm == vm_global)
+ clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
}
/*