diff options
Diffstat (limited to 'src/vnet/session/session_node.c')
-rw-r--r-- | src/vnet/session/session_node.c | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index e15625e37ca..4f2cae4d196 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -142,10 +142,14 @@ session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt) session_worker_stat_error_inc (wrk, rv, 1); app_wrk = application_get_worker (app, mp->wrk_index); - mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv); + app_worker_listened_notify (app_wrk, a->handle, mp->context, rv); if (mp->ext_config) session_mq_free_ext_config (app, mp->ext_config); + + /* Make sure events are flushed before releasing barrier, to avoid + * potential race with accept. */ + app_wrk_flush_wrk_events (app_wrk, 0); } static void @@ -170,7 +174,8 @@ session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt) rv = vnet_bind_uri (a); app_wrk = application_get_worker (app, 0); - mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv); + app_worker_listened_notify (app_wrk, a->handle, mp->context, rv); + app_wrk_flush_wrk_events (app_wrk, 0); } static void @@ -215,7 +220,7 @@ session_mq_connect_one (session_connect_msg_t *mp) wrk = session_main_get_worker (vlib_get_thread_index ()); session_worker_stat_error_inc (wrk, rv, 1); app_wrk = application_get_worker (app, mp->wrk_index); - mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv); + app_worker_connect_notify (app_wrk, 0, rv, mp->context); } if (mp->ext_config) @@ -324,7 +329,7 @@ session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt) { session_worker_stat_error_inc (wrk, rv, 1); app_wrk = application_get_worker (app, 0 /* default wrk only */ ); - mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv); + app_worker_connect_notify (app_wrk, 0, rv, mp->context); } } @@ -410,7 +415,7 @@ session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt) if (!app_wrk) return; - mq_send_unlisten_reply (app_wrk, sh, mp->context, rv); + app_worker_unlisten_reply (app_wrk, sh, mp->context, rv); } static void @@ -466,7 +471,7 @@ session_mq_accepted_reply_handler (session_worker_t *wrk, session_set_state (s, SESSION_STATE_READY); if (!svm_fifo_is_empty_prod (s->rx_fifo)) - app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, s); /* Closed while waiting for app to reply. Resend disconnect */ if (old_state >= SESSION_STATE_TRANSPORT_CLOSING) @@ -669,7 +674,7 @@ session_mq_worker_update_handler (void *data) session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX); if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo)) - app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, s); if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) app_worker_close_notify (app_wrk, s); @@ -1790,7 +1795,7 @@ session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node, break; svm_fifo_unset_event (s->rx_fifo); app_wrk = app_worker_get (s->app_wrk_index); - app_worker_builtin_rx (app_wrk, s); + app_worker_rx_notify (app_wrk, s); break; case SESSION_IO_EVT_TX_MAIN: s = session_get_if_valid (e->session_index, 0 /* main thread */); |