aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2024-11-02 16:27:53 -0400
committerDave Wallace <dwallacelf@gmail.com>2025-01-07 20:25:22 +0000
commit6102d81832ed93469de64222657a69dcd4e0af5e (patch)
treea92ef7507b0d004084ddff6abdfd33bb61f0bbc5
parent3a3476fddb79ed1eaf277bb66c0641933906ef3d (diff)
vcl: support pre/post cb before mq wait
Allow vls to register cb functions with vcl pre/post mq sleep. These can be used to drop/reacquire locks prior/after waiting on vcl mq events. This then allows multi-thread, as opposed to multi-worker, applications to share sessions between threads without deadlocking, e.g., multiple threads trying to read/write/close non-blocking sessions. Caveat: connects still need to be improved. Type: improvement Change-Id: I589aa9dfd0553b0fad54f02ed16c3cda9761a83d Signed-off-by: Florin Coras <fcoras@cisco.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
-rw-r--r--src/vcl/ldp.c14
-rw-r--r--src/vcl/vcl_locked.c184
-rw-r--r--src/vcl/vcl_locked.h5
-rw-r--r--src/vcl/vcl_private.c46
-rw-r--r--src/vcl/vcl_private.h16
-rw-r--r--src/vcl/vppcom.c73
-rw-r--r--test/asf/test_vcl.py6
7 files changed, 284 insertions, 60 deletions
diff --git a/src/vcl/ldp.c b/src/vcl/ldp.c
index 023a0eb0184..d9f45b2bce9 100644
--- a/src/vcl/ldp.c
+++ b/src/vcl/ldp.c
@@ -645,6 +645,7 @@ ldp_select_init_maps (fd_set * __restrict original,
u32 n_bytes, uword * si_bits, uword * libc_bits)
{
uword si_bits_set, libc_bits_set;
+ u32 session_index, wrk_index;
vls_handle_t vlsh;
int fd;
@@ -660,10 +661,14 @@ ldp_select_init_maps (fd_set * __restrict original,
vlsh = ldp_fd_to_vlsh (fd);
if (vlsh == VLS_INVALID_HANDLE)
clib_bitmap_set_no_check (*libcb, fd, 1);
- else if (vlsh_to_worker_index (vlsh) != vppcom_worker_index ())
- clib_warning ("migration currently not supported");
else
- *vclb = clib_bitmap_set (*vclb, vlsh_to_session_index (vlsh), 1);
+ {
+ vlsh_to_session_and_worker_index (vlsh, &session_index, &wrk_index);
+ if (wrk_index != vppcom_worker_index ())
+ clib_warning ("migration currently not supported");
+ else
+ *vclb = clib_bitmap_set (*vclb, session_index, 1);
+ }
}
si_bits_set = clib_bitmap_last_set (*vclb) + 1;
@@ -686,7 +691,8 @@ ldp_select_vcl_map_to_libc (clib_bitmap_t * vclb, fd_set * __restrict libcb)
clib_bitmap_foreach (si, vclb) {
vlsh = vls_session_index_to_vlsh (si);
- ASSERT (vlsh != VLS_INVALID_HANDLE);
+ if (vlsh == VLS_INVALID_HANDLE)
+ continue;
fd = ldp_vlsh_to_fd (vlsh);
if (PREDICT_FALSE (fd < 0))
{
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c
index f8a306c1ba1..a9992e9317c 100644
--- a/src/vcl/vcl_locked.c
+++ b/src/vcl/vcl_locked.c
@@ -94,6 +94,22 @@ typedef struct vls_shared_data_
clib_bitmap_t *listeners; /**< bitmap of wrks actively listening */
} vls_shared_data_t;
+#define foreach_vls_flag _ (APP_CLOSED, "app closed")
+
+enum vls_flags_bits_
+{
+#define _(sym, str) VLS_FLAG_BIT_##sym,
+ foreach_vls_flag
+#undef _
+};
+
+typedef enum vls_flags_
+{
+#define _(sym, str) VLS_F_##sym = 1 << VLS_FLAG_BIT_##sym,
+ foreach_vls_flag
+#undef _
+} vls_flags_t;
+
typedef struct vcl_locked_session_
{
clib_spinlock_t lock; /**< vls lock when in use */
@@ -104,6 +120,7 @@ typedef struct vcl_locked_session_
u32 owner_vcl_wrk_index; /**< vcl wrk of the vls wrk at alloc */
uword *vcl_wrk_index_to_session_index; /**< map vcl wrk to session */
int libc_epfd; /**< epoll fd for libc epoll */
+ vls_flags_t flags; /**< vls flags */
} vcl_locked_session_t;
typedef struct vls_worker_
@@ -303,19 +320,22 @@ vls_mt_add (void)
vcl_set_worker_index (vlsl->vls_wrk_index);
/* Only allow new pthread to be cancled in vls_mt_mq_lock */
- int old_state;
if (vlsl->vls_mt_n_threads >= 2)
- pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &old_state);
+ pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL);
}
static inline void
vls_mt_mq_lock (void)
{
- /* Allow controlled cancelation of thread before grabbing mutex */
- pthread_testcancel ();
pthread_mutex_lock (&vlsl->vls_mt_mq_mlock);
}
+static inline int
+vls_mt_mq_trylock (void)
+{
+ return pthread_mutex_trylock (&vlsl->vls_mt_mq_mlock);
+}
+
static inline void
vls_mt_mq_unlock (void)
{
@@ -354,6 +374,14 @@ vls_lock (vcl_locked_session_t * vls)
clib_spinlock_lock (&vls->lock);
}
+static inline int
+vls_trylock (vcl_locked_session_t *vls)
+{
+ if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
+ return !clib_spinlock_trylock (&vls->lock);
+ return 0;
+}
+
static inline void
vls_unlock (vcl_locked_session_t * vls)
{
@@ -559,28 +587,30 @@ vlsh_to_sh (vls_handle_t vlsh)
return rv;
}
-vcl_session_handle_t
-vlsh_to_session_index (vls_handle_t vlsh)
-{
- vcl_session_handle_t sh;
- sh = vlsh_to_sh (vlsh);
- return vppcom_session_index (sh);
-}
-
-int
-vlsh_to_worker_index (vls_handle_t vlsh)
+void
+vlsh_to_session_and_worker_index (vls_handle_t vlsh, u32 *session_index,
+ u32 *wrk_index)
{
vcl_locked_session_t *vls;
- u32 wrk_index;
- vls = vls_get_w_dlock (vlsh);
+ vls_mt_pool_rlock ();
+
+ /* Do not lock vls because for mt apps that use select this could
+ * deadlock if multiple threads select on the same vlsh */
+ vls = vls_get (vlsh);
+
if (!vls)
- wrk_index = INVALID_SESSION_ID;
+ {
+ *session_index = INVALID_SESSION_ID;
+ *wrk_index = INVALID_SESSION_ID;
+ }
else
- wrk_index = vls->vcl_wrk_index;
- vls_dunlock (vls);
+ {
+ *session_index = vls->session_index;
+ *wrk_index = vls->vcl_wrk_index;
+ }
- return wrk_index;
+ vls_mt_pool_runlock ();
}
vls_handle_t
@@ -977,6 +1007,19 @@ vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
vls_share_sessions (vls_parent_wrk, vls_wrk);
}
+static inline u8
+vcl_session_is_write_nonblk (vcl_session_t *s)
+{
+ int rv = vcl_session_write_ready (s);
+
+ if (!s->is_dgram)
+ return rv != 0;
+
+ /* Probably not common, but without knowing the actual write size, this is
+ * the only way we can guarantee the write won't block */
+ return rv < 0 ? 1 : (rv > (64 << 10));
+}
+
static void
vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
{
@@ -995,23 +1038,25 @@ vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
switch (op)
{
case VLS_MT_OP_READ:
- if (!is_nonblk)
- is_nonblk = vcl_session_read_ready (s) != 0;
- if (!is_nonblk)
+ is_nonblk = is_nonblk ?: vcl_session_read_ready (s) != 0;
+ while (!is_nonblk && vls_mt_mq_trylock ())
{
- vls_mt_mq_lock ();
- *locks_acq |= VLS_MT_LOCK_MQ;
+ /* might get data while waiting for lock */
+ is_nonblk = vcl_session_read_ready (s) != 0;
}
+ if (!is_nonblk)
+ *locks_acq |= VLS_MT_LOCK_MQ;
break;
case VLS_MT_OP_WRITE:
ASSERT (s);
- if (!is_nonblk)
- is_nonblk = vcl_session_write_ready (s) != 0;
- if (!is_nonblk)
+ is_nonblk = is_nonblk ?: vcl_session_is_write_nonblk (s);
+ while (!is_nonblk && vls_mt_mq_trylock ())
{
- vls_mt_mq_lock ();
- *locks_acq |= VLS_MT_LOCK_MQ;
+ /* might get space while waiting for lock */
+ is_nonblk = vcl_session_is_write_nonblk (s);
}
+ if (!is_nonblk)
+ *locks_acq |= VLS_MT_LOCK_MQ;
break;
case VLS_MT_OP_XPOLL:
vls_mt_mq_lock ();
@@ -1442,15 +1487,23 @@ vls_close (vls_handle_t vlsh)
int rv;
vls_mt_detect ();
- vls_mt_pool_wlock ();
- vls = vls_get_and_lock (vlsh);
+ /* Notify vcl while holding a reader lock. Allows other threads to
+ * regrab vls and unlock it if needed. */
+ vls_mt_pool_rlock ();
+
+ vls = vls_get (vlsh);
if (!vls)
{
- vls_mt_pool_wunlock ();
+ vls_mt_pool_runlock ();
return VPPCOM_EBADFD;
}
+ /* Notify other threads, if any, that app closed. Do it before
+ * grabbing lock as vls might be already locked */
+ vls->flags |= VLS_F_APP_CLOSED;
+ vls_lock (vls);
+
vls_mt_guard (vls, VLS_MT_OP_SPOOL);
if (vls_is_shared (vls))
@@ -1461,8 +1514,23 @@ vls_close (vls_handle_t vlsh)
if (vls_mt_wrk_supported ())
vls_mt_session_cleanup (vls);
- vls_free (vls);
vls_mt_unguard ();
+ vls_unlock (vls);
+ vls_mt_pool_runlock ();
+
+ /* Drop mt reader lock on pool and acquire writer lock */
+ vls_mt_pool_wlock ();
+
+ vls = vls_get (vlsh);
+
+ /* Other threads might be still using the session */
+ while (vls_trylock (vls))
+ {
+ vls_mt_pool_wunlock ();
+ vls_mt_pool_wlock ();
+ }
+
+ vls_free (vls);
vls_mt_pool_wunlock ();
@@ -2002,6 +2070,51 @@ vls_send_session_cleanup_rpc (vcl_worker_t * wrk,
dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret);
}
+static inline void
+vls_mt_mq_wait_lock (vcl_session_handle_t vcl_sh)
+{
+ vcl_locked_session_t *vls;
+ vls_worker_t *wrk;
+ uword *vlshp;
+
+ /* If mt wrk supported or single threaded just return */
+ if (vls_mt_wrk_supported () || (vlsl->vls_mt_n_threads <= 1))
+ return;
+
+ wrk = vls_worker_get_current ();
+ /* Expect current thread to have dropped lock before calling vcl */
+ vls_mt_pool_rlock ();
+
+ vlshp = vls_sh_to_vlsh_table_get (wrk, vcl_sh);
+ if (vlshp)
+ {
+ vls = vls_get (*vlshp);
+ /* Handle case here other threads might've closed the session */
+ if (vls->flags & VLS_F_APP_CLOSED)
+ {
+ vcl_session_t *s;
+ s = vcl_session_get_w_handle (vcl_worker_get_current (), vcl_sh);
+ s->flags |= VCL_SESSION_F_APP_CLOSING;
+ vls_mt_pool_runlock ();
+ return;
+ }
+ }
+
+ vls_mt_mq_unlock ();
+}
+
+static inline void
+vls_mt_mq_wait_unlock (vcl_session_handle_t vcl_sh)
+{
+ if (vls_mt_wrk_supported () || (vlsl->vls_mt_n_threads <= 1))
+ return;
+
+ vls_mt_mq_lock ();
+
+ /* writers can grab lock now */
+ vls_mt_pool_runlock ();
+}
+
int
vls_app_create (char *app_name)
{
@@ -2025,6 +2138,9 @@ vls_app_create (char *app_name)
clib_rwlock_init (&vlsl->vls_pool_lock);
vls_mt_locks_init ();
vcm->wrk_rpc_fn = vls_rpc_handler;
+ /* For multi threaded apps where sessions are implicitly shared, ask vcl
+ * to use these callbacks prior and after blocking on io operations */
+ vcl_worker_set_wait_mq_fns (vls_mt_mq_wait_lock, vls_mt_mq_wait_unlock);
return VPPCOM_OK;
}
diff --git a/src/vcl/vcl_locked.h b/src/vcl/vcl_locked.h
index 0e747acdea0..98a1c542e4a 100644
--- a/src/vcl/vcl_locked.h
+++ b/src/vcl/vcl_locked.h
@@ -50,8 +50,9 @@ int vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
vcl_si_set * except_map, double wait_for_time);
int vls_poll (vcl_poll_t *vp, uint32_t n_sids, double wait_for_time);
vcl_session_handle_t vlsh_to_sh (vls_handle_t vlsh);
-vcl_session_handle_t vlsh_to_session_index (vls_handle_t vlsh);
-int vlsh_to_worker_index (vls_handle_t vlsh);
+void vlsh_to_session_and_worker_index (vls_handle_t vlsh,
+ uint32_t *session_index,
+ uint32_t *wrk_index);
vls_handle_t vls_session_index_to_vlsh (uint32_t session_index);
int vls_app_create (char *app_name);
unsigned char vls_use_eventfd (void);
diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c
index 4af79e9ebb0..d3ad2331827 100644
--- a/src/vcl/vcl_private.c
+++ b/src/vcl/vcl_private.c
@@ -75,6 +75,8 @@ vcl_mq_epoll_add_evfd (vcl_worker_t * wrk, svm_msg_q_t * mq)
mqc->mq_fd = mq_fd;
mqc->mq = mq;
+ fcntl (mq_fd, F_SETFL, O_NONBLOCK);
+
e.events = EPOLLIN;
e.data.u32 = mqc_index;
if (epoll_ctl (wrk->mqs_epfd, EPOLL_CTL_ADD, mq_fd, &e) < 0)
@@ -228,6 +230,15 @@ vcl_worker_detach_sessions (vcl_worker_t *wrk)
hash_free (seg_indices_map);
}
+void
+vcl_worker_set_wait_mq_fns (vcl_worker_wait_mq_fn pre_wait,
+ vcl_worker_wait_mq_fn post_wait)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ wrk->pre_wait_fn = pre_wait;
+ wrk->post_wait_fn = post_wait;
+}
+
vcl_worker_t *
vcl_worker_alloc_and_init ()
{
@@ -336,7 +347,7 @@ vcl_session_read_ready (vcl_session_t * s)
max_deq = svm_fifo_max_dequeue_cons (s->rx_fifo);
if (max_deq <= SESSION_CONN_HDR_LEN)
return 0;
- if (svm_fifo_peek (s->rx_fifo, 0, sizeof (ph), (u8 *) & ph) < 0)
+ if (svm_fifo_peek (s->rx_fifo, 0, sizeof (ph), (u8 *) &ph) < 0)
return 0;
if (ph.data_length + SESSION_CONN_HDR_LEN > max_deq)
return 0;
@@ -358,6 +369,39 @@ vcl_session_read_ready (vcl_session_t * s)
}
}
+/**
+ * Used as alternative to vcl_session_read_ready to avoid peeking udp sessions.
+ * Multi-threaded applications could select the same session from multiple
+ * threads */
+int
+vcl_session_read_ready2 (vcl_session_t *s)
+{
+ if (vcl_session_is_open (s))
+ {
+ if (vcl_session_is_ct (s))
+ return svm_fifo_max_dequeue_cons (s->ct_rx_fifo);
+
+ if (s->is_dgram)
+ {
+ if (svm_fifo_max_dequeue_cons (s->rx_fifo) <= SESSION_CONN_HDR_LEN)
+ return 0;
+
+ /* Return 1 even if not yet sure if a full datagram was received */
+ return 1;
+ }
+
+ return svm_fifo_max_dequeue_cons (s->rx_fifo);
+ }
+ else if (s->session_state == VCL_STATE_LISTEN)
+ {
+ return clib_fifo_elts (s->accept_evts_fifo);
+ }
+ else
+ {
+ return 1;
+ }
+}
+
int
vcl_session_write_ready (vcl_session_t * s)
{
diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h
index 7e72b29ba25..c98e1cde9b1 100644
--- a/src/vcl/vcl_private.h
+++ b/src/vcl/vcl_private.h
@@ -143,8 +143,16 @@ typedef enum vcl_session_flags_
VCL_SESSION_F_PENDING_DISCONNECT = 1 << 6,
VCL_SESSION_F_PENDING_FREE = 1 << 7,
VCL_SESSION_F_PENDING_LISTEN = 1 << 8,
+ VCL_SESSION_F_APP_CLOSING = 1 << 9,
} __clib_packed vcl_session_flags_t;
+typedef enum vcl_worker_wait_
+{
+ VCL_WRK_WAIT_CTRL,
+ VCL_WRK_WAIT_IO_RX,
+ VCL_WRK_WAIT_IO_TX,
+} vcl_worker_wait_type_t;
+
typedef struct vcl_session_
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -229,6 +237,7 @@ typedef struct vcl_mq_evt_conn_
int mq_fd;
} vcl_mq_evt_conn_t;
+typedef void (*vcl_worker_wait_mq_fn) (u32 vcl_sh);
typedef struct vcl_worker_
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -312,6 +321,10 @@ typedef struct vcl_worker_
/** vcl needs next epoll_create to go to libc_epoll */
u8 vcl_needs_real_epoll;
volatile int rpc_done;
+
+ /* functions to be called pre/post wait if vcl managed by vls */
+ vcl_worker_wait_mq_fn pre_wait_fn;
+ vcl_worker_wait_mq_fn post_wait_fn;
} vcl_worker_t;
STATIC_ASSERT (sizeof (session_disconnected_msg_t) <= 16,
@@ -724,6 +737,7 @@ u32 vcl_segment_table_lookup (u64 segment_handle);
void vcl_segment_table_del (u64 segment_handle);
int vcl_session_read_ready (vcl_session_t * session);
+int vcl_session_read_ready2 (vcl_session_t *s);
int vcl_session_write_ready (vcl_session_t * session);
int vcl_session_alloc_ext_cfg (vcl_session_t *s,
transport_endpt_ext_cfg_type_t type, u32 len);
@@ -783,6 +797,8 @@ svm_fifo_chunk_t *vcl_segment_alloc_chunk (uword segment_handle,
int vcl_session_share_fifos (vcl_session_t *s, svm_fifo_t *rxf,
svm_fifo_t *txf);
void vcl_worker_detach_sessions (vcl_worker_t *wrk);
+void vcl_worker_set_wait_mq_fns (vcl_worker_wait_mq_fn pre_wait,
+ vcl_worker_wait_mq_fn post_wait);
/*
* VCL Binary API
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index d084c09e1da..0b2141614ee 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -1263,6 +1263,51 @@ vcl_flush_mq_events (void)
vcl_worker_flush_mq_events (vcl_worker_get_current ());
}
+static inline void
+vcl_worker_wait_mq (vcl_worker_t *wrk, u32 session_handle,
+ vcl_worker_wait_type_t wait)
+{
+ vcl_session_t *s = 0;
+ u32 sleeps = 0;
+
+ if (wrk->pre_wait_fn)
+ wrk->pre_wait_fn (session_handle);
+
+ if (session_handle != VCL_INVALID_SESSION_INDEX)
+ {
+ s = vcl_session_get_w_handle (wrk, session_handle);
+ /* Session might've been closed by another thread if multi-threaded
+ * as opposed to multi-worker app */
+ if (s->flags & VCL_SESSION_F_APP_CLOSING)
+ return;
+ }
+
+ /* Short sleeps waiting on mq notifications. Note that we drop mq lock for
+ * multi-thread apps so we may be getting spurious notifications. Not ideal,
+ * as we'd like to only be woken up on events, but since multiple threads may
+ * be waiting on the same mq, events may be missed. If waiting on session io
+ * events, return if session is ready. Otherwise return if mq has event */
+ while (svm_msg_q_timedwait (wrk->app_event_queue, 1e-3))
+ {
+ if (s)
+ {
+ if ((wait == VCL_WRK_WAIT_IO_RX && vcl_session_read_ready (s)) ||
+ (wait == VCL_WRK_WAIT_IO_TX && vcl_session_write_ready (s)))
+ break;
+ }
+ else
+ {
+ if (!svm_msg_q_is_empty (wrk->app_event_queue))
+ break;
+ }
+ if (++sleeps > 200)
+ break;
+ }
+
+ if (wrk->post_wait_fn)
+ wrk->post_wait_fn (session_handle);
+}
+
static int
vppcom_session_unbind (u32 session_handle)
{
@@ -1831,7 +1876,7 @@ again:
if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
return VPPCOM_EAGAIN;
- svm_msg_q_wait (wrk->app_event_queue, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_wait_mq (wrk, -1, VCL_WRK_WAIT_CTRL);
vcl_worker_flush_mq_events (wrk);
goto again;
}
@@ -2047,7 +2092,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
session_event_t *e;
- svm_msg_q_t *mq;
u8 is_ct;
if (PREDICT_FALSE (!buf))
@@ -2079,7 +2123,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
is_ct = vcl_session_is_ct (s);
- mq = wrk->app_event_queue;
rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
@@ -2098,12 +2141,14 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
{
if (vcl_session_is_closing (s))
return vcl_session_closing_error (s);
+ if (s->flags & VCL_SESSION_F_APP_CLOSING)
+ return vcl_session_closed_error (s);
if (is_ct)
svm_fifo_unset_event (s->rx_fifo);
svm_fifo_unset_event (rx_fifo);
- svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX);
vcl_worker_flush_mq_events (wrk);
}
}
@@ -2188,7 +2233,6 @@ vppcom_session_read_segments (uint32_t session_handle,
int n_read = 0, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
- svm_msg_q_t *mq;
u8 is_ct;
s = vcl_session_get_w_handle (wrk, session_handle);
@@ -2200,7 +2244,6 @@ vppcom_session_read_segments (uint32_t session_handle,
is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
is_ct = vcl_session_is_ct (s);
- mq = wrk->app_event_queue;
rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
@@ -2219,12 +2262,14 @@ vppcom_session_read_segments (uint32_t session_handle,
{
if (vcl_session_is_closing (s))
return vcl_session_closing_error (s);
+ if (s->flags & VCL_SESSION_F_APP_CLOSING)
+ return vcl_session_closed_error (s);
if (is_ct)
svm_fifo_unset_event (s->rx_fifo);
svm_fifo_unset_event (rx_fifo);
- svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX);
vcl_worker_flush_mq_events (wrk);
}
}
@@ -2289,7 +2334,6 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf,
int n_write, is_nonblocking;
session_evt_type_t et;
svm_fifo_t *tx_fifo;
- svm_msg_q_t *mq;
u8 is_ct;
/* Accept zero length writes but just return */
@@ -2326,7 +2370,6 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf,
tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
- mq = wrk->app_event_queue;
if (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
{
if (is_nonblocking)
@@ -2338,8 +2381,10 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf,
svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
if (vcl_session_is_closing (s))
return vcl_session_closing_error (s);
+ if (s->flags & VCL_SESSION_F_APP_CLOSING)
+ return vcl_session_closed_error (s);
- svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_wait_mq (wrk, vcl_session_handle (s), VCL_WRK_WAIT_IO_TX);
vcl_worker_flush_mq_events (wrk);
}
}
@@ -2383,7 +2428,6 @@ vppcom_session_write_segments (uint32_t session_handle,
int n_write = 0, n_bytes = 0, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *tx_fifo;
- svm_msg_q_t *mq;
u8 is_ct;
u32 i;
@@ -2406,7 +2450,6 @@ vppcom_session_write_segments (uint32_t session_handle,
is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
is_ct = vcl_session_is_ct (s);
- mq = wrk->app_event_queue;
tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
for (i = 0; i < n_segments; i++)
@@ -2423,8 +2466,10 @@ vppcom_session_write_segments (uint32_t session_handle,
svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
if (vcl_session_is_closing (s))
return vcl_session_closing_error (s);
+ if (s->flags & VCL_SESSION_F_APP_CLOSING)
+ return vcl_session_closed_error (s);
- svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_TX);
vcl_worker_flush_mq_events (wrk);
}
}
@@ -2843,7 +2888,7 @@ check_rd:
continue;
}
- if (vcl_session_read_ready (s))
+ if (vcl_session_read_ready2 (s))
{
clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
bits_set++;
diff --git a/test/asf/test_vcl.py b/test/asf/test_vcl.py
index 124ea14089b..143b46c22ee 100644
--- a/test/asf/test_vcl.py
+++ b/test/asf/test_vcl.py
@@ -7,7 +7,7 @@ import subprocess
import signal
import glob
from config import config
-from asfframework import VppAsfTestCase, VppTestRunner, Worker, tag_fixme_ubuntu2404
+from asfframework import VppAsfTestCase, VppTestRunner, Worker
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
iperf3 = "/usr/bin/iperf3"
@@ -311,7 +311,6 @@ class VCLTestCase(VppAsfTestCase):
self.assert_equal(worker_client.result, 0, "Binary test return code")
-@tag_fixme_ubuntu2404
class LDPCutThruTestCase(VCLTestCase):
"""LDP Cut Thru Tests"""
@@ -1024,7 +1023,6 @@ class VCLThruHostStackNsock(VCLTestCase):
)
-@tag_fixme_ubuntu2404
class LDPThruHostStackIperf(VCLTestCase):
"""LDP Thru Host Stack Iperf"""
@@ -1072,7 +1070,6 @@ class LDPThruHostStackIperf(VCLTestCase):
)
-@tag_fixme_ubuntu2404
class LDPThruHostStackIperfUdp(VCLTestCase):
"""LDP Thru Host Stack Iperf UDP"""
@@ -1118,7 +1115,6 @@ class LDPThruHostStackIperfUdp(VCLTestCase):
)
-@tag_fixme_ubuntu2404
class LDPIpv6CutThruTestCase(VCLTestCase):
"""LDP IPv6 Cut Thru Tests"""