diff options
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/application.c | 3 | ||||
-rw-r--r-- | src/vnet/session/session.c | 30 | ||||
-rw-r--r-- | src/vnet/session/session.h | 30 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 117 |
4 files changed, 176 insertions, 4 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 16acc9c997d..a93e4b9dbbf 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -497,6 +497,9 @@ VLIB_NODE_FN (appsl_rx_mqs_input_node) if (aw->pending_rx_mqs) vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index); + if (n_msgs && wrk->state == SESSION_WRK_INTERRUPT) + vlib_node_set_interrupt_pending (vm, session_queue_node.index); + return n_msgs; } diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index c24a95fd9a6..7513aa32ed8 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -28,11 +28,12 @@ static inline int session_send_evt_to_thread (void *data, void *args, u32 thread_index, session_evt_type_t evt_type) { + session_worker_t *wrk = session_main_get_worker (thread_index); session_event_t *evt; svm_msg_q_msg_t msg; svm_msg_q_t *mq; - mq = session_main_get_vpp_event_queue (thread_index); + mq = wrk->vpp_event_queue; if (PREDICT_FALSE (svm_msg_q_lock (mq))) return -1; if (PREDICT_FALSE (svm_msg_q_is_full (mq) @@ -72,6 +73,10 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index, evt->event_type = evt_type; svm_msg_q_add_and_unlock (mq, &msg); + + if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT)) + vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index); + return 0; } @@ -121,19 +126,20 @@ session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) void session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio) { - session_t *s; + session_t *s = session_get (tc->s_index, tc->thread_index); - s = session_get (tc->s_index, tc->thread_index); ASSERT (s->thread_index == vlib_get_thread_index ()); ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED); + if (!(s->flags & SESSION_F_CUSTOM_TX)) { s->flags |= SESSION_F_CUSTOM_TX; if (svm_fifo_set_event (s->tx_fifo) || transport_connection_is_descheduled (tc)) { - session_worker_t *wrk; session_evt_elt_t *elt; + session_worker_t *wrk; + wrk = session_main_get_worker (tc->thread_index); if (has_prio) elt = session_evt_alloc_new (wrk); @@ -142,6 +148,10 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio) elt->evt.session_index = tc->s_index; elt->evt.event_type = SESSION_IO_EVT_TX; tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED; + + if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT)) + vlib_node_set_interrupt_pending (wrk->vm, + session_queue_node.index); } } } @@ -157,6 +167,9 @@ sesssion_reschedule_tx (transport_connection_t * tc) elt = session_evt_alloc_new (wrk); elt->evt.session_index = tc->s_index; elt->evt.event_type = SESSION_IO_EVT_TX; + + if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT)) + vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index); } static void @@ -175,6 +188,9 @@ session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt) clib_memset (&elt->evt, 0, sizeof (session_event_t)); elt->evt.session_handle = session_handle (s); elt->evt.event_type = evt; + + if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT)) + vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index); } else session_send_ctrl_evt_to_thread (s, evt); @@ -1693,6 +1709,9 @@ session_manager_main_enable (vlib_main_t * vm) if (num_threads > 1) clib_rwlock_init (&smm->wrk[i].peekers_rw_locks); + + if (!smm->no_adaptive && smm->use_private_rx_mqs) + session_wrk_enable_adaptive_mode (wrk); } /* Allocate vpp event queues segment and queue */ @@ -1817,6 +1836,7 @@ session_main_init (vlib_main_t * vm) smm->session_enable_asap = 0; smm->poll_main = 0; smm->use_private_rx_mqs = 0; + smm->no_adaptive = 0; smm->session_baseva = HIGH_SEGMENT_BASEVA; #if (HIGH_SEGMENT_BASEVA > (4ULL << 30)) @@ -1938,6 +1958,8 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input) smm->poll_main = 1; else if (unformat (input, "use-private-rx-mqs")) smm->use_private_rx_mqs = 1; + else if (unformat (input, "no-adaptive")) + smm->no_adaptive = 1; else return clib_error_return (0, "unknown input `%U'", format_unformat_error, input); diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 55863163a89..93278d6c3e1 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -69,6 +69,18 @@ typedef struct session_ctrl_evt_data_ u8 data[SESSION_CTRL_MSG_MAX_SIZE]; } session_evt_ctrl_data_t; +typedef enum session_wrk_state_ +{ + SESSION_WRK_POLLING, + SESSION_WRK_INTERRUPT, + SESSION_WRK_IDLE, +} __clib_packed session_wrk_state_t; + +typedef enum session_wrk_flags_ +{ + SESSION_WRK_F_ADAPTIVE = 1 << 0, +} __clib_packed session_wrk_flag_t; + typedef struct session_worker_ { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); @@ -91,6 +103,15 @@ typedef struct session_worker_ /** Per-proto vector of sessions to enqueue */ u32 **session_to_enqueue; + /** Timerfd used to periodically signal wrk session queue node */ + u32 timerfd; + + /** Worker flags */ + session_wrk_flag_t flags; + + /** Worker state */ + session_wrk_state_t state; + /** Context for session tx */ session_tx_context_t ctx; @@ -121,6 +142,9 @@ typedef struct session_worker_ /** Vector of nexts for the pending tx buffers */ u16 *pending_tx_nexts; + /** Clib file for timerfd. Used only if adaptive mode is on */ + uword timerfd_file; + #if SESSION_DEBUG /** last event poll time by thread */ clib_time_type_t last_event_poll; @@ -177,6 +201,9 @@ typedef struct session_main_ /** Allocate private rx mqs for external apps */ u8 use_private_rx_mqs; + /** Do not enable session queue node adaptive mode */ + u8 no_adaptive; + /** vpp fifo event queue configured length */ u32 configured_event_queue_length; @@ -682,6 +709,8 @@ session_add_pending_tx_buffer (u32 thread_index, u32 bi, u32 next_node) session_worker_t *wrk = session_main_get_worker (thread_index); vec_add1 (wrk->pending_tx_buffers, bi); vec_add1 (wrk->pending_tx_nexts, next_node); + if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT)) + vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index); } always_inline void @@ -691,6 +720,7 @@ session_wrk_update_time (session_worker_t *wrk, f64 now) wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ; } +void session_wrk_enable_adaptive_mode (session_worker_t *wrk); fifo_segment_t *session_main_get_evt_q_segment (void); void session_node_enable_disable (u8 is_en); clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en); diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index f8157cc6214..d7adbb5fd06 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -24,6 +24,7 @@ #include <vnet/session/application_local.h> #include <vnet/session/session_debug.h> #include <svm/queue.h> +#include <sys/timerfd.h> #define app_check_thread_and_barrier(_fn, _arg) \ if (!vlib_thread_is_main_w_barrier ()) \ @@ -1418,6 +1419,79 @@ session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq) return n_to_dequeue; } +static void +session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns) +{ + struct itimerspec its; + + its.it_value.tv_sec = 0; + its.it_value.tv_nsec = time_ns; + its.it_interval.tv_sec = 0; + its.it_interval.tv_nsec = its.it_value.tv_nsec; + + if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1) + clib_warning ("timerfd_settime"); +} + +always_inline u64 +session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index) +{ + if (state == SESSION_WRK_INTERRUPT) + return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6; + else if (state == SESSION_WRK_IDLE) + return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8; + else + return 0; +} + +static inline void +session_wrk_state_update (session_worker_t *wrk, session_wrk_state_t state) +{ + u64 time_ns; + + wrk->state = state; + time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index); + session_wrk_timerfd_update (wrk, time_ns); +} + +static void +session_wrk_update_state (session_worker_t *wrk) +{ + vlib_main_t *vm = wrk->vm; + + if (wrk->state == SESSION_WRK_POLLING) + { + if (pool_elts (wrk->event_elts) == 3 && + vlib_last_vectors_per_main_loop (vm) < 1) + { + session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT); + vlib_node_set_state (vm, session_queue_node.index, + VLIB_NODE_STATE_INTERRUPT); + } + } + else if (wrk->state == SESSION_WRK_INTERRUPT) + { + if (pool_elts (wrk->event_elts) > 3 || + vlib_last_vectors_per_main_loop (vm) > 1) + { + session_wrk_state_update (wrk, SESSION_WRK_POLLING); + vlib_node_set_state (vm, session_queue_node.index, + VLIB_NODE_STATE_POLLING); + } + else if (PREDICT_FALSE (!pool_elts (wrk->sessions))) + { + session_wrk_state_update (wrk, SESSION_WRK_IDLE); + } + } + else + { + if (pool_elts (wrk->event_elts)) + { + session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT); + } + } +} + static uword session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame) @@ -1513,6 +1587,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets); + if (wrk->flags & SESSION_WRK_F_ADAPTIVE) + session_wrk_update_state (wrk); + return n_tx_packets; } @@ -1531,6 +1608,46 @@ VLIB_REGISTER_NODE (session_queue_node) = /* *INDENT-ON* */ static clib_error_t * +session_wrk_tfd_read_ready (clib_file_t *cf) +{ + session_worker_t *wrk = session_main_get_worker (cf->private_data); + u64 buf; + int rv; + + vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index); + rv = read (wrk->timerfd, &buf, sizeof (buf)); + if (rv < 0 && errno != EAGAIN) + clib_unix_warning ("failed"); + return 0; +} + +static clib_error_t * +session_wrk_tfd_write_ready (clib_file_t *cf) +{ + return 0; +} + +void +session_wrk_enable_adaptive_mode (session_worker_t *wrk) +{ + u32 thread_index = wrk->vm->thread_index; + clib_file_t template = { 0 }; + + if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0) + clib_warning ("timerfd_create"); + + template.read_function = session_wrk_tfd_read_ready; + template.write_function = session_wrk_tfd_write_ready; + template.file_descriptor = wrk->timerfd; + template.private_data = thread_index; + template.polling_thread_index = thread_index; + template.description = format (0, "session-wrk-tfd-%u", thread_index); + + wrk->timerfd_file = clib_file_add (&file_main, &template); + wrk->flags |= SESSION_WRK_F_ADAPTIVE; +} + +static clib_error_t * session_queue_exit (vlib_main_t * vm) { if (vlib_get_n_threads () < 2) |