summaryrefslogtreecommitdiffstats
path: root/src/vcl/vcl_locked.c
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2024-11-02 16:27:53 -0400
committerDave Wallace <dwallacelf@gmail.com>2025-01-07 20:25:22 +0000
commit6102d81832ed93469de64222657a69dcd4e0af5e (patch)
treea92ef7507b0d004084ddff6abdfd33bb61f0bbc5 /src/vcl/vcl_locked.c
parent3a3476fddb79ed1eaf277bb66c0641933906ef3d (diff)
vcl: support pre/post cb before mq wait
Allow vls to register cb functions with vcl pre/post mq sleep. These can be used to drop/reacquire locks prior/after waiting on vcl mq events. This then allows multi-thread, as opposed to multi-worker, applications to share sessions between threads without deadlocking, e.g., multiple threads trying to read/write/close non-blocking sessions. Caveat: connects still need to be improved. Type: improvement Change-Id: I589aa9dfd0553b0fad54f02ed16c3cda9761a83d Signed-off-by: Florin Coras <fcoras@cisco.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'src/vcl/vcl_locked.c')
-rw-r--r--src/vcl/vcl_locked.c184
1 files changed, 150 insertions, 34 deletions
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c
index f8a306c1ba1..a9992e9317c 100644
--- a/src/vcl/vcl_locked.c
+++ b/src/vcl/vcl_locked.c
@@ -94,6 +94,22 @@ typedef struct vls_shared_data_
clib_bitmap_t *listeners; /**< bitmap of wrks actively listening */
} vls_shared_data_t;
+#define foreach_vls_flag _ (APP_CLOSED, "app closed")
+
+enum vls_flags_bits_
+{
+#define _(sym, str) VLS_FLAG_BIT_##sym,
+ foreach_vls_flag
+#undef _
+};
+
+typedef enum vls_flags_
+{
+#define _(sym, str) VLS_F_##sym = 1 << VLS_FLAG_BIT_##sym,
+ foreach_vls_flag
+#undef _
+} vls_flags_t;
+
typedef struct vcl_locked_session_
{
clib_spinlock_t lock; /**< vls lock when in use */
@@ -104,6 +120,7 @@ typedef struct vcl_locked_session_
u32 owner_vcl_wrk_index; /**< vcl wrk of the vls wrk at alloc */
uword *vcl_wrk_index_to_session_index; /**< map vcl wrk to session */
int libc_epfd; /**< epoll fd for libc epoll */
+ vls_flags_t flags; /**< vls flags */
} vcl_locked_session_t;
typedef struct vls_worker_
@@ -303,19 +320,22 @@ vls_mt_add (void)
vcl_set_worker_index (vlsl->vls_wrk_index);
/* Only allow new pthread to be cancled in vls_mt_mq_lock */
- int old_state;
if (vlsl->vls_mt_n_threads >= 2)
- pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &old_state);
+ pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL);
}
static inline void
vls_mt_mq_lock (void)
{
- /* Allow controlled cancelation of thread before grabbing mutex */
- pthread_testcancel ();
pthread_mutex_lock (&vlsl->vls_mt_mq_mlock);
}
+static inline int
+vls_mt_mq_trylock (void)
+{
+ return pthread_mutex_trylock (&vlsl->vls_mt_mq_mlock);
+}
+
static inline void
vls_mt_mq_unlock (void)
{
@@ -354,6 +374,14 @@ vls_lock (vcl_locked_session_t * vls)
clib_spinlock_lock (&vls->lock);
}
+static inline int
+vls_trylock (vcl_locked_session_t *vls)
+{
+ if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
+ return !clib_spinlock_trylock (&vls->lock);
+ return 0;
+}
+
static inline void
vls_unlock (vcl_locked_session_t * vls)
{
@@ -559,28 +587,30 @@ vlsh_to_sh (vls_handle_t vlsh)
return rv;
}
-vcl_session_handle_t
-vlsh_to_session_index (vls_handle_t vlsh)
-{
- vcl_session_handle_t sh;
- sh = vlsh_to_sh (vlsh);
- return vppcom_session_index (sh);
-}
-
-int
-vlsh_to_worker_index (vls_handle_t vlsh)
+void
+vlsh_to_session_and_worker_index (vls_handle_t vlsh, u32 *session_index,
+ u32 *wrk_index)
{
vcl_locked_session_t *vls;
- u32 wrk_index;
- vls = vls_get_w_dlock (vlsh);
+ vls_mt_pool_rlock ();
+
+ /* Do not lock vls because for mt apps that use select this could
+ * deadlock if multiple threads select on the same vlsh */
+ vls = vls_get (vlsh);
+
if (!vls)
- wrk_index = INVALID_SESSION_ID;
+ {
+ *session_index = INVALID_SESSION_ID;
+ *wrk_index = INVALID_SESSION_ID;
+ }
else
- wrk_index = vls->vcl_wrk_index;
- vls_dunlock (vls);
+ {
+ *session_index = vls->session_index;
+ *wrk_index = vls->vcl_wrk_index;
+ }
- return wrk_index;
+ vls_mt_pool_runlock ();
}
vls_handle_t
@@ -977,6 +1007,19 @@ vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
vls_share_sessions (vls_parent_wrk, vls_wrk);
}
+static inline u8
+vcl_session_is_write_nonblk (vcl_session_t *s)
+{
+ int rv = vcl_session_write_ready (s);
+
+ if (!s->is_dgram)
+ return rv != 0;
+
+ /* Probably not common, but without knowing the actual write size, this is
+ * the only way we can guarantee the write won't block */
+ return rv < 0 ? 1 : (rv > (64 << 10));
+}
+
static void
vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
{
@@ -995,23 +1038,25 @@ vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
switch (op)
{
case VLS_MT_OP_READ:
- if (!is_nonblk)
- is_nonblk = vcl_session_read_ready (s) != 0;
- if (!is_nonblk)
+ is_nonblk = is_nonblk ?: vcl_session_read_ready (s) != 0;
+ while (!is_nonblk && vls_mt_mq_trylock ())
{
- vls_mt_mq_lock ();
- *locks_acq |= VLS_MT_LOCK_MQ;
+ /* might get data while waiting for lock */
+ is_nonblk = vcl_session_read_ready (s) != 0;
}
+ if (!is_nonblk)
+ *locks_acq |= VLS_MT_LOCK_MQ;
break;
case VLS_MT_OP_WRITE:
ASSERT (s);
- if (!is_nonblk)
- is_nonblk = vcl_session_write_ready (s) != 0;
- if (!is_nonblk)
+ is_nonblk = is_nonblk ?: vcl_session_is_write_nonblk (s);
+ while (!is_nonblk && vls_mt_mq_trylock ())
{
- vls_mt_mq_lock ();
- *locks_acq |= VLS_MT_LOCK_MQ;
+ /* might get space while waiting for lock */
+ is_nonblk = vcl_session_is_write_nonblk (s);
}
+ if (!is_nonblk)
+ *locks_acq |= VLS_MT_LOCK_MQ;
break;
case VLS_MT_OP_XPOLL:
vls_mt_mq_lock ();
@@ -1442,15 +1487,23 @@ vls_close (vls_handle_t vlsh)
int rv;
vls_mt_detect ();
- vls_mt_pool_wlock ();
- vls = vls_get_and_lock (vlsh);
+ /* Notify vcl while holding a reader lock. Allows other threads to
+ * regrab vls and unlock it if needed. */
+ vls_mt_pool_rlock ();
+
+ vls = vls_get (vlsh);
if (!vls)
{
- vls_mt_pool_wunlock ();
+ vls_mt_pool_runlock ();
return VPPCOM_EBADFD;
}
+ /* Notify other threads, if any, that app closed. Do it before
+ * grabbing lock as vls might be already locked */
+ vls->flags |= VLS_F_APP_CLOSED;
+ vls_lock (vls);
+
vls_mt_guard (vls, VLS_MT_OP_SPOOL);
if (vls_is_shared (vls))
@@ -1461,8 +1514,23 @@ vls_close (vls_handle_t vlsh)
if (vls_mt_wrk_supported ())
vls_mt_session_cleanup (vls);
- vls_free (vls);
vls_mt_unguard ();
+ vls_unlock (vls);
+ vls_mt_pool_runlock ();
+
+ /* Drop mt reader lock on pool and acquire writer lock */
+ vls_mt_pool_wlock ();
+
+ vls = vls_get (vlsh);
+
+ /* Other threads might be still using the session */
+ while (vls_trylock (vls))
+ {
+ vls_mt_pool_wunlock ();
+ vls_mt_pool_wlock ();
+ }
+
+ vls_free (vls);
vls_mt_pool_wunlock ();
@@ -2002,6 +2070,51 @@ vls_send_session_cleanup_rpc (vcl_worker_t * wrk,
dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret);
}
+static inline void
+vls_mt_mq_wait_lock (vcl_session_handle_t vcl_sh)
+{
+ vcl_locked_session_t *vls;
+ vls_worker_t *wrk;
+ uword *vlshp;
+
+ /* If mt wrk supported or single threaded just return */
+ if (vls_mt_wrk_supported () || (vlsl->vls_mt_n_threads <= 1))
+ return;
+
+ wrk = vls_worker_get_current ();
+ /* Expect current thread to have dropped lock before calling vcl */
+ vls_mt_pool_rlock ();
+
+ vlshp = vls_sh_to_vlsh_table_get (wrk, vcl_sh);
+ if (vlshp)
+ {
+ vls = vls_get (*vlshp);
+ /* Handle case here other threads might've closed the session */
+ if (vls->flags & VLS_F_APP_CLOSED)
+ {
+ vcl_session_t *s;
+ s = vcl_session_get_w_handle (vcl_worker_get_current (), vcl_sh);
+ s->flags |= VCL_SESSION_F_APP_CLOSING;
+ vls_mt_pool_runlock ();
+ return;
+ }
+ }
+
+ vls_mt_mq_unlock ();
+}
+
+static inline void
+vls_mt_mq_wait_unlock (vcl_session_handle_t vcl_sh)
+{
+ if (vls_mt_wrk_supported () || (vlsl->vls_mt_n_threads <= 1))
+ return;
+
+ vls_mt_mq_lock ();
+
+ /* writers can grab lock now */
+ vls_mt_pool_runlock ();
+}
+
int
vls_app_create (char *app_name)
{
@@ -2025,6 +2138,9 @@ vls_app_create (char *app_name)
clib_rwlock_init (&vlsl->vls_pool_lock);
vls_mt_locks_init ();
vcm->wrk_rpc_fn = vls_rpc_handler;
+ /* For multi threaded apps where sessions are implicitly shared, ask vcl
+ * to use these callbacks prior and after blocking on io operations */
+ vcl_worker_set_wait_mq_fns (vls_mt_mq_wait_lock, vls_mt_mq_wait_unlock);
return VPPCOM_OK;
}