summaryrefslogtreecommitdiffstats
path: root/src/vcl/vcl_locked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/vcl/vcl_locked.c')
-rw-r--r--src/vcl/vcl_locked.c242
1 files changed, 236 insertions, 6 deletions
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c
index 6254bad09b6..c6c79cdd470 100644
--- a/src/vcl/vcl_locked.c
+++ b/src/vcl/vcl_locked.c
@@ -18,11 +18,12 @@
typedef struct vcl_locked_session_
{
+ clib_spinlock_t lock;
u32 session_index;
u32 worker_index;
u32 vls_index;
u32 flags;
- clib_spinlock_t lock;
+ u32 *workers_subscribed;
} vcl_locked_session_t;
typedef struct vcl_main_
@@ -62,7 +63,7 @@ vls_table_wunlock (void)
static inline vcl_session_handle_t
vls_to_sh (vcl_locked_session_t * vls)
{
- return vppcom_session_handle (vls->session_index);
+ return vcl_session_handle_from_index (vls->session_index);
}
static inline vcl_session_handle_t
@@ -164,6 +165,86 @@ vls_get_and_free (vls_handle_t vlsh)
vls_table_wunlock ();
}
+u8
+vls_is_shared (vcl_locked_session_t * vls)
+{
+ return vec_len (vls->workers_subscribed);
+}
+
+int
+vls_unshare_session (vcl_locked_session_t * vls)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ vcl_session_t *s;
+ int i;
+
+ for (i = 0; i < vec_len (vls->workers_subscribed); i++)
+ {
+ if (vls->workers_subscribed[i] != wrk->wrk_index)
+ continue;
+
+ s = vcl_session_get (wrk, vls->session_index);
+ if (s->rx_fifo)
+ {
+ svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
+ svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
+ }
+ vec_del1 (vls->workers_subscribed, i);
+ vcl_session_cleanup (wrk, s, vcl_session_handle (s),
+ 0 /* do_disconnect */ );
+ return 0;
+ }
+
+ /* Assumption is that unshare is only called if session is shared.
+ * So shared_workers must be non-empty if the worker is the owner */
+ if (vls->worker_index == wrk->wrk_index)
+ {
+ s = vcl_session_get (wrk, vls->session_index);
+ vls->worker_index = vls->workers_subscribed[0];
+ vec_del1 (vls->workers_subscribed, 0);
+ vcl_send_session_worker_update (wrk, s, vls->worker_index);
+ if (vec_len (vls->workers_subscribed))
+ clib_warning ("more workers need to be updated");
+ }
+
+ return 0;
+}
+
+void
+vls_share_vcl_session (vcl_worker_t * wrk, vcl_session_t * s)
+{
+ vcl_locked_session_t *vls;
+
+ vls = vls_get_w_dlock (vls_session_index_to_vlsh (s->session_index));
+ if (!vls)
+ return;
+ vec_add1 (vls->workers_subscribed, wrk->wrk_index);
+ if (s->rx_fifo)
+ {
+ svm_fifo_add_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
+ svm_fifo_add_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
+ }
+ vls_dunlock (vls);
+}
+
+void
+vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ vcl_session_t *s;
+
+ wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
+ wrk->sessions = pool_dup (parent_wrk->sessions);
+ wrk->session_index_by_vpp_handles =
+ hash_dup (parent_wrk->session_index_by_vpp_handles);
+
+ /* *INDENT-OFF* */
+ pool_foreach (s, wrk->sessions, ({
+ vls_share_vcl_session (wrk, s);
+ }));
+ /* *INDENT-ON* */
+}
+
int
vls_write (vls_handle_t vlsh, void *buf, size_t nbytes)
{
@@ -325,13 +406,19 @@ vls_close (vls_handle_t vlsh)
{
vcl_locked_session_t *vls;
vcl_session_handle_t sh;
- int rv, refcnt;
+ int rv;
if (!(vls = vls_get_w_dlock (vlsh)))
return VPPCOM_EBADFD;
+ if (vls_is_shared (vls))
+ {
+ vls_unshare_session (vls);
+ vls_dunlock (vls);
+ return VPPCOM_OK;
+ }
+
sh = vls_to_sh (vls);
- refcnt = vppcom_session_attr (sh, VPPCOM_ATTR_GET_REFCNT, 0, 0);
if ((rv = vppcom_session_close (sh)))
{
vls_dunlock (vls);
@@ -339,8 +426,7 @@ vls_close (vls_handle_t vlsh)
}
vls_dunlock (vls);
- if (refcnt <= 1)
- vls_get_and_free (vlsh);
+ vls_get_and_free (vlsh);
return rv;
}
@@ -438,6 +524,148 @@ vls_session_index_to_vlsh (uint32_t session_index)
return vlsh;
}
+static void
+vls_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
+{
+ vcl_worker_t *sub_child;
+ int tries = 0;
+
+ if (child_wrk->forked_child != ~0)
+ {
+ sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
+ if (sub_child)
+ {
+ /* Wait a bit, maybe the process is going away */
+ while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
+ usleep (1e3);
+ if (kill (sub_child->current_pid, 0) < 0)
+ vls_cleanup_forked_child (child_wrk, sub_child);
+ }
+ }
+ vcl_worker_cleanup (child_wrk, 1 /* notify vpp */ );
+ VDBG (0, "Cleaned up wrk %u", child_wrk->wrk_index);
+ wrk->forked_child = ~0;
+}
+
+static struct sigaction old_sa;
+
+static void
+vls_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
+{
+ vcl_worker_t *wrk, *child_wrk;
+
+ if (vcl_get_worker_index () == ~0)
+ return;
+
+ if (sigaction (SIGCHLD, &old_sa, 0))
+ {
+ VERR ("couldn't restore sigchld");
+ exit (-1);
+ }
+
+ wrk = vcl_worker_get_current ();
+ if (wrk->forked_child == ~0)
+ return;
+
+ child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
+ if (!child_wrk)
+ goto done;
+
+ if (si && si->si_pid != child_wrk->current_pid)
+ {
+ VDBG (0, "unexpected child pid %u", si->si_pid);
+ goto done;
+ }
+ vls_cleanup_forked_child (wrk, child_wrk);
+
+done:
+ if (old_sa.sa_flags & SA_SIGINFO)
+ {
+ void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
+ fn (signum, si, uc);
+ }
+ else
+ {
+ void (*fn) (int) = old_sa.sa_handler;
+ if (fn)
+ fn (signum);
+ }
+}
+
+static void
+vls_incercept_sigchld ()
+{
+ struct sigaction sa;
+ clib_memset (&sa, 0, sizeof (sa));
+ sa.sa_sigaction = vls_intercept_sigchld_handler;
+ sa.sa_flags = SA_SIGINFO;
+ if (sigaction (SIGCHLD, &sa, &old_sa))
+ {
+ VERR ("couldn't intercept sigchld");
+ exit (-1);
+ }
+}
+
+static void
+vls_app_pre_fork (void)
+{
+ vls_incercept_sigchld ();
+ vcl_flush_mq_events ();
+}
+
+static void
+vls_app_fork_child_handler (void)
+{
+ vcl_worker_t *parent_wrk;
+ int rv, parent_wrk_index;
+ u8 *child_name;
+
+ parent_wrk_index = vcl_get_worker_index ();
+ VDBG (0, "initializing forked child %u with parent wrk %u", getpid (),
+ parent_wrk_index);
+
+ /*
+ * Allocate worker
+ */
+ vcl_set_worker_index (~0);
+ if (!vcl_worker_alloc_and_init ())
+ VERR ("couldn't allocate new worker");
+
+ /*
+ * Attach to binary api
+ */
+ child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
+ vcl_cleanup_bapi ();
+ vppcom_api_hookup ();
+ vcm->app_state = STATE_APP_START;
+ rv = vppcom_connect_to_vpp ((char *) child_name);
+ vec_free (child_name);
+ if (rv)
+ {
+ VERR ("couldn't connect to VPP!");
+ return;
+ }
+
+ /*
+ * Register worker with vpp and share sessions
+ */
+ vcl_worker_register_with_vpp ();
+ parent_wrk = vcl_worker_get (parent_wrk_index);
+ vls_worker_copy_on_fork (parent_wrk);
+ parent_wrk->forked_child = vcl_get_worker_index ();
+
+ VDBG (0, "forked child main worker initialized");
+ vcm->forking = 0;
+}
+
+static void
+vls_app_fork_parent_handler (void)
+{
+ vcm->forking = 1;
+ while (vcm->forking)
+ ;
+}
+
int
vls_app_create (char *app_name)
{
@@ -445,6 +673,8 @@ vls_app_create (char *app_name)
if ((rv = vppcom_app_create (app_name)))
return rv;
clib_rwlock_init (&vlsm->vls_table_lock);
+ pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler,
+ vls_app_fork_child_handler);
return VPPCOM_OK;
}