diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/vlib/main.c | 7 | ||||
-rw-r--r-- | src/vlib/main.h | 4 | ||||
-rw-r--r-- | src/vlib/threads.c | 3 | ||||
-rw-r--r-- | src/vlibmemory/memory_api.c | 34 | ||||
-rw-r--r-- | src/vlibmemory/memory_api.h | 2 | ||||
-rw-r--r-- | src/vlibmemory/vlib_api.c | 43 |
6 files changed, 62 insertions, 31 deletions
diff --git a/src/vlib/main.c b/src/vlib/main.c index a6ad4032dae..df7c2f1e007 100644 --- a/src/vlib/main.c +++ b/src/vlib/main.c @@ -1534,7 +1534,10 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main) vlib_node_runtime_t *n; if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0)) - vl_api_send_pending_rpc_requests (vm); + { + if (!is_main) + vl_api_send_pending_rpc_requests (vm); + } if (!is_main) { @@ -1842,6 +1845,8 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input) vec_validate (vm->pending_rpc_requests, 0); _vec_len (vm->pending_rpc_requests) = 0; + vec_validate (vm->processing_rpc_requests, 0); + _vec_len (vm->processing_rpc_requests) = 0; switch (clib_setjmp (&vm->main_loop_exit, VLIB_MAIN_LOOP_EXIT_NONE)) { diff --git a/src/vlib/main.h b/src/vlib/main.h index 7c34fb6528d..1cf8fe188b1 100644 --- a/src/vlib/main.h +++ b/src/vlib/main.h @@ -207,8 +207,10 @@ typedef struct vlib_main_t /* Earliest barrier can be closed again */ f64 barrier_no_close_before; - /* Vector of pending RPC requests */ + /* RPC requests, main thread only */ uword *pending_rpc_requests; + uword *processing_rpc_requests; + clib_spinlock_t pending_rpc_lock; } vlib_main_t; diff --git a/src/vlib/threads.c b/src/vlib/threads.c index c99458ddaec..7ecfa309378 100644 --- a/src/vlib/threads.c +++ b/src/vlib/threads.c @@ -699,6 +699,9 @@ start_workers (vlib_main_t * vm) vlib_worker_threads->node_reforks_required = clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES); + /* We'll need the rpc vector lock... */ + clib_spinlock_init (&vm->pending_rpc_lock); + /* Ask for an initial barrier sync */ *vlib_worker_threads->workers_at_barrier = 0; *vlib_worker_threads->wait_at_barrier = 1; diff --git a/src/vlibmemory/memory_api.c b/src/vlibmemory/memory_api.c index a444ec785a7..7a7644a0998 100644 --- a/src/vlibmemory/memory_api.c +++ b/src/vlibmemory/memory_api.c @@ -103,6 +103,14 @@ memclnt_queue_callback (vlib_main_t * vm) break; } } + if (vec_len (vm->pending_rpc_requests)) + { + vm->queue_signal_pending = 1; + vm->api_queue_nonempty = 1; + vlib_process_signal_event (vm, vl_api_clnt_node.index, + /* event_type */ QUEUE_SIGNAL_EVENT, + /* event_data */ 0); + } } /* @@ -704,6 +712,32 @@ vl_mem_api_handle_msg_main (vlib_main_t * vm, vlib_node_runtime_t * node) } int +vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node) +{ + api_main_t *am = &api_main; + int i; + uword *tmp, mp; + + /* + * Swap pending and processing vectors, then process the RPCs + * Avoid deadlock conditions by construction. + */ + clib_spinlock_lock_if_init (&vm->pending_rpc_lock); + tmp = vm->processing_rpc_requests; + vec_reset_length (tmp); + vm->processing_rpc_requests = vm->pending_rpc_requests; + vm->pending_rpc_requests = tmp; + clib_spinlock_unlock_if_init (&vm->pending_rpc_lock); + + for (i = 0; i < vec_len (vm->processing_rpc_requests); i++) + { + mp = vm->processing_rpc_requests[i]; + vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node); + } + return 0; +} + +int vl_mem_api_handle_msg_private (vlib_main_t * vm, vlib_node_runtime_t * node, u32 reg_index) { diff --git a/src/vlibmemory/memory_api.h b/src/vlibmemory/memory_api.h index 4cda04b3d9f..f6580067d46 100644 --- a/src/vlibmemory/memory_api.h +++ b/src/vlibmemory/memory_api.h @@ -32,6 +32,8 @@ void vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, int vl_mem_api_handle_msg_main (vlib_main_t * vm, vlib_node_runtime_t * node); int vl_mem_api_handle_msg_private (vlib_main_t * vm, vlib_node_runtime_t * node, u32 reg_index); +int vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node); + vl_api_registration_t *vl_mem_api_client_index_to_registration (u32 handle); void vl_mem_api_enable_disable (vlib_main_t * vm, int yesno); u32 vl_api_memclnt_create_internal (char *, svm_queue_t *); diff --git a/src/vlibmemory/vlib_api.c b/src/vlibmemory/vlib_api.c index b72f1335c29..16e6402d9f0 100644 --- a/src/vlibmemory/vlib_api.c +++ b/src/vlibmemory/vlib_api.c @@ -346,7 +346,8 @@ vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node, start_time = vlib_time_now (vm); while (1) { - if (vl_mem_api_handle_msg_main (vm, node)) + if (vl_mem_api_handle_rpc (vm, node) + || vl_mem_api_handle_msg_main (vm, node)) { vm->api_queue_nonempty = 0; VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0); @@ -564,36 +565,14 @@ vl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp) void vl_api_send_pending_rpc_requests (vlib_main_t * vm) { - api_main_t *am = &api_main; - vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr; - svm_queue_t *q; - int i; - - /* - * Use the "normal" control-plane mechanism for the main thread. - * Well, almost. if the main input queue is full, we cannot - * block. Otherwise, we can expect a barrier sync timeout. - */ - q = shmem_hdr->vl_input_queue; + vlib_main_t *vm_global = &vlib_global_main; - for (i = 0; i < vec_len (vm->pending_rpc_requests); i++) - { - while (pthread_mutex_trylock (&q->mutex)) - vlib_worker_thread_barrier_check (); + ASSERT (vm != vm_global); - while (PREDICT_FALSE (svm_queue_is_full (q))) - { - pthread_mutex_unlock (&q->mutex); - vlib_worker_thread_barrier_check (); - while (pthread_mutex_trylock (&q->mutex)) - vlib_worker_thread_barrier_check (); - } - - vl_msg_api_send_shmem_nolock (q, (u8 *) (vm->pending_rpc_requests + i)); - - pthread_mutex_unlock (&q->mutex); - } - _vec_len (vm->pending_rpc_requests) = 0; + clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock); + vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests); + vec_reset_length (vm->pending_rpc_requests); + clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock); } always_inline void @@ -601,6 +580,7 @@ vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length, u8 force_rpc) { vl_api_rpc_call_t *mp; + vlib_main_t *vm_global = &vlib_global_main; vlib_main_t *vm = vlib_get_main (); /* Main thread and not a forced RPC: call the function directly */ @@ -626,7 +606,12 @@ vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length, mp->function = pointer_to_uword (fp); mp->need_barrier_sync = 1; + /* Add to the pending vector. Thread 0 requires locking. */ + if (vm == vm_global) + clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock); vec_add1 (vm->pending_rpc_requests, (uword) mp); + if (vm == vm_global) + clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock); } /* |