diff options
author | 2024-11-02 16:27:53 -0400 | |
---|---|---|
committer | 2025-01-07 20:25:22 +0000 | |
commit | 6102d81832ed93469de64222657a69dcd4e0af5e (patch) | |
tree | a92ef7507b0d004084ddff6abdfd33bb61f0bbc5 | |
parent | 3a3476fddb79ed1eaf277bb66c0641933906ef3d (diff) |
vcl: support pre/post cb before mq wait
Allow vls to register cb functions with vcl pre/post mq sleep. These can
be used to drop/reacquire locks prior/after waiting on vcl mq events.
This then allows multi-thread, as opposed to multi-worker, applications
to share sessions between threads without deadlocking, e.g., multiple
threads trying to read/write/close non-blocking sessions. Caveat:
connects still need to be improved.
Type: improvement
Change-Id: I589aa9dfd0553b0fad54f02ed16c3cda9761a83d
Signed-off-by: Florin Coras <fcoras@cisco.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
-rw-r--r-- | src/vcl/ldp.c | 14 | ||||
-rw-r--r-- | src/vcl/vcl_locked.c | 184 | ||||
-rw-r--r-- | src/vcl/vcl_locked.h | 5 | ||||
-rw-r--r-- | src/vcl/vcl_private.c | 46 | ||||
-rw-r--r-- | src/vcl/vcl_private.h | 16 | ||||
-rw-r--r-- | src/vcl/vppcom.c | 73 | ||||
-rw-r--r-- | test/asf/test_vcl.py | 6 |
7 files changed, 284 insertions, 60 deletions
diff --git a/src/vcl/ldp.c b/src/vcl/ldp.c index 023a0eb0184..d9f45b2bce9 100644 --- a/src/vcl/ldp.c +++ b/src/vcl/ldp.c @@ -645,6 +645,7 @@ ldp_select_init_maps (fd_set * __restrict original, u32 n_bytes, uword * si_bits, uword * libc_bits) { uword si_bits_set, libc_bits_set; + u32 session_index, wrk_index; vls_handle_t vlsh; int fd; @@ -660,10 +661,14 @@ ldp_select_init_maps (fd_set * __restrict original, vlsh = ldp_fd_to_vlsh (fd); if (vlsh == VLS_INVALID_HANDLE) clib_bitmap_set_no_check (*libcb, fd, 1); - else if (vlsh_to_worker_index (vlsh) != vppcom_worker_index ()) - clib_warning ("migration currently not supported"); else - *vclb = clib_bitmap_set (*vclb, vlsh_to_session_index (vlsh), 1); + { + vlsh_to_session_and_worker_index (vlsh, &session_index, &wrk_index); + if (wrk_index != vppcom_worker_index ()) + clib_warning ("migration currently not supported"); + else + *vclb = clib_bitmap_set (*vclb, session_index, 1); + } } si_bits_set = clib_bitmap_last_set (*vclb) + 1; @@ -686,7 +691,8 @@ ldp_select_vcl_map_to_libc (clib_bitmap_t * vclb, fd_set * __restrict libcb) clib_bitmap_foreach (si, vclb) { vlsh = vls_session_index_to_vlsh (si); - ASSERT (vlsh != VLS_INVALID_HANDLE); + if (vlsh == VLS_INVALID_HANDLE) + continue; fd = ldp_vlsh_to_fd (vlsh); if (PREDICT_FALSE (fd < 0)) { diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c index f8a306c1ba1..a9992e9317c 100644 --- a/src/vcl/vcl_locked.c +++ b/src/vcl/vcl_locked.c @@ -94,6 +94,22 @@ typedef struct vls_shared_data_ clib_bitmap_t *listeners; /**< bitmap of wrks actively listening */ } vls_shared_data_t; +#define foreach_vls_flag _ (APP_CLOSED, "app closed") + +enum vls_flags_bits_ +{ +#define _(sym, str) VLS_FLAG_BIT_##sym, + foreach_vls_flag +#undef _ +}; + +typedef enum vls_flags_ +{ +#define _(sym, str) VLS_F_##sym = 1 << VLS_FLAG_BIT_##sym, + foreach_vls_flag +#undef _ +} vls_flags_t; + typedef struct vcl_locked_session_ { clib_spinlock_t lock; /**< vls lock when in use */ @@ -104,6 +120,7 @@ typedef struct vcl_locked_session_ u32 owner_vcl_wrk_index; /**< vcl wrk of the vls wrk at alloc */ uword *vcl_wrk_index_to_session_index; /**< map vcl wrk to session */ int libc_epfd; /**< epoll fd for libc epoll */ + vls_flags_t flags; /**< vls flags */ } vcl_locked_session_t; typedef struct vls_worker_ @@ -303,19 +320,22 @@ vls_mt_add (void) vcl_set_worker_index (vlsl->vls_wrk_index); /* Only allow new pthread to be cancled in vls_mt_mq_lock */ - int old_state; if (vlsl->vls_mt_n_threads >= 2) - pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &old_state); + pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); } static inline void vls_mt_mq_lock (void) { - /* Allow controlled cancelation of thread before grabbing mutex */ - pthread_testcancel (); pthread_mutex_lock (&vlsl->vls_mt_mq_mlock); } +static inline int +vls_mt_mq_trylock (void) +{ + return pthread_mutex_trylock (&vlsl->vls_mt_mq_mlock); +} + static inline void vls_mt_mq_unlock (void) { @@ -354,6 +374,14 @@ vls_lock (vcl_locked_session_t * vls) clib_spinlock_lock (&vls->lock); } +static inline int +vls_trylock (vcl_locked_session_t *vls) +{ + if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls)) + return !clib_spinlock_trylock (&vls->lock); + return 0; +} + static inline void vls_unlock (vcl_locked_session_t * vls) { @@ -559,28 +587,30 @@ vlsh_to_sh (vls_handle_t vlsh) return rv; } -vcl_session_handle_t -vlsh_to_session_index (vls_handle_t vlsh) -{ - vcl_session_handle_t sh; - sh = vlsh_to_sh (vlsh); - return vppcom_session_index (sh); -} - -int -vlsh_to_worker_index (vls_handle_t vlsh) +void +vlsh_to_session_and_worker_index (vls_handle_t vlsh, u32 *session_index, + u32 *wrk_index) { vcl_locked_session_t *vls; - u32 wrk_index; - vls = vls_get_w_dlock (vlsh); + vls_mt_pool_rlock (); + + /* Do not lock vls because for mt apps that use select this could + * deadlock if multiple threads select on the same vlsh */ + vls = vls_get (vlsh); + if (!vls) - wrk_index = INVALID_SESSION_ID; + { + *session_index = INVALID_SESSION_ID; + *wrk_index = INVALID_SESSION_ID; + } else - wrk_index = vls->vcl_wrk_index; - vls_dunlock (vls); + { + *session_index = vls->session_index; + *wrk_index = vls->vcl_wrk_index; + } - return wrk_index; + vls_mt_pool_runlock (); } vls_handle_t @@ -977,6 +1007,19 @@ vls_worker_copy_on_fork (vcl_worker_t * parent_wrk) vls_share_sessions (vls_parent_wrk, vls_wrk); } +static inline u8 +vcl_session_is_write_nonblk (vcl_session_t *s) +{ + int rv = vcl_session_write_ready (s); + + if (!s->is_dgram) + return rv != 0; + + /* Probably not common, but without knowing the actual write size, this is + * the only way we can guarantee the write won't block */ + return rv < 0 ? 1 : (rv > (64 << 10)); +} + static void vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq) { @@ -995,23 +1038,25 @@ vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq) switch (op) { case VLS_MT_OP_READ: - if (!is_nonblk) - is_nonblk = vcl_session_read_ready (s) != 0; - if (!is_nonblk) + is_nonblk = is_nonblk ?: vcl_session_read_ready (s) != 0; + while (!is_nonblk && vls_mt_mq_trylock ()) { - vls_mt_mq_lock (); - *locks_acq |= VLS_MT_LOCK_MQ; + /* might get data while waiting for lock */ + is_nonblk = vcl_session_read_ready (s) != 0; } + if (!is_nonblk) + *locks_acq |= VLS_MT_LOCK_MQ; break; case VLS_MT_OP_WRITE: ASSERT (s); - if (!is_nonblk) - is_nonblk = vcl_session_write_ready (s) != 0; - if (!is_nonblk) + is_nonblk = is_nonblk ?: vcl_session_is_write_nonblk (s); + while (!is_nonblk && vls_mt_mq_trylock ()) { - vls_mt_mq_lock (); - *locks_acq |= VLS_MT_LOCK_MQ; + /* might get space while waiting for lock */ + is_nonblk = vcl_session_is_write_nonblk (s); } + if (!is_nonblk) + *locks_acq |= VLS_MT_LOCK_MQ; break; case VLS_MT_OP_XPOLL: vls_mt_mq_lock (); @@ -1442,15 +1487,23 @@ vls_close (vls_handle_t vlsh) int rv; vls_mt_detect (); - vls_mt_pool_wlock (); - vls = vls_get_and_lock (vlsh); + /* Notify vcl while holding a reader lock. Allows other threads to + * regrab vls and unlock it if needed. */ + vls_mt_pool_rlock (); + + vls = vls_get (vlsh); if (!vls) { - vls_mt_pool_wunlock (); + vls_mt_pool_runlock (); return VPPCOM_EBADFD; } + /* Notify other threads, if any, that app closed. Do it before + * grabbing lock as vls might be already locked */ + vls->flags |= VLS_F_APP_CLOSED; + vls_lock (vls); + vls_mt_guard (vls, VLS_MT_OP_SPOOL); if (vls_is_shared (vls)) @@ -1461,8 +1514,23 @@ vls_close (vls_handle_t vlsh) if (vls_mt_wrk_supported ()) vls_mt_session_cleanup (vls); - vls_free (vls); vls_mt_unguard (); + vls_unlock (vls); + vls_mt_pool_runlock (); + + /* Drop mt reader lock on pool and acquire writer lock */ + vls_mt_pool_wlock (); + + vls = vls_get (vlsh); + + /* Other threads might be still using the session */ + while (vls_trylock (vls)) + { + vls_mt_pool_wunlock (); + vls_mt_pool_wlock (); + } + + vls_free (vls); vls_mt_pool_wunlock (); @@ -2002,6 +2070,51 @@ vls_send_session_cleanup_rpc (vcl_worker_t * wrk, dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret); } +static inline void +vls_mt_mq_wait_lock (vcl_session_handle_t vcl_sh) +{ + vcl_locked_session_t *vls; + vls_worker_t *wrk; + uword *vlshp; + + /* If mt wrk supported or single threaded just return */ + if (vls_mt_wrk_supported () || (vlsl->vls_mt_n_threads <= 1)) + return; + + wrk = vls_worker_get_current (); + /* Expect current thread to have dropped lock before calling vcl */ + vls_mt_pool_rlock (); + + vlshp = vls_sh_to_vlsh_table_get (wrk, vcl_sh); + if (vlshp) + { + vls = vls_get (*vlshp); + /* Handle case here other threads might've closed the session */ + if (vls->flags & VLS_F_APP_CLOSED) + { + vcl_session_t *s; + s = vcl_session_get_w_handle (vcl_worker_get_current (), vcl_sh); + s->flags |= VCL_SESSION_F_APP_CLOSING; + vls_mt_pool_runlock (); + return; + } + } + + vls_mt_mq_unlock (); +} + +static inline void +vls_mt_mq_wait_unlock (vcl_session_handle_t vcl_sh) +{ + if (vls_mt_wrk_supported () || (vlsl->vls_mt_n_threads <= 1)) + return; + + vls_mt_mq_lock (); + + /* writers can grab lock now */ + vls_mt_pool_runlock (); +} + int vls_app_create (char *app_name) { @@ -2025,6 +2138,9 @@ vls_app_create (char *app_name) clib_rwlock_init (&vlsl->vls_pool_lock); vls_mt_locks_init (); vcm->wrk_rpc_fn = vls_rpc_handler; + /* For multi threaded apps where sessions are implicitly shared, ask vcl + * to use these callbacks prior and after blocking on io operations */ + vcl_worker_set_wait_mq_fns (vls_mt_mq_wait_lock, vls_mt_mq_wait_unlock); return VPPCOM_OK; } diff --git a/src/vcl/vcl_locked.h b/src/vcl/vcl_locked.h index 0e747acdea0..98a1c542e4a 100644 --- a/src/vcl/vcl_locked.h +++ b/src/vcl/vcl_locked.h @@ -50,8 +50,9 @@ int vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map, vcl_si_set * except_map, double wait_for_time); int vls_poll (vcl_poll_t *vp, uint32_t n_sids, double wait_for_time); vcl_session_handle_t vlsh_to_sh (vls_handle_t vlsh); -vcl_session_handle_t vlsh_to_session_index (vls_handle_t vlsh); -int vlsh_to_worker_index (vls_handle_t vlsh); +void vlsh_to_session_and_worker_index (vls_handle_t vlsh, + uint32_t *session_index, + uint32_t *wrk_index); vls_handle_t vls_session_index_to_vlsh (uint32_t session_index); int vls_app_create (char *app_name); unsigned char vls_use_eventfd (void); diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c index 4af79e9ebb0..d3ad2331827 100644 --- a/src/vcl/vcl_private.c +++ b/src/vcl/vcl_private.c @@ -75,6 +75,8 @@ vcl_mq_epoll_add_evfd (vcl_worker_t * wrk, svm_msg_q_t * mq) mqc->mq_fd = mq_fd; mqc->mq = mq; + fcntl (mq_fd, F_SETFL, O_NONBLOCK); + e.events = EPOLLIN; e.data.u32 = mqc_index; if (epoll_ctl (wrk->mqs_epfd, EPOLL_CTL_ADD, mq_fd, &e) < 0) @@ -228,6 +230,15 @@ vcl_worker_detach_sessions (vcl_worker_t *wrk) hash_free (seg_indices_map); } +void +vcl_worker_set_wait_mq_fns (vcl_worker_wait_mq_fn pre_wait, + vcl_worker_wait_mq_fn post_wait) +{ + vcl_worker_t *wrk = vcl_worker_get_current (); + wrk->pre_wait_fn = pre_wait; + wrk->post_wait_fn = post_wait; +} + vcl_worker_t * vcl_worker_alloc_and_init () { @@ -336,7 +347,7 @@ vcl_session_read_ready (vcl_session_t * s) max_deq = svm_fifo_max_dequeue_cons (s->rx_fifo); if (max_deq <= SESSION_CONN_HDR_LEN) return 0; - if (svm_fifo_peek (s->rx_fifo, 0, sizeof (ph), (u8 *) & ph) < 0) + if (svm_fifo_peek (s->rx_fifo, 0, sizeof (ph), (u8 *) &ph) < 0) return 0; if (ph.data_length + SESSION_CONN_HDR_LEN > max_deq) return 0; @@ -358,6 +369,39 @@ vcl_session_read_ready (vcl_session_t * s) } } +/** + * Used as alternative to vcl_session_read_ready to avoid peeking udp sessions. + * Multi-threaded applications could select the same session from multiple + * threads */ +int +vcl_session_read_ready2 (vcl_session_t *s) +{ + if (vcl_session_is_open (s)) + { + if (vcl_session_is_ct (s)) + return svm_fifo_max_dequeue_cons (s->ct_rx_fifo); + + if (s->is_dgram) + { + if (svm_fifo_max_dequeue_cons (s->rx_fifo) <= SESSION_CONN_HDR_LEN) + return 0; + + /* Return 1 even if not yet sure if a full datagram was received */ + return 1; + } + + return svm_fifo_max_dequeue_cons (s->rx_fifo); + } + else if (s->session_state == VCL_STATE_LISTEN) + { + return clib_fifo_elts (s->accept_evts_fifo); + } + else + { + return 1; + } +} + int vcl_session_write_ready (vcl_session_t * s) { diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 7e72b29ba25..c98e1cde9b1 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -143,8 +143,16 @@ typedef enum vcl_session_flags_ VCL_SESSION_F_PENDING_DISCONNECT = 1 << 6, VCL_SESSION_F_PENDING_FREE = 1 << 7, VCL_SESSION_F_PENDING_LISTEN = 1 << 8, + VCL_SESSION_F_APP_CLOSING = 1 << 9, } __clib_packed vcl_session_flags_t; +typedef enum vcl_worker_wait_ +{ + VCL_WRK_WAIT_CTRL, + VCL_WRK_WAIT_IO_RX, + VCL_WRK_WAIT_IO_TX, +} vcl_worker_wait_type_t; + typedef struct vcl_session_ { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); @@ -229,6 +237,7 @@ typedef struct vcl_mq_evt_conn_ int mq_fd; } vcl_mq_evt_conn_t; +typedef void (*vcl_worker_wait_mq_fn) (u32 vcl_sh); typedef struct vcl_worker_ { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); @@ -312,6 +321,10 @@ typedef struct vcl_worker_ /** vcl needs next epoll_create to go to libc_epoll */ u8 vcl_needs_real_epoll; volatile int rpc_done; + + /* functions to be called pre/post wait if vcl managed by vls */ + vcl_worker_wait_mq_fn pre_wait_fn; + vcl_worker_wait_mq_fn post_wait_fn; } vcl_worker_t; STATIC_ASSERT (sizeof (session_disconnected_msg_t) <= 16, @@ -724,6 +737,7 @@ u32 vcl_segment_table_lookup (u64 segment_handle); void vcl_segment_table_del (u64 segment_handle); int vcl_session_read_ready (vcl_session_t * session); +int vcl_session_read_ready2 (vcl_session_t *s); int vcl_session_write_ready (vcl_session_t * session); int vcl_session_alloc_ext_cfg (vcl_session_t *s, transport_endpt_ext_cfg_type_t type, u32 len); @@ -783,6 +797,8 @@ svm_fifo_chunk_t *vcl_segment_alloc_chunk (uword segment_handle, int vcl_session_share_fifos (vcl_session_t *s, svm_fifo_t *rxf, svm_fifo_t *txf); void vcl_worker_detach_sessions (vcl_worker_t *wrk); +void vcl_worker_set_wait_mq_fns (vcl_worker_wait_mq_fn pre_wait, + vcl_worker_wait_mq_fn post_wait); /* * VCL Binary API diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index d084c09e1da..0b2141614ee 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -1263,6 +1263,51 @@ vcl_flush_mq_events (void) vcl_worker_flush_mq_events (vcl_worker_get_current ()); } +static inline void +vcl_worker_wait_mq (vcl_worker_t *wrk, u32 session_handle, + vcl_worker_wait_type_t wait) +{ + vcl_session_t *s = 0; + u32 sleeps = 0; + + if (wrk->pre_wait_fn) + wrk->pre_wait_fn (session_handle); + + if (session_handle != VCL_INVALID_SESSION_INDEX) + { + s = vcl_session_get_w_handle (wrk, session_handle); + /* Session might've been closed by another thread if multi-threaded + * as opposed to multi-worker app */ + if (s->flags & VCL_SESSION_F_APP_CLOSING) + return; + } + + /* Short sleeps waiting on mq notifications. Note that we drop mq lock for + * multi-thread apps so we may be getting spurious notifications. Not ideal, + * as we'd like to only be woken up on events, but since multiple threads may + * be waiting on the same mq, events may be missed. If waiting on session io + * events, return if session is ready. Otherwise return if mq has event */ + while (svm_msg_q_timedwait (wrk->app_event_queue, 1e-3)) + { + if (s) + { + if ((wait == VCL_WRK_WAIT_IO_RX && vcl_session_read_ready (s)) || + (wait == VCL_WRK_WAIT_IO_TX && vcl_session_write_ready (s))) + break; + } + else + { + if (!svm_msg_q_is_empty (wrk->app_event_queue)) + break; + } + if (++sleeps > 200) + break; + } + + if (wrk->post_wait_fn) + wrk->post_wait_fn (session_handle); +} + static int vppcom_session_unbind (u32 session_handle) { @@ -1831,7 +1876,7 @@ again: if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking) return VPPCOM_EAGAIN; - svm_msg_q_wait (wrk->app_event_queue, SVM_MQ_WAIT_EMPTY); + vcl_worker_wait_mq (wrk, -1, VCL_WRK_WAIT_CTRL); vcl_worker_flush_mq_events (wrk); goto again; } @@ -2047,7 +2092,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, vcl_session_t *s = 0; svm_fifo_t *rx_fifo; session_event_t *e; - svm_msg_q_t *mq; u8 is_ct; if (PREDICT_FALSE (!buf)) @@ -2079,7 +2123,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK); is_ct = vcl_session_is_ct (s); - mq = wrk->app_event_queue; rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo; s->flags &= ~VCL_SESSION_F_HAS_RX_EVT; @@ -2098,12 +2141,14 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, { if (vcl_session_is_closing (s)) return vcl_session_closing_error (s); + if (s->flags & VCL_SESSION_F_APP_CLOSING) + return vcl_session_closed_error (s); if (is_ct) svm_fifo_unset_event (s->rx_fifo); svm_fifo_unset_event (rx_fifo); - svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY); + vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX); vcl_worker_flush_mq_events (wrk); } } @@ -2188,7 +2233,6 @@ vppcom_session_read_segments (uint32_t session_handle, int n_read = 0, is_nonblocking; vcl_session_t *s = 0; svm_fifo_t *rx_fifo; - svm_msg_q_t *mq; u8 is_ct; s = vcl_session_get_w_handle (wrk, session_handle); @@ -2200,7 +2244,6 @@ vppcom_session_read_segments (uint32_t session_handle, is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK); is_ct = vcl_session_is_ct (s); - mq = wrk->app_event_queue; rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo; s->flags &= ~VCL_SESSION_F_HAS_RX_EVT; @@ -2219,12 +2262,14 @@ vppcom_session_read_segments (uint32_t session_handle, { if (vcl_session_is_closing (s)) return vcl_session_closing_error (s); + if (s->flags & VCL_SESSION_F_APP_CLOSING) + return vcl_session_closed_error (s); if (is_ct) svm_fifo_unset_event (s->rx_fifo); svm_fifo_unset_event (rx_fifo); - svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY); + vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX); vcl_worker_flush_mq_events (wrk); } } @@ -2289,7 +2334,6 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf, int n_write, is_nonblocking; session_evt_type_t et; svm_fifo_t *tx_fifo; - svm_msg_q_t *mq; u8 is_ct; /* Accept zero length writes but just return */ @@ -2326,7 +2370,6 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf, tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo; is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK); - mq = wrk->app_event_queue; if (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram)) { if (is_nonblocking) @@ -2338,8 +2381,10 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf, svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); if (vcl_session_is_closing (s)) return vcl_session_closing_error (s); + if (s->flags & VCL_SESSION_F_APP_CLOSING) + return vcl_session_closed_error (s); - svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY); + vcl_worker_wait_mq (wrk, vcl_session_handle (s), VCL_WRK_WAIT_IO_TX); vcl_worker_flush_mq_events (wrk); } } @@ -2383,7 +2428,6 @@ vppcom_session_write_segments (uint32_t session_handle, int n_write = 0, n_bytes = 0, is_nonblocking; vcl_session_t *s = 0; svm_fifo_t *tx_fifo; - svm_msg_q_t *mq; u8 is_ct; u32 i; @@ -2406,7 +2450,6 @@ vppcom_session_write_segments (uint32_t session_handle, is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK); is_ct = vcl_session_is_ct (s); - mq = wrk->app_event_queue; tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo; for (i = 0; i < n_segments; i++) @@ -2423,8 +2466,10 @@ vppcom_session_write_segments (uint32_t session_handle, svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); if (vcl_session_is_closing (s)) return vcl_session_closing_error (s); + if (s->flags & VCL_SESSION_F_APP_CLOSING) + return vcl_session_closed_error (s); - svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY); + vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_TX); vcl_worker_flush_mq_events (wrk); } } @@ -2843,7 +2888,7 @@ check_rd: continue; } - if (vcl_session_read_ready (s)) + if (vcl_session_read_ready2 (s)) { clib_bitmap_set_no_check ((uword *) read_map, sid, 1); bits_set++; diff --git a/test/asf/test_vcl.py b/test/asf/test_vcl.py index 124ea14089b..143b46c22ee 100644 --- a/test/asf/test_vcl.py +++ b/test/asf/test_vcl.py @@ -7,7 +7,7 @@ import subprocess import signal import glob from config import config -from asfframework import VppAsfTestCase, VppTestRunner, Worker, tag_fixme_ubuntu2404 +from asfframework import VppAsfTestCase, VppTestRunner, Worker from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath iperf3 = "/usr/bin/iperf3" @@ -311,7 +311,6 @@ class VCLTestCase(VppAsfTestCase): self.assert_equal(worker_client.result, 0, "Binary test return code") -@tag_fixme_ubuntu2404 class LDPCutThruTestCase(VCLTestCase): """LDP Cut Thru Tests""" @@ -1024,7 +1023,6 @@ class VCLThruHostStackNsock(VCLTestCase): ) -@tag_fixme_ubuntu2404 class LDPThruHostStackIperf(VCLTestCase): """LDP Thru Host Stack Iperf""" @@ -1072,7 +1070,6 @@ class LDPThruHostStackIperf(VCLTestCase): ) -@tag_fixme_ubuntu2404 class LDPThruHostStackIperfUdp(VCLTestCase): """LDP Thru Host Stack Iperf UDP""" @@ -1118,7 +1115,6 @@ class LDPThruHostStackIperfUdp(VCLTestCase): ) -@tag_fixme_ubuntu2404 class LDPIpv6CutThruTestCase(VCLTestCase): """LDP IPv6 Cut Thru Tests""" |