diff options
Diffstat (limited to 'src/vcl/vcl_locked.c')
-rw-r--r-- | src/vcl/vcl_locked.c | 242 |
1 files changed, 236 insertions, 6 deletions
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c index 6254bad09b6..c6c79cdd470 100644 --- a/src/vcl/vcl_locked.c +++ b/src/vcl/vcl_locked.c @@ -18,11 +18,12 @@ typedef struct vcl_locked_session_ { + clib_spinlock_t lock; u32 session_index; u32 worker_index; u32 vls_index; u32 flags; - clib_spinlock_t lock; + u32 *workers_subscribed; } vcl_locked_session_t; typedef struct vcl_main_ @@ -62,7 +63,7 @@ vls_table_wunlock (void) static inline vcl_session_handle_t vls_to_sh (vcl_locked_session_t * vls) { - return vppcom_session_handle (vls->session_index); + return vcl_session_handle_from_index (vls->session_index); } static inline vcl_session_handle_t @@ -164,6 +165,86 @@ vls_get_and_free (vls_handle_t vlsh) vls_table_wunlock (); } +u8 +vls_is_shared (vcl_locked_session_t * vls) +{ + return vec_len (vls->workers_subscribed); +} + +int +vls_unshare_session (vcl_locked_session_t * vls) +{ + vcl_worker_t *wrk = vcl_worker_get_current (); + vcl_session_t *s; + int i; + + for (i = 0; i < vec_len (vls->workers_subscribed); i++) + { + if (vls->workers_subscribed[i] != wrk->wrk_index) + continue; + + s = vcl_session_get (wrk, vls->session_index); + if (s->rx_fifo) + { + svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index); + svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index); + } + vec_del1 (vls->workers_subscribed, i); + vcl_session_cleanup (wrk, s, vcl_session_handle (s), + 0 /* do_disconnect */ ); + return 0; + } + + /* Assumption is that unshare is only called if session is shared. + * So shared_workers must be non-empty if the worker is the owner */ + if (vls->worker_index == wrk->wrk_index) + { + s = vcl_session_get (wrk, vls->session_index); + vls->worker_index = vls->workers_subscribed[0]; + vec_del1 (vls->workers_subscribed, 0); + vcl_send_session_worker_update (wrk, s, vls->worker_index); + if (vec_len (vls->workers_subscribed)) + clib_warning ("more workers need to be updated"); + } + + return 0; +} + +void +vls_share_vcl_session (vcl_worker_t * wrk, vcl_session_t * s) +{ + vcl_locked_session_t *vls; + + vls = vls_get_w_dlock (vls_session_index_to_vlsh (s->session_index)); + if (!vls) + return; + vec_add1 (vls->workers_subscribed, wrk->wrk_index); + if (s->rx_fifo) + { + svm_fifo_add_subscriber (s->rx_fifo, wrk->vpp_wrk_index); + svm_fifo_add_subscriber (s->tx_fifo, wrk->vpp_wrk_index); + } + vls_dunlock (vls); +} + +void +vls_worker_copy_on_fork (vcl_worker_t * parent_wrk) +{ + vcl_worker_t *wrk = vcl_worker_get_current (); + vcl_session_t *s; + + wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues); + wrk->sessions = pool_dup (parent_wrk->sessions); + wrk->session_index_by_vpp_handles = + hash_dup (parent_wrk->session_index_by_vpp_handles); + + /* *INDENT-OFF* */ + pool_foreach (s, wrk->sessions, ({ + vls_share_vcl_session (wrk, s); + })); + /* *INDENT-ON* */ +} + int vls_write (vls_handle_t vlsh, void *buf, size_t nbytes) { @@ -325,13 +406,19 @@ vls_close (vls_handle_t vlsh) { vcl_locked_session_t *vls; vcl_session_handle_t sh; - int rv, refcnt; + int rv; if (!(vls = vls_get_w_dlock (vlsh))) return VPPCOM_EBADFD; + if (vls_is_shared (vls)) + { + vls_unshare_session (vls); + vls_dunlock (vls); + return VPPCOM_OK; + } + sh = vls_to_sh (vls); - refcnt = vppcom_session_attr (sh, VPPCOM_ATTR_GET_REFCNT, 0, 0); if ((rv = vppcom_session_close (sh))) { vls_dunlock (vls); @@ -339,8 +426,7 @@ vls_close (vls_handle_t vlsh) } vls_dunlock (vls); - if (refcnt <= 1) - vls_get_and_free (vlsh); + vls_get_and_free (vlsh); return rv; } @@ -438,6 +524,148 @@ vls_session_index_to_vlsh (uint32_t session_index) return vlsh; } +static void +vls_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk) +{ + vcl_worker_t *sub_child; + int tries = 0; + + if (child_wrk->forked_child != ~0) + { + sub_child = vcl_worker_get_if_valid (child_wrk->forked_child); + if (sub_child) + { + /* Wait a bit, maybe the process is going away */ + while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50) + usleep (1e3); + if (kill (sub_child->current_pid, 0) < 0) + vls_cleanup_forked_child (child_wrk, sub_child); + } + } + vcl_worker_cleanup (child_wrk, 1 /* notify vpp */ ); + VDBG (0, "Cleaned up wrk %u", child_wrk->wrk_index); + wrk->forked_child = ~0; +} + +static struct sigaction old_sa; + +static void +vls_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc) +{ + vcl_worker_t *wrk, *child_wrk; + + if (vcl_get_worker_index () == ~0) + return; + + if (sigaction (SIGCHLD, &old_sa, 0)) + { + VERR ("couldn't restore sigchld"); + exit (-1); + } + + wrk = vcl_worker_get_current (); + if (wrk->forked_child == ~0) + return; + + child_wrk = vcl_worker_get_if_valid (wrk->forked_child); + if (!child_wrk) + goto done; + + if (si && si->si_pid != child_wrk->current_pid) + { + VDBG (0, "unexpected child pid %u", si->si_pid); + goto done; + } + vls_cleanup_forked_child (wrk, child_wrk); + +done: + if (old_sa.sa_flags & SA_SIGINFO) + { + void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction; + fn (signum, si, uc); + } + else + { + void (*fn) (int) = old_sa.sa_handler; + if (fn) + fn (signum); + } +} + +static void +vls_incercept_sigchld () +{ + struct sigaction sa; + clib_memset (&sa, 0, sizeof (sa)); + sa.sa_sigaction = vls_intercept_sigchld_handler; + sa.sa_flags = SA_SIGINFO; + if (sigaction (SIGCHLD, &sa, &old_sa)) + { + VERR ("couldn't intercept sigchld"); + exit (-1); + } +} + +static void +vls_app_pre_fork (void) +{ + vls_incercept_sigchld (); + vcl_flush_mq_events (); +} + +static void +vls_app_fork_child_handler (void) +{ + vcl_worker_t *parent_wrk; + int rv, parent_wrk_index; + u8 *child_name; + + parent_wrk_index = vcl_get_worker_index (); + VDBG (0, "initializing forked child %u with parent wrk %u", getpid (), + parent_wrk_index); + + /* + * Allocate worker + */ + vcl_set_worker_index (~0); + if (!vcl_worker_alloc_and_init ()) + VERR ("couldn't allocate new worker"); + + /* + * Attach to binary api + */ + child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0); + vcl_cleanup_bapi (); + vppcom_api_hookup (); + vcm->app_state = STATE_APP_START; + rv = vppcom_connect_to_vpp ((char *) child_name); + vec_free (child_name); + if (rv) + { + VERR ("couldn't connect to VPP!"); + return; + } + + /* + * Register worker with vpp and share sessions + */ + vcl_worker_register_with_vpp (); + parent_wrk = vcl_worker_get (parent_wrk_index); + vls_worker_copy_on_fork (parent_wrk); + parent_wrk->forked_child = vcl_get_worker_index (); + + VDBG (0, "forked child main worker initialized"); + vcm->forking = 0; +} + +static void +vls_app_fork_parent_handler (void) +{ + vcm->forking = 1; + while (vcm->forking) + ; +} + int vls_app_create (char *app_name) { @@ -445,6 +673,8 @@ vls_app_create (char *app_name) if ((rv = vppcom_app_create (app_name))) return rv; clib_rwlock_init (&vlsm->vls_table_lock); + pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler, + vls_app_fork_child_handler); return VPPCOM_OK; } |