From 2d675d72d378466d1d71a4323891071038ba21d9 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Mon, 28 Jan 2019 15:54:27 -0800 Subject: vls: support passive listeners If an application worker calls listen on a session, vpp registers the worker to the listener's work load balance group and, as new connections are accepted, it may potentially push accept notifications to it. There are however applications, like nginx, that on some workers may never accept new connections on a session they've started listening on. To avoid accumulating accept events on such workers, this patch adds support for passive listeners. That is, workers that have started listening on a session but then never call accept or epoll/select on that listener. Change-Id: I007e6dcb54fc88a0e3aab3c6e2a3d1ef135cbd58 Signed-off-by: Florin Coras --- src/vcl/vcl_bapi.c | 3 +- src/vcl/vcl_locked.c | 306 ++++++++++++++++++++++++++++++++++++++------------ src/vcl/vcl_private.h | 27 +++-- src/vcl/vppcom.c | 14 ++- 4 files changed, 265 insertions(+), 85 deletions(-) (limited to 'src/vcl') diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c index 5b9a9d5d3ce..debfb8ff4c3 100644 --- a/src/vcl/vcl_bapi.c +++ b/src/vcl/vcl_bapi.c @@ -594,9 +594,8 @@ vppcom_send_bind_sock (vcl_session_t * session) } void -vppcom_send_unbind_sock (u64 vpp_handle) +vppcom_send_unbind_sock (vcl_worker_t * wrk, u64 vpp_handle) { - vcl_worker_t *wrk = vcl_worker_get_current (); vl_api_unbind_sock_t *ump; ump = vl_msg_api_alloc (sizeof (*ump)); diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c index 8a8d7d9d66b..f5892c17e73 100644 --- a/src/vcl/vcl_locked.c +++ b/src/vcl/vcl_locked.c @@ -22,19 +22,31 @@ typedef struct vcl_locked_session_ u32 session_index; u32 worker_index; u32 vls_index; - u32 flags; u32 *workers_subscribed; + clib_bitmap_t *listeners; } vcl_locked_session_t; -typedef struct vcl_main_ +typedef struct vls_local_ +{ + int vls_wrk_index; + volatile int vls_mt_n_threads; + pthread_mutex_t vls_mt_mq_mlock; + pthread_mutex_t vls_mt_spool_mlock; + volatile u8 select_mp_check; + volatile u8 epoll_mp_check; +} vls_process_local_t; + +static vls_process_local_t vls_local; +static vls_process_local_t *vlsl = &vls_local; + +typedef struct vls_main_ { vcl_locked_session_t *vls_pool; clib_rwlock_t vls_table_lock; uword *session_index_to_vlsh_table; } vls_main_t; -vls_main_t vls_main; -vls_main_t *vlsm = &vls_main; +vls_main_t *vlsm; static inline void vls_table_rlock (void) @@ -74,40 +86,42 @@ typedef enum VLS_MT_LOCK_SPOOL = 1 << 1 } vls_mt_lock_type_t; -static int vls_wrk_index = ~0; -static volatile int vls_mt_n_threads; -static pthread_mutex_t vls_mt_mq_mlock = PTHREAD_MUTEX_INITIALIZER; -static pthread_mutex_t vls_mt_spool_mlock = PTHREAD_MUTEX_INITIALIZER; - static void vls_mt_add (void) { - vls_mt_n_threads += 1; - vcl_set_worker_index (vls_wrk_index); + vlsl->vls_mt_n_threads += 1; + vcl_set_worker_index (vlsl->vls_wrk_index); } static inline void vls_mt_mq_lock (void) { - pthread_mutex_lock (&vls_mt_mq_mlock); + pthread_mutex_lock (&vlsl->vls_mt_mq_mlock); } static inline void vls_mt_mq_unlock (void) { - pthread_mutex_unlock (&vls_mt_mq_mlock); + pthread_mutex_unlock (&vlsl->vls_mt_mq_mlock); } static inline void vls_mt_spool_lock (void) { - pthread_mutex_lock (&vls_mt_spool_mlock); + pthread_mutex_lock (&vlsl->vls_mt_spool_mlock); } static inline void vls_mt_create_unlock (void) { - pthread_mutex_unlock (&vls_mt_spool_mlock); + pthread_mutex_unlock (&vlsl->vls_mt_spool_mlock); +} + +static void +vls_mt_locks_init (void) +{ + pthread_mutex_init (&vlsl->vls_mt_mq_mlock, NULL); + pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL); } static inline vcl_session_handle_t @@ -181,6 +195,12 @@ vls_get_w_dlock (vls_handle_t vlsh) return vls; } +static inline void +vls_lock (vcl_locked_session_t * vls) +{ + clib_spinlock_lock (&vls->lock); +} + static inline void vls_unlock (vcl_locked_session_t * vls) { @@ -204,6 +224,48 @@ vls_dunlock (vcl_locked_session_t * vls) vls_table_runlock (); } +vcl_session_handle_t +vlsh_to_sh (vls_handle_t vlsh) +{ + vcl_locked_session_t *vls; + int rv; + + vls = vls_get_w_dlock (vlsh); + if (!vls) + return INVALID_SESSION_ID; + rv = vls_to_sh (vls); + vls_dunlock (vls); + return rv; +} + +vcl_session_handle_t +vlsh_to_session_index (vls_handle_t vlsh) +{ + vcl_session_handle_t sh; + sh = vlsh_to_sh (vlsh); + return vppcom_session_index (sh); +} + +vls_handle_t +vls_si_to_vlsh (u32 session_index) +{ + uword *vlshp; + vlshp = hash_get (vlsm->session_index_to_vlsh_table, session_index); + return vlshp ? *vlshp : VLS_INVALID_HANDLE; +} + +vls_handle_t +vls_session_index_to_vlsh (uint32_t session_index) +{ + vls_handle_t vlsh; + + vls_table_rlock (); + vlsh = vls_si_to_vlsh (session_index); + vls_table_runlock (); + + return vlsh; +} + u8 vls_is_shared (vcl_locked_session_t * vls) { @@ -220,26 +282,63 @@ vls_is_shared_by_wrk (vcl_locked_session_t * vls, u32 wrk_index) return 0; } +static void +vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active) +{ + clib_bitmap_set (vls->listeners, wrk_index, is_active); +} + +static u8 +vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index) +{ + return (clib_bitmap_get (vls->listeners, wrk_index) == 1); +} + +static void +vls_listener_wrk_start_listen (vcl_locked_session_t * vls, u32 wrk_index) +{ + vppcom_session_listen (vls_to_sh (vls), ~0); + vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */ ); +} + +static void +vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index) +{ + vcl_worker_t *wrk; + vcl_session_t *s; + + wrk = vcl_worker_get (wrk_index); + s = vcl_session_get (wrk, vls->session_index); + if (s->session_state != STATE_LISTEN) + return; + vppcom_send_unbind_sock (wrk, s->vpp_handle); + s->session_state = STATE_LISTEN_NO_MQ; + vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ ); +} + int vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk) { + int i, do_disconnect; vcl_session_t *s; - int i; + + s = vcl_session_get (wrk, vls->session_index); + if (s->session_state == STATE_LISTEN) + vls_listener_wrk_set (vls, wrk->wrk_index, 0 /* is_active */ ); for (i = 0; i < vec_len (vls->workers_subscribed); i++) { if (vls->workers_subscribed[i] != wrk->wrk_index) continue; - s = vcl_session_get (wrk, vls->session_index); if (s->rx_fifo) { svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index); svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index); } vec_del1 (vls->workers_subscribed, i); - vcl_session_cleanup (wrk, s, vcl_session_handle (s), - 0 /* do_disconnect */ ); + do_disconnect = s->session_state == STATE_LISTEN; + vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect); return 0; } @@ -247,8 +346,6 @@ vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk) if (vls->worker_index != wrk->wrk_index) return 0; - s = vcl_session_get (wrk, vls->session_index); - /* Check if we can change owner or close */ if (vec_len (vls->workers_subscribed)) { @@ -272,16 +369,22 @@ vls_share_vcl_session (vcl_worker_t * wrk, vcl_session_t * s) { vcl_locked_session_t *vls; - vls = vls_get_w_dlock (vls_session_index_to_vlsh (s->session_index)); + vls = vls_get (vls_si_to_vlsh (s->session_index)); if (!vls) return; + vls_lock (vls); vec_add1 (vls->workers_subscribed, wrk->wrk_index); if (s->rx_fifo) { svm_fifo_add_subscriber (s->rx_fifo, wrk->vpp_wrk_index); svm_fifo_add_subscriber (s->tx_fifo, wrk->vpp_wrk_index); } - vls_dunlock (vls); + else if (s->session_state == STATE_LISTEN) + { + s->session_state = STATE_LISTEN_NO_MQ; + } + + vls_unlock (vls); } void @@ -294,12 +397,15 @@ vls_worker_copy_on_fork (vcl_worker_t * parent_wrk) wrk->sessions = pool_dup (parent_wrk->sessions); wrk->session_index_by_vpp_handles = hash_dup (parent_wrk->session_index_by_vpp_handles); + vls_table_wlock (); /* *INDENT-OFF* */ pool_foreach (s, wrk->sessions, ({ vls_share_vcl_session (wrk, s); })); /* *INDENT-ON* */ + + vls_table_wunlock (); } static void @@ -363,7 +469,7 @@ vls_mt_rel_locks (int locks_acq) int _locks_acq = 0; \ if (PREDICT_FALSE (vcl_get_worker_index () == ~0)); \ vls_mt_add (); \ - if (PREDICT_FALSE (vls_mt_n_threads > 1)) \ + if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1)) \ vls_mt_acq_locks (_vls, _op, &_locks_acq); \ #define vls_mt_unguard() \ @@ -505,6 +611,46 @@ vls_connect (vls_handle_t vlsh, vppcom_endpt_t * server_ep) return rv; } +static inline void +vls_mp_checks (vcl_locked_session_t * vls, int is_add) +{ + vcl_worker_t *wrk = vcl_worker_get_current (); + vcl_session_t *s; + + s = vcl_session_get (wrk, vls->session_index); + switch (s->session_state) + { + case STATE_LISTEN: + if (is_add) + { + if (vls->worker_index == wrk->wrk_index) + vls_listener_wrk_set (vls, wrk->wrk_index, 1 /* is_active */ ); + break; + } + vls_listener_wrk_stop_listen (vls, vls->worker_index); + break; + case STATE_LISTEN_NO_MQ: + if (!is_add) + break; + + /* Register worker as listener */ + vls_listener_wrk_start_listen (vls, wrk->wrk_index); + + /* If owner worker did not attempt to accept/xpoll on the session, + * force a listen stop for it, since it may not be interested in + * accepting new sessions. + * This is pretty much a hack done to give app workers the illusion + * that it is fine to listen and not accept new sessions for a + * given listener. Without it, we would accumulate unhandled + * accepts on the passive worker message queue. */ + if (!vls_listener_wrk_is_active (vls, vls->worker_index)) + vls_listener_wrk_stop_listen (vls, vls->worker_index); + break; + default: + break; + } +} + vls_handle_t vls_accept (vls_handle_t listener_vlsh, vppcom_endpt_t * ep, int flags) { @@ -514,6 +660,8 @@ vls_accept (vls_handle_t listener_vlsh, vppcom_endpt_t * ep, int flags) if (!(vls = vls_get_w_dlock (listener_vlsh))) return VPPCOM_EBADFD; + if (vcl_n_workers () > 1) + vls_mp_checks (vls, 1 /* is_add */ ); vls_mt_guard (vls, VLS_MT_OP_SPOOL); sh = vppcom_session_accept (vls_to_sh_tu (vls), ep, flags); vls_mt_unguard (); @@ -597,6 +745,22 @@ vls_epoll_create (void) return vlsh; } +static void +vls_epoll_ctl_mp_checks (vcl_locked_session_t * vls, int op) +{ + if (vcl_n_workers () <= 1) + { + vlsl->epoll_mp_check = 1; + return; + } + + if (op == EPOLL_CTL_MOD) + return; + + vlsl->epoll_mp_check = 1; + vls_mp_checks (vls, op == EPOLL_CTL_ADD); +} + int vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh, struct epoll_event *event) @@ -610,6 +774,10 @@ vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh, vls = vls_get_and_lock (vlsh); ep_sh = vls_to_sh (ep_vls); sh = vls_to_sh (vls); + + if (PREDICT_FALSE (!vlsl->epoll_mp_check)) + vls_epoll_ctl_mp_checks (vls, op); + vls_table_runlock (); rv = vppcom_epoll_ctl (ep_sh, op, sh, event); @@ -640,57 +808,50 @@ vls_epoll_wait (vls_handle_t ep_vlsh, struct epoll_event *events, return rv; } -int -vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map, - vcl_si_set * except_map, double wait_for_time) -{ - int rv; - vls_mt_guard (0, VLS_MT_OP_XPOLL); - rv = vppcom_select (n_bits, read_map, write_map, except_map, wait_for_time); - vls_mt_unguard (); - return rv; -} - -vcl_session_handle_t -vlsh_to_sh (vls_handle_t vlsh) +static void +vls_select_mp_checks (vcl_si_set * read_map) { vcl_locked_session_t *vls; - int rv; + vcl_worker_t *wrk; + vcl_session_t *s; + u32 si; - vls = vls_get_w_dlock (vlsh); - if (!vls) - return INVALID_SESSION_ID; - rv = vls_to_sh (vls); - vls_dunlock (vls); - return rv; -} + if (vcl_n_workers () <= 1) + { + vlsl->select_mp_check = 1; + return; + } -vcl_session_handle_t -vlsh_to_session_index (vls_handle_t vlsh) -{ - vcl_session_handle_t sh; - sh = vlsh_to_sh (vlsh); - return vppcom_session_index (sh); -} + if (!read_map) + return; -vls_handle_t -vls_si_to_vlsh (u32 session_index) -{ - uword *vlshp; - vlshp = hash_get (vlsm->session_index_to_vlsh_table, session_index); - return vlshp ? *vlshp : VLS_INVALID_HANDLE; + vlsl->select_mp_check = 1; + wrk = vcl_worker_get_current (); + + /* *INDENT-OFF* */ + clib_bitmap_foreach (si, read_map, ({ + s = vcl_session_get (wrk, si); + if (s->session_state == STATE_LISTEN) + { + vls = vls_get (vls_session_index_to_vlsh (si)); + vls_mp_checks (vls, 1 /* is_add */); + } + })); + /* *INDENT-ON* */ } -vls_handle_t -vls_session_index_to_vlsh (uint32_t session_index) +int +vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map, + vcl_si_set * except_map, double wait_for_time) { - vls_handle_t vlsh; - - vls_table_rlock (); - vlsh = vls_si_to_vlsh (session_index); - vls_table_runlock (); + int rv; - return vlsh; + vls_mt_guard (0, VLS_MT_OP_XPOLL); + if (PREDICT_FALSE (!vlsl->select_mp_check)) + vls_select_mp_checks (read_map); + rv = vppcom_select (n_bits, read_map, write_map, except_map, wait_for_time); + vls_mt_unguard (); + return rv; } static void @@ -855,8 +1016,11 @@ vls_app_fork_child_handler (void) parent_wrk->forked_child = vcl_get_worker_index (); /* Reset number of threads and set wrk index */ - vls_mt_n_threads = 0; - vls_wrk_index = vcl_get_worker_index (); + vlsl->vls_mt_n_threads = 0; + vlsl->vls_wrk_index = vcl_get_worker_index (); + vlsl->select_mp_check = 0; + vlsl->epoll_mp_check = 0; + vls_mt_locks_init (); VDBG (0, "forked child main worker initialized"); vcm->forking = 0; @@ -884,12 +1048,14 @@ vls_app_create (char *app_name) if ((rv = vppcom_app_create (app_name))) return rv; - + vlsm = clib_mem_alloc (sizeof (vls_main_t)); + clib_memset (vlsm, 0, sizeof (*vlsm)); clib_rwlock_init (&vlsm->vls_table_lock); pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler, vls_app_fork_child_handler); atexit (vls_app_exit); - vls_wrk_index = vcl_get_worker_index (); + vlsl->vls_wrk_index = vcl_get_worker_index (); + vls_mt_locks_init (); return VPPCOM_OK; } diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 1cde85027a5..c61bb0b9247 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -63,14 +63,15 @@ typedef enum typedef enum { - STATE_START = 0x01, - STATE_CONNECT = 0x02, - STATE_LISTEN = 0x04, - STATE_ACCEPT = 0x08, - STATE_VPP_CLOSING = 0x10, - STATE_DISCONNECT = 0x20, - STATE_FAILED = 0x40, - STATE_UPDATED = 0x80, + STATE_START = 0, + STATE_CONNECT = 0x01, + STATE_LISTEN = 0x02, + STATE_ACCEPT = 0x04, + STATE_VPP_CLOSING = 0x08, + STATE_DISCONNECT = 0x10, + STATE_FAILED = 0x20, + STATE_UPDATED = 0x40, + STATE_LISTEN_NO_MQ = 0x80, } session_state_t; #define SERVER_STATE_OPEN (STATE_ACCEPT|STATE_VPP_CLOSING) @@ -491,7 +492,7 @@ vcl_session_table_lookup_listener (vcl_worker_t * wrk, u64 listener_handle) } session = pool_elt_at_index (wrk->sessions, p[0]); - ASSERT (session->session_state & STATE_LISTEN); + ASSERT (session->session_state & (STATE_LISTEN | STATE_LISTEN_NO_MQ)); return session; } @@ -566,6 +567,12 @@ vcl_worker_get_current (void) return vcl_worker_get (vcl_get_worker_index ()); } +static inline u8 +vcl_n_workers (void) +{ + return pool_elts (vcm->workers); +} + static inline svm_msg_q_t * vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s) { @@ -588,7 +595,7 @@ void vppcom_app_send_detach (void); void vppcom_send_connect_sock (vcl_session_t * session); void vppcom_send_disconnect_session (u64 vpp_handle); void vppcom_send_bind_sock (vcl_session_t * session); -void vppcom_send_unbind_sock (u64 vpp_handle); +void vppcom_send_unbind_sock (vcl_worker_t * wrk, u64 vpp_handle); void vppcom_api_hookup (void); void vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert, u32 cert_len); diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index fc7d194bc12..570e323bb49 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -94,6 +94,14 @@ vppcom_session_state_str (session_state_t state) st = "STATE_FAILED"; break; + case STATE_UPDATED: + st = "STATE_UPDATED"; + break; + + case STATE_LISTEN_NO_MQ: + st = "STATE_LISTEN_NO_MQ"; + break; + default: st = "UNKNOWN_STATE"; break; @@ -818,11 +826,11 @@ vppcom_session_unbind (u32 session_handle) session->vpp_handle = ~0; session->session_state = STATE_DISCONNECT; - VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state" - " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT, + VDBG (1, "vpp handle 0x%llx, sid %u: sending unbind msg! new state" + " 0x%x (%s)", vpp_handle, session_handle, STATE_DISCONNECT, vppcom_session_state_str (STATE_DISCONNECT)); vcl_evt (VCL_EVT_UNBIND, session); - vppcom_send_unbind_sock (vpp_handle); + vppcom_send_unbind_sock (wrk, vpp_handle); return VPPCOM_OK; } -- cgit 1.2.3-korg