aboutsummaryrefslogtreecommitdiffstats
path: root/src/vcl/vppcom.c
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2024-11-02 16:27:53 -0400
committerDave Wallace <dwallacelf@gmail.com>2025-01-07 20:25:22 +0000
commit6102d81832ed93469de64222657a69dcd4e0af5e (patch)
treea92ef7507b0d004084ddff6abdfd33bb61f0bbc5 /src/vcl/vppcom.c
parent3a3476fddb79ed1eaf277bb66c0641933906ef3d (diff)
vcl: support pre/post cb before mq wait
Allow vls to register cb functions with vcl pre/post mq sleep. These can be used to drop/reacquire locks prior/after waiting on vcl mq events. This then allows multi-thread, as opposed to multi-worker, applications to share sessions between threads without deadlocking, e.g., multiple threads trying to read/write/close non-blocking sessions. Caveat: connects still need to be improved. Type: improvement Change-Id: I589aa9dfd0553b0fad54f02ed16c3cda9761a83d Signed-off-by: Florin Coras <fcoras@cisco.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'src/vcl/vppcom.c')
-rw-r--r--src/vcl/vppcom.c73
1 files changed, 59 insertions, 14 deletions
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index d084c09e1da..0b2141614ee 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -1263,6 +1263,51 @@ vcl_flush_mq_events (void)
vcl_worker_flush_mq_events (vcl_worker_get_current ());
}
+static inline void
+vcl_worker_wait_mq (vcl_worker_t *wrk, u32 session_handle,
+ vcl_worker_wait_type_t wait)
+{
+ vcl_session_t *s = 0;
+ u32 sleeps = 0;
+
+ if (wrk->pre_wait_fn)
+ wrk->pre_wait_fn (session_handle);
+
+ if (session_handle != VCL_INVALID_SESSION_INDEX)
+ {
+ s = vcl_session_get_w_handle (wrk, session_handle);
+ /* Session might've been closed by another thread if multi-threaded
+ * as opposed to multi-worker app */
+ if (s->flags & VCL_SESSION_F_APP_CLOSING)
+ return;
+ }
+
+ /* Short sleeps waiting on mq notifications. Note that we drop mq lock for
+ * multi-thread apps so we may be getting spurious notifications. Not ideal,
+ * as we'd like to only be woken up on events, but since multiple threads may
+ * be waiting on the same mq, events may be missed. If waiting on session io
+ * events, return if session is ready. Otherwise return if mq has event */
+ while (svm_msg_q_timedwait (wrk->app_event_queue, 1e-3))
+ {
+ if (s)
+ {
+ if ((wait == VCL_WRK_WAIT_IO_RX && vcl_session_read_ready (s)) ||
+ (wait == VCL_WRK_WAIT_IO_TX && vcl_session_write_ready (s)))
+ break;
+ }
+ else
+ {
+ if (!svm_msg_q_is_empty (wrk->app_event_queue))
+ break;
+ }
+ if (++sleeps > 200)
+ break;
+ }
+
+ if (wrk->post_wait_fn)
+ wrk->post_wait_fn (session_handle);
+}
+
static int
vppcom_session_unbind (u32 session_handle)
{
@@ -1831,7 +1876,7 @@ again:
if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
return VPPCOM_EAGAIN;
- svm_msg_q_wait (wrk->app_event_queue, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_wait_mq (wrk, -1, VCL_WRK_WAIT_CTRL);
vcl_worker_flush_mq_events (wrk);
goto again;
}
@@ -2047,7 +2092,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
session_event_t *e;
- svm_msg_q_t *mq;
u8 is_ct;
if (PREDICT_FALSE (!buf))
@@ -2079,7 +2123,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
is_ct = vcl_session_is_ct (s);
- mq = wrk->app_event_queue;
rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
@@ -2098,12 +2141,14 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
{
if (vcl_session_is_closing (s))
return vcl_session_closing_error (s);
+ if (s->flags & VCL_SESSION_F_APP_CLOSING)
+ return vcl_session_closed_error (s);
if (is_ct)
svm_fifo_unset_event (s->rx_fifo);
svm_fifo_unset_event (rx_fifo);
- svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX);
vcl_worker_flush_mq_events (wrk);
}
}
@@ -2188,7 +2233,6 @@ vppcom_session_read_segments (uint32_t session_handle,
int n_read = 0, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
- svm_msg_q_t *mq;
u8 is_ct;
s = vcl_session_get_w_handle (wrk, session_handle);
@@ -2200,7 +2244,6 @@ vppcom_session_read_segments (uint32_t session_handle,
is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
is_ct = vcl_session_is_ct (s);
- mq = wrk->app_event_queue;
rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
@@ -2219,12 +2262,14 @@ vppcom_session_read_segments (uint32_t session_handle,
{
if (vcl_session_is_closing (s))
return vcl_session_closing_error (s);
+ if (s->flags & VCL_SESSION_F_APP_CLOSING)
+ return vcl_session_closed_error (s);
if (is_ct)
svm_fifo_unset_event (s->rx_fifo);
svm_fifo_unset_event (rx_fifo);
- svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX);
vcl_worker_flush_mq_events (wrk);
}
}
@@ -2289,7 +2334,6 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf,
int n_write, is_nonblocking;
session_evt_type_t et;
svm_fifo_t *tx_fifo;
- svm_msg_q_t *mq;
u8 is_ct;
/* Accept zero length writes but just return */
@@ -2326,7 +2370,6 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf,
tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
- mq = wrk->app_event_queue;
if (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
{
if (is_nonblocking)
@@ -2338,8 +2381,10 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf,
svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
if (vcl_session_is_closing (s))
return vcl_session_closing_error (s);
+ if (s->flags & VCL_SESSION_F_APP_CLOSING)
+ return vcl_session_closed_error (s);
- svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_wait_mq (wrk, vcl_session_handle (s), VCL_WRK_WAIT_IO_TX);
vcl_worker_flush_mq_events (wrk);
}
}
@@ -2383,7 +2428,6 @@ vppcom_session_write_segments (uint32_t session_handle,
int n_write = 0, n_bytes = 0, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *tx_fifo;
- svm_msg_q_t *mq;
u8 is_ct;
u32 i;
@@ -2406,7 +2450,6 @@ vppcom_session_write_segments (uint32_t session_handle,
is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
is_ct = vcl_session_is_ct (s);
- mq = wrk->app_event_queue;
tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
for (i = 0; i < n_segments; i++)
@@ -2423,8 +2466,10 @@ vppcom_session_write_segments (uint32_t session_handle,
svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
if (vcl_session_is_closing (s))
return vcl_session_closing_error (s);
+ if (s->flags & VCL_SESSION_F_APP_CLOSING)
+ return vcl_session_closed_error (s);
- svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_TX);
vcl_worker_flush_mq_events (wrk);
}
}
@@ -2843,7 +2888,7 @@ check_rd:
continue;
}
- if (vcl_session_read_ready (s))
+ if (vcl_session_read_ready2 (s))
{
clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
bits_set++;