diff options
author | Florin Coras <fcoras@cisco.com> | 2024-11-02 16:27:53 -0400 |
---|---|---|
committer | Dave Wallace <dwallacelf@gmail.com> | 2025-01-07 20:25:22 +0000 |
commit | 6102d81832ed93469de64222657a69dcd4e0af5e (patch) | |
tree | a92ef7507b0d004084ddff6abdfd33bb61f0bbc5 /src/vcl/vcl_locked.c | |
parent | 3a3476fddb79ed1eaf277bb66c0641933906ef3d (diff) |
vcl: support pre/post cb before mq wait
Allow vls to register cb functions with vcl pre/post mq sleep. These can
be used to drop/reacquire locks prior/after waiting on vcl mq events.
This then allows multi-thread, as opposed to multi-worker, applications
to share sessions between threads without deadlocking, e.g., multiple
threads trying to read/write/close non-blocking sessions. Caveat:
connects still need to be improved.
Type: improvement
Change-Id: I589aa9dfd0553b0fad54f02ed16c3cda9761a83d
Signed-off-by: Florin Coras <fcoras@cisco.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'src/vcl/vcl_locked.c')
-rw-r--r-- | src/vcl/vcl_locked.c | 184 |
1 files changed, 150 insertions, 34 deletions
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c index f8a306c1ba1..a9992e9317c 100644 --- a/src/vcl/vcl_locked.c +++ b/src/vcl/vcl_locked.c @@ -94,6 +94,22 @@ typedef struct vls_shared_data_ clib_bitmap_t *listeners; /**< bitmap of wrks actively listening */ } vls_shared_data_t; +#define foreach_vls_flag _ (APP_CLOSED, "app closed") + +enum vls_flags_bits_ +{ +#define _(sym, str) VLS_FLAG_BIT_##sym, + foreach_vls_flag +#undef _ +}; + +typedef enum vls_flags_ +{ +#define _(sym, str) VLS_F_##sym = 1 << VLS_FLAG_BIT_##sym, + foreach_vls_flag +#undef _ +} vls_flags_t; + typedef struct vcl_locked_session_ { clib_spinlock_t lock; /**< vls lock when in use */ @@ -104,6 +120,7 @@ typedef struct vcl_locked_session_ u32 owner_vcl_wrk_index; /**< vcl wrk of the vls wrk at alloc */ uword *vcl_wrk_index_to_session_index; /**< map vcl wrk to session */ int libc_epfd; /**< epoll fd for libc epoll */ + vls_flags_t flags; /**< vls flags */ } vcl_locked_session_t; typedef struct vls_worker_ @@ -303,19 +320,22 @@ vls_mt_add (void) vcl_set_worker_index (vlsl->vls_wrk_index); /* Only allow new pthread to be cancled in vls_mt_mq_lock */ - int old_state; if (vlsl->vls_mt_n_threads >= 2) - pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &old_state); + pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); } static inline void vls_mt_mq_lock (void) { - /* Allow controlled cancelation of thread before grabbing mutex */ - pthread_testcancel (); pthread_mutex_lock (&vlsl->vls_mt_mq_mlock); } +static inline int +vls_mt_mq_trylock (void) +{ + return pthread_mutex_trylock (&vlsl->vls_mt_mq_mlock); +} + static inline void vls_mt_mq_unlock (void) { @@ -354,6 +374,14 @@ vls_lock (vcl_locked_session_t * vls) clib_spinlock_lock (&vls->lock); } +static inline int +vls_trylock (vcl_locked_session_t *vls) +{ + if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls)) + return !clib_spinlock_trylock (&vls->lock); + return 0; +} + static inline void vls_unlock (vcl_locked_session_t * vls) { @@ -559,28 +587,30 @@ vlsh_to_sh (vls_handle_t vlsh) return rv; } -vcl_session_handle_t -vlsh_to_session_index (vls_handle_t vlsh) -{ - vcl_session_handle_t sh; - sh = vlsh_to_sh (vlsh); - return vppcom_session_index (sh); -} - -int -vlsh_to_worker_index (vls_handle_t vlsh) +void +vlsh_to_session_and_worker_index (vls_handle_t vlsh, u32 *session_index, + u32 *wrk_index) { vcl_locked_session_t *vls; - u32 wrk_index; - vls = vls_get_w_dlock (vlsh); + vls_mt_pool_rlock (); + + /* Do not lock vls because for mt apps that use select this could + * deadlock if multiple threads select on the same vlsh */ + vls = vls_get (vlsh); + if (!vls) - wrk_index = INVALID_SESSION_ID; + { + *session_index = INVALID_SESSION_ID; + *wrk_index = INVALID_SESSION_ID; + } else - wrk_index = vls->vcl_wrk_index; - vls_dunlock (vls); + { + *session_index = vls->session_index; + *wrk_index = vls->vcl_wrk_index; + } - return wrk_index; + vls_mt_pool_runlock (); } vls_handle_t @@ -977,6 +1007,19 @@ vls_worker_copy_on_fork (vcl_worker_t * parent_wrk) vls_share_sessions (vls_parent_wrk, vls_wrk); } +static inline u8 +vcl_session_is_write_nonblk (vcl_session_t *s) +{ + int rv = vcl_session_write_ready (s); + + if (!s->is_dgram) + return rv != 0; + + /* Probably not common, but without knowing the actual write size, this is + * the only way we can guarantee the write won't block */ + return rv < 0 ? 1 : (rv > (64 << 10)); +} + static void vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq) { @@ -995,23 +1038,25 @@ vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq) switch (op) { case VLS_MT_OP_READ: - if (!is_nonblk) - is_nonblk = vcl_session_read_ready (s) != 0; - if (!is_nonblk) + is_nonblk = is_nonblk ?: vcl_session_read_ready (s) != 0; + while (!is_nonblk && vls_mt_mq_trylock ()) { - vls_mt_mq_lock (); - *locks_acq |= VLS_MT_LOCK_MQ; + /* might get data while waiting for lock */ + is_nonblk = vcl_session_read_ready (s) != 0; } + if (!is_nonblk) + *locks_acq |= VLS_MT_LOCK_MQ; break; case VLS_MT_OP_WRITE: ASSERT (s); - if (!is_nonblk) - is_nonblk = vcl_session_write_ready (s) != 0; - if (!is_nonblk) + is_nonblk = is_nonblk ?: vcl_session_is_write_nonblk (s); + while (!is_nonblk && vls_mt_mq_trylock ()) { - vls_mt_mq_lock (); - *locks_acq |= VLS_MT_LOCK_MQ; + /* might get space while waiting for lock */ + is_nonblk = vcl_session_is_write_nonblk (s); } + if (!is_nonblk) + *locks_acq |= VLS_MT_LOCK_MQ; break; case VLS_MT_OP_XPOLL: vls_mt_mq_lock (); @@ -1442,15 +1487,23 @@ vls_close (vls_handle_t vlsh) int rv; vls_mt_detect (); - vls_mt_pool_wlock (); - vls = vls_get_and_lock (vlsh); + /* Notify vcl while holding a reader lock. Allows other threads to + * regrab vls and unlock it if needed. */ + vls_mt_pool_rlock (); + + vls = vls_get (vlsh); if (!vls) { - vls_mt_pool_wunlock (); + vls_mt_pool_runlock (); return VPPCOM_EBADFD; } + /* Notify other threads, if any, that app closed. Do it before + * grabbing lock as vls might be already locked */ + vls->flags |= VLS_F_APP_CLOSED; + vls_lock (vls); + vls_mt_guard (vls, VLS_MT_OP_SPOOL); if (vls_is_shared (vls)) @@ -1461,8 +1514,23 @@ vls_close (vls_handle_t vlsh) if (vls_mt_wrk_supported ()) vls_mt_session_cleanup (vls); - vls_free (vls); vls_mt_unguard (); + vls_unlock (vls); + vls_mt_pool_runlock (); + + /* Drop mt reader lock on pool and acquire writer lock */ + vls_mt_pool_wlock (); + + vls = vls_get (vlsh); + + /* Other threads might be still using the session */ + while (vls_trylock (vls)) + { + vls_mt_pool_wunlock (); + vls_mt_pool_wlock (); + } + + vls_free (vls); vls_mt_pool_wunlock (); @@ -2002,6 +2070,51 @@ vls_send_session_cleanup_rpc (vcl_worker_t * wrk, dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret); } +static inline void +vls_mt_mq_wait_lock (vcl_session_handle_t vcl_sh) +{ + vcl_locked_session_t *vls; + vls_worker_t *wrk; + uword *vlshp; + + /* If mt wrk supported or single threaded just return */ + if (vls_mt_wrk_supported () || (vlsl->vls_mt_n_threads <= 1)) + return; + + wrk = vls_worker_get_current (); + /* Expect current thread to have dropped lock before calling vcl */ + vls_mt_pool_rlock (); + + vlshp = vls_sh_to_vlsh_table_get (wrk, vcl_sh); + if (vlshp) + { + vls = vls_get (*vlshp); + /* Handle case here other threads might've closed the session */ + if (vls->flags & VLS_F_APP_CLOSED) + { + vcl_session_t *s; + s = vcl_session_get_w_handle (vcl_worker_get_current (), vcl_sh); + s->flags |= VCL_SESSION_F_APP_CLOSING; + vls_mt_pool_runlock (); + return; + } + } + + vls_mt_mq_unlock (); +} + +static inline void +vls_mt_mq_wait_unlock (vcl_session_handle_t vcl_sh) +{ + if (vls_mt_wrk_supported () || (vlsl->vls_mt_n_threads <= 1)) + return; + + vls_mt_mq_lock (); + + /* writers can grab lock now */ + vls_mt_pool_runlock (); +} + int vls_app_create (char *app_name) { @@ -2025,6 +2138,9 @@ vls_app_create (char *app_name) clib_rwlock_init (&vlsl->vls_pool_lock); vls_mt_locks_init (); vcm->wrk_rpc_fn = vls_rpc_handler; + /* For multi threaded apps where sessions are implicitly shared, ask vcl + * to use these callbacks prior and after blocking on io operations */ + vcl_worker_set_wait_mq_fns (vls_mt_mq_wait_lock, vls_mt_mq_wait_unlock); return VPPCOM_OK; } |