diff options
-rw-r--r-- | src/plugins/hs_apps/echo_client.c | 6 | ||||
-rw-r--r-- | src/plugins/hs_apps/echo_server.c | 2 | ||||
-rw-r--r-- | src/plugins/http/http.c | 9 | ||||
-rw-r--r-- | src/plugins/quic/quic.c | 7 | ||||
-rw-r--r-- | src/plugins/srtp/srtp.c | 5 | ||||
-rw-r--r-- | src/plugins/unittest/session_test.c | 70 | ||||
-rw-r--r-- | src/svm/message_queue.c | 10 | ||||
-rw-r--r-- | src/svm/message_queue.h | 11 | ||||
-rw-r--r-- | src/vnet/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/vnet/session/application.c | 8 | ||||
-rw-r--r-- | src/vnet/session/application.h | 47 | ||||
-rw-r--r-- | src/vnet/session/application_local.c | 40 | ||||
-rw-r--r-- | src/vnet/session/application_worker.c | 406 | ||||
-rw-r--r-- | src/vnet/session/segment_manager.c | 2 | ||||
-rw-r--r-- | src/vnet/session/session.c | 517 | ||||
-rw-r--r-- | src/vnet/session/session.h | 25 | ||||
-rw-r--r-- | src/vnet/session/session_api.c | 50 | ||||
-rw-r--r-- | src/vnet/session/session_input.c | 296 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 21 | ||||
-rw-r--r-- | src/vnet/session/session_types.h | 3 | ||||
-rw-r--r-- | src/vnet/tcp/tcp_input.c | 19 | ||||
-rw-r--r-- | src/vnet/tls/tls.c | 8 | ||||
-rw-r--r-- | src/vnet/udp/udp_input.c | 15 |
23 files changed, 949 insertions, 629 deletions
diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c index a7358a61952..d449045cf6a 100644 --- a/src/plugins/hs_apps/echo_client.c +++ b/src/plugins/hs_apps/echo_client.c @@ -706,10 +706,8 @@ ec_session_rx_callback (session_t *s) receive_data_chunk (wrk, es); if (svm_fifo_max_dequeue_cons (s->rx_fifo)) - { - if (svm_fifo_set_event (s->rx_fifo)) - session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX); - } + session_enqueue_notify (s); + return 0; } diff --git a/src/plugins/hs_apps/echo_server.c b/src/plugins/hs_apps/echo_server.c index 6a291163bd1..178e9eef4fb 100644 --- a/src/plugins/hs_apps/echo_server.c +++ b/src/plugins/hs_apps/echo_server.c @@ -228,6 +228,8 @@ echo_server_rx_callback (session_t * s) /* Program self-tap to retry */ if (svm_fifo_set_event (rx_fifo)) { + /* TODO should be session_enqueue_notify(s) but quic tests seem + * to fail if that's the case */ if (session_send_io_evt_to_thread (rx_fifo, SESSION_IO_EVT_BUILTIN_RX)) clib_warning ("failed to enqueue self-tap"); diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c index d25123a757d..503752c377e 100644 --- a/src/plugins/http/http.c +++ b/src/plugins/http/http.c @@ -503,7 +503,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp) hc->http_state = HTTP_STATE_WAIT_APP; app_wrk = app_worker_get_if_valid (as->app_wrk_index); - app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, as); return HTTP_SM_STOP; @@ -777,7 +777,7 @@ state_cln_wait_method (http_conn_t *hc, transport_send_params_t *sp) } app_wrk = app_worker_get_if_valid (as->app_wrk_index); - app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, as); return HTTP_SM_STOP; } @@ -808,7 +808,7 @@ cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as) app_wrk = app_worker_get_if_valid (as->app_wrk_index); ASSERT (app_wrk); - app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, as); return 1; } @@ -864,8 +864,9 @@ maybe_reschedule: if (hc->rx_buf_offset < vec_len (hc->rx_buf) || svm_fifo_max_dequeue_cons (ts->rx_fifo)) { + /* TODO is the flag really needed? */ if (svm_fifo_set_event (ts->rx_fifo)) - session_send_io_evt_to_thread (ts->rx_fifo, SESSION_IO_EVT_BUILTIN_RX); + session_enqueue_notify (ts); } return HTTP_SM_CONTINUE; } diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c index c3c4540353b..61380b8048c 100644 --- a/src/plugins/quic/quic.c +++ b/src/plugins/quic/quic.c @@ -830,7 +830,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, size_t len) { QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off); - u32 max_enq, rv; + u32 max_enq; quic_ctx_t *sctx; session_t *stream_session; app_worker_t *app_wrk; @@ -895,10 +895,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index); if (PREDICT_TRUE (app_wrk != 0)) { - rv = app_worker_lock_and_send_event (app_wrk, stream_session, - SESSION_IO_EVT_RX); - if (rv) - QUIC_ERR ("Failed to ping app for RX"); + app_worker_rx_notify (app_wrk, stream_session); } quic_ack_rx_data (stream_session); } diff --git a/src/plugins/srtp/srtp.c b/src/plugins/srtp/srtp.c index aacadcee265..8b7c5b6374c 100644 --- a/src/plugins/srtp/srtp.c +++ b/src/plugins/srtp/srtp.c @@ -309,8 +309,7 @@ done: int srtp_add_vpp_q_builtin_rx_evt (session_t *s) { - if (svm_fifo_set_event (s->rx_fifo)) - session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX); + session_enqueue_notify (s); return 0; } @@ -320,7 +319,7 @@ srtp_notify_app_enqueue (srtp_tc_t *ctx, session_t *app_session) app_worker_t *app_wrk; app_wrk = app_worker_get_if_valid (app_session->app_wrk_index); if (PREDICT_TRUE (app_wrk != 0)) - app_worker_lock_and_send_event (app_wrk, app_session, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, app_session); } static inline int diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c index c4e41c34dd0..70b3b32a2e4 100644 --- a/src/plugins/unittest/session_test.c +++ b/src/plugins/unittest/session_test.c @@ -1771,6 +1771,74 @@ wait_for_event (svm_msg_q_t * mq, int fd, int epfd, u8 use_eventfd) } } +/* Used to be part of application_worker.c prior to adding support for + * async rx + */ +static int +test_mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring, + svm_msg_q_msg_t *msg) +{ + int rv, n_try = 0; + + while (n_try < 75) + { + rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg); + if (!rv) + return 0; + /* + * Break the loop if mq is full, usually this is because the + * app has crashed or is hanging on somewhere. + */ + if (rv != -1) + break; + n_try += 1; + usleep (1); + } + + return -1; +} + +/* Used to be part of application_worker.c prior to adding support for + * async rx and was used for delivering io events over mq + * NB: removed handling of mq congestion + */ +static inline int +test_app_send_io_evt_rx (app_worker_t *app_wrk, session_t *s) +{ + svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg; + session_event_t *evt; + svm_msg_q_t *mq; + u32 app_session; + int rv; + + if (app_worker_application_is_builtin (app_wrk)) + return app_worker_rx_notify (app_wrk, s); + + if (svm_fifo_has_event (s->rx_fifo)) + return 0; + + app_session = s->rx_fifo->shr->client_session_index; + mq = app_wrk->event_queue; + + rv = test_mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg); + + if (PREDICT_FALSE (rv)) + { + clib_warning ("failed to alloc mq message"); + return -1; + } + + evt = svm_msg_q_msg_data (mq, mq_msg); + evt->event_type = SESSION_IO_EVT_RX; + evt->session_index = app_session; + + (void) svm_fifo_set_event (s->rx_fifo); + + svm_msg_q_add_and_unlock (mq, mq_msg); + + return 0; +} + static int session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input) { @@ -1885,7 +1953,7 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input) { while (svm_fifo_has_event (rx_fifo)) ; - app_worker_lock_and_send_event (app_wrk, &s, SESSION_IO_EVT_RX); + test_app_send_io_evt_rx (app_wrk, &s); } } diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c index 2880645b427..ab0d230b1f0 100644 --- a/src/svm/message_queue.c +++ b/src/svm/message_queue.c @@ -340,15 +340,15 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) return (dist1 < dist2); } -static void -svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem) +void +svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg) { svm_msg_q_shared_queue_t *sq = mq->q.shr; i8 *tailp; u32 sz; tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail); - clib_memcpy_fast (tailp, elem, sq->elsize); + clib_memcpy_fast (tailp, msg, sq->elsize); sq->tail = (sq->tail + 1) % sq->maxsize; @@ -381,7 +381,7 @@ svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait) svm_msg_q_wait_prod (mq); } - svm_msg_q_add_raw (mq, (u8 *) msg); + svm_msg_q_add_raw (mq, msg); svm_msg_q_unlock (mq); @@ -392,7 +392,7 @@ void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { ASSERT (svm_msq_q_msg_is_valid (mq, msg)); - svm_msg_q_add_raw (mq, (u8 *) msg); + svm_msg_q_add_raw (mq, msg); svm_msg_q_unlock (mq); } diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h index 0780cca1c32..4473c44f4e3 100644 --- a/src/svm/message_queue.h +++ b/src/svm/message_queue.h @@ -193,6 +193,17 @@ void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); /** * Producer enqueue one message to queue * + * Must be called with mq locked. Prior to calling this, the producer should've + * obtained a message buffer from one of the rings. + * + * @param mq message queue + * @param msg message to be enqueued + */ +void svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg); + +/** + * Producer enqueue one message to queue + * * Prior to calling this, the producer should've obtained a message buffer * from one of the rings by calling @ref svm_msg_q_alloc_msg. * diff --git a/src/vnet/CMakeLists.txt b/src/vnet/CMakeLists.txt index 5aada92e8d9..5e913dffdea 100644 --- a/src/vnet/CMakeLists.txt +++ b/src/vnet/CMakeLists.txt @@ -1018,6 +1018,7 @@ list(APPEND VNET_SOURCES session/session_rules_table.c session/session_lookup.c session/session_node.c + session/session_input.c session/transport.c session/application.c session/application_worker.c diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index cf867401ea2..fdd5a0a67cd 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -725,6 +725,12 @@ application_get_if_valid (u32 app_index) return pool_elt_at_index (app_main.app_pool, app_index); } +static int +_null_app_tx_callback (session_t *s) +{ + return 0; +} + static void application_verify_cb_fns (session_cb_vft_t * cb_fns) { @@ -736,6 +742,8 @@ application_verify_cb_fns (session_cb_vft_t * cb_fns) clib_warning ("No session disconnect callback function provided"); if (cb_fns->session_reset_callback == 0) clib_warning ("No session reset callback function provided"); + if (!cb_fns->builtin_app_tx_callback) + cb_fns->builtin_app_tx_callback = _null_app_tx_callback; } /** diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index c3d6180022f..5505d91ea09 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -77,17 +77,17 @@ typedef struct app_worker_ /** Pool of half-open session handles. Tracked in case worker detaches */ session_handle_t *half_open_table; + /* Per vpp worker fifos of events for app worker */ + session_event_t **wrk_evts; + + /* Vector of vpp workers mq congestion flags */ + u8 *wrk_mq_congested; + /** Protects detached seg managers */ clib_spinlock_t detached_seg_managers_lock; /** Vector of detached listener segment managers */ u32 *detached_seg_managers; - - /** Fifo of messages postponed because of mq congestion */ - app_wrk_postponed_msg_t *postponed_mq_msgs; - - /** Lock to add/sub message from ref @postponed_mq_msgs */ - clib_spinlock_t postponed_mq_msgs_lock; } app_worker_t; typedef struct app_worker_map_ @@ -317,6 +317,12 @@ void application_enable_rx_mqs_nodes (u8 is_en); * App worker */ +always_inline u8 +app_worker_mq_is_congested (app_worker_t *app_wrk) +{ + return app_wrk->mq_congested > 0; +} + app_worker_t *app_worker_alloc (application_t * app); int application_alloc_worker_and_init (application_t * app, app_worker_t ** wrk); @@ -331,6 +337,10 @@ session_error_t app_worker_start_listen (app_worker_t *app_wrk, app_listener_t *lstnr); int app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al); int app_worker_init_accepted (session_t * s); +int app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh, + u32 opaque, int err); +int app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh, + u32 opaque, session_error_t err); int app_worker_accept_notify (app_worker_t * app_wrk, session_t * s); int app_worker_init_connected (app_worker_t * app_wrk, session_t * s); int app_worker_connect_notify (app_worker_t * app_wrk, session_t * s, @@ -343,13 +353,21 @@ int app_worker_transport_closed_notify (app_worker_t * app_wrk, int app_worker_reset_notify (app_worker_t * app_wrk, session_t * s); int app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s, session_cleanup_ntf_t ntf); +int app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s, + session_cleanup_ntf_t ntf, + void (*cleanup_cb) (session_t *s)); int app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s, session_handle_t new_sh); -int app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s); -int app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s); +int app_worker_rx_notify (app_worker_t *app_wrk, session_t *s); int app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s, svm_fifo_t * f, session_ft_action_t act, u32 len); +void app_worker_add_event (app_worker_t *app_wrk, session_t *s, + session_evt_type_t evt_type); +void app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index, + session_event_t *evt); +int app_wrk_flush_wrk_events (app_worker_t *app_wrk, u32 thread_index); +void app_worker_del_all_events (app_worker_t *app_wrk); segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *, session_t *); segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *); @@ -364,9 +382,10 @@ void app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg, u32 msg_len, int fd); void app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg, u32 msg_len); -int app_worker_send_event (app_worker_t * app, session_t * s, u8 evt); -int app_worker_lock_and_send_event (app_worker_t * app, session_t * s, - u8 evt_type); +u8 app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index); +void app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index); +void app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk, + u32 thread_index); session_t *app_worker_proxy_listener (app_worker_t * app, u8 fib_proto, u8 transport_proto); void app_worker_del_detached_sm (app_worker_t * app_wrk, u32 sm_index); @@ -395,6 +414,12 @@ void sapi_socket_close_w_handle (u32 api_handle); crypto_engine_type_t app_crypto_engine_type_add (void); u8 app_crypto_engine_n_types (void); +static inline u8 +app_worker_application_is_builtin (app_worker_t *app_wrk) +{ + return app_wrk->app_is_builtin; +} + #endif /* SRC_VNET_SESSION_APPLICATION_H_ */ /* diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c index 6ac4da2c655..192c22b659a 100644 --- a/src/vnet/session/application_local.c +++ b/src/vnet/session/application_local.c @@ -1028,6 +1028,17 @@ ct_close_is_reset (ct_connection_t *ct, session_t *s) } static void +ct_session_cleanup_server_session (session_t *s) +{ + ct_connection_t *ct; + + ct = (ct_connection_t *) session_get_transport (s); + ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo); + session_free (s); + ct_connection_free (ct); +} + +static void ct_session_postponed_cleanup (ct_connection_t *ct) { ct_connection_t *peer_ct; @@ -1047,33 +1058,38 @@ ct_session_postponed_cleanup (ct_connection_t *ct) } session_transport_closed_notify (&ct->connection); + /* It would be cleaner to call session_transport_delete_notify + * but then we can't control session cleanup lower */ + session_set_state (s, SESSION_STATE_TRANSPORT_DELETED); + if (app_wrk) + app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT); + if (ct->flags & CT_CONN_F_CLIENT) { - if (app_wrk) - app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT); - /* Normal free for client session as the fifos are allocated through * the connects segment manager in a segment that's not shared with * the server */ ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo); - session_free_w_fifos (s); + session_program_cleanup (s); + ct_connection_free (ct); } else { /* Manual session and fifo segment cleanup to avoid implicit * segment manager cleanups and notifications */ - app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (app_wrk) { - app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT); - app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION); + /* Remove custom cleanup notify infra when/if switching to normal + * session cleanup. Note that ct is freed in the cb function */ + app_worker_cleanup_notify_custom (app_wrk, s, + SESSION_CLEANUP_SESSION, + ct_session_cleanup_server_session); + } + else + { + ct_connection_free (ct); } - - ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo); - session_free (s); } - - ct_connection_free (ct); } static void diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c index c3941d1fd7b..127963a1eda 100644 --- a/src/vnet/session/application_worker.c +++ b/src/vnet/session/application_worker.c @@ -26,6 +26,7 @@ app_worker_t * app_worker_alloc (application_t * app) { app_worker_t *app_wrk; + pool_get (app_workers, app_wrk); clib_memset (app_wrk, 0, sizeof (*app_wrk)); app_wrk->wrk_index = app_wrk - app_workers; @@ -33,7 +34,8 @@ app_worker_alloc (application_t * app) app_wrk->wrk_map_index = ~0; app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; clib_spinlock_init (&app_wrk->detached_seg_managers_lock); - clib_spinlock_init (&app_wrk->postponed_mq_msgs_lock); + vec_validate (app_wrk->wrk_evts, vlib_num_workers ()); + vec_validate (app_wrk->wrk_mq_congested, vlib_num_workers ()); APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index); return app_wrk; } @@ -65,17 +67,25 @@ app_worker_free (app_worker_t * app_wrk) int i; /* + * Cleanup vpp wrk events + */ + app_worker_del_all_events (app_wrk); + for (i = 0; i < vec_len (app_wrk->wrk_evts); i++) + clib_fifo_free (app_wrk->wrk_evts[i]); + + vec_free (app_wrk->wrk_evts); + vec_free (app_wrk->wrk_mq_congested); + + /* * Listener cleanup */ - /* *INDENT-OFF* */ hash_foreach (handle, sm_index, app_wrk->listeners_table, ({ ls = listen_session_get_from_handle (handle); vec_add1 (handles, app_listen_session_handle (ls)); vec_add1 (sm_indices, sm_index); sm = segment_manager_get (sm_index); })); - /* *INDENT-ON* */ for (i = 0; i < vec_len (handles); i++) { @@ -127,7 +137,6 @@ app_worker_free (app_worker_t * app_wrk) } vec_free (app_wrk->detached_seg_managers); clib_spinlock_free (&app_wrk->detached_seg_managers_lock); - clib_spinlock_free (&app_wrk->postponed_mq_msgs_lock); if (CLIB_DEBUG) clib_memset (app_wrk, 0xfe, sizeof (*app_wrk)); @@ -339,7 +348,7 @@ app_worker_init_accepted (session_t * s) listener = listen_session_get_from_handle (s->listener_handle); app_wrk = application_listener_select_worker (listener); - if (PREDICT_FALSE (app_wrk->mq_congested)) + if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk))) return -1; s->app_wrk_index = app_wrk->wrk_index; @@ -355,10 +364,39 @@ app_worker_init_accepted (session_t * s) } int +app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh, + u32 opaque, session_error_t err) +{ + session_event_t evt; + + evt.event_type = SESSION_CTRL_EVT_BOUND; + evt.session_handle = alsh; + evt.as_u64[1] = (u64) opaque << 32 | err; + + app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt); + + return 0; +} + +int +app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh, + u32 opaque, session_error_t err) +{ + session_event_t evt = {}; + + evt.event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY; + evt.session_handle = sh; + evt.as_u64[1] = (u64) opaque << 32 | (u32) err; + + app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt); + return 0; +} + +int app_worker_accept_notify (app_worker_t * app_wrk, session_t * s) { - application_t *app = application_get (app_wrk->app_index); - return app->cb_fns.session_accept_callback (s); + app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_ACCEPTED); + return 0; } int @@ -382,9 +420,16 @@ int app_worker_connect_notify (app_worker_t * app_wrk, session_t * s, session_error_t err, u32 opaque) { - application_t *app = application_get (app_wrk->app_index); - return app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, - s, err); + session_event_t evt = {}; + u32 thread_index; + + evt.event_type = SESSION_CTRL_EVT_CONNECTED; + evt.session_index = s ? s->session_index : ~0; + evt.as_u64[1] = (u64) opaque << 32 | (u32) err; + thread_index = s ? s->thread_index : vlib_get_thread_index (); + + app_worker_add_event_custom (app_wrk, thread_index, &evt); + return 0; } int @@ -402,36 +447,28 @@ app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh) int app_worker_del_half_open (app_worker_t *app_wrk, session_t *s) { - application_t *app = application_get (app_wrk->app_index); - ASSERT (session_vlib_thread_is_cl_thread ()); - pool_put_index (app_wrk->half_open_table, s->ho_index); - if (app->cb_fns.half_open_cleanup_callback) - app->cb_fns.half_open_cleanup_callback (s); + app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_HALF_CLEANUP); return 0; } int app_worker_close_notify (app_worker_t * app_wrk, session_t * s) { - application_t *app = application_get (app_wrk->app_index); - app->cb_fns.session_disconnect_callback (s); + app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_DISCONNECTED); return 0; } int app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s) { - application_t *app = application_get (app_wrk->app_index); - if (app->cb_fns.session_transport_closed_callback) - app->cb_fns.session_transport_closed_callback (s); + app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_TRANSPORT_CLOSED); return 0; } int app_worker_reset_notify (app_worker_t * app_wrk, session_t * s) { - application_t *app = application_get (app_wrk->app_index); - app->cb_fns.session_reset_callback (s); + app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_RESET); return 0; } @@ -439,29 +476,37 @@ int app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s, session_cleanup_ntf_t ntf) { - application_t *app = application_get (app_wrk->app_index); - if (app->cb_fns.session_cleanup_callback) - app->cb_fns.session_cleanup_callback (s, ntf); + session_event_t evt; + + evt.event_type = SESSION_CTRL_EVT_CLEANUP; + evt.as_u64[0] = (u64) ntf << 32 | s->session_index; + evt.as_u64[1] = pointer_to_uword (session_cleanup); + + app_worker_add_event_custom (app_wrk, s->thread_index, &evt); + return 0; } int -app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s) +app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s, + session_cleanup_ntf_t ntf, + void (*cleanup_cb) (session_t *s)) { - application_t *app = application_get (app_wrk->app_index); - app->cb_fns.builtin_app_rx_callback (s); + session_event_t evt; + + evt.event_type = SESSION_CTRL_EVT_CLEANUP; + evt.as_u64[0] = (u64) ntf << 32 | s->session_index; + evt.as_u64[1] = pointer_to_uword (cleanup_cb); + + app_worker_add_event_custom (app_wrk, s->thread_index, &evt); + return 0; } int -app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s) +app_worker_rx_notify (app_worker_t *app_wrk, session_t *s) { - application_t *app = application_get (app_wrk->app_index); - - if (!app->cb_fns.builtin_app_tx_callback) - return 0; - - app->cb_fns.builtin_app_tx_callback (s); + app_worker_add_event (app_wrk, s, SESSION_IO_EVT_RX); return 0; } @@ -469,8 +514,13 @@ int app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s, session_handle_t new_sh) { - application_t *app = application_get (app_wrk->app_index); - app->cb_fns.session_migrate_callback (s, new_sh); + session_event_t evt; + + evt.event_type = SESSION_CTRL_EVT_MIGRATED; + evt.session_index = s->session_index; + evt.as_u64[1] = new_sh; + + app_worker_add_event_custom (app_wrk, s->thread_index, &evt); return 0; } @@ -514,7 +564,7 @@ int app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep, session_handle_t *rsh) { - if (PREDICT_FALSE (app_wrk->mq_congested)) + if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk))) return SESSION_E_REFUSED; sep->app_wrk_index = app_wrk->wrk_index; @@ -601,24 +651,27 @@ app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto, int app_worker_add_segment_notify (app_worker_t * app_wrk, u64 segment_handle) { - application_t *app = application_get (app_wrk->app_index); + session_event_t evt; + + evt.event_type = SESSION_CTRL_EVT_APP_ADD_SEGMENT; + evt.as_u64[1] = segment_handle; - return app->cb_fns.add_segment_callback (app_wrk->wrk_index, - segment_handle); + app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt); + + return 0; } int app_worker_del_segment_notify (app_worker_t * app_wrk, u64 segment_handle) { - application_t *app = application_get (app_wrk->app_index); - return app->cb_fns.del_segment_callback (app_wrk->wrk_index, - segment_handle); -} + session_event_t evt; -static inline u8 -app_worker_application_is_builtin (app_worker_t * app_wrk) -{ - return app_wrk->app_is_builtin; + evt.event_type = SESSION_CTRL_EVT_APP_DEL_SEGMENT; + evt.as_u64[1] = segment_handle; + + app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt); + + return 0; } static int @@ -677,126 +730,38 @@ app_wrk_send_fd (app_worker_t *app_wrk, int fd) return 0; } -static int -mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring, - svm_msg_q_msg_t *msg) -{ - int rv, n_try = 0; - - while (n_try < 75) - { - rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg); - if (!rv) - return 0; - /* - * Break the loop if mq is full, usually this is because the - * app has crashed or is hanging on somewhere. - */ - if (rv != -1) - break; - n_try += 1; - usleep (1); - } - - return -1; -} - -typedef union app_wrk_mq_rpc_args_ -{ - struct - { - u32 thread_index; - u32 app_wrk_index; - }; - uword as_uword; -} app_wrk_mq_rpc_ags_t; - -static int -app_wrk_handle_mq_postponed_msgs (void *arg) +void +app_worker_add_event (app_worker_t *app_wrk, session_t *s, + session_evt_type_t evt_type) { - svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg; - app_wrk_postponed_msg_t *pm; - app_wrk_mq_rpc_ags_t args; - u32 max_msg, n_msg = 0; - app_worker_t *app_wrk; session_event_t *evt; - svm_msg_q_t *mq; - - args.as_uword = pointer_to_uword (arg); - app_wrk = app_worker_get_if_valid (args.app_wrk_index); - if (!app_wrk) - return 0; - - mq = app_wrk->event_queue; - - clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock); - - max_msg = clib_min (32, clib_fifo_elts (app_wrk->postponed_mq_msgs)); - - while (n_msg < max_msg) - { - pm = clib_fifo_head (app_wrk->postponed_mq_msgs); - if (mq_try_lock_and_alloc_msg (mq, pm->ring, mq_msg)) - break; - evt = svm_msg_q_msg_data (mq, mq_msg); - clib_memset (evt, 0, sizeof (*evt)); - evt->event_type = pm->event_type; - clib_memcpy_fast (evt->data, pm->data, pm->len); - - if (pm->fd != -1) - app_wrk_send_fd (app_wrk, pm->fd); - - svm_msg_q_add_and_unlock (mq, mq_msg); - - clib_fifo_advance_head (app_wrk->postponed_mq_msgs, 1); - n_msg += 1; - } + ASSERT (s->thread_index == vlib_get_thread_index ()); + clib_fifo_add2 (app_wrk->wrk_evts[s->thread_index], evt); + evt->session_index = s->session_index; + evt->event_type = evt_type; + evt->postponed = 0; - if (!clib_fifo_elts (app_wrk->postponed_mq_msgs)) + /* First event for this app_wrk. Schedule it for handling in session input */ + if (clib_fifo_elts (app_wrk->wrk_evts[s->thread_index]) == 1) { - app_wrk->mq_congested = 0; + session_worker_t *wrk = session_main_get_worker (s->thread_index); + session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index); } - else - { - session_send_rpc_evt_to_thread_force ( - args.thread_index, app_wrk_handle_mq_postponed_msgs, - uword_to_pointer (args.as_uword, void *)); - } - - clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock); - - return 0; } -static void -app_wrk_add_mq_postponed_msg (app_worker_t *app_wrk, session_mq_rings_e ring, - u8 evt_type, void *msg, u32 msg_len, int fd) +void +app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index, + session_event_t *evt) { - app_wrk_postponed_msg_t *pm; - - clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock); + clib_fifo_add1 (app_wrk->wrk_evts[thread_index], *evt); - app_wrk->mq_congested = 1; - - clib_fifo_add2 (app_wrk->postponed_mq_msgs, pm); - clib_memcpy_fast (pm->data, msg, msg_len); - pm->event_type = evt_type; - pm->ring = ring; - pm->len = msg_len; - pm->fd = fd; - - if (clib_fifo_elts (app_wrk->postponed_mq_msgs) == 1) + /* First event for this app_wrk. Schedule it for handling in session input */ + if (clib_fifo_elts (app_wrk->wrk_evts[thread_index]) == 1) { - app_wrk_mq_rpc_ags_t args = { .thread_index = vlib_get_thread_index (), - .app_wrk_index = app_wrk->wrk_index }; - - session_send_rpc_evt_to_thread_force ( - args.thread_index, app_wrk_handle_mq_postponed_msgs, - uword_to_pointer (args.as_uword, void *)); + session_worker_t *wrk = session_main_get_worker (thread_index); + session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index); } - - clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock); } always_inline void @@ -806,14 +771,9 @@ app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg, svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg; svm_msg_q_t *mq = app_wrk->event_queue; session_event_t *evt; - int rv; - - if (PREDICT_FALSE (app_wrk->mq_congested)) - goto handle_congestion; - rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_CTRL_EVT_RING, mq_msg); - if (PREDICT_FALSE (rv)) - goto handle_congestion; + ASSERT (!svm_msg_q_or_ring_is_full (mq, SESSION_MQ_CTRL_EVT_RING)); + *mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_CTRL_EVT_RING); evt = svm_msg_q_msg_data (mq, mq_msg); clib_memset (evt, 0, sizeof (*evt)); @@ -823,14 +783,7 @@ app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg, if (fd != -1) app_wrk_send_fd (app_wrk, fd); - svm_msg_q_add_and_unlock (mq, mq_msg); - - return; - -handle_congestion: - - app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_CTRL_EVT_RING, evt_type, - msg, msg_len, fd); + svm_msg_q_add_raw (mq, mq_msg); } void @@ -847,109 +800,26 @@ app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg, app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1); } -static inline int -app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s) +u8 +app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index) { - svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg; - session_event_t *evt; - svm_msg_q_t *mq; - u32 app_session; - int rv; - - if (app_worker_application_is_builtin (app_wrk)) - return app_worker_builtin_rx (app_wrk, s); - - if (svm_fifo_has_event (s->rx_fifo)) - return 0; - - app_session = s->rx_fifo->shr->client_session_index; - mq = app_wrk->event_queue; - - if (PREDICT_FALSE (app_wrk->mq_congested)) - goto handle_congestion; - - rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg); - - if (PREDICT_FALSE (rv)) - goto handle_congestion; - - evt = svm_msg_q_msg_data (mq, mq_msg); - evt->event_type = SESSION_IO_EVT_RX; - evt->session_index = app_session; - - (void) svm_fifo_set_event (s->rx_fifo); - - svm_msg_q_add_and_unlock (mq, mq_msg); - - return 0; - -handle_congestion: - - app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING, - SESSION_IO_EVT_RX, &app_session, - sizeof (app_session), -1); - return -1; + return app_wrk->wrk_mq_congested[thread_index] > 0; } -static inline int -app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s) +void +app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index) { - svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg; - session_event_t *evt; - svm_msg_q_t *mq; - u32 app_session; - int rv; - - if (app_worker_application_is_builtin (app_wrk)) - return app_worker_builtin_tx (app_wrk, s); - - app_session = s->tx_fifo->shr->client_session_index; - mq = app_wrk->event_queue; - - if (PREDICT_FALSE (app_wrk->mq_congested)) - goto handle_congestion; - - rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg); - - if (PREDICT_FALSE (rv)) - goto handle_congestion; - - evt = svm_msg_q_msg_data (mq, mq_msg); - evt->event_type = SESSION_IO_EVT_TX; - evt->session_index = app_session; - - svm_msg_q_add_and_unlock (mq, mq_msg); - - return 0; - -handle_congestion: - - app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING, - SESSION_IO_EVT_TX, &app_session, - sizeof (app_session), -1); - return -1; + clib_atomic_fetch_add_relax (&app_wrk->mq_congested, 1); + ASSERT (thread_index == vlib_get_thread_index ()); + app_wrk->wrk_mq_congested[thread_index] = 1; } -/* *INDENT-OFF* */ -typedef int (app_send_evt_handler_fn) (app_worker_t *app, - session_t *s); -static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = { - app_send_io_evt_rx, - app_send_io_evt_tx, -}; -/* *INDENT-ON* */ - -/** - * Send event to application - * - * Logic from queue perspective is blocking. However, if queue is full, - * we return. - */ -int -app_worker_lock_and_send_event (app_worker_t * app, session_t * s, - u8 evt_type) +void +app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk, u32 thread_index) { - return app_send_evt_handler_fns[evt_type] (app, s); + clib_atomic_fetch_sub_relax (&app_wrk->mq_congested, 1); + ASSERT (thread_index == vlib_get_thread_index ()); + app_wrk->wrk_mq_congested[thread_index] = 0; } u8 * diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c index ad0ba89a1dd..d459b73650a 100644 --- a/src/vnet/session/segment_manager.c +++ b/src/vnet/session/segment_manager.c @@ -866,7 +866,7 @@ segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo) /* Thread that allocated the fifos must be the one to clean them up */ ASSERT (rx_fifo->master_thread_index == vlib_get_thread_index () || - rx_fifo->refcnt > 1); + rx_fifo->refcnt > 1 || vlib_thread_is_main_w_barrier ()); /* It's possible to have no segment manager if the session was removed * as result of a detach. */ diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 228234ceefc..b494041f9eb 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -240,18 +240,26 @@ session_is_valid (u32 si, u8 thread_index) || s->session_state <= SESSION_STATE_LISTENING) return 1; - if (s->session_state == SESSION_STATE_CONNECTING && + if ((s->session_state == SESSION_STATE_CONNECTING || + s->session_state == SESSION_STATE_TRANSPORT_CLOSED) && (s->flags & SESSION_F_HALF_OPEN)) return 1; tc = session_get_transport (s); - if (s->connection_index != tc->c_index - || s->thread_index != tc->thread_index || tc->s_index != si) + if (s->connection_index != tc->c_index || + s->thread_index != tc->thread_index || tc->s_index != si) return 0; return 1; } +void +session_cleanup (session_t *s) +{ + segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); + session_free (s); +} + static void session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf) { @@ -259,16 +267,21 @@ session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf) app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (!app_wrk) - return; + { + if (ntf == SESSION_CLEANUP_TRANSPORT) + return; + + session_cleanup (s); + return; + } app_worker_cleanup_notify (app_wrk, s, ntf); } void -session_free_w_fifos (session_t * s) +session_program_cleanup (session_t *s) { + ASSERT (s->session_state == SESSION_STATE_TRANSPORT_DELETED); session_cleanup_notify (s, SESSION_CLEANUP_SESSION); - segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); - session_free (s); } /** @@ -285,7 +298,7 @@ session_delete (session_t * s) if ((rv = session_lookup_del_session (s))) clib_warning ("session %u hash delete rv %d", s->session_index, rv); - session_free_w_fifos (s); + session_program_cleanup (s); } void @@ -307,7 +320,7 @@ session_cleanup_half_open (session_handle_t ho_handle) transport_cleanup (session_get_transport_proto (ho), ho->connection_index, ho->app_index /* overloaded */); } - else + else if (ho->session_state != SESSION_STATE_TRANSPORT_DELETED) { /* Cleanup half-open session lookup table if need be */ if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED) @@ -333,7 +346,8 @@ session_half_open_free (session_t *ho) app_wrk = app_worker_get_if_valid (ho->app_wrk_index); if (app_wrk) app_worker_del_half_open (app_wrk, ho); - session_free (ho); + else + session_free (ho); } static void @@ -354,6 +368,7 @@ session_half_open_delete_notify (transport_connection_t *tc) if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP)) session_lookup_del_half_open (tc); } + session_set_state (ho, SESSION_STATE_TRANSPORT_DELETED); /* Notification from ctrl thread accepted without rpc */ if (tc->thread_index == transport_cl_thread ()) @@ -558,10 +573,158 @@ session_fifo_tuning (session_t * s, svm_fifo_t * f, } } +void +session_wrk_program_app_wrk_evts (session_worker_t *wrk, u32 app_wrk_index) +{ + u8 need_interrupt; + + ASSERT ((wrk - session_main.wrk) == vlib_get_thread_index ()); + need_interrupt = clib_bitmap_is_zero (wrk->app_wrks_pending_ntf); + wrk->app_wrks_pending_ntf = + clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk_index, 1); + + if (need_interrupt) + vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index); +} + +always_inline void +session_program_io_event (app_worker_t *app_wrk, session_t *s, + session_evt_type_t et, u8 is_cl) +{ + if (is_cl) + { + /* Special events for connectionless sessions */ + et += SESSION_IO_EVT_BUILTIN_RX - SESSION_IO_EVT_RX; + + ASSERT (s->thread_index == 0); + session_event_t evt = { + .event_type = et, + .session_handle = session_handle (s), + }; + + app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt); + } + else + { + app_worker_add_event (app_wrk, s, et); + } +} + +static inline int +session_notify_subscribers (u32 app_index, session_t *s, svm_fifo_t *f, + session_evt_type_t evt_type) +{ + app_worker_t *app_wrk; + application_t *app; + u8 is_cl; + int i; + + app = application_get (app_index); + if (!app) + return -1; + + is_cl = s->thread_index != vlib_get_thread_index (); + for (i = 0; i < f->shr->n_subscribers; i++) + { + app_wrk = application_get_worker (app, f->shr->subscribers[i]); + if (!app_wrk) + continue; + session_program_io_event (app_wrk, s, evt_type, is_cl ? 1 : 0); + } + + return 0; +} + +always_inline int +session_enqueue_notify_inline (session_t *s, u8 is_cl) +{ + app_worker_t *app_wrk; + + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (PREDICT_FALSE (!app_wrk)) + return -1; + + session_program_io_event (app_wrk, s, SESSION_IO_EVT_RX, is_cl); + + if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo))) + return session_notify_subscribers (app_wrk->app_index, s, s->rx_fifo, + SESSION_IO_EVT_RX); + + return 0; +} + +int +session_enqueue_notify (session_t *s) +{ + return session_enqueue_notify_inline (s, 0 /* is_cl */); +} + +int +session_enqueue_notify_cl (session_t *s) +{ + return session_enqueue_notify_inline (s, 1 /* is_cl */); +} + +int +session_dequeue_notify (session_t *s) +{ + app_worker_t *app_wrk; + u8 is_cl; + + /* Unset as soon as event is requested */ + svm_fifo_clear_deq_ntf (s->tx_fifo); + + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (PREDICT_FALSE (!app_wrk)) + return -1; + + is_cl = s->session_state == SESSION_STATE_LISTENING || + s->session_state == SESSION_STATE_OPENED; + session_program_io_event (app_wrk, s, SESSION_IO_EVT_TX, is_cl ? 1 : 0); + + if (PREDICT_FALSE (svm_fifo_n_subscribers (s->tx_fifo))) + return session_notify_subscribers (app_wrk->app_index, s, s->tx_fifo, + SESSION_IO_EVT_TX); + + return 0; +} + +/** + * Flushes queue of sessions that are to be notified of new data + * enqueued events. + * + * @param transport_proto transport protocol for which queue to be flushed + * @param thread_index Thread index for which the flush is to be performed. + * @return 0 on success or a positive number indicating the number of + * failures due to API queue being full. + */ +void +session_main_flush_enqueue_events (transport_proto_t transport_proto, + u32 thread_index) +{ + session_worker_t *wrk = session_main_get_worker (thread_index); + session_handle_t *handles; + session_t *s; + u32 i; + + handles = wrk->session_to_enqueue[transport_proto]; + + for (i = 0; i < vec_len (handles); i++) + { + s = session_get_from_handle (handles[i]); + session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, + 0 /* TODO/not needed */); + session_enqueue_notify_inline (s, + s->thread_index != thread_index ? 1 : 0); + } + + vec_reset_length (handles); + wrk->session_to_enqueue[transport_proto] = handles; +} + /* - * Enqueue data for delivery to session peer. Does not notify peer of enqueue - * event but on request can queue notification events for later delivery by - * calling stream_server_flush_enqueue_events(). + * Enqueue data for delivery to app. If requested, it queues app notification + * event for later delivery. * * @param tc Transport connection which is to be enqueued data * @param b Buffer to be enqueued @@ -610,15 +773,14 @@ session_enqueue_stream_connection (transport_connection_t * tc, if (queue_event) { - /* Queue RX event on this fifo. Eventually these will need to be flushed - * by calling stream_server_flush_enqueue_events () */ - session_worker_t *wrk; - - wrk = session_main_get_worker (s->thread_index); + /* Queue RX event on this fifo. Eventually these will need to be + * flushed by calling @ref session_main_flush_enqueue_events () */ if (!(s->flags & SESSION_F_RX_EVT)) { + session_worker_t *wrk = session_main_get_worker (s->thread_index); + ASSERT (s->thread_index == vlib_get_thread_index ()); s->flags |= SESSION_F_RX_EVT; - vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index); + vec_add1 (wrk->session_to_enqueue[tc->proto], session_handle (s)); } session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0); @@ -627,10 +789,11 @@ session_enqueue_stream_connection (transport_connection_t * tc, return enqueued; } -int -session_enqueue_dgram_connection (session_t * s, - session_dgram_hdr_t * hdr, - vlib_buffer_t * b, u8 proto, u8 queue_event) +always_inline int +session_enqueue_dgram_connection_inline (session_t *s, + session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, + u8 queue_event, u32 is_cl) { int rv; @@ -639,12 +802,10 @@ session_enqueue_dgram_connection (session_t * s, if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT))) { - /* *INDENT-OFF* */ svm_fifo_seg_t segs[2] = { { (u8 *) hdr, sizeof (*hdr) }, { vlib_buffer_get_current (b), b->current_length } }; - /* *INDENT-ON* */ rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2, 0 /* allow_partial */ ); @@ -676,15 +837,16 @@ session_enqueue_dgram_connection (session_t * s, if (queue_event && rv > 0) { - /* Queue RX event on this fifo. Eventually these will need to be flushed - * by calling stream_server_flush_enqueue_events () */ - session_worker_t *wrk; - - wrk = session_main_get_worker (s->thread_index); + /* Queue RX event on this fifo. Eventually these will need to be + * flushed by calling @ref session_main_flush_enqueue_events () */ if (!(s->flags & SESSION_F_RX_EVT)) { + u32 thread_index = + is_cl ? vlib_get_thread_index () : s->thread_index; + session_worker_t *wrk = session_main_get_worker (thread_index); + ASSERT (s->thread_index == vlib_get_thread_index () || is_cl); s->flags |= SESSION_F_RX_EVT; - vec_add1 (wrk->session_to_enqueue[proto], s->session_index); + vec_add1 (wrk->session_to_enqueue[proto], session_handle (s)); } session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0); @@ -693,6 +855,23 @@ session_enqueue_dgram_connection (session_t * s, } int +session_enqueue_dgram_connection (session_t *s, session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, u8 queue_event) +{ + return session_enqueue_dgram_connection_inline (s, hdr, b, proto, + queue_event, 0 /* is_cl */); +} + +int +session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, + u8 queue_event) +{ + return session_enqueue_dgram_connection_inline (s, hdr, b, proto, + queue_event, 1 /* is_cl */); +} + +int session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer, u32 offset, u32 max_bytes) { @@ -715,189 +894,6 @@ session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes) return rv; } -static inline int -session_notify_subscribers (u32 app_index, session_t * s, - svm_fifo_t * f, session_evt_type_t evt_type) -{ - app_worker_t *app_wrk; - application_t *app; - int i; - - app = application_get (app_index); - if (!app) - return -1; - - for (i = 0; i < f->shr->n_subscribers; i++) - { - app_wrk = application_get_worker (app, f->shr->subscribers[i]); - if (!app_wrk) - continue; - if (app_worker_lock_and_send_event (app_wrk, s, evt_type)) - return -1; - } - - return 0; -} - -/** - * Notify session peer that new data has been enqueued. - * - * @param s Stream session for which the event is to be generated. - * @param lock Flag to indicate if call should lock message queue. - * - * @return 0 on success or negative number if failed to send notification. - */ -static inline int -session_enqueue_notify_inline (session_t * s) -{ - app_worker_t *app_wrk; - u32 session_index; - u8 n_subscribers; - u32 thread_index; - - session_index = s->session_index; - thread_index = s->thread_index; - n_subscribers = svm_fifo_n_subscribers (s->rx_fifo); - - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (PREDICT_FALSE (!app_wrk)) - { - SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index); - return 0; - } - - SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo)); - - s->flags &= ~SESSION_F_RX_EVT; - - /* Application didn't confirm accept yet */ - if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING)) - return 0; - - if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s, - SESSION_IO_EVT_RX))) - return -1; - - if (PREDICT_FALSE (n_subscribers)) - { - s = session_get (session_index, thread_index); - return session_notify_subscribers (app_wrk->app_index, s, - s->rx_fifo, SESSION_IO_EVT_RX); - } - - return 0; -} - -int -session_enqueue_notify (session_t * s) -{ - return session_enqueue_notify_inline (s); -} - -static void -session_enqueue_notify_rpc (void *arg) -{ - u32 session_index = pointer_to_uword (arg); - session_t *s; - - s = session_get_if_valid (session_index, vlib_get_thread_index ()); - if (!s) - return; - - session_enqueue_notify (s); -} - -/** - * Like session_enqueue_notify, but can be called from a thread that does not - * own the session. - */ -void -session_enqueue_notify_thread (session_handle_t sh) -{ - u32 thread_index = session_thread_from_handle (sh); - u32 session_index = session_index_from_handle (sh); - - /* - * Pass session index (u32) as opposed to handle (u64) in case pointers - * are not 64-bit. - */ - session_send_rpc_evt_to_thread (thread_index, - session_enqueue_notify_rpc, - uword_to_pointer (session_index, void *)); -} - -int -session_dequeue_notify (session_t * s) -{ - app_worker_t *app_wrk; - - svm_fifo_clear_deq_ntf (s->tx_fifo); - - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (PREDICT_FALSE (!app_wrk)) - return -1; - - if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s, - SESSION_IO_EVT_TX))) - return -1; - - if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers)) - return session_notify_subscribers (app_wrk->app_index, s, - s->tx_fifo, SESSION_IO_EVT_TX); - - return 0; -} - -/** - * Flushes queue of sessions that are to be notified of new data - * enqueued events. - * - * @param thread_index Thread index for which the flush is to be performed. - * @return 0 on success or a positive number indicating the number of - * failures due to API queue being full. - */ -int -session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index) -{ - session_worker_t *wrk = session_main_get_worker (thread_index); - session_t *s; - int i, errors = 0; - u32 *indices; - - indices = wrk->session_to_enqueue[transport_proto]; - - for (i = 0; i < vec_len (indices); i++) - { - s = session_get_if_valid (indices[i], thread_index); - if (PREDICT_FALSE (!s)) - { - errors++; - continue; - } - - session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, - 0 /* TODO/not needed */ ); - - if (PREDICT_FALSE (session_enqueue_notify_inline (s))) - errors++; - } - - vec_reset_length (indices); - wrk->session_to_enqueue[transport_proto] = indices; - - return errors; -} - -int -session_main_flush_all_enqueue_events (u8 transport_proto) -{ - vlib_thread_main_t *vtm = vlib_get_thread_main (); - int i, errors = 0; - for (i = 0; i < 1 + vtm->n_threads; i++) - errors += session_main_flush_enqueue_events (transport_proto, i); - return errors; -} - int session_stream_connect_notify (transport_connection_t * tc, session_error_t err) @@ -951,43 +947,20 @@ session_stream_connect_notify (transport_connection_t * tc, return 0; } -typedef union session_switch_pool_reply_args_ -{ - struct - { - u32 session_index; - u16 thread_index; - u8 is_closed; - }; - u64 as_u64; -} session_switch_pool_reply_args_t; - -STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword), - "switch pool reply args size"); - static void -session_switch_pool_reply (void *arg) +session_switch_pool_closed_rpc (void *arg) { - session_switch_pool_reply_args_t rargs; + session_handle_t sh; session_t *s; - rargs.as_u64 = pointer_to_uword (arg); - s = session_get_if_valid (rargs.session_index, rargs.thread_index); + sh = pointer_to_uword (arg); + s = session_get_from_handle_if_valid (sh); if (!s) return; - /* Session closed during migration. Clean everything up */ - if (rargs.is_closed) - { - transport_cleanup (session_get_transport_proto (s), s->connection_index, - s->thread_index); - segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); - session_free (s); - return; - } - - /* Notify app that it has data on the new session */ - session_enqueue_notify (s); + transport_cleanup (session_get_transport_proto (s), s->connection_index, + s->thread_index); + session_cleanup (s); } typedef struct _session_switch_pool_args @@ -1005,8 +978,7 @@ static void session_switch_pool (void *cb_args) { session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args; - session_switch_pool_reply_args_t rargs; - session_handle_t new_sh; + session_handle_t sh, new_sh; segment_manager_t *sm; app_worker_t *app_wrk; session_t *s; @@ -1014,37 +986,32 @@ session_switch_pool (void *cb_args) ASSERT (args->thread_index == vlib_get_thread_index ()); s = session_get (args->session_index, args->thread_index); - /* Check if session closed during migration */ - rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING; + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (!app_wrk) + goto app_closed; - transport_cleanup (session_get_transport_proto (s), s->connection_index, - s->thread_index); + /* Cleanup fifo segment slice state for fifos */ + sm = app_worker_get_connect_segment_manager (app_wrk); + segment_manager_detach_fifo (sm, &s->rx_fifo); + segment_manager_detach_fifo (sm, &s->tx_fifo); - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (app_wrk) - { - /* Cleanup fifo segment slice state for fifos */ - sm = app_worker_get_connect_segment_manager (app_wrk); - segment_manager_detach_fifo (sm, &s->rx_fifo); - segment_manager_detach_fifo (sm, &s->tx_fifo); + /* Check if session closed during migration */ + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + goto app_closed; - /* Notify app, using old session, about the migration event */ - if (!rargs.is_closed) - { - new_sh = session_make_handle (args->new_session_index, - args->new_thread_index); - app_worker_migrate_notify (app_wrk, s, new_sh); - } - } + new_sh = + session_make_handle (args->new_session_index, args->new_thread_index); + app_worker_migrate_notify (app_wrk, s, new_sh); - /* Trigger app read and fifo updates on the new thread */ - rargs.session_index = args->new_session_index; - rargs.thread_index = args->new_thread_index; - session_send_rpc_evt_to_thread (args->new_thread_index, - session_switch_pool_reply, - uword_to_pointer (rargs.as_u64, void *)); + clib_mem_free (cb_args); + return; - session_free (s); +app_closed: + /* Session closed during migration. Clean everything up */ + sh = session_handle (s); + session_send_rpc_evt_to_thread (args->new_thread_index, + session_switch_pool_closed_rpc, + uword_to_pointer (sh, void *)); clib_mem_free (cb_args); } @@ -1184,6 +1151,7 @@ session_transport_delete_notify (transport_connection_t * tc) break; case SESSION_STATE_CLOSED: session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT); + session_set_state (s, SESSION_STATE_TRANSPORT_DELETED); session_delete (s); break; default: @@ -1633,7 +1601,7 @@ session_transport_close (session_t * s) session_set_state (s, SESSION_STATE_CLOSED); /* If transport is already deleted, just free the session */ else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED) - session_free_w_fifos (s); + session_program_cleanup (s); return; } @@ -1660,7 +1628,7 @@ session_transport_reset (session_t * s) if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED) session_set_state (s, SESSION_STATE_CLOSED); else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED) - session_free_w_fifos (s); + session_program_cleanup (s); return; } @@ -2157,6 +2125,7 @@ session_node_enable_disable (u8 is_en) if (!sm->poll_main) continue; } + vlib_node_set_state (vm, session_input_node.index, mstate); vlib_node_set_state (vm, session_queue_node.index, state); } diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 6b6d1f64499..4de7bb252f0 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -100,8 +100,8 @@ typedef struct session_worker_ /** Convenience pointer to this worker's vlib_main */ vlib_main_t *vm; - /** Per-proto vector of sessions to enqueue */ - u32 **session_to_enqueue; + /** Per-proto vector of session handles to enqueue */ + session_handle_t **session_to_enqueue; /** Timerfd used to periodically signal wrk session queue node */ int timerfd; @@ -148,6 +148,9 @@ typedef struct session_worker_ /** List head for first worker evts pending handling on main */ clib_llist_index_t evts_pending_main; + /** Per-app-worker bitmap of pending notifications */ + uword *app_wrks_pending_ntf; + int config_index; u8 dma_enabled; session_dma_transfer *dma_trans; @@ -275,6 +278,7 @@ typedef struct session_main_ extern session_main_t session_main; extern vlib_node_registration_t session_queue_node; +extern vlib_node_registration_t session_input_node; extern vlib_node_registration_t session_queue_process_node; extern vlib_node_registration_t session_queue_pre_input_node; @@ -358,7 +362,8 @@ int session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq); session_t *session_alloc (u32 thread_index); void session_free (session_t * s); -void session_free_w_fifos (session_t * s); +void session_cleanup (session_t *s); +void session_program_cleanup (session_t *s); void session_cleanup_half_open (session_handle_t ho_handle); u8 session_is_valid (u32 si, u8 thread_index); @@ -452,8 +457,9 @@ void session_transport_reset (session_t * s); void session_transport_cleanup (session_t * s); int session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type); -int session_enqueue_notify (session_t * s); +int session_enqueue_notify (session_t *s); int session_dequeue_notify (session_t * s); +int session_enqueue_notify_cl (session_t *s); int session_send_io_evt_to_thread_custom (void *data, u32 thread_index, session_evt_type_t evt_type); void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, @@ -485,6 +491,10 @@ int session_enqueue_dgram_connection (session_t * s, session_dgram_hdr_t * hdr, vlib_buffer_t * b, u8 proto, u8 queue_event); +int session_enqueue_dgram_connection_cl (session_t *s, + session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, + u8 queue_event); int session_stream_connect_notify (transport_connection_t * tc, session_error_t err); int session_dgram_connect_notify (transport_connection_t * tc, @@ -502,6 +512,7 @@ int session_stream_accept (transport_connection_t * tc, u32 listener_index, u32 thread_index, u8 notify); int session_dgram_accept (transport_connection_t * tc, u32 listener_index, u32 thread_index); + /** * Initialize session layer for given transport proto and ip version * @@ -765,8 +776,8 @@ do { \ return clib_error_return (0, "session layer is not enabled"); \ } while (0) -int session_main_flush_enqueue_events (u8 proto, u32 thread_index); -int session_main_flush_all_enqueue_events (u8 transport_proto); +void session_main_flush_enqueue_events (transport_proto_t transport_proto, + u32 thread_index); void session_queue_run_on_main_thread (vlib_main_t * vm); /** @@ -799,6 +810,8 @@ fifo_segment_t *session_main_get_wrk_mqs_segment (void); void session_node_enable_disable (u8 is_en); clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en); void session_wrk_handle_evts_main_rpc (void *); +void session_wrk_program_app_wrk_evts (session_worker_t *wrk, + u32 app_wrk_index); session_t *session_alloc_for_connection (transport_connection_t * tc); session_t *session_alloc_for_half_open (transport_connection_t *tc); diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index ff11bcb690a..55fc72ee4c2 100644 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -460,6 +460,52 @@ mq_send_session_cleanup_cb (session_t * s, session_cleanup_ntf_t ntf) app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_CLEANUP, &m, sizeof (m)); } +static int +mq_send_io_rx_event (session_t *s) +{ + session_event_t *mq_evt; + svm_msg_q_msg_t mq_msg; + app_worker_t *app_wrk; + svm_msg_q_t *mq; + + if (svm_fifo_has_event (s->rx_fifo)) + return 0; + + app_wrk = app_worker_get (s->app_wrk_index); + mq = app_wrk->event_queue; + + mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + mq_evt = svm_msg_q_msg_data (mq, &mq_msg); + + mq_evt->event_type = SESSION_IO_EVT_RX; + mq_evt->session_index = s->rx_fifo->shr->client_session_index; + + (void) svm_fifo_set_event (s->rx_fifo); + + svm_msg_q_add_raw (mq, &mq_msg); + + return 0; +} + +static int +mq_send_io_tx_event (session_t *s) +{ + app_worker_t *app_wrk = app_worker_get (s->app_wrk_index); + svm_msg_q_t *mq = app_wrk->event_queue; + session_event_t *mq_evt; + svm_msg_q_msg_t mq_msg; + + mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + mq_evt = svm_msg_q_msg_data (mq, &mq_msg); + + mq_evt->event_type = SESSION_IO_EVT_TX; + mq_evt->session_index = s->tx_fifo->shr->client_session_index; + + svm_msg_q_add_raw (mq, &mq_msg); + + return 0; +} + static session_cb_vft_t session_mq_cb_vft = { .session_accept_callback = mq_send_session_accepted_cb, .session_disconnect_callback = mq_send_session_disconnected_cb, @@ -469,6 +515,8 @@ static session_cb_vft_t session_mq_cb_vft = { .session_cleanup_callback = mq_send_session_cleanup_cb, .add_segment_callback = mq_send_add_segment_cb, .del_segment_callback = mq_send_del_segment_cb, + .builtin_app_rx_callback = mq_send_io_rx_event, + .builtin_app_tx_callback = mq_send_io_tx_event, }; static void @@ -1246,6 +1294,8 @@ static session_cb_vft_t session_mq_sapi_cb_vft = { .session_cleanup_callback = mq_send_session_cleanup_cb, .add_segment_callback = mq_send_add_segment_sapi_cb, .del_segment_callback = mq_send_del_segment_sapi_cb, + .builtin_app_rx_callback = mq_send_io_rx_event, + .builtin_app_tx_callback = mq_send_io_tx_event, }; static void diff --git a/src/vnet/session/session_input.c b/src/vnet/session/session_input.c new file mode 100644 index 00000000000..8c1f11ca96e --- /dev/null +++ b/src/vnet/session/session_input.c @@ -0,0 +1,296 @@ +/* SPDX-License-Identifier: Apache-2.0 + * Copyright(c) 2023 Cisco Systems, Inc. + */ + +#include <vnet/session/session.h> +#include <vnet/session/application.h> + +static inline int +mq_try_lock (svm_msg_q_t *mq) +{ + int rv, n_try = 0; + + while (n_try < 100) + { + rv = svm_msg_q_try_lock (mq); + if (!rv) + return 0; + n_try += 1; + usleep (1); + } + + return -1; +} + +always_inline u8 +mq_event_ring_index (session_evt_type_t et) +{ + return (et >= SESSION_CTRL_EVT_RPC ? SESSION_MQ_CTRL_EVT_RING : + SESSION_MQ_IO_EVT_RING); +} + +void +app_worker_del_all_events (app_worker_t *app_wrk) +{ + session_worker_t *wrk; + session_event_t *evt; + u32 thread_index; + session_t *s; + + for (thread_index = 0; thread_index < vec_len (app_wrk->wrk_evts); + thread_index++) + { + while (clib_fifo_elts (app_wrk->wrk_evts[thread_index])) + { + clib_fifo_sub2 (app_wrk->wrk_evts[thread_index], evt); + switch (evt->event_type) + { + case SESSION_CTRL_EVT_MIGRATED: + s = session_get (evt->session_index, thread_index); + transport_cleanup (session_get_transport_proto (s), + s->connection_index, s->thread_index); + session_free (s); + break; + case SESSION_CTRL_EVT_CLEANUP: + s = session_get (evt->as_u64[0] & 0xffffffff, thread_index); + if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION) + break; + uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s); + break; + case SESSION_CTRL_EVT_HALF_CLEANUP: + s = ho_session_get (evt->session_index); + pool_put_index (app_wrk->half_open_table, s->ho_index); + session_free (s); + break; + default: + break; + } + } + wrk = session_main_get_worker (thread_index); + clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0); + } +} + +always_inline int +app_worker_flush_events_inline (app_worker_t *app_wrk, u32 thread_index, + u8 is_builtin) +{ + application_t *app = application_get (app_wrk->app_index); + svm_msg_q_t *mq = app_wrk->event_queue; + session_event_t *evt; + u32 n_evts = 128, i; + u8 ring_index, mq_is_cong; + session_t *s; + + n_evts = clib_min (n_evts, clib_fifo_elts (app_wrk->wrk_evts[thread_index])); + + if (!is_builtin) + { + mq_is_cong = app_worker_mq_is_congested (app_wrk); + if (mq_try_lock (mq)) + { + app_worker_set_mq_wrk_congested (app_wrk, thread_index); + return 0; + } + } + + for (i = 0; i < n_evts; i++) + { + evt = clib_fifo_head (app_wrk->wrk_evts[thread_index]); + if (!is_builtin) + { + ring_index = mq_event_ring_index (evt->event_type); + if (svm_msg_q_or_ring_is_full (mq, ring_index)) + { + app_worker_set_mq_wrk_congested (app_wrk, thread_index); + break; + } + } + + switch (evt->event_type) + { + case SESSION_IO_EVT_RX: + s = session_get (evt->session_index, thread_index); + s->flags &= ~SESSION_F_RX_EVT; + /* Application didn't confirm accept yet */ + if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING)) + break; + app->cb_fns.builtin_app_rx_callback (s); + break; + /* Handle sessions that might not be on current thread */ + case SESSION_IO_EVT_BUILTIN_RX: + s = session_get_from_handle_if_valid (evt->session_handle); + if (!s || s->session_state == SESSION_STATE_ACCEPTING) + break; + app->cb_fns.builtin_app_rx_callback (s); + break; + case SESSION_IO_EVT_TX: + s = session_get (evt->session_index, thread_index); + app->cb_fns.builtin_app_tx_callback (s); + break; + case SESSION_IO_EVT_TX_MAIN: + s = session_get_from_handle_if_valid (evt->session_handle); + if (!s) + break; + app->cb_fns.builtin_app_tx_callback (s); + break; + case SESSION_CTRL_EVT_BOUND: + /* No app cb function currently */ + if (is_builtin) + break; + mq_send_session_bound_cb (app_wrk->wrk_index, evt->as_u64[1] >> 32, + evt->session_handle, + evt->as_u64[1] & 0xffffffff); + break; + case SESSION_CTRL_EVT_ACCEPTED: + s = session_get (evt->session_index, thread_index); + app->cb_fns.session_accept_callback (s); + break; + case SESSION_CTRL_EVT_CONNECTED: + if (!(evt->as_u64[1] & 0xffffffff)) + s = session_get (evt->session_index, thread_index); + else + s = 0; + app->cb_fns.session_connected_callback (app_wrk->wrk_index, + evt->as_u64[1] >> 32, s, + evt->as_u64[1] & 0xffffffff); + break; + case SESSION_CTRL_EVT_DISCONNECTED: + s = session_get (evt->session_index, thread_index); + app->cb_fns.session_disconnect_callback (s); + break; + case SESSION_CTRL_EVT_RESET: + s = session_get (evt->session_index, thread_index); + app->cb_fns.session_reset_callback (s); + break; + case SESSION_CTRL_EVT_UNLISTEN_REPLY: + if (is_builtin) + break; + mq_send_unlisten_reply (app_wrk, evt->session_handle, + evt->as_u64[1] >> 32, + evt->as_u64[1] & 0xffffffff); + break; + case SESSION_CTRL_EVT_MIGRATED: + s = session_get (evt->session_index, thread_index); + app->cb_fns.session_migrate_callback (s, evt->as_u64[1]); + transport_cleanup (session_get_transport_proto (s), + s->connection_index, s->thread_index); + session_free (s); + /* Notify app that it has data on the new session */ + s = session_get_from_handle (evt->as_u64[1]); + session_send_io_evt_to_thread (s->rx_fifo, + SESSION_IO_EVT_BUILTIN_RX); + break; + case SESSION_CTRL_EVT_TRANSPORT_CLOSED: + s = session_get (evt->session_index, thread_index); + if (app->cb_fns.session_transport_closed_callback) + app->cb_fns.session_transport_closed_callback (s); + break; + case SESSION_CTRL_EVT_CLEANUP: + s = session_get (evt->as_u64[0] & 0xffffffff, thread_index); + if (app->cb_fns.session_cleanup_callback) + app->cb_fns.session_cleanup_callback (s, evt->as_u64[0] >> 32); + if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION) + break; + uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s); + break; + case SESSION_CTRL_EVT_HALF_CLEANUP: + s = ho_session_get (evt->session_index); + ASSERT (session_vlib_thread_is_cl_thread ()); + if (app->cb_fns.half_open_cleanup_callback) + app->cb_fns.half_open_cleanup_callback (s); + pool_put_index (app_wrk->half_open_table, s->ho_index); + session_free (s); + break; + case SESSION_CTRL_EVT_APP_ADD_SEGMENT: + app->cb_fns.add_segment_callback (app_wrk->wrk_index, + evt->as_u64[1]); + break; + case SESSION_CTRL_EVT_APP_DEL_SEGMENT: + app->cb_fns.del_segment_callback (app_wrk->wrk_index, + evt->as_u64[1]); + break; + default: + clib_warning ("unexpected event: %u", evt->event_type); + ASSERT (0); + break; + } + clib_fifo_advance_head (app_wrk->wrk_evts[thread_index], 1); + } + + if (!is_builtin) + { + svm_msg_q_unlock (mq); + if (mq_is_cong && i == n_evts) + app_worker_unset_wrk_mq_congested (app_wrk, thread_index); + } + + return 0; +} + +int +app_wrk_flush_wrk_events (app_worker_t *app_wrk, u32 thread_index) +{ + if (app_worker_application_is_builtin (app_wrk)) + return app_worker_flush_events_inline (app_wrk, thread_index, + 1 /* is_builtin */); + else + return app_worker_flush_events_inline (app_wrk, thread_index, + 0 /* is_builtin */); +} + +static inline int +session_wrk_flush_events (session_worker_t *wrk) +{ + app_worker_t *app_wrk; + uword app_wrk_index; + u32 thread_index; + + thread_index = wrk->vm->thread_index; + app_wrk_index = clib_bitmap_first_set (wrk->app_wrks_pending_ntf); + + while (app_wrk_index != ~0) + { + app_wrk = app_worker_get_if_valid (app_wrk_index); + /* app_wrk events are flushed on free, so should be valid here */ + ASSERT (app_wrk != 0); + app_wrk_flush_wrk_events (app_wrk, thread_index); + + if (!clib_fifo_elts (app_wrk->wrk_evts[thread_index])) + clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0); + + app_wrk_index = + clib_bitmap_next_set (wrk->app_wrks_pending_ntf, app_wrk_index + 1); + } + + if (!clib_bitmap_is_zero (wrk->app_wrks_pending_ntf)) + vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index); + + return 0; +} + +VLIB_NODE_FN (session_input_node) +(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame) +{ + u32 thread_index = vm->thread_index; + session_worker_t *wrk; + + wrk = session_main_get_worker (thread_index); + session_wrk_flush_events (wrk); + + return 0; +} + +VLIB_REGISTER_NODE (session_input_node) = { + .name = "session-input", + .type = VLIB_NODE_TYPE_INPUT, + .state = VLIB_NODE_STATE_DISABLED, +}; + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */
\ No newline at end of file diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index e15625e37ca..4f2cae4d196 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -142,10 +142,14 @@ session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt) session_worker_stat_error_inc (wrk, rv, 1); app_wrk = application_get_worker (app, mp->wrk_index); - mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv); + app_worker_listened_notify (app_wrk, a->handle, mp->context, rv); if (mp->ext_config) session_mq_free_ext_config (app, mp->ext_config); + + /* Make sure events are flushed before releasing barrier, to avoid + * potential race with accept. */ + app_wrk_flush_wrk_events (app_wrk, 0); } static void @@ -170,7 +174,8 @@ session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt) rv = vnet_bind_uri (a); app_wrk = application_get_worker (app, 0); - mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv); + app_worker_listened_notify (app_wrk, a->handle, mp->context, rv); + app_wrk_flush_wrk_events (app_wrk, 0); } static void @@ -215,7 +220,7 @@ session_mq_connect_one (session_connect_msg_t *mp) wrk = session_main_get_worker (vlib_get_thread_index ()); session_worker_stat_error_inc (wrk, rv, 1); app_wrk = application_get_worker (app, mp->wrk_index); - mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv); + app_worker_connect_notify (app_wrk, 0, rv, mp->context); } if (mp->ext_config) @@ -324,7 +329,7 @@ session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt) { session_worker_stat_error_inc (wrk, rv, 1); app_wrk = application_get_worker (app, 0 /* default wrk only */ ); - mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv); + app_worker_connect_notify (app_wrk, 0, rv, mp->context); } } @@ -410,7 +415,7 @@ session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt) if (!app_wrk) return; - mq_send_unlisten_reply (app_wrk, sh, mp->context, rv); + app_worker_unlisten_reply (app_wrk, sh, mp->context, rv); } static void @@ -466,7 +471,7 @@ session_mq_accepted_reply_handler (session_worker_t *wrk, session_set_state (s, SESSION_STATE_READY); if (!svm_fifo_is_empty_prod (s->rx_fifo)) - app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, s); /* Closed while waiting for app to reply. Resend disconnect */ if (old_state >= SESSION_STATE_TRANSPORT_CLOSING) @@ -669,7 +674,7 @@ session_mq_worker_update_handler (void *data) session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX); if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo)) - app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX); + app_worker_rx_notify (app_wrk, s); if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) app_worker_close_notify (app_wrk, s); @@ -1790,7 +1795,7 @@ session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node, break; svm_fifo_unset_event (s->rx_fifo); app_wrk = app_worker_get (s->app_wrk_index); - app_worker_builtin_rx (app_wrk, s); + app_worker_rx_notify (app_wrk, s); break; case SESSION_IO_EVT_TX_MAIN: s = session_get_if_valid (e->session_index, 0 /* main thread */); diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index 8755a146bce..4fe0c7cf09a 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -379,6 +379,8 @@ typedef enum SESSION_CTRL_EVT_APP_WRK_RPC, SESSION_CTRL_EVT_TRANSPORT_ATTR, SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY, + SESSION_CTRL_EVT_TRANSPORT_CLOSED, + SESSION_CTRL_EVT_HALF_CLEANUP, } session_evt_type_t; #define foreach_session_ctrl_evt \ @@ -437,6 +439,7 @@ typedef struct session_handle_t session_handle; session_rpc_args_t rpc_args; u32 ctrl_data_index; + u64 as_u64[2]; struct { u8 data[0]; diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c index f0d039fbbb8..3d8afaaba3d 100644 --- a/src/vnet/tcp/tcp_input.c +++ b/src/vnet/tcp/tcp_input.c @@ -1394,11 +1394,10 @@ always_inline uword tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame, int is_ip4) { - u32 thread_index = vm->thread_index, errors = 0; + u32 thread_index = vm->thread_index, n_left_from, *from; tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index); vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b; u16 err_counters[TCP_N_ERROR] = { 0 }; - u32 n_left_from, *from; if (node->flags & VLIB_NODE_FLAG_TRACE) tcp_established_trace_frame (vm, node, frame, is_ip4); @@ -1462,9 +1461,7 @@ tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node, b += 1; } - errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, - thread_index); - err_counters[TCP_ERROR_MSG_QUEUE_FULL] = errors; + session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index); tcp_store_err_counters (established, err_counters); tcp_handle_postponed_dequeues (wrk); tcp_handle_disconnects (wrk); @@ -1746,7 +1743,7 @@ always_inline uword tcp46_syn_sent_inline (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame, int is_ip4) { - u32 n_left_from, *from, thread_index = vm->thread_index, errors = 0; + u32 n_left_from, *from, thread_index = vm->thread_index; tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index); vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b; @@ -1981,9 +1978,7 @@ tcp46_syn_sent_inline (vlib_main_t *vm, vlib_node_runtime_t *node, tcp_inc_counter (syn_sent, error, 1); } - errors = - session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index); - tcp_inc_counter (syn_sent, TCP_ERROR_MSG_QUEUE_FULL, errors); + session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index); vlib_buffer_free (vm, from, frame->n_vectors); tcp_handle_disconnects (wrk); @@ -2058,7 +2053,7 @@ always_inline uword tcp46_rcv_process_inline (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame, int is_ip4) { - u32 thread_index = vm->thread_index, errors, n_left_from, *from, max_deq; + u32 thread_index = vm->thread_index, n_left_from, *from, max_deq; tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index); vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b; @@ -2431,9 +2426,7 @@ tcp46_rcv_process_inline (vlib_main_t *vm, vlib_node_runtime_t *node, tcp_inc_counter (rcv_process, error, 1); } - errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, - thread_index); - tcp_inc_counter (rcv_process, TCP_ERROR_MSG_QUEUE_FULL, errors); + session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index); tcp_handle_postponed_dequeues (wrk); tcp_handle_disconnects (wrk); vlib_buffer_free (vm, from, frame->n_vectors); diff --git a/src/vnet/tls/tls.c b/src/vnet/tls/tls.c index fb625c841c6..8175e22097a 100644 --- a/src/vnet/tls/tls.c +++ b/src/vnet/tls/tls.c @@ -61,8 +61,7 @@ tls_add_vpp_q_rx_evt (session_t * s) int tls_add_vpp_q_builtin_rx_evt (session_t * s) { - if (svm_fifo_set_event (s->rx_fifo)) - session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX); + session_enqueue_notify (s); return 0; } @@ -75,9 +74,10 @@ tls_add_vpp_q_tx_evt (session_t * s) } static inline int -tls_add_app_q_evt (app_worker_t * app, session_t * app_session) +tls_add_app_q_evt (app_worker_t *app_wrk, session_t *app_session) { - return app_worker_lock_and_send_event (app, app_session, SESSION_IO_EVT_RX); + app_worker_add_event (app_wrk, app_session, SESSION_IO_EVT_RX); + return 0; } u32 diff --git a/src/vnet/udp/udp_input.c b/src/vnet/udp/udp_input.c index 6e5ed158d56..33ee2cddefd 100644 --- a/src/vnet/udp/udp_input.c +++ b/src/vnet/udp/udp_input.c @@ -149,11 +149,9 @@ udp_connection_enqueue (udp_connection_t * uc0, session_t * s0, * enqueue event now while we still have the peeker lock */ if (s0->thread_index != thread_index) { - wrote0 = session_enqueue_dgram_connection (s0, hdr0, b, - TRANSPORT_PROTO_UDP, - /* queue event */ 0); - if (queue_event && !svm_fifo_has_event (s0->rx_fifo)) - session_enqueue_notify (s0); + wrote0 = session_enqueue_dgram_connection_cl ( + s0, hdr0, b, TRANSPORT_PROTO_UDP, + /* queue event */ queue_event && !svm_fifo_has_event (s0->rx_fifo)); } else { @@ -232,10 +230,9 @@ always_inline uword udp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame, u8 is_ip4) { - u32 n_left_from, *from, errors, *first_buffer; + u32 thread_index = vm->thread_index, n_left_from, *from, *first_buffer; vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b; u16 err_counters[UDP_N_ERROR] = { 0 }; - u32 thread_index = vm->thread_index; from = first_buffer = vlib_frame_vector_args (frame); n_left_from = frame->n_vectors; @@ -327,9 +324,7 @@ udp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node, } vlib_buffer_free (vm, first_buffer, frame->n_vectors); - errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_UDP, - thread_index); - err_counters[UDP_ERROR_MQ_FULL] = errors; + session_main_flush_enqueue_events (TRANSPORT_PROTO_UDP, thread_index); udp_store_err_counters (vm, is_ip4, err_counters); return frame->n_vectors; } |