aboutsummaryrefslogtreecommitdiffstats
path: root/src/vcl/vcl_locked.c
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2019-01-28 15:54:27 -0800
committerDamjan Marion <dmarion@me.com>2019-01-30 09:08:11 +0000
commit2d675d72d378466d1d71a4323891071038ba21d9 (patch)
tree75bfc072ed2fc0ec90bb69f0935f1739a69ca704 /src/vcl/vcl_locked.c
parent822f5a498b3a21ee990a39d32b644eea947b31bb (diff)
vls: support passive listeners
If an application worker calls listen on a session, vpp registers the worker to the listener's work load balance group and, as new connections are accepted, it may potentially push accept notifications to it. There are however applications, like nginx, that on some workers may never accept new connections on a session they've started listening on. To avoid accumulating accept events on such workers, this patch adds support for passive listeners. That is, workers that have started listening on a session but then never call accept or epoll/select on that listener. Change-Id: I007e6dcb54fc88a0e3aab3c6e2a3d1ef135cbd58 Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/vcl/vcl_locked.c')
-rw-r--r--src/vcl/vcl_locked.c306
1 files changed, 236 insertions, 70 deletions
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c
index 8a8d7d9d66b..f5892c17e73 100644
--- a/src/vcl/vcl_locked.c
+++ b/src/vcl/vcl_locked.c
@@ -22,19 +22,31 @@ typedef struct vcl_locked_session_
u32 session_index;
u32 worker_index;
u32 vls_index;
- u32 flags;
u32 *workers_subscribed;
+ clib_bitmap_t *listeners;
} vcl_locked_session_t;
-typedef struct vcl_main_
+typedef struct vls_local_
+{
+ int vls_wrk_index;
+ volatile int vls_mt_n_threads;
+ pthread_mutex_t vls_mt_mq_mlock;
+ pthread_mutex_t vls_mt_spool_mlock;
+ volatile u8 select_mp_check;
+ volatile u8 epoll_mp_check;
+} vls_process_local_t;
+
+static vls_process_local_t vls_local;
+static vls_process_local_t *vlsl = &vls_local;
+
+typedef struct vls_main_
{
vcl_locked_session_t *vls_pool;
clib_rwlock_t vls_table_lock;
uword *session_index_to_vlsh_table;
} vls_main_t;
-vls_main_t vls_main;
-vls_main_t *vlsm = &vls_main;
+vls_main_t *vlsm;
static inline void
vls_table_rlock (void)
@@ -74,40 +86,42 @@ typedef enum
VLS_MT_LOCK_SPOOL = 1 << 1
} vls_mt_lock_type_t;
-static int vls_wrk_index = ~0;
-static volatile int vls_mt_n_threads;
-static pthread_mutex_t vls_mt_mq_mlock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_mutex_t vls_mt_spool_mlock = PTHREAD_MUTEX_INITIALIZER;
-
static void
vls_mt_add (void)
{
- vls_mt_n_threads += 1;
- vcl_set_worker_index (vls_wrk_index);
+ vlsl->vls_mt_n_threads += 1;
+ vcl_set_worker_index (vlsl->vls_wrk_index);
}
static inline void
vls_mt_mq_lock (void)
{
- pthread_mutex_lock (&vls_mt_mq_mlock);
+ pthread_mutex_lock (&vlsl->vls_mt_mq_mlock);
}
static inline void
vls_mt_mq_unlock (void)
{
- pthread_mutex_unlock (&vls_mt_mq_mlock);
+ pthread_mutex_unlock (&vlsl->vls_mt_mq_mlock);
}
static inline void
vls_mt_spool_lock (void)
{
- pthread_mutex_lock (&vls_mt_spool_mlock);
+ pthread_mutex_lock (&vlsl->vls_mt_spool_mlock);
}
static inline void
vls_mt_create_unlock (void)
{
- pthread_mutex_unlock (&vls_mt_spool_mlock);
+ pthread_mutex_unlock (&vlsl->vls_mt_spool_mlock);
+}
+
+static void
+vls_mt_locks_init (void)
+{
+ pthread_mutex_init (&vlsl->vls_mt_mq_mlock, NULL);
+ pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL);
}
static inline vcl_session_handle_t
@@ -182,6 +196,12 @@ vls_get_w_dlock (vls_handle_t vlsh)
}
static inline void
+vls_lock (vcl_locked_session_t * vls)
+{
+ clib_spinlock_lock (&vls->lock);
+}
+
+static inline void
vls_unlock (vcl_locked_session_t * vls)
{
clib_spinlock_unlock (&vls->lock);
@@ -204,6 +224,48 @@ vls_dunlock (vcl_locked_session_t * vls)
vls_table_runlock ();
}
+vcl_session_handle_t
+vlsh_to_sh (vls_handle_t vlsh)
+{
+ vcl_locked_session_t *vls;
+ int rv;
+
+ vls = vls_get_w_dlock (vlsh);
+ if (!vls)
+ return INVALID_SESSION_ID;
+ rv = vls_to_sh (vls);
+ vls_dunlock (vls);
+ return rv;
+}
+
+vcl_session_handle_t
+vlsh_to_session_index (vls_handle_t vlsh)
+{
+ vcl_session_handle_t sh;
+ sh = vlsh_to_sh (vlsh);
+ return vppcom_session_index (sh);
+}
+
+vls_handle_t
+vls_si_to_vlsh (u32 session_index)
+{
+ uword *vlshp;
+ vlshp = hash_get (vlsm->session_index_to_vlsh_table, session_index);
+ return vlshp ? *vlshp : VLS_INVALID_HANDLE;
+}
+
+vls_handle_t
+vls_session_index_to_vlsh (uint32_t session_index)
+{
+ vls_handle_t vlsh;
+
+ vls_table_rlock ();
+ vlsh = vls_si_to_vlsh (session_index);
+ vls_table_runlock ();
+
+ return vlsh;
+}
+
u8
vls_is_shared (vcl_locked_session_t * vls)
{
@@ -220,26 +282,63 @@ vls_is_shared_by_wrk (vcl_locked_session_t * vls, u32 wrk_index)
return 0;
}
+static void
+vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active)
+{
+ clib_bitmap_set (vls->listeners, wrk_index, is_active);
+}
+
+static u8
+vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index)
+{
+ return (clib_bitmap_get (vls->listeners, wrk_index) == 1);
+}
+
+static void
+vls_listener_wrk_start_listen (vcl_locked_session_t * vls, u32 wrk_index)
+{
+ vppcom_session_listen (vls_to_sh (vls), ~0);
+ vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */ );
+}
+
+static void
+vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
+{
+ vcl_worker_t *wrk;
+ vcl_session_t *s;
+
+ wrk = vcl_worker_get (wrk_index);
+ s = vcl_session_get (wrk, vls->session_index);
+ if (s->session_state != STATE_LISTEN)
+ return;
+ vppcom_send_unbind_sock (wrk, s->vpp_handle);
+ s->session_state = STATE_LISTEN_NO_MQ;
+ vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
+}
+
int
vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk)
{
+ int i, do_disconnect;
vcl_session_t *s;
- int i;
+
+ s = vcl_session_get (wrk, vls->session_index);
+ if (s->session_state == STATE_LISTEN)
+ vls_listener_wrk_set (vls, wrk->wrk_index, 0 /* is_active */ );
for (i = 0; i < vec_len (vls->workers_subscribed); i++)
{
if (vls->workers_subscribed[i] != wrk->wrk_index)
continue;
- s = vcl_session_get (wrk, vls->session_index);
if (s->rx_fifo)
{
svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
}
vec_del1 (vls->workers_subscribed, i);
- vcl_session_cleanup (wrk, s, vcl_session_handle (s),
- 0 /* do_disconnect */ );
+ do_disconnect = s->session_state == STATE_LISTEN;
+ vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect);
return 0;
}
@@ -247,8 +346,6 @@ vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk)
if (vls->worker_index != wrk->wrk_index)
return 0;
- s = vcl_session_get (wrk, vls->session_index);
-
/* Check if we can change owner or close */
if (vec_len (vls->workers_subscribed))
{
@@ -272,16 +369,22 @@ vls_share_vcl_session (vcl_worker_t * wrk, vcl_session_t * s)
{
vcl_locked_session_t *vls;
- vls = vls_get_w_dlock (vls_session_index_to_vlsh (s->session_index));
+ vls = vls_get (vls_si_to_vlsh (s->session_index));
if (!vls)
return;
+ vls_lock (vls);
vec_add1 (vls->workers_subscribed, wrk->wrk_index);
if (s->rx_fifo)
{
svm_fifo_add_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
svm_fifo_add_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
}
- vls_dunlock (vls);
+ else if (s->session_state == STATE_LISTEN)
+ {
+ s->session_state = STATE_LISTEN_NO_MQ;
+ }
+
+ vls_unlock (vls);
}
void
@@ -294,12 +397,15 @@ vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
wrk->sessions = pool_dup (parent_wrk->sessions);
wrk->session_index_by_vpp_handles =
hash_dup (parent_wrk->session_index_by_vpp_handles);
+ vls_table_wlock ();
/* *INDENT-OFF* */
pool_foreach (s, wrk->sessions, ({
vls_share_vcl_session (wrk, s);
}));
/* *INDENT-ON* */
+
+ vls_table_wunlock ();
}
static void
@@ -363,7 +469,7 @@ vls_mt_rel_locks (int locks_acq)
int _locks_acq = 0; \
if (PREDICT_FALSE (vcl_get_worker_index () == ~0)); \
vls_mt_add (); \
- if (PREDICT_FALSE (vls_mt_n_threads > 1)) \
+ if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1)) \
vls_mt_acq_locks (_vls, _op, &_locks_acq); \
#define vls_mt_unguard() \
@@ -505,6 +611,46 @@ vls_connect (vls_handle_t vlsh, vppcom_endpt_t * server_ep)
return rv;
}
+static inline void
+vls_mp_checks (vcl_locked_session_t * vls, int is_add)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ vcl_session_t *s;
+
+ s = vcl_session_get (wrk, vls->session_index);
+ switch (s->session_state)
+ {
+ case STATE_LISTEN:
+ if (is_add)
+ {
+ if (vls->worker_index == wrk->wrk_index)
+ vls_listener_wrk_set (vls, wrk->wrk_index, 1 /* is_active */ );
+ break;
+ }
+ vls_listener_wrk_stop_listen (vls, vls->worker_index);
+ break;
+ case STATE_LISTEN_NO_MQ:
+ if (!is_add)
+ break;
+
+ /* Register worker as listener */
+ vls_listener_wrk_start_listen (vls, wrk->wrk_index);
+
+ /* If owner worker did not attempt to accept/xpoll on the session,
+ * force a listen stop for it, since it may not be interested in
+ * accepting new sessions.
+ * This is pretty much a hack done to give app workers the illusion
+ * that it is fine to listen and not accept new sessions for a
+ * given listener. Without it, we would accumulate unhandled
+ * accepts on the passive worker message queue. */
+ if (!vls_listener_wrk_is_active (vls, vls->worker_index))
+ vls_listener_wrk_stop_listen (vls, vls->worker_index);
+ break;
+ default:
+ break;
+ }
+}
+
vls_handle_t
vls_accept (vls_handle_t listener_vlsh, vppcom_endpt_t * ep, int flags)
{
@@ -514,6 +660,8 @@ vls_accept (vls_handle_t listener_vlsh, vppcom_endpt_t * ep, int flags)
if (!(vls = vls_get_w_dlock (listener_vlsh)))
return VPPCOM_EBADFD;
+ if (vcl_n_workers () > 1)
+ vls_mp_checks (vls, 1 /* is_add */ );
vls_mt_guard (vls, VLS_MT_OP_SPOOL);
sh = vppcom_session_accept (vls_to_sh_tu (vls), ep, flags);
vls_mt_unguard ();
@@ -597,6 +745,22 @@ vls_epoll_create (void)
return vlsh;
}
+static void
+vls_epoll_ctl_mp_checks (vcl_locked_session_t * vls, int op)
+{
+ if (vcl_n_workers () <= 1)
+ {
+ vlsl->epoll_mp_check = 1;
+ return;
+ }
+
+ if (op == EPOLL_CTL_MOD)
+ return;
+
+ vlsl->epoll_mp_check = 1;
+ vls_mp_checks (vls, op == EPOLL_CTL_ADD);
+}
+
int
vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh,
struct epoll_event *event)
@@ -610,6 +774,10 @@ vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh,
vls = vls_get_and_lock (vlsh);
ep_sh = vls_to_sh (ep_vls);
sh = vls_to_sh (vls);
+
+ if (PREDICT_FALSE (!vlsl->epoll_mp_check))
+ vls_epoll_ctl_mp_checks (vls, op);
+
vls_table_runlock ();
rv = vppcom_epoll_ctl (ep_sh, op, sh, event);
@@ -640,57 +808,50 @@ vls_epoll_wait (vls_handle_t ep_vlsh, struct epoll_event *events,
return rv;
}
-int
-vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
- vcl_si_set * except_map, double wait_for_time)
-{
- int rv;
- vls_mt_guard (0, VLS_MT_OP_XPOLL);
- rv = vppcom_select (n_bits, read_map, write_map, except_map, wait_for_time);
- vls_mt_unguard ();
- return rv;
-}
-
-vcl_session_handle_t
-vlsh_to_sh (vls_handle_t vlsh)
+static void
+vls_select_mp_checks (vcl_si_set * read_map)
{
vcl_locked_session_t *vls;
- int rv;
+ vcl_worker_t *wrk;
+ vcl_session_t *s;
+ u32 si;
- vls = vls_get_w_dlock (vlsh);
- if (!vls)
- return INVALID_SESSION_ID;
- rv = vls_to_sh (vls);
- vls_dunlock (vls);
- return rv;
-}
+ if (vcl_n_workers () <= 1)
+ {
+ vlsl->select_mp_check = 1;
+ return;
+ }
-vcl_session_handle_t
-vlsh_to_session_index (vls_handle_t vlsh)
-{
- vcl_session_handle_t sh;
- sh = vlsh_to_sh (vlsh);
- return vppcom_session_index (sh);
-}
+ if (!read_map)
+ return;
-vls_handle_t
-vls_si_to_vlsh (u32 session_index)
-{
- uword *vlshp;
- vlshp = hash_get (vlsm->session_index_to_vlsh_table, session_index);
- return vlshp ? *vlshp : VLS_INVALID_HANDLE;
+ vlsl->select_mp_check = 1;
+ wrk = vcl_worker_get_current ();
+
+ /* *INDENT-OFF* */
+ clib_bitmap_foreach (si, read_map, ({
+ s = vcl_session_get (wrk, si);
+ if (s->session_state == STATE_LISTEN)
+ {
+ vls = vls_get (vls_session_index_to_vlsh (si));
+ vls_mp_checks (vls, 1 /* is_add */);
+ }
+ }));
+ /* *INDENT-ON* */
}
-vls_handle_t
-vls_session_index_to_vlsh (uint32_t session_index)
+int
+vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
+ vcl_si_set * except_map, double wait_for_time)
{
- vls_handle_t vlsh;
-
- vls_table_rlock ();
- vlsh = vls_si_to_vlsh (session_index);
- vls_table_runlock ();
+ int rv;
- return vlsh;
+ vls_mt_guard (0, VLS_MT_OP_XPOLL);
+ if (PREDICT_FALSE (!vlsl->select_mp_check))
+ vls_select_mp_checks (read_map);
+ rv = vppcom_select (n_bits, read_map, write_map, except_map, wait_for_time);
+ vls_mt_unguard ();
+ return rv;
}
static void
@@ -855,8 +1016,11 @@ vls_app_fork_child_handler (void)
parent_wrk->forked_child = vcl_get_worker_index ();
/* Reset number of threads and set wrk index */
- vls_mt_n_threads = 0;
- vls_wrk_index = vcl_get_worker_index ();
+ vlsl->vls_mt_n_threads = 0;
+ vlsl->vls_wrk_index = vcl_get_worker_index ();
+ vlsl->select_mp_check = 0;
+ vlsl->epoll_mp_check = 0;
+ vls_mt_locks_init ();
VDBG (0, "forked child main worker initialized");
vcm->forking = 0;
@@ -884,12 +1048,14 @@ vls_app_create (char *app_name)
if ((rv = vppcom_app_create (app_name)))
return rv;
-
+ vlsm = clib_mem_alloc (sizeof (vls_main_t));
+ clib_memset (vlsm, 0, sizeof (*vlsm));
clib_rwlock_init (&vlsm->vls_table_lock);
pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler,
vls_app_fork_child_handler);
atexit (vls_app_exit);
- vls_wrk_index = vcl_get_worker_index ();
+ vlsl->vls_wrk_index = vcl_get_worker_index ();
+ vls_mt_locks_init ();
return VPPCOM_OK;
}