From 053a0e44edb21713e0825f9c09ba4af12e686b38 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Tue, 13 Nov 2018 15:52:38 -0800 Subject: vcl/session: apps with process workers Allow apps to register child processes as app workers. In particular, on fork vcl now registers the child process with vpp as a new worker. Change-Id: I52a65fbc3292962b1f6e1fe0f6153f739e6e0d4a Signed-off-by: Florin Coras --- src/vcl/vcl_bapi.c | 38 +++++++++++-------------- src/vcl/vcl_cfg.c | 15 +++++++--- src/vcl/vcl_debug.h | 11 ++++++- src/vcl/vcl_private.h | 19 +++++++++++-- src/vcl/vppcom.c | 79 ++++++++++++++++++++++++++++++++++++++++----------- 5 files changed, 118 insertions(+), 44 deletions(-) (limited to 'src/vcl') diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c index ea8f2ce5651..7dd601cf737 100644 --- a/src/vcl/vcl_bapi.c +++ b/src/vcl/vcl_bapi.c @@ -148,17 +148,13 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t * format_api_error, ntohl (mp->retval)); goto failed; } - wrk_index = clib_net_to_host_u32 (mp->wrk_index); - if (mp->context != wrk_index) - { - clib_warning ("VCL<%d>: wrk numbering doesn't match ours: %u, vpp: %u", - getpid (), mp->context, wrk_index); - goto failed; - } + wrk_index = mp->context; + wrk = vcl_worker_get (wrk_index); + wrk->vpp_wrk_index = clib_net_to_host_u32 (mp->wrk_index); + if (!mp->is_add) return; - wrk = vcl_worker_get (wrk_index); wrk->app_event_queue = uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *); @@ -193,6 +189,7 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t * goto failed; } vcm->app_state = STATE_APP_READY; + VDBG (0, "worker %u vpp-worker %u added", wrk_index, wrk->vpp_wrk_index); return; failed: @@ -432,7 +429,7 @@ vcl_send_app_worker_add_del (u8 is_add) mp->_vl_msg_id = ntohs (VL_API_APP_WORKER_ADD_DEL); mp->client_index = vcm->my_client_index; - mp->app_api_index = clib_host_to_net_u32 (vcm->my_client_index); + mp->app_api_index = clib_host_to_net_u32 (vcm->main_api_client_index); mp->context = wrk_index; mp->is_add = is_add; if (!is_add) @@ -444,6 +441,7 @@ vcl_send_app_worker_add_del (u8 is_add) void vppcom_send_connect_sock (vcl_session_t * session) { + vcl_worker_t *wrk = vcl_worker_get_current (); vl_api_connect_sock_t *cmp; cmp = vl_msg_api_alloc (sizeof (*cmp)); @@ -451,7 +449,7 @@ vppcom_send_connect_sock (vcl_session_t * session) cmp->_vl_msg_id = ntohs (VL_API_CONNECT_SOCK); cmp->client_index = vcm->my_client_index; cmp->context = session->session_index; - cmp->wrk_index = vcl_get_worker_index (); + cmp->wrk_index = wrk->vpp_wrk_index; cmp->is_ip4 = session->transport.is_ip4; clib_memcpy_fast (cmp->ip, &session->transport.rmt_ip, sizeof (cmp->ip)); cmp->port = session->transport.rmt_port; @@ -479,6 +477,7 @@ vppcom_send_disconnect_session (u64 vpp_handle) void vppcom_send_bind_sock (vcl_session_t * session) { + vcl_worker_t *wrk = vcl_worker_get_current (); vl_api_bind_sock_t *bmp; /* Assumes caller has acquired spinlock: vcm->sessions_lockp */ @@ -488,7 +487,7 @@ vppcom_send_bind_sock (vcl_session_t * session) bmp->_vl_msg_id = ntohs (VL_API_BIND_SOCK); bmp->client_index = vcm->my_client_index; bmp->context = session->session_index; - bmp->wrk_index = vcl_get_worker_index (); + bmp->wrk_index = wrk->vpp_wrk_index; bmp->is_ip4 = session->transport.is_ip4; clib_memcpy_fast (bmp->ip, &session->transport.lcl_ip, sizeof (bmp->ip)); bmp->port = session->transport.lcl_port; @@ -500,6 +499,7 @@ vppcom_send_bind_sock (vcl_session_t * session) void vppcom_send_unbind_sock (u64 vpp_handle) { + vcl_worker_t *wrk = vcl_worker_get_current (); vl_api_unbind_sock_t *ump; ump = vl_msg_api_alloc (sizeof (*ump)); @@ -507,7 +507,7 @@ vppcom_send_unbind_sock (u64 vpp_handle) ump->_vl_msg_id = ntohs (VL_API_UNBIND_SOCK); ump->client_index = vcm->my_client_index; - ump->wrk_index = vcl_get_worker_index (); + ump->wrk_index = wrk->vpp_wrk_index; ump->handle = vpp_handle; vl_msg_api_send_shmem (vcm->vl_input_queue, (u8 *) & ump); } @@ -556,15 +556,13 @@ vppcom_connect_to_vpp (char *app_name) if (vl_socket_client_connect ((char *) vcl_cfg->vpp_api_socket_name, app_name, 0 /* default rx/tx buffer */ )) { - clib_warning ("VCL<%d>: app (%s) socket connect failed!", - getpid (), app_name); + VERR ("app (%s) socket connect failed!", app_name); return VPPCOM_ECONNREFUSED; } if (vl_socket_client_init_shm (0)) { - clib_warning ("VCL<%d>: app (%s) init shm failed!", - getpid (), app_name); + VERR ("app (%s) init shm failed!", app_name); return VPPCOM_ECONNREFUSED; } } @@ -573,14 +571,13 @@ vppcom_connect_to_vpp (char *app_name) if (!vcl_cfg->vpp_api_filename) vcl_cfg->vpp_api_filename = format (0, "/vpe-api%c", 0); - VDBG (0, "VCL<%d>: app (%s) connecting to VPP api (%s)...", getpid (), + VDBG (0, "app (%s) connecting to VPP api (%s)...", app_name, vcl_cfg->vpp_api_filename); if (vl_client_connect_to_vlib ((char *) vcl_cfg->vpp_api_filename, app_name, vcm->cfg.vpp_api_q_length) < 0) { - clib_warning ("VCL<%d>: app (%s) connect failed!", getpid (), - app_name); + VERR ("app (%s) connect failed!", app_name); return VPPCOM_ECONNREFUSED; } @@ -590,8 +587,7 @@ vppcom_connect_to_vpp (char *app_name) vcm->my_client_index = (u32) am->my_client_index; vcm->app_state = STATE_APP_CONN_VPP; - VDBG (0, "VCL<%d>: app (%s) is connected to VPP!", getpid (), app_name); - + VDBG (0, "app (%s) is connected to VPP!", app_name); vcl_evt (VCL_EVT_INIT, vcm); return VPPCOM_OK; } diff --git a/src/vcl/vcl_cfg.c b/src/vcl/vcl_cfg.c index e39e5177908..3e92941d720 100644 --- a/src/vcl/vcl_cfg.c +++ b/src/vcl/vcl_cfg.c @@ -22,6 +22,7 @@ static vppcom_main_t _vppcom_main = { .debug = VPPCOM_DEBUG_INIT, .is_init = 0, + .main_api_client_index = ~0, .my_client_index = ~0 }; @@ -170,6 +171,7 @@ vppcom_cfg_heapsize (char *conf_fname) goto defaulted; } } + free (argv[i]); } defaulted: @@ -231,8 +233,7 @@ vppcom_cfg_read_file (char *conf_fname) if (fstat (fd, &s) < 0) { - VCFG_DBG (0, - "VCL<%d>: failed to stat `%s', using default configuration", + VCFG_DBG (0, "VCL<%d>: failed to stat `%s' using default configuration", getpid (), conf_fname); goto file_done; } @@ -254,17 +255,21 @@ vppcom_cfg_read_file (char *conf_fname) if (unformat (line_input, "vcl {")) { vc_cfg_input = 1; + unformat_free (line_input); continue; } if (vc_cfg_input) { - if (unformat (line_input, "heapsize %lu", &vcl_cfg->heapsize)) + if (unformat (line_input, "heapsize %U", unformat_memory_size, + &vcl_cfg->heapsize)) { VCFG_DBG (0, "VCL<%d>: configured heapsize %lu", getpid (), vcl_cfg->heapsize); } - if (unformat (line_input, "max-workers %u", &vcl_cfg->max_workers)) + else + if (unformat + (line_input, "max-workers %u", &vcl_cfg->max_workers)) { VCFG_DBG (0, "VCL<%d>: configured max-workers %u", getpid (), vcl_cfg->max_workers); @@ -490,6 +495,7 @@ vppcom_cfg_read_file (char *conf_fname) vc_cfg_input = 0; VCFG_DBG (0, "VCL<%d>: completed parsing vppcom config!", getpid ()); + unformat_free (line_input); goto input_done; } else @@ -501,6 +507,7 @@ vppcom_cfg_read_file (char *conf_fname) &line_input->buffer[line_input->index]); } } + unformat_free (line_input); } } diff --git a/src/vcl/vcl_debug.h b/src/vcl/vcl_debug.h index 7b284166503..48ff21af1de 100644 --- a/src/vcl/vcl_debug.h +++ b/src/vcl/vcl_debug.h @@ -23,7 +23,16 @@ #define VDBG(_lvl, _fmt, _args...) \ if (VCL_DBG_ON && vcm->debug > _lvl) \ - clib_warning ("vcl: " _fmt, __vcl_worker_index, ##_args) + clib_warning ("vcl<%d:%d>: " _fmt, vcm->current_pid, \ + __vcl_worker_index, ##_args) + +#define VWRN(_fmt, _args...) \ +clib_warning ("vcl<%d:%d>: " _fmt, vcm->current_pid, \ + __vcl_worker_index, ##_args) + +#define VERR(_fmt, _args...) \ + clib_warning ("vcl<%d:%d>: ERROR " _fmt, vcm->current_pid, \ + __vcl_worker_index, ##_args) #define foreach_vcl_dbg_evt \ _(INIT, "vcl init track") \ diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index d1a40b933a7..7bb0dbc5a66 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -219,8 +219,12 @@ typedef struct vcl_worker_ /* Session pool */ vcl_session_t *sessions; + /** Worker/thread index in current process */ u32 wrk_index; + /** Worker index in vpp*/ + u32 vpp_wrk_index; + /** Message queues epoll fd. Initialized only if using mqs with eventfds */ int mqs_epfd; @@ -272,15 +276,26 @@ typedef struct vppcom_main_t_ u32 debug; pthread_t main_cpu; - /** VPP binary api input queue */ - svm_queue_t *vl_input_queue; + /** Main process pid */ + pid_t main_pid; + + /** Current pid, may be different from main_pid if forked child */ + pid_t current_pid; + + /** Main process api client index. It's used by vpp to identify the app */ + u32 main_api_client_index; /** API client handle */ u32 my_client_index; + /** VPP binary api input queue */ + svm_queue_t *vl_input_queue; + /** State of the connection, shared between msg RX thread and main thread */ volatile app_state_t app_state; + u8 *app_name; + /** VCL configuration */ vppcom_cfg_t cfg; diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 99547377523..669f04fe1e3 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -688,6 +688,52 @@ vppcom_session_disconnect (u32 session_handle) return VPPCOM_OK; } +static void +vcl_cleanup_bapi (void) +{ + api_main_t *am = &api_main; + + am->my_client_index = ~0; + am->my_registration = 0; + am->vl_input_queue = 0; + am->msg_index_by_name_and_crc = 0; + + vl_client_api_unmap (); +} + +void +vcl_app_fork_child_handler (void) +{ + u8 *child_name; + int rv; + + vcm->current_pid = getpid (); + vcl_set_worker_index (0); + + VDBG (0, "initializing forked child"); + child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0); + + vcl_cleanup_bapi (); + vppcom_api_hookup (); + vcm->app_state = STATE_APP_START; + rv = vppcom_connect_to_vpp ((char *) child_name); + vec_free (child_name); + if (rv) + { + VERR ("couldn't connect to VPP!"); + return; + } + + vcm->app_state = STATE_APP_ADDING_WORKER; + vcl_send_app_worker_add_del (1 /* is_add */ ); + if (vcl_wait_for_app_state_change (STATE_APP_READY)) + { + VERR ("failed to add worker to vpp"); + return; + } + VDBG (0, "forked child main worker initialized"); +} + /* * VPPCOM Public API functions */ @@ -704,12 +750,15 @@ vppcom_app_create (char *app_name) vcl_cfg = &vcm->cfg; vcm->main_cpu = pthread_self (); + vcm->main_pid = vcm->current_pid = getpid (); + vcm->app_name = format (0, "%s", app_name); vppcom_init_error_string_table (); svm_fifo_segment_main_init (vcl_cfg->segment_baseva, 20 /* timeout in secs */ ); pool_init_fixed (vcm->workers, vcl_cfg->max_workers); clib_spinlock_init (&vcm->workers_lock); vcl_worker_alloc_and_init (); + pthread_atfork (NULL, NULL, vcl_app_fork_child_handler); } if (vcm->my_client_index == ~0) @@ -721,31 +770,28 @@ vppcom_app_create (char *app_name) rv = vppcom_connect_to_vpp (app_name); if (rv) { - clib_warning ("VCL<%d>: ERROR: couldn't connect to VPP!", - getpid ()); + VERR ("couldn't connect to VPP!"); return rv; } - - VDBG (0, "VCL<%d>: sending session enable", getpid ()); + vcm->main_api_client_index = vcm->my_client_index; + VDBG (0, "sending session enable"); rv = vppcom_app_session_enable (); if (rv) { - clib_warning ("VCL<%d>: ERROR: vppcom_app_session_enable() " - "failed!", getpid ()); + VERR ("vppcom_app_session_enable() failed!"); return rv; } - VDBG (0, "VCL<%d>: sending app attach", getpid ()); + VDBG (0, "sending app attach"); rv = vppcom_app_attach (); if (rv) { - clib_warning ("VCL<%d>: ERROR: vppcom_app_attach() failed!", - getpid ()); + VERR ("vppcom_app_attach() failed!"); return rv; } - VDBG (0, "VCL<%d>: app_name '%s', my_client_index %d (0x%x)", - getpid (), app_name, vcm->my_client_index, vcm->my_client_index); + VDBG (0, "app_name '%s', my_client_index %d (0x%x)", + app_name, vcm->my_client_index, vcm->my_client_index); } return VPPCOM_OK; @@ -760,8 +806,8 @@ vppcom_app_destroy (void) if (vcm->my_client_index == ~0) return; - VDBG (0, "VCL<%d>: detaching from VPP, my_client_index %d (0x%x)", - getpid (), vcm->my_client_index, vcm->my_client_index); + VDBG (0, "detaching from VPP, my_client_index %d (0x%x)", + vcm->my_client_index, vcm->my_client_index); vcl_evt (VCL_EVT_DETACH, vcm); vppcom_app_send_detach (); @@ -770,11 +816,12 @@ vppcom_app_destroy (void) rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED); vcm->cfg.app_timeout = orig_app_timeout; if (PREDICT_FALSE (rv)) - VDBG (0, "VCL<%d>: application detach timed out! returning %d (%s)", - getpid (), rv, vppcom_retval_str (rv)); + VDBG (0, "application detach timed out! returning %d (%s)", + rv, vppcom_retval_str (rv)); vcl_elog_stop (vcm); vl_client_disconnect_from_vlib (); + vec_free (vcm->app_name); vcm->my_client_index = ~0; vcm->app_state = STATE_APP_START; } @@ -798,7 +845,7 @@ vppcom_session_create (u8 proto, u8 is_nonblocking) vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state, is_nonblocking, session_index); - VDBG (0, "VCL<%d>: sid %u", getpid (), session->session_index); + VDBG (0, "created sid %u", session->session_index); return vcl_session_handle (session); } -- cgit 1.2.3-korg